\chapter{Introduction to Parallel R}
\label{chap:r} 

\section{Why Is R Featured in This Book?}

Our main language in the remaining chapters of this book will continue
to be C/C++, but we will also have a number of R examples.  Why feature R?

\begin{itemize}

\item R is the most widely-used programming language for statistics and
data manipulation.  In this era of Big Data, a number of ``parallel R''
packages have  been developed.  In particular, the package {\bf
parallel} is now a standard part of the R base distribution.

\item The widespread usage of R is epitomized by the fact that Google
has developed its own internal style guidelines for it.\footnote{I
personally do not like those guidelines, preferring my own, but the mere
fact that Google has set up its own guidelines shows the significance
they place on R.}  Oracle now includes R with its Big Data Appliance
package.

\item R will often be very convenient for the purpose of illustrating
various parallel algorithms.
% \footnote{For the same reason, R is
% attractive to an instructor in that it is easy to formulate problems for
% examinations!}  
This convenience arises from the fact that R has
built-in vector and matrix types, as well as a complex number type.  

\end{itemize}

Python also has various parallelization libraries, notably {\bf
multiprocessing}.  The topic of parallel Python in Chapter
\ref{chap:pythr}.

{\bf Examples in this chapter will be kept simple.}  But parallel R can
be applied to parallelize very large, complex problems.

There is a 5-minute tutorial in Appendix \ref{chap:rquickstart} of this
book.  In reading that material, keep in mind the role of R lists.

{\bf The key to parallel R---list manipulation:}

Many parallel R packages depend heavily on R lists.  Both input and
output arguments often take the form of lists, as do the return values.
The reader may wish to review the material on R lists in Appendix
\ref{chap:rquickstart}. 

\section{R and Embarrassing Parallel Problems}

It should be noted that parallel R packages tend to work well only on
embarrassingly parallel problems.  Recall that this was defined (in Sec.
\ref{embpar}) to mean algorithms that are not only easily parallelized
but that also have relatively small communication needs.\footnote{This
latter condition eliminates many iterative algorithms, even though they
may be easily parallelized.}  As we know, it is often the case that only
embarrassingly parallel algorithms give good performance, but is
especially so in the case of R, for the following reasons.  

The functional programming nature of R implies that technically, any
write to a single vector or matrix element, say

\begin{lstlisting}
x[3] <- 8
\end{lstlisting}

means that the entire vector or matrix is rewritten.\footnote{Element
assignment in R {\it is} a function call, with arguments in the above
case being {\bf x}, 3 and 8.}  There are exceptions to this (probably
more and more in each new version of R), but generally we must accept
that parallel vector and matrix code will be expensive in
R.\footnote{R's new Reference classes may change this somewhat.}

For non-embarrassingly parallel applications, one should consider
interfacing R to parallel C code, which is described in Section
\ref{cfromr}.

\section{Some Parallel R Packages}

Here are a few parallel R packages:

\begin{itemize}

\item Message-passing or scatter/gather (Section \ref{scattergather}):
{\bf Rmpi}, {\bf snow}, {\bf foreach}, {\bf rmr} (cloud), {\bf Rhipe}
(cloud), {\bf multicore}\footnote{The {\bf multicore} package runs on
multicore, i.e. shared memory, machines, but does not share data in the
read/write sense.}, {\bf rzmq}

\item Shared-memory:  {\bf Rdsm}, {\bf bigmemory}

\item GPU: {\bf gputools}, {\bf rgpu}

\end{itemize}

A far more extensive list is at
\url{http://cran.r-project.org/web/views/HighPerformanceComputing.html}.

Starting with version 2.14, the R base distribution includes a {\bf
parallel} package, consisting of {\bf snow} and {\bf multicore}.
(Earlier versions must download these separately.)  For this reason,
both of these are covered here.  I will also cover {\bf Rdsm/bigmemory}
and {\bf gputools}.  

\section{Installing and Loading the Packages}

{\bf Installing:}

As noted, if you have R 2.14 or later, you already have {\bf snow} and
{\bf multicore}.  In general, with the exception of {\bf Rgpu}, all of
the packages above are downloadable/installable from CRAN, the official
R repository for contributed code, at \url{http://cran.r-project.org}.
Here's what to do, say for {\bf snow}:

Suppose you want to install in the directory {\bf /a/b/c/}.  The easiest
way to do so is use R's {\bf install.packages()} function, say:

\begin{lstlisting}
> install.packages("snow","/a/b/c/")
\end{lstlisting}

This will install {\bf snow} in the directory {\bf /a/b/c/{\bf snow}}.

You'll need to arrange for the directory {\bf /a/b/c} (not {\bf
/a/b/c/snow}) to be added to your R library search path.  I recommend
placing a line

\begin{lstlisting}
.libPaths("/a/b/c/")
\end{lstlisting}

in a file {\bf .Rprofile} in your home directory (this is an R startup
file).

In some cases, due to issues such as locations of libraries, you may
need to install a CRAN package ``by hand.''  See Section \ref{gpuinstall} 
and \ref{rgpu} below.

{\bf Loading a package:}

Call {\bf library()} to load a package.  For instance, to load {\bf
parallel}, type:

\begin{lstlisting}
> library(parallel)
\end{lstlisting}

\section{The R snow Package}
\label{snow}

The real virtue of {\bf snow} is its simplicity.  The concept is simple,
the implementation is simple, and very little can go wrong.
Accordingly, it may be the most popular type of parallel R in use today.

The {\bf snow} package runs directly via network sockets (probably the
most common usage, since the user does not have to install anything
besides {\bf snow}), or on top of {\bf Rmpi} (R's interface to MPI), PVM
or NWS.

It operates under a scatter/gather model (Sec.
\ref{scattergather}).

For instance, just as the ordinary R function {\bf apply()} applies the
same function to all rows of a matrix (example below), the {\bf snow}
function {\bf parApply()} does that in parallel, across multiple
machines; different machines will work on different rows.  (Instead of
running on several machines, we might one several {\bf snow} clients on
one multicore machine.)

\subsection{Usage}

After loading {\bf snow}, by typing

\begin{lstlisting}
> library(snow)
\end{lstlisting}

one sets up a {\bf snow} cluster, by calling the {\bf snow} function
{\bf makeCluster()}.  The named argument {\bf type} of that function
indicates the networking platform, e.g. ``MPI'' or ``SOCK." The last
indicates that you wish {\bf snow} to run on TCP/IP sockets that it
creates itself, rather than going through MPI etc.

In the examples here, I used ``SOCK,'' on machines named {\bf pc48} and
{\bf pc49}, setting up the cluster this way:\footnote{If you are on a
shared-file system group of machines, try to stick to ones for which the
path to R is the same for all, to avoid problems.}

\begin{lstlisting}
> cls <- makeCluster(type="SOCK",c("pc48","pc49"))
\end{lstlisting}

Note that the above R code sets up {\bf worker nodes} at the machines
named {\bf pc48} and {\bf pc49}; these are in addition to the {\bf
manager node}, which is the machine on which that R code is
executed.  

By the way, if you want to make worker nodes on the same machine as the
manager (typically on a multicore machine), use {\bf localhost} as the
machine name.

There are various other optional arguments.  One you may find useful is
{\bf outfile}, which records the result of the call in the file {\bf
outfile}.  This can be helpful for debugging if the call fails.

\subsection{Example:  Matrix-Vector Multiply, Using parApply()}

To introduce {\bf snow}, consider a simple example of multiplication of
a vector by a matrix.  We set up a test matrix:

\begin{lstlisting}[numbers=left]
> a <- matrix(c(1:12),nrow=6)
> a
     [,1] [,2]
[1,]    1    7
[2,]    2    8
[3,]    3    9
[4,]    4   10
[5,]    5   11
[6,]    6   12
\end{lstlisting}

We will multiply the vector $(1,1)^{T}$ (T meaning transpose) by our
matrix {\bf a}.  In this small example, of course, we would do that
directly:

\begin{lstlisting}[numbers=left]
> a %*% c(1,1)
     [,1]
[1,]    8
[2,]   10
[3,]   12
[4,]   14
[5,]   16
[6,]   18
\end{lstlisting}


But let's see how we could do it using R's {\bf apply()} function, still
in serial form, as it will set the stage for extending to parallel
computation.

R's {\bf apply()} function calls a user-specified, scalar-valued
function to each of the rows (or each of the columns) of a
user-specified matrix.  This returns a vector.  To use {\bf apply()} for
our matrix-times-vector problem here, define a dot product function:

\begin{lstlisting}
> dot <- function(x,y) {return(x%*%y)}
\end{lstlisting}

Now call {\bf apply()}:

\begin{lstlisting}
> apply(a,1,dot,c(1,1))
[1]  8 10 12 14 16 18
\end{lstlisting}

This call applies the function {\bf dot()} to each row (indicated by the
1, with 2 meaning column instead of row) of the matrix {\bf a}; a row
plays the role of the first argument to {\bf dot()}, and with c(1,1)
playing the role of the second argument.  In other words, the first call
to {\bf dot()} will be

\begin{lstlisting}
dot(c(1,7),c(1,1))
\end{lstlisting}

The {\bf snow} library function {\bf parApply()} then extends {\bf
apply()} to parallel computation.  Let's use it to parallelize our
matrix multiplication, across our the machines in our cluster {\bf cls}:

\begin{lstlisting}
> parApply(cls,a,1,dot,c(1,1))
[1]  8 10 12 14 16 18
\end{lstlisting}

What {\bf parApply()} did was to send some of the rows of the matrix to
each node, also sending them the function {\bf dot()} and the argument
{\bf c(1,1)}.  Each node applied {\bf dot()} to each of the rows it was
given, and then returned the results to be assembled by the manager node.

R's {\bf apply()} function is normally used in scalar-valued situations,
meaning that {\bf f()} in a call {\bf apply(m,i,f)} is scalar-valued.
If {\bf f()} is vector-valued, then a matrix will be returned instead of
a vector, with each column of that matrix being the result of calling
{\bf f()} on a row or column of {\bf m}.  The same holds for {\bf
parApply()}.

\subsection{Other snow Functions:  clusterApply(), clusterCall() Etc.}

In the last section we introduced the {\bf parApply()} function.  Its
general call form is

\begin{itemize}

\item {\bf parApply():}

The call 

\begin{lstlisting}
parApply(cls,m,DIM,f,...)}
\end{lstlisting}

\end{itemize}

results in the rows of the matrix {\bf m} parceled out to the various
worker nodes of {\bf cls}, at which {\bf f()} will be applied to each
row, with optional parameters in the position indicated by the ellipsis.
The argument {\bf DIM} is 1 for row operations, 2 for columns.

The return value is a vector (or possibly a matrix, as noted above).

The virtue of {\bf snow} is its simplicity.  Thus it does not have a lot
of complex functions.  But there is certainly more than just {\bf
parApply()}.  Here are a few more:

\begin{itemize}

\item {\bf clusterApply():}

This function may be the most heavily-used function in {\bf snow}.  The
call 

\begin{lstlisting}
clusterApply(cls,individualargs,f,...)}
\end{lstlisting}

runs {\bf f()} at each worker node in {\bf cls}.  Here {\bf
individualargs} is an R list (if it is a vector, it will be converted to
a list).  When {\bf f()} is called at node i of the cluster, its
arguments will be as follows.  The first argument will be the i$^{th}$
element of the {\bf individualargs}, i.e. {\bf individualargs[[i]]}.  If
arguments indicated by the ellipsis are in the call (optional), then
these will be passed to {\bf f()} as its second, third and so on
arguments.

If {\bf individualargs} has more elements than the number of nodes in
the cluster, then {\bf cls} will be recycled (treating it as a vector),
so that most or all nodes will call {\bf f()} on more than element of
{\bf individualargs}.

The return value is an R list, whose i$^{th}$ component is the result of
the call to {\bf f()} on the i$^{th}$ element of {\bf individualargs}.

Typically the list {\bf individualargs} consists of the work to be split
up and done in parallel.

\item {\bf clusterApplyLB():}

This is a load-balancing form of {\bf clusterApply()}, aimed at
addressing the performance issues we discussed on Chapter
\ref{chap:issues}.  

To explain the difference between the two forms of the cluster-apply
operation, suppose our cluster consists of 10 nodes, and that we have 25
tasks for them to do (i.e. {\bf individualargs} has length 25).  With
{\bf clusterApply()}, the following will occur:

\begin{itemize}

\item The first 10 tasks will be sent to the workers, one task per
worker.

\item The manager will wait for all 10 tasks to complete, and then will
send out the next 10.

\item The manager will wait for these 10 tasks to complete, and then will
send out the remaining 5.

\item The manager will wait for the results of these 5, and then return
the 25 results to the caller.

\end{itemize}

By contrast, with {\bf clusterApplyLB()}, the event flow will go this
way:

\begin{itemize}

\item The first 10 tasks will be sent to the workers, one task per
worker.

\item When some node finishes, the manager will take action right away,
sending the 11$^{th}$ task to this node, even though the others aren't
done.

\item The manager will continue in this fashion, giving each node a new
task as soon as the node finishes its old one, until all the tasks are
done.

\item The manager will then return the 25 results to the caller.

\end{itemize}

In the language of Chapter \ref{chap:issues}, and of Section
\ref{schedulework} of our OpenMP chapter, {\bf clusterApply()} employs a
{\bf static} scheduling policy, while {\bf clusterApplyLB()} uses a
dynamic one; chunk size is 1.

\item {\bf clusterCall():}

The function {\bf clusterCall(cls,f,...)} sends the function {\bf f()},
and the set of arguments (if any) represented by the ellipsis above to
each worker node, where {\bf f()} is evaluated on the arguments.  The
return value is an R list, with the i$^{th}$ element is the result of
the computation at node i in the cluster.  (It might seem at first that
each node will return the same value, but typically the {\tt f()} will
make use of variables special to the node, thus yielding different
results.)

\item {\bf clusterExport():}

The function {\bf clusterExport(cls,varlist)} copies the variables whose
names appear in the character vector {\bf varlist} to each worker in the
cluster {\bf cls}.  You can use this, for instance, to avoid constant
shipping of large data sets from the master to the workers, at great
communications costs.  With this function, you are able to ship a
quantity just once; you call {\bf clusterExport()} on the corresponding
variables, and then access those variables at worker nodes as
(node-specific) globals.  Again, the return value is an R list, with the
i$^{th}$ element is the result of the computation at node i in the
cluster.

By default, the variables to be exported must be global on the manager
node.

Note carefully that once you export a variable, say {\bf x}, from the
manager to the workers, their copies become independent of the one at
the manager (and independent of each other).  If one copy changes, that
change will not be reflected in the other copies.

\item {\bf clusterEvalQ():}

The function {\bf clusterEvalQ(cls,expression)} runs {\bf
expression} at each worker node in {\bf cls}.  

\end{itemize}

\subsection{Example:  Parallel Sum}

Let's go over one more toy problem, in which we have {\bf snow} do
parallel summation.  We'll do a simpler version, then a more advanced
one.

\begin{lstlisting}[numbers=left]
parsum <- function(cls,x) {
   # partition the indices of x among the cluster nodes (nothing 
   # is actually sent to them yet)
   xparts <- clusterSplit(cls,x)
   # now send to the nodes and have them sum
   tmp <- clusterApply(cls,xparts,sum)
   # now finish, combing the individual sums into the grand total
   tot <- 0
   for (i in 1:length(tmp)) tot <- tot + tmp[[i]]
   return(tot)
}
\end{lstlisting}

Let's test it on a two-worker cluster {\bf cls}:

\begin{lstlisting}
> x
[1]  1  2  3  4  5  6  5 12 13
> parsum1(cls,x)
[1] 51
\end{lstlisting}

Good.  Now, how does it work?

The basic idea is to break our vector into chunks, then distribute the
chunks to the worker nodes.  Each of the latter will sum its chunk, and
send the sum back to the manager node.  The latter will sum the sums,
giving us the grand total as desired.

In order to break our vector {\bf x} into chunks to send to the workers,
we'll first turn to the {\bf snow} function {\bf clusterSplit()}.  That
function inputs an R vector and breaks it into as many chunks as we have
worker nodes, just what we want.

For example, with {\bf x} as above on a two-worker cluster, we get

\begin{lstlisting}
> xparts <- clusterSplit(cls,x)
> xparts
[[1]]
[1] 1 2 3 4

[[2]]
[1]  5  6  5 12 13
\end{lstlisting}

Sure enough, our R list {\bf xparts} has one chunk of {\bf x} in one of
its components, and the other chunk of {\bf x} in the other component.
These two chunks are now sent to our two worker nodes:

\begin{lstlisting}
> tmp <- clusterApply(cls,xparts,sum)
> tmp
[[1]]
[1] 10

[[2]]
[1] 41
\end{lstlisting}

Again, {\bf clusterApply()}, like most {\bf snow} functions, returns its
results in an R list, which we've assigned to {\bf tmp}.  The contents
of the latter are

\begin{lstlisting}
> tmp
[[1]]
[1] 10

[[2]]
[1] 41
\end{lstlisting}

i.e. the sum of each chunk of {\bf x}.

To get the grand total, we can't merely call R's {\bf sum()} function on
{\bf tmp}:

\begin{lstlisting}
> sum(tmp)
Error in sum(tmp) : invalid 'type' (list) of argument
\end{lstlisting}

This is because {\bf sum()} works on vectors, not lists.  So, we just
wrote a loop to add everything together:

\begin{lstlisting}
tot <- 0
for (i in 1:length(tmp)) tot <- tot + tmp[[i]]
\end{lstlisting}

Note that we need double brackets to access list elements.

We can improve the code a little by replacing the above loop code by a
call to R's {\bf Reduce()} function, which works like the reduction
operators we saw in Sections \ref{ompreduction} and \ref{mpireduction}.
(Note, though, that this is a serial operation here, not parallel.)
It takes the form {\bf Reduce(f,y)} for a function {\bf f()} and a list
{\bf y}, and essentially does

\begin{lstlisting}
z <- y[1]
for (i in 2:length(y)) z <- f(z,y[i]) 
\end{lstlisting}

Using {\bf Reduce()} makes for more compact, readable code, and in some
cases may speed up execution (not an issue here, since we'll have only a
few items to sum).  Moreover, {\bf Reduce()} changes {\bf tmp} from an R
list to a vector for us, solving the problem we had above when we tried
to apply {\bf sum()} to {\bf tmp} directly.

Here's the new code:

\begin{lstlisting}[numbers=left]
parsum <- function(cls,x) {
   xparts <- clusterSplit(cls,x)
   tmp <- clusterApply(cls,xparts,sum)
   Reduce(sum,tmp)  # implicit return()
}
\end{lstlisting}

Note that in R, absent an explcit {\bf return()} call, the last value
computed is returned, in this case the value produced by {\bf Reduce()}.

{\bf Reduce()} is a very handy function in R in general, and with {\bf
snow} in particular.  Here's an example in which we combine several
matrices into one:

\begin{lstlisting}
> Reduce(rbind,list(matrix(5:8,nrow=2),3:4,c(-1,1)))
     [,1] [,2]
[1,]    5    7
[2,]    6    8
[3,]    3    4
[4,]   -1    1
\end{lstlisting}

The {\bf rbind()} functions has two operands, but in the situation above
we have three.  Calling {\bf Reduce()} solves that problem.

\subsection{Example:  Inversion of Block-Diagonal Matrices}
\label{blkd}

Suppose we have a block-diagonal matrix, such as

$$
\left (
   \begin{array}{cccc}
   1 & 2 & 0 & 0 \\
   3 & 4 & 0 & 0 \\
   0 & 0 & 8 & 1 \\
   0 & 0 & 1 & 5 
   \end{array}
\right )     
$$

\noindent
and we wish to find its inverse.  This is an embarrassingly parallel
problem:  If we have two processes, we simply have one process invert that
first 2x2 submatrix, have the second process invert the second 2x2
submatrix, and we then place the inverses back in the same diagonal
positions.  

Communication costs might not be too bad here, since inversion of an nxn
matrix takes $O(n^3)$ time while communication is only $O(n^2)$.

Here we'll discuss {\bf snow} code for inverting block-diagonal matrices.

\begin{lstlisting}[numbers=left]
# invert a block diagonal matrix m, whose sizes are given in szs;
# return value is the inverted matrix
bdiaginv <- function(cls,m,szs) {
   nb <- length(szs)  # number of blocks
   dgs <- list()   # will form args for clusterApply()
   rownums <- getrng(szs)
   for (i in 1:nb) {
      rng <- rownums[i,1]:rownums[i,2]
      dgs[[i]] <- m[rng,rng]
   }
   invs <- clusterApply(cls,dgs,solve)
   for (i in 1:nb) {
      rng <- rownums[i,1]:rownums[i,2]
      m[rng,rng] <- invs[[i]]
   }
   m  
}

# find row number ranges for the blocks, returned in a # 2-column 
# matrix; blkszs = block sizes
getrng <- function(blkszs) {
   col2 <- cumsum(blkszs)  # cumulative sums function
   col1 <- col2 - (blkszs-1)
   cbind(col1,col2)  # column bind
}
\end{lstlisting}

Let's test it:

\begin{lstlisting}
> m
     [,1] [,2] [,3] [,4] [,5]
[1,]    1    2    0    0    0
[2,]    7    8    0    0    0
[3,]    0    0    1    2    3
[4,]    0    0    2    4    5
[5,]    0    0    1    1    1
> bdiaginv(cls,m,c(2,3))
          [,1]       [,2] [,3] [,4] [,5]
[1,] -1.333333  0.3333333    0    0    0
[2,]  1.166667 -0.1666667    0    0    0
[3,]  0.000000  0.0000000    1   -1    2
[4,]  0.000000  0.0000000   -3    2   -1
[5,]  0.000000  0.0000000    2   -1    0
\end{lstlisting}

Note the {\bf szs} argument here, which contains the sizes of the
blocks.  Since we had one 2x2 block and a 3x3 one, the sizes were 2 and
3, hence the {\bf c(2,3)} argument in our call.  

The use of {\bf clusterApply()} here is similar to our earlier one.  The
main point in the code is to keep track of the positions of the blocks
within the big matrix.  To that end, we wrote {\bf getrng()}, which
returns the starting and ending row numbers for the various blocks.  we
use that to set up the argument {\bf dg} to be fed into {\bf
clusterApply()}:

\begin{lstlisting}
for (i in 1:nb) {
   rng <- rownums[i,1]:rownums[i,2]
   dgs[[i]] <- m[rng,rng]
\end{lstlisting}

Keep in mind that the expression {\bf m[rng,rng]} extracts a subset
of the rows and columns of {\bf m}, in this case the i$^{th}$ block.

\subsection{Example:  Mutual Outlinks}
\label{rmutlinks}

Consider the example of Section \ref{mutlinks}.  We have
a network graph of some kind, such as Web links.  For any two
vertices, say any two Web sites, we might be interested in mutual
outlinks, i.e. outbound links that are common to two Web sites.

The {\bf snow} code below finds the mean number of mutual outlinks, among
all pairs of sites in a set of Web sites.

\begin{lstlisting}[numbers=left]
# snow version of mutual links problem

library(snow)

mtl <- function(ichunks,m) {
   n <- ncol(m)
   matches <- 0
   for (i in ichunks) {
      if (i < n) {
         rowi <- m[i,]
         matches <- matches +
            sum(m[(i+1):n,] %*% as.vector(rowi))
      }
   }
   matches
}

# returns the mean number of mutual outlinks in m, computing on the
# cluster cls
mutlinks <- function(cls,m) {
   n <- nrow(m)
   nc <- length(cls)
   # determine which worker gets which chunk of i
   options(warn=-1)
   ichunks <- split(1:n,1:nc)
   options(warn=0)
   counts <- clusterApply(cls,ichunks,mtl,m)
   do.call(sum,counts) / (n*(n-1)/2)
}
\end{lstlisting}

For each row in {\bf m}, we will count mutual links in all rows below
that one.  To distribute the work among the worker nodes, we could have
a call to {\bf clusterSplit()} along the lines of

\begin{lstlisting}
clusterSplit(cls,1:nrow(m))
\end{lstlisting}

But this would presents a load imbalance problem, discussed in  Section
\ref{mutlinks}.  For instance, suppose again we have two worker nodes,
and there are 100 rows.  If we were to use {\bf clusterSplit()} as in
the last section, the first worker would be doing a lot more row
comparisons than would the second worker.

One solution to this problem would be to randomize the row numbers
before calling {\bf clusterSplit()}.  Another approach, taken in our
full code above, is to use R's {\bf split()} function.

What does {\bf split()} do?  It forms chunks of its first argument,
according to ``categories'' specified in the second.  Look at this
example:

\begin{lstlisting}
> split(2:5,c('a','b'))
$a
[1] 2 4

$b
[1] 3 5
\end{lstlisting}

Here the categories are 'a' and 'b'.  The {\bf split()} function
requires the second argument to be the same length as the first, so it
will first {\bf recycle} the second argument to 'a','b','a','b','a'.
The split will take 2,3,4,5 and treat 2 and 4 as being in categort 'a',
and 3 and 5 to be category 'b'.  The function returns a list
accordingly.

Now coming back to our above {\bf snow} example, and again assuming
two workers and  {\bf m} 100x100, the code

\begin{lstlisting}
nc <- length(cls)
ichunks <- split(1:n,1:nc)
\end{lstlisting}

produces a list of two components, with the odd-numbered rows in one
component and the evens in the other.  Our call,

\begin{lstlisting}
counts <- clusterApply(cls,ichunks,mtl,m)
\end{lstlisting}

then results in good load balance between the two workers.

Note that the call needed includ {\bf m} as an argument (which becomes
an argument to {\bf mtl()}).  Otherwise the workers would have no {\bf
m} to work it.  One alternative would have been to use {\bf
clusterExport()} to ship {\bf m} to the workers, at which it then would
be a global variable accessible by {\bf mtl()}.

By the way, the calls to {\bf options()} tell R not to warn us that it
did recycling.  It doesn't usually do so, but it will for {\bf split()}.

Then to get the grand total from the output list of individual sums, we
could have used {\bf Reduce()} again, but for variety utilized R's {\bf
do.call()} function.  That function does exactly what its name implies:
It will extract the elements of the list {\bf counts}, and then plug
them as arguments into {\bf sum()}!  (In general, {\bf do.call()} is
useful when we wish to call a certain function on a set of arguments
whose number won't be known until run time.)

As noted, instead of {\bf split()}, we could have randomized the rows:

\begin{lstlisting}
tmp <- clusterSplit(cls,order(runif(nrow(m))))
\end{lstlisting}

This generates a random number in (0,1) for each row, then finds the
order of these numbers.  If for instance the third number is the
20$^{th}$-smallest, element 3 of the output of {\bf order()} will be 20.
This amounts to finding a random permutation of the row numbers of {\bf
m}.

% \subsection{Example:  Doing a Scatter Operation}
% 
% A common operation in message-passing parallel systems is {\bf
% scatter/gather} (Section \ref{scattergather}).  Actually, {\bf snow} is
% fundamentally a scatter/gather-based package, but the operation itself
% may take some doing.  Here's how we could set this up for matrices in
% {\bf snow}.  
% 
% Recall that {\bf clusterApply()} requires an R list as its first
% argument.  So, if for instance we wish to scatter the rows of a matrix,
% we'll need to first construct a list whose elements are those rows.
% 
% Here's the code to do this:
% 
% \begin{lstlisting}[numbers=left]
% # returns a list of the matrix m's rows (rowcol=1) or columns 
% mat2lst <- function(m,rowcol=1) {
%    if (rowcol == 1) m <- t(m)
%    dm <- as.data.frame(m)
%    lapply(dm,function(col) col)
% }
% \end{lstlisting}
% 
% Note the line
% 
% \begin{lstlisting}
% lapply(dm,function(col) col)
% \end{lstlisting}
% 
% The second argument in \lstinline{lapply()} needs to be a function.  It
% could be the name of a function, or a function defined ``on the spot''
% (like {\it anonymous} functions in, say, Python), as is the case here.
% 
% On the R-help online discussion list, Petr Pikal suggested this
% approach:
% 
% \begin{lstlisting}[numbers=left]
% # returns a list of the matrix m's rows
% mat2lst <- function(m) {
%    split(m, 1:nrow(m))
% }
% \end{lstlisting}
% 
% This works because (a) an R matrix is actually a vector, store in
% row-major order and (b) in R a shorter vector is recycled to the length
% of the longer vector.
% 
% This second approach seems to work faster for rows, as no transpose
% operation is done.
% 
% Calling {\bf clusterApply()} with {\bf mrows}, the output of {\bf
% mat2lst(),} above would scatter the rows of the matrix to the various
% workers.  By the way, the workers may also need to know {\it which} rows
% they are given.  We could arrange that by appending a column to {\bf m}
% before calling {\bf mat2lst()}:
% 
% \begin{lstlisting}
% m <- cbind(m,1:nrow(m))
% \end{lstlisting}

\subsection{Example:  Transforming an Adjacency Matrix}
\label{snowadj}

Here is a {\bf snow} version of the code in Section \ref{transgraph}.
To review, here is the problem:

Say we have a graph with adjacency matrix

\begin{equation}
\left (
\begin{array}{rrrr}
0 & 1 & 0 & 0 \\
1 & 0 & 0 & 1 \\
0 & 1 & 0 & 1 \\
1 & 1 & 1 & 0 \\
\end{array}
\right )
\end{equation}

with row and column numbering starting at 0, not 1.  We'd like to
transform this to a two-column matrix that displays the links, in this
case

\begin{equation}
\left (
\begin{array}{rr}
0 & 1 \\
1 & 0 \\
1 & 3 \\
2 & 1 \\
2 & 3 \\
3 & 0 \\
3 & 1 \\
3 & 2 \\
\end{array}
\right )
\end{equation}

For instance, there is a 1 on the far right, second row of the above
matrix, meaning that in the graph there is an edge from vertex 1 to
vertex 3.  This results in the row (1,3) in the transformed matrix seen
above.

Here is code to do this computation in {\bf snow}:

\begin{lstlisting}[numbers=left]
tg <- function(cls,m) {
   n <- nrow(m)
   rowschunks <- clusterSplit(cls,1:n)  # make chunks of row numbers
   m1 <- cbind(1:n,m)  # prepend col of row numbers to m
   # now make the chunks of rows themselves
   tmp <- lapply(rowschunks,function(rchunk) m1[rchunk,])
   # launch the computation
   tmp <- clusterApply(cls,tmp,tgonchunk)
   do.call(rbind,tmp)  # combine into one large matrix
}

# a worker works on a chunk of rows
tgonchunk <- function(rows) {
   # note:  matrix space allocation not efficient
   mat <- NULL
   nc <- ncol(rows)
   for (i in 1:nrow(rows)) {
      row <- rows[i,]
      rownum <- row[1]
      for (j in 2:nc) {
         if (row[j] == 1) {
            if (is.null(mat)) {
               mat <- matrix(c(rownum,j-1),ncol=2)
            } else
               mat <- rbind(mat,c(rownum,j-1))
         }
      }
   }
   return(mat)
}
\end{lstlisting}

What is new here?  First, since we desired the output matrix to be in
lexicographical order, we needed a way to keep track of the original
indices of the rows.  So, we added a column for those numbers to {\bf
m}:

\begin{lstlisting}
m1 <- cbind(1:n,m)  # prepend col of row numbers to m
\end{lstlisting}

Second, note the use of R's {\bf lapply()} function.  Just as {\bf
apply()} calls a specified function on each row (or each column) of a
matrix, {\bf lapply()} calls a specified function on each element of a
list.  The output will also be a list.

In our case here, we need to feed the row chunks of {\bf m} into {\bf
clusterApply()}, but the latter requires that we do that via a list.
We could have done that using a {\bf for} loop, adding row chunks to a
list one by one, but it is more compact to use {\bf lapply()}.

In the end, the manager node receives many parts of the new matrix,
which must be combined.  It's natural to do that with the {\bf rbind()}
function, but again we need to overcome the fact that the parts are
packaged in an R list.  It's handy to use {\bf do.call()} again, though
{\bf Reduce()} would have worked too.

Note carefully that although it's natural to use {\bf rbind()} as
mentioned in the preceding paragraph, it's not efficient.  This is
because each call to {\bf rbind()} causes a new matrix to be allocated,
a time-consuming action.  It would be better to allocate, say, 50 rows
at a time, and fill in the rows as we build the matrix.  Whenever we
would use up all of a matrix, we would start a new one, and then return
all the matrices in a list.

\subsection{Example:  Setting Node IDs and Notification of Cluster Size}

Recall that in OpenMP there are functions {\bf omp\_get\_thread\_num()}
and {\bf omp\_get\_num\_threads()} that report a thread's ID number and
the total number of threads.  In MPI, the corresponding functions are
{\bf MPI\_Comm\_rank()} and {\bf MPI\_Comm\_size()}.  It would be nice
to have such functions (or such functionality) in {\bf snow}.  Here is
code for that purpose:

\begin{lstlisting}[numbers=left]
# sets a list myinfo as a global variable in the worker nodes in the
# cluster cls, with myinfo$id being the ID number of the worker and
# myinfo$nwrkrs being the number of workers in the cluster; called from
# the manager node
setmyinfo <- function(cls) {
   setmyinfo <- function(i,n) {
      myinfo <<- list(id = i, nwrkrs = n)
   }
   ncls <- length(cls)
   clusterApply(cls,1:ncls,setmyinfo,ncls)
}
\end{lstlisting}

Yes, R does allow defining a function within a function.  Note by the
way the use of the superassignment operator, \verb#<<-#, which assigns
to the global level.

After this call, any code executed by a worker node can then determine
its node number, e.g. in code such as

\begin{lstlisting}
if (myinfo$id == 1) ...
\end{lstlisting}

Or, we could send code from the manager to be executed on the workers:

\begin{lstlisting}
> setmyinfo(cls)
[[1]]
[[1]]$id
[1] 1

[[1]]$nwrkrs
[1] 2


[[2]]
[[2]]$id
[1] 2

[[2]]$nwrkrs
[1] 2

> clusterEvalQ(cls,myinfo$id)
[[1]]
[1] 1

[[2]]
[1] 2
\end{lstlisting}

In that first case, since {\bf clusterApply()} returns a value, it was
printed out.  In the second case, the call

\begin{lstlisting}
clusterEvalQ(cls,myinfo$id)
\end{lstlisting}

asks each worker to evaluate the expression \textbf{myinfo\$id};
{\bf clusterEvalQ()} then returns the results of evaluating the
expression at each worker node.

\subsection{Shutting Down a Cluster}

Don't forget to stop your clusters before exiting R, by calling
{\bf stopCluster(clustername)}.

\section{The multicore Package}

As the name implies, the {\bf multicore} package is used to exploit the
power of multicore machines.  This might seem odd: Since {\bf snow} can
be used on either a (physical) cluster of machines or on a multicore
machine, while {\bf multicore} can only be used on the latter, one might
wonder what, if anything, is to be gained by using {\bf multicore}.  The
answer is that one might gain in performance, as will be explained.

The package's main function, {\bf mclapply()}, is similar in syntax to
{\bf snow}'s {\bf clusterApply()}, and similarly parcels tasks out to
the various worker nodes.  

The worker nodes here, though, are just different processes on the same
machine.  Say for example you are running {\bf multicore} on a quad-core
machine.  Calling {\bf mclapply()} will start (a default value of) 4 new
invocations of R on your machine, each of which will work on a piece of
your application in parallel.  Each invocation has exactly the same R
variables set up as your original R process did before the call.  Thus
all the variables are shared initially (note that qualifier), and you as
the programmer do not take any special action to transfer variables from
the manager node to the worker nodes, quite a contrast to {\bf snow}.

The way all this is accomplished is that {\bf mclapply()} calls your OS'
{\bf fork()} function.  (It is thus limited to Unix-family OSs, such as
Linux and Macs.)  The process that is forked is R itself, with one new
copy per desired worker node.  

The workers thus start with copies of R all sharing whatever variables
existed at the time of the fork (including locals in the function that
you had call {\bf mclapply()}).  Thus your code does not have to copy
these variables to the workers, which automatically have access to them.
But note carefully that the variables are shared only initially, and a
write to one copy is NOT reflected in the other copies (including the
original one).  

The copying of the initial values of the variables from the manager node
to the worker nodes is done on a {\bf copy-on-write} basis, meaning that
data isn't copied to a node until (and unless) the node tries to access
that data.  The granularity is at the virtual memory page level (Section
\ref{howvmworks}).  Again, the OS handles this, not R.

Thus some physical copying does occur eventually, done by the OS, so
{\bf multicore} will not have as much advantage over {\bf snow} as one
might think.  However, there may be some latency-hiding advantage
(Section \ref{latencybandwidth}).  It may be the case that not all
workers need to access some variable at the same time, so one worker
might do so while others are doing actual computation.  

Note too that, in contrast to {\bf snow}, in which a cluster is set up
once per session and then repeatedly reused at each {\bf snow} function
call, with {\bf multicore} the worker R processes are set up again from
scratch each time a {\bf multicore} function is called.

\subsection{Example:  Transforming an Adjacency Matrix, multicore
Version}

Same application as in Section \ref{snowadj}, and indeed the function
{\bf tgonchunk()} below is just a modified version of what we had in the
{\bf snow} code.

The call

\begin{lstlisting}
mclapply(starts,tgonchunk,m1,chunksize,mc.cores=ncores)
\end{lstlisting}

applies the function {\bf tgonchunk()} to every element of the vector
{\bf starts} (changed to an R list first), with {\bf m1} and {\bf
chunksize} serving as additional arguments to {\bf mclapply()}.

\begin{lstlisting}[numbers=left]
# transgraph problem, R multicore version

# arguments:
#    m:  the input matrix 
#    ncores:  desired number of cores to use
tgmc <- function(m,ncores) {
   n <- nrow(m)
   chunksize <- floor(n/ncores)
   starts <- seq(1,n,chunksize)
   m1 <- cbind(1:n,m)  # prepend col of row numbers to m
   tmp <- mclapply(starts,tgonchunk,m1,chunksize,mc.cores=ncores)
   do.call(rbind,tmp)
}

# a worker works on a chunk of rows
tgonchunk <- function(start,m1,chunksize) {
   # note:  matrix space allocation not efficient
   outmat <- NULL
   end <- start + chunksize - 1
   nrm <- nrow(m1)
   if (end > nrm) end <- nrm
   ncm <- ncol(m1)
   for (i in start:end) {
      rownum <- m1[i,1]
      for (j in 2:ncm) {
         if (m1[i,j] == 1) {
            if (is.null(outmat)) {
               outmat <- matrix(c(rownum,j-1),ncol=2)
            } else
               outmat <- rbind(outmat,c(rownum,j-1))
         }
      }
   }
   return(outmat)
}
\end{lstlisting}

% \section{Rmpi}
% \label{rmpi}
% 
% The {\bf Rmpi} package provides an interface from R to MPI.  (MPI is
% covered in detail in Chapter \ref{chap:mpi}).  Its author is Hao Yu of
% the University of Western Ontario.
% 
% It is arguably the most versatile of the parallel R packages, as it
% allows any node to communicate directly with any other node, without
% passing through a ``middleman.''  (The latter is the manager program in
% {\bf snow}, and the server in {\bf Rdsm}.)  This could enable major
% reductions in communications costs, and thus major increases in speed.
% Moreover, {\bf Rmpi} is more efficient in communication than is {\bf
% snow}, for example.  
% 
% So, although many applications using {\bf Rmpi} could be done more
% simply in {\bf snow}, the former can bring better performance.
% 
% On the other hand, coding in {\bf Rmpi} generally requires more work
% than for the other packages.  In addition, MPI is quite finicky, and
% subtle errors in your setup may prevent it from running; that problem
% may be compounded if you run R and MPI together.  (In online R
% discussion groups, one of the most common types of queries concerns
% getting {\bf Rmpi} to run.)  
% 
% {\bf Rmpi} will not be used in this book, but here is an overview of how
% to use it:
% 
% \subsection{Usage}
% 
% Fire up MPI, and then from R load in {\bf Rmpi}, by typing
% 
% \begin{lstlisting}
% > library(Rmpi)
% \end{lstlisting}
% 
% Then start {\bf Rmpi}:
% 
% \begin{lstlisting}
% > mpi.spawn.Rslaves()
% \end{lstlisting}
% 
% On some systems, the call to {\bf mpi.spawn.Rslaves()} may encounter
% problems.  An alternate method of launching the worker processes is to
% copy the {\bf Rprofile} file in the {\bf Rmpi} distribution to {\bf
% .Rprofile} in your home directory.  Then start R, say for two workers
% and a manager, by running something like (for the LAM case)
% 
% \begin{lstlisting}
% mpirun -c 3 R --no-save -q
% \end{lstlisting}
% 
% This will start R on all machines in the group you started MPI on.
% 
% \subsection{Available Functions}
% 
% Most standard MPI functions are available, as well as many extras.
% Here are just a few examples: 
% 
% \begin{itemize}
% 
% \item {\bf mpi.comm.size():}  
% 
% Returns the number of MPI processes, including the master that spawned
% the other processes.
% 
% \item {\bf mpi.comm.rank():}  
% 
% Returns the rank of the process that executes it.
% 
% \item {\bf mpi.send(), mpi.recv()}:  
% 
% The usual send/receive operations.
% 
% \item {\bf mpi.bcast(), mpi.scatter(), mpi.gather():}  
% 
% The usual broadcast, scatter and gather operations.
% 
% \end{itemize}



% \subsection{Example:  Inversion of a Diagonal-Block Matrix}
% 
% Below is the {\bf Rmpi} code, for general n x n matrices of the
% block-diagonal form introduced in Section \ref{blkd}, but to keep this
% introductory Rmpi example simple, we'll assume that there are only two
% blocks, of the same size, with n/2 rows and n/2 columns, and with the
% manager itself doing half the work:
% 
% \begin{lstlisting}[numbers=left]
% parinv <- function(blkdg) {
%    n <- nrow(blkdg)
%    k <- n/2  # block size
%    # send the worker its task function
%    mpi.bcast.Robj2slave(bkinv) 
%    # get worker started running its task
%    mpi.bcast.cmd(bkinv())  
%    # send worker data to feed its task
%    mpi.send.Robj(blkdg[(k+1):n,(k+1):n],dest=1,tag=1)  
%    # prepare output matrix
%    outmat <- matrix(rep(0,n^2),nrow=n,ncol=n)
%    # manager does its task
%    outmat[1:k,1:k] <- solve(blkdg[1:k,1:k])  
%    # receive result from worker and place in output matrix
%    outmat[(k+1):n,(k+1):n] <-  
%       mpi.recv.Robj(source=1,tag=2) 
%    return(outmat)
% }
% 
% # function for worker to execute
% bkinv <- function() {
%    # receive data from manager
%    blk <- mpi.recv.Robj(source=0,tag=1)  
%    # invert matrix and send back to manager
%    mpi.send.Robj(solve(blk),dest=0,tag=2)  
% }
% 
% test <- function() {
%    nb <- 800
%    nb1 <- nb+1
%    nbx2 <- 2*nb
%    blk <- matrix(runif(nb^2),nrow=nb)  
%    mat <- matrix(rep(0,nbx2^2),nrow=nbx2)
%    mat[1:nb,1:nb] <- blk
%    mat[nb1:nbx2,nb1:nbx2] <- blk
%    print(system.time(parinv(mat)))  
%    print(system.time(solve(mat)))  
% }
% \end{lstlisting}

\section{Rdsm}

My {\bf Rdsm} package can be used as a threads system regardless of
whether you are on a NOW or a multicore machine.  It is an extension of
a similar package I wrote in 2002 for Perl, called PerlDSM.  (N.
Matloff, PerlDSM: A Distributed Shared Memory System for Perl, {\it
Proceedings of PDPTA 2002}, 2002, 63-68.)  The major advantages of {\bf
Rdsm} are:

\begin{itemize}

\item It uses a shared-memory programming model, which as noted in
Section \ref{sharedbetter}, is commonly considered in the parallel
processing community to be clearer than messag-passing. 

\item It allows full use of R's debugging tools.

\end{itemize}

{\bf Rdsm} gives the R programmer a shared memory view, but the objects
are not physically shared.  Instead, they are stored in a server and
accessed through network sockets,\footnote{Or, {\bf Rdsm} can be used
with the {\bf bigmemory} package, as seen in Section \ref{bigmemory}.}
thus enabling a threads-like view for R programmers even on NOWs.  There
is no manager/worker structure here.  All of the R processes execute the
same code, as peers.

Shared objects in {\bf Rdsm} can be numerical vectors or matrices, via
the classes {\bf dsmv} and {\bf dsmm}, or R lists, using the class {\bf
dsml}.  Communication with the server in the vector and matrix cases is
done in binary form for efficiency, while serialization is used for
lists.  There is as a built-in variable {\bf myinfo} that gives a
process' ID number and the total number of processes, analogous to the
information obtained in {\bf Rmpi} from the functions {\bf
mpi.comm.rank()} and {\bf mpi.comm.size()}. 

To install, again use {\bf install.packages()} as above.  There is
built-in documentation, but it's best to read through the code {\bf
MatMul.R} in the {\bf examples} directory of the {\bf Rdsm} distribution
first.  It is heavily commented, with the goal of serving as an
introduction to the package.

\subsection{Example:  Inversion of Block-Diagonal Matrices}

Let's see how the block-diagonal matrix inversion example from Section
\ref{blkd} can be handled in {\bf Rdsm}.

\begin{lstlisting}[numbers=left]
# invert a block diagonal matrix m, whose sizes are given in szs; here m
# is either an Rdsm or bigmemory shared variable; no return
# value--inversion is done in-place; it is assumed that there is one
# thread for each block

bdiaginv <- function(bd,szs) {
   # get number of rows of bd
   nrdb <- if(class(bd) == "big.matrix") dim(bd)[1] else bd$size[1]
   rownums <- getrng(nrdb,szs)
   myid <- myinfo$myid
   rng <- rownums[myid,1]:rownums[myid,2]
   bd[rng,rng] <- solve(bd[rng,rng])
   barr()  # barrier
}

# find row number ranges for the blocks, returned in a 2-column matrix; 
# matsz = number of rows in matrix, blkszs = block sizes
getrng <- function(matsz, blkszs) {
   nb <- length(blkszs)
   rwnms <- matrix(nrow=nb,ncol=2)
   for (i in 1:nb) {
      # i-th block will be in rows (and cols)  i1:i2
      i1 <- if (i==1) 1 else i2 + 1
      i2 <- if (i == nb) matsz else i1 + blkszs[i] - 1
      rwnms[i,] <- c(i1,i2)
   }
   rwnms
}
\end{lstlisting}

The parallel work is basically done in four lines:

\begin{lstlisting}
myid <- myinfo$myid
rng <- rownums[myid,1]:rownums[myid,2]
bd[rng,rng] <- solve(bd[rng,rng])
barr()  # barrier
\end{lstlisting}

compared to about 11 lines in the {\bf snow} implementation above.  This 
illustrates the power of the shared-memory programming model over
message passing.

\subsection{Example:  Web Probe}

In the general programming community, one major class of applications,
even on a serial platform, is parallel I/O.  Since each I/O operation
may take a long time (by CPU standards), it makes sense to do them in
parallel if possible.  {\bf Rdsm} facilitates doing this in R.

The example below repeatedly cycles through a large list of Web sites,
taking measurements on the time to access each one.  The data are stored
in a shared variable {\bf accesstimes}; the {\bf n} most recent access
times are stored.  Each {\bf Rdsm} process works on one Web site at a
time.

An unusual feature here is that one of the processes immediately exits,
returning to the R interactive command line.  This allows the user to
monitor the data that is being collected.  Remember, the shared
variables are still accessible to that process.  Thus while the other
processes are continually adding data to {\bf accesstimes} (and deleted
one item for each one added), the user can give commands to the exited
process to analyze the data, say with histograms, as the collection
progresses.

Note the use of lock/unlock operations here, with the {\bf Rdsm}
variables of the same names.

\begin{lstlisting}[numbers=left]
# if the variable accesstimes is length n, then the Rdsm vector
# accesstimes stores the n most recent probed access times, with element
# i being the i-th oldest

# arguments:
#    sitefile: IPs, one Web site per line
#    ww: window width, desired length of accesstimes
webprobe <- function(sitefile,ww) {
   # create shared variables
   cnewdsm("accesstimes","dsmv","double",rep(0,ww))
   cnewdsm("naccesstimes","dsmv","double",0)
   barr()  # Rdsm barrier
   # last thread is intended simply to provide access to humans, who
   # can do analyses on the data, typing commands, so have it exit this
   # function and return to the R command prompt
   # built-in R list myinfo has components to give thread ID number and
   # overall number of threads
   if (myinfo$myid == myinfo$nclnt) {
      print("back to R now")
      return()
   } else {  # the other processes continually probe the Web:
      sites <- scan(sitefile,what="")  # read from URL file
      nsites <- length(sites)
      repeat {
         # choose random site to probe
         site <- sites[sample(1:nsites,1)]
         # now probe it, recording the access time
         acc <- system.time(system(paste("wget --spider -q",site)))[3]
         # add to accesstimes, in sliding-window fashion
         lock("acclock")
         if (naccesstimes[1] < ww) {
            naccesstimes[1] <- naccesstimes[1] + 1
            accesstimes[naccesstimes[1]] <- acc
         } else {
            # out with the oldest, in with the newest
            newvec <- c(accesstimes[-1],acc)
            accesstimes[] <- newvec
         }
         unlock("acclock")
      }
   }
}
\end{lstlisting}

\subsection{The bigmemory Package}
\label{bigmemory}

Jay Emerson and Mike Kane developed the {\bf bigmemory} package when I
was developing {\bf Rdsm}; neither of us knew about the other.

The {\bf bigmemory} package is not intended to provide a threads
environment.  Instead, it is used to deal with a hard limit R has:
No R object can be larger than $2^{31}-1$ bytes.  This holds even if you
have a 64-bit machine with lots of RAM.  The {\bf bigmemory} package
solves the problem on a multicore machine, by making use of operating
system calls to set up shared memory between processes.\footnote{It can
also be used on distributed systems, by exploiting OS services to map
memory to files.}  

In principle, {\bf bigmemory} could be used for threading, but the
package includes no infrastructure for this.  However, one can use {\bf
Rdsm} in conjunction with {\bf bigmemory}, an advantage since the latter
is very efficient.  

Using {\bf bigmemory} variables in {\bf Rdsm} is quite simple:  Instead
of calling {\bf cnewdsm()} to create a shared variable, call {\bf newbm()}.

\section{R with GPUs}

The blinding speed of GPUs (for certain problems) is sure to of interest
to more and more R users in the coming years.  

As of today, the main vehicle for writing GPU code is CUDA, on NVIDIA
graphics cards.  CUDA is a slight extension of C.

You may need to write your own CUDA code, in which case you need to use
the methods of Section \ref{cfromr}.  But in many cases you can get what
you need in ready-made form, via the two main packages for GPU programming
with R, {\bf gputools} and {\bf rgpu}.  Both deal mainly with linear
algebra operations.  The remainder of this section will deal with these
packages.

\subsection{Installation}
\label{gpuinstall}

Note that, due to issues involving linking to the CUDA libraries, in
the cases of these two packages, you probably will {\it not} be able to
install them by merely calling {\bf install.packages()}.   The
alternative I recommend works as follows:

\begin{itemize}

\item Download the package in {\bf .tar.gz} form.

\item Unpack the package, producing a directory that we'll call {\bf x}.

\item Let's say you wish to install to {\bf /a/b/c}.

\item Modify some files within {\bf x}.

\item Then run

\begin{lstlisting}
R CMD INSTALL -l /a/b/c x
\end{lstlisting}

\end{itemize}

Details will be shown in the following sections.

\subsection{The gputools Package}

In installing {\bf gputools}, I downloaded the source from the CRAN R
repository site, and unpacked as above.  I then removed the subcommand

\begin{verbatim}
-gencode arch=compute_20,code=sm_20
\end{verbatim}

from the file {\bf Makefile.in} in the {\bf src} directory.  I also
made sure that my shell startup file included my CUDA executable and
library paths, {\bf /usr/local/cuda/bin} and {\bf /usr/local/cuda/lib}.

I then ran {\bf R CMD INSTALL} as above.  I tested it by trying 
{\bf gpuLm.fit()}, the {\bf gputools} version of R's regular {\bf
lm.fit()}.

The package offers various linear algebra routines, such as matrix
multiplication, solution of Ax = b (and thus matrix inversion), and
singular value decomposition, as well as some computation-intensive
operations such as linear/generalize linear model estimation and
hierarchical clustering.

Here for instance is how to find the square of a matrix {\bf m}:

\begin{lstlisting}
> m2 <- gpuMatMult(m,m)
\end{lstlisting}

The {\bf gpuSolve()} function works like the R {\bf solve()}.  The call
\lstinline{gpuSolve(a,b)} will solve the linear system ax = b, for a
square matrix {\bf a} and vector {\bf b}.  If the second argument is
missing, then $a^{-1}$ will be returned.

\subsection{The rgpu Package}
\label{rgpu}

In installing {\bf rgpu}, I downloaded the source code from
\url{https://gforge.nbic.nl/frs/?group_id=38} and unpacked as above.  
I then changed the file {\bf Makefile}, with the modified lines being

\begin{lstlisting}[numbers=left]
LIBS = -L/usr/lib/nvidia -lcuda -lcudart -lcublas
CUDA_INC_PATH ?= /home/matloff/NVIDIA_GPU_Computing_SDK/C/common/inc
R_INC_PATH ?= /usr/include/R
\end{lstlisting}

The first line was needed to pick up {\bf -lcuda}, as with {\bf
gputools}.  The second line was needed to acquire the file {\bf cutil.h}
in the NVIDIA SDK, which I had installed earlier at the location see
above.

For the third line, I made a file {\bf z.c} consisting solely of the
line

\begin{lstlisting}
#include <R.h>
\end{lstlisting}

and ran

\begin{lstlisting}
R CMD SHLIB z.c
\end{lstlisting}

just to see whether the R include file was.

As of May 2010, the routines in {\bf rgpu} are much less extensive than
those of {\bf gputools}.  However, one very nice feature of {\bf rgpu} 
is that one can compute matrix expressions without bringing intermediate
results back from the device memory to the host memory, which would be a
big slowdown.  Here for instance is how to compute the square of the
matrix {\bf m}, plus itself:

\begin{lstlisting}
> m2m <- evalgpu(m %*% m + m)
\end{lstlisting}

\section{Parallelism Via Calling C from R}
\label{cfromr}

Parallel R aims to be faster than ordinary R.  But even if that aim is
achieved, it's still R, and thus potentially slow.

One must always decide how much effort one is willing to devote to
optimization.  For the fastest code, we should not write in C, but
rather in assembly language.  Similarly, one must decide whether to
stick purely to R, or go to the faster C.  If parallel R gives you the
speed you need in your application, fine; if not, though, you should
consider writing part of your application in C, with the main part still
written in R.  You may find that placing the parallelism in the C
portion of your code is good enough, while retaining the convenience of
R for the rest of your code.

\subsection{Calling C from R}

In C, two-dimensional arrays are stored in row-major order, in contrast
to R's column-major order.   For instance, if we have a 3x4 array, the
element in the second row and second column is element number 5 of the
array when viewed linearly, since there are three elements in the first
column and this is the second element in the second column.  Of course,
keep in mind that C subscripts begin at 0, rather than at 1 as with R.
In writing your C code to be interfaced to R, you must keep these issues
in mind.

All the arguments passed from R to C are received by C as pointers.
Note that the C function itself must return {\tt void}.  Values that we
would ordinarily return must in the R/C context be communicated through the
function's arguments, such as {\tt result} in our example below.

\subsection{Example:  Extracting Subdiagonals of a Matrix}

As an example, here is C code to extract subdiagonals from a square
matrix.\footnote{I wish to thank my former graduate assistant, Min-Yu
Huang, who wrote an earlier version of this function.} The code is in a
file {\bf sd.c}:

\begin{lstlisting}[numbers=left]
#include <R.h>  // required

// arguments:
//    m:  a square matrix
//    n:  number of rows/columns of m
//    k:  the subdiagonal index--0 for main diagonal, 1 for first
//        subdiagonal, 2 for the second, etc.
//    result:  space for the requested subdiagonal, returned here

void subdiag(double *m, int *n, int *k, double *result)
{
  int nval = *n, kval = *k;
  int stride = nval + 1;
  for (int i = 0, j = kval; i < nval-kval; ++i, j+= stride)
     result[i] = m[j];
}
\end{lstlisting}

For convenience, you can compile this by rubnning R in a terminal
window, which will invoke GCC:

\begin{lstlisting}[numbers=left]
% R CMD SHLIB sd.c
gcc -std=gnu99 -I/usr/share/R/include      -fpic  -g -O2 -c sd.c -o sd.o
gcc -std=gnu99 -shared  -o sd.so sd.o   -L/usr/lib/R/lib -lR
\end{lstlisting}

Note that here R showed us exactly what it did in invoking GCC.  This
allows us to do some customization.

But note that this simply produced a dynamic library, {\bf sd.o}, not an
executable program.  (On Windows this would presumably be a {\bf .dll}
file.)  So, how is it executed?  The answer is that it is loaded into R,
using R's {\bf dyn.load()} function.  Here is an example:

\begin{lstlisting}[numbers=left]
> dyn.load("sd.so")
> m <- rbind(1:5, 6:10, 11:15, 16:20, 21:25)
> k <- 2
> .C("subdiag", as.double(m), as.integer(dim(m)[1]), as.integer(k),
result=double(dim(m)[1]-k))
[[1]]
 [1]  1  6 11 16 21  2  7 12 17 22  3  8 13 18 23  4  9 14 19 24  5 10 15 20 25

[[2]]
[1] 5

[[3]]
[1] 2

$result
[1] 11 17 23
\end{lstlisting}

Note that we needed to allocate space for {\tt result} in our call, in a
variable we've named {\tt result}.  The value placed in there by our 
function is seen above to be correct.

\subsection{Calling C OpenMP Code from R}

Since OpenMP is usable from C, that makes it in turn usable from R.
(See Chapter \ref{chap:omp} for a detailed discussion of OpenMP.)

The code is compiled and then loaded into R as in Section
\ref{cfromr}, though with the additional step of specifying the
{\tt -fopenmp} command-line option in both invocations of GCC 
(which you run by hand, instead of using {\bf R CMD SHLIB}).

\subsection{Calling CUDA Code from R}

The same principles apply here, but one does have to be careful with
libraries and the like.

As before, we want to compile not to an executable file, but to a dynamic
library file.  Here's how, for the C file {\bf mutlinksforr.cu}
presented in the next section, the compile command is

\begin{lstlisting}
pc41:~% nvcc -g -G -I/usr/local/cuda/include -Xcompiler 
   "-I/usr/include/R -fpic" -c mutlinksforr.cu -o mutlinks.o -arch=sm_11
pc41:~% nvcc -shared -Xlinker "-L/usr/lib/R/lib -lR" 
   -L/usr/local/cuda/lib mutlinks.o -o meanlinks.so
\end{lstlisting}

The product of this was {\bf meanlinks.so}.  I then tested it on R:

\begin{lstlisting}
> dyn.load("meanlinks.so")
> m <- rbind(c(0,1,1,1),c(1,0,0,1),c(1,0,0,1),c(1,1,1,0))
> ma <- rbind(c(0,1,0),c(1,0,0),c(1,0,0))
> .C("meanout",as.integer(m),as.integer(4),mo=double(1))
[[1]]
 [1] 0 1 1 1 1 0 0 1 1 0 0 1 1 1 1 0

[[2]]
[1] 4

$mo
[1] 1.333333

> .C("meanout",as.integer(ma),as.integer(3),mo=double(1))
[[1]]
[1] 0 1 1 1 0 0 0 0 0

[[2]]
[1] 3

$mo
[1] 0.3333333
\end{lstlisting}

\subsection{Example:  Mutual Outlinks}

We again take as our example the mutual-outlinks example from Section
\ref{mutlinks}.  Here is an R/CUDA version:

\begin{lstlisting}[numbers=left]
// CUDA example:  finds mean number of mutual outlinks, among all pairs
// of Web sites in our set

#include <cuda.h>
#include <stdio.h>

// the following is needed to avoid variable name mangling
extern "C" void meanout(int *hm, int *nrc, double *meanmut);

// for a given thread number tn, calculates pair, the (i,j) to be
// processed by that thread; for nxn matrix
__device__ void findpair(int tn, int n, int *pair)
{  int sum=0,oldsum=0,i;
   for(i=0; ;i++) {
      sum += n - i - 1;
      if (tn <= sum-1) {
         pair[0] = i;
         pair[1] = tn - oldsum + i + 1;
         return;
      }
      oldsum = sum;
   }
}

// proc1pair() processes one pair of Web sites, i.e. one pair of rows in
// the nxn adjacency matrix m; the number of mutual outlinks is added to
// tot
__global__ void proc1pair(int *m, int *tot, int n)
{
   // find (i,j) pair to assess for mutuality
   int pair[2];
   findpair(threadIdx.x,n,pair);
   int sum=0; 
   // make sure to account for R being column-major order; R's i-th row
   // is our i-th column here
   int startrowa = pair[0],
       startrowb = pair[1];
   for (int k = 0; k < n; k++) 
      sum += m[startrowa + n*k] * m[startrowb + n*k];
   atomicAdd(tot,sum);
}

// meanout() is called from R
// hm points to the link matrix, nrc to the matrix size, meanmut to the output
void meanout(int *hm, int *nrc, double *meanmut)
{
    int n = *nrc,msize=n*n*sizeof(int);
    int *dm, // device matrix
        htot, // host grand total
        *dtot; // device grand total
    cudaMalloc((void **)&dm,msize);
    cudaMemcpy(dm,hm,msize,cudaMemcpyHostToDevice);
    htot = 0;
    cudaMalloc((void **)&dtot,sizeof(int));
    cudaMemcpy(dtot,&htot,sizeof(int),cudaMemcpyHostToDevice);
    dim3 dimGrid(1,1);
    int npairs = n*(n-1)/2;
    dim3 dimBlock(npairs,1,1);
    proc1pair<<<dimGrid,dimBlock>>>(dm,dtot,n);
    cudaThreadSynchronize();
    cudaMemcpy(&htot,dtot,sizeof(int),cudaMemcpyDeviceToHost);
    *meanmut = htot/double(npairs);
    cudaFree(dm);
    cudaFree(dtot);
}

\end{lstlisting}

The code is hardly optimal.  We should, for instance, have more than one
thread per block.

\section{Debugging R Applications}

The built-in debugging facilities in R are primitive, but alternatives
are available.

\subsection{Text Editors}  

However, if you are a Vim editor fan, I've developed a tool that greatly
enhances the power of R's debugger.  Download {\bf edtdbg} from R's CRAN
repository.  It's also available for Emacs.

Vitalie Spinu's {\tt ess-tracebug} runs under Emacs.  It was modeled
roughly on {\tt edtdbg}, but has more Emacs-specific features
than does {\tt edtdbg}.

\subsection{IDEs}

I'm personally not a fan of IDEs, but some excellent ones are available.

REvolution Analytics, a firm that offers R consulting and develops
souped-up versions of R, offers an IDE for R that includes nice
debugging facilities.  It is only available on Windows, and even then
only for those who have Microsoft Visual Studio.

The developers of StatET, a platform-independent Eclipse-based IDE for
R added a debugging tool in May 2011.

The people developing RStudio, another  a platform-independent IDE for
R, also plan to begin work on a debugger, beginning summer 2011.

\subsection{The Problem of Lack of a Terminal}

Parallel R packages such as {\bf Rmpi}, {\bf snow}, {\bf foreach} and so
on do not set up a terminal for each process, thus making it impossible
to use R's debugger on the workers.  What then can one do to debug apps
for those packages?  Let's consider {\bf snow} for concreteness.

First, one should debug the underlying single-worker function, such as {\bf
mtl()} in our mutual outlinks example in Section \ref{rmutlinks}.  Here
one would set up some artificial values of the arguments, and then use
R's ordinary debugging facilities.

This may be sufficient.  However, the bug may be in the arguments
themselves, or in the way we set them up.  Then things get more
difficult.  It's hard to even print out trace information, e.g. values
of variables, since {\bf print()} won't work in the worker processes.
The {\bf message()} function may work for some of these packages; if
not, you may have to resort to using {\bf cat()} to write to a file.  

{\bf Rdsm} allows full debugging, as there is a separate terminal window
for each process.

\subsection{Debugging C Called from R}

For parallel R that is implemented via R calls to C code, producing a
dynamically-loaded library as in Section \ref{cfromr}, debugging is a
little more involved.  First start R under GDB, then load the library to
be debugged.  At this point, R's interpreter will be looping,
anticipating reading an R command from you.  Break the loop by hitting
ctrl-c, which will put you back into {\it GDB's} interpreter.  Then set
a breakpoint at the C function you want to debug, say {\bf subdiag()} in
our example above.  Finally, tell GDB to continue, and it will then stop
in your function!  Here's how your session will look:

\begin{lstlisting}[numbers=left]
$ R -d gdb
GNU gdb 6.8-debian
...
(gdb) run
Starting program: /usr/lib/R/bin/exec/R
...
> dyn.load("sd.so")
\end{lstlisting}

\section{Other R Examples in This Book}

See these examples (some nonparallel):

in Sections \ref{ompjacobi}, \ref{onedimdft} (nonparallel)
and \ref{smoothing} (nonparallel).

\begin{itemize}

\item Parallel Jacobi iteration of linear equations, Section \ref{ompjacobi}.

\item Matrix computation of 1-D FFT, Section \ref{onedimdft} (can
paralleliza using parallel matrix multiplication). 

\item Parallel computation of 2-D FFT, Section \ref{rfft}.

\item Image smoothing, Section \ref{smoothing}.

\end{itemize}




