\chapter{Introduction to Parallel Processing}
\label{chap:intro} 

Parallel machines provide a wonderful opportunity for applications with
large computational requirements.  Effective use of these machines,
though, requires a keen understanding of how they work.  This chapter
provides an overview.

\section{Overview:  Why Use Parallel Systems?}

\subsection{Execution Speed}

There is an ever-increasing appetite among some types of computer users
for faster and faster machines. This was epitomized in a statement by
the late Steve Jobs, founder/CEO of Apple and Pixar. He noted that when
he was at Apple in the 1980s, he was always worried that some other
company would come out with a faster machine than his. But now at Pixar,
whose graphics work requires extremely fast computers, he is always
\underbar{hoping} someone produces faster machines, so that he can use
them! 

A major source of speedup is the parallelizing of operations. Parallel
operations can be either within-processor, such as with pipelining or
having several ALUs within a processor, or between-processor, in which
many processor work on different parts of a problem in parallel. Our
focus here is on between-processor operations.

For example, the Registrar's Office at UC Davis uses shared-memory
multiprocessors for processing its on-line registration work.  Online
registration involves an enormous amount of database computation.  In
order to handle this computation reasonably quickly, the program
partitions the work to be done, assigning different portions of the
database to different processors.  The database field has contributed
greatly to the commercial success of large shared-memory machines.

As the Pixar example shows, highly computation-intensive applications
like computer graphics also have a need for these fast parallel
computers. No one wants to wait hours just to generate a single image,
and the use of parallel processing machines can speed things up
considerably. For example, consider \textbf{ray tracing} operations.
Here our code follows the path of a ray of light in a scene, accounting
for reflection and absorbtion of the light by various objects. Suppose
the image is to consist of 1,000 rows of pixels, with 1,000 pixels per
row.  In order to attack this problem in a parallel processing manner
with, say, 25 processors, we could divide the image into 25 squares of
size 200x200, and have each processor do the computations for its
square.

Note, though, that it may be much more challenging than this implies.
First of all, the computation will need some communication between the
processors, which hinders performance if it is not done carefully.
Second, if one really wants good speedup, one may need to take into
account the fact that some squares require more computation work than
others.  More on this below.

\subsection{Memory}

Yes, execution speed is the reason that comes to most people's minds
when the subject of parallel processing comes up.  But in many
applications, an equally important consideration is memory capacity.
Parallel processing application often tend to use huge amounts of
memory, and in many cases the amount of memory needed is more than can
fit on one machine.  If we have many machines working together,
especially in the message-passing settings described below, we can
accommodate the large memory needs.

\subsection{Distributed Processing}

In the above two subsections we've hit the two famous issues in computer
science---time (speed) and space (memory capacity).  But there is a
third reason to do parallel processing, which actually has its own name,
{\bf distributed processing}.  In a distributed database, for instance,
parts of the database may be physically located in widely dispersed
sites.  If most transactions at a particular site arise locally, then
we would make more efficient use of the netowrk, and so on.

\subsection{Our Focus Here}

In this book, the primary emphasis is on processing speed.

\section{Parallel Processing Hardware}

This is not a hardware course, but since the goal of using parallel
hardware is speed, the efficiency of our code is a major issue.  That in
turn means that we need a good understanding of the underlying hardware
that we are programming.  In this section, we give an overview of
parallel hardware.

\subsection{Shared-Memory Systems}

\subsubsection{Basic Architecture}

Here many CPUs share the same physical memory.  This kind of
architecture is sometimes called MIMD, standing for Multiple Instruction
(different CPUs are working independently, and thus typically are
executing different instructions at any given instant), Multiple Data
(different CPUs are generally accessing different memory locations at
any given time).

Until recently, shared-memory systems cost hundreds of thousands of
dollars and were affordable only by large companies, such as in the
insurance and banking industries.  The high-end machines are indeed
still quite expensive, but now {\bf dual-core} machines, in which two
CPUs share a common memory, are commonplace in the home.

\subsubsection{SMP Systems}

A Symmetric Multiprocessor (SMP) system has the following structure:

\includegraphics{Images/UMABus.jpg} 

Here and below:

\begin{itemize}

\item The Ps are processors, e.g. off-the-shelf chips such as Pentiums.

\item The Ms are \textbf{memory modules}. These are physically separate
objects, e.g. separate boards of memory chips.  It is typical that there
will be the same number of memory modules as processors.  In the
shared-memory case, the memory modules collectively form the entire
shared address space, but with the addresses being assigned to the
memory modules in one of two ways:

\begin{itemize}

\item (a)

High-order interleaving. Here consecutive addresses are in the
\underbar{same} M (except at boundaries). For example, suppose for
simplicity that our memory consists of addresses 0 through 1023, and
that there are four Ms.  Then M0 would contain addresses 0-255, M1 would
have 256-511, M2 would have 512-767, and M3 would have
768-1023.

We need 10 bits for addresses (since $1024 = 2^{10}$).  The two
most-significant bits would be used to select the module number (since
$4 = 2^2$); hence the term {\it high-order} in the name of this design.
The remaining eight bits are used to select the word within a module.

\item (b)

Low-order interleaving. Here consecutive addresses are in consecutive
memory modules (except when we get to the right end). In the example
above, if we used low-order interleaving, then address 0 would be in M0,
1 would be in M1, 2 would be in M2, 3 would be in M3, 4 would be back in
M0, 5 in M1, and so on.

Here the two least-significant bits are used to determine the module
number.

\end{itemize}

\item To make sure only one processor uses the bus at a time, standard bus
arbitration signals and/or arbitration devices are used. 

\item There may also be \textbf{coherent caches}, which we will discuss later. 

\end{itemize}

\subsection{Message-Passing Systems}

\subsubsection{Basic Architecture}

Here we have a number of independent CPUs, each with its own independent
memory.  The various processors communicate with each other via networks
of some kind.

\subsubsection{Example:  Networks of Workstations (NOWs)}

Large shared-memory multiprocessor systems are still very expensive.  A
major alternative today is networks of workstations (NOWs).  Here one
purchases a set of commodity PCs and networks them for use as parallel
processing systems. The PCs are of course individual machines, capable
of the usual uniprocessor (or now multiprocessor) applications, but by
networking them together and using parallel-processing software
environments, we can form very powerful parallel systems.

The networking does result in a significant loss of performance. This
will be discussed in Chapter \ref{chap:msg}. But even without these techniques,
the price/performance ratio in NOW is much superior in many applications
to that of shared-memory hardware.

One factor which can be key to the success of a NOW is the use of a fast
network, fast both in terms of hardware and network protocol.  Ordinary
Ethernet and TCP/IP are fine for the applications envisioned by the
original designers of the Internet, e.g. e-mail and file transfer, but
is slow in the NOW context.  A good network for a NOW is, for instance,
Infiniband.  

NOWs have become so popular that there are now ``recipes'' on how to
build them for the specific purpose of parallel processing.  The term
{\bf Beowulf} come to mean a cluster of PCs, usually with a fast network
connecting them, used for parallel processing.  Software packages such
as ROCKS (\url{http://www.rocksclusters.org/wordpress/}) have been
developed to make it easy to set up and administer such systems.

\subsection{SIMD}

In contrast to MIMD systems, processors in SIMD---Single Instruction,
Multiple Data---systems execute in lockstep.  At any given time, all
processors are executing the same machine instruction on different data.

Some famous SIMD systems in computer history include the ILLIAC and
Thinking Machines Corporation's CM-1 and CM-2.  Also, DSP (``digital
signal processing'') chips tend to have an SIMD architecture.

But today the most prominent example of SIMD is that of GPUs---graphics
processing units.  In addition to powering your PC's video cards, GPUs
can now be used for general-purpose computation.  The architecture is
fundamentally shared-memory, but the individual processors do execute in
lockstep, SIMD-fashion.

\section{Programmer World Views}

\subsection{Example:  Matrix-Vector Multiply}
\label{matvec}

To explain the paradigms, we will use the term \textbf{nodes}, where
roughly speaking one node corresponds to one processor, and use the
following example:

\begin{quote}

Suppose we wish to multiply an nx1 vector X by an nxn matrix A, putting
the product in an nx1 vector Y, and we have p processors to share the
work. 

\end{quote}

In all the forms of parallelism, each node would be assigned some of the
rows of A, and would multiply X by them, thus forming part of Y.

Note that in typical applications, the matrix A would be very large, say
thousands of rows and thousands of columns.  Otherwise the computation
could be done quite satisfactorily in a {\bf sequential}, i.e.
nonparallel manner, making parallel processing unnecessary..

\subsection{Shared-Memory}

\subsubsection{Programmer View}
\label{sharedview}

In implementing the matrix-vector multiply example of Section
\ref{matvec} in the shared-memory paradigm, the arrays for A, X and Y
would be held in common by all nodes. If for instance node 2 were to
execute

\begin{Verbatim}[fontsize=\relsize{-2}]
   Y[3] = 12;
\end{Verbatim}

and then node 15 were to subsequently execute

\begin{Verbatim}[fontsize=\relsize{-2}]
   print("%d\n",Y[3]);
\end{Verbatim}

then the outputted value from the latter would be 12.

Computation of the matrix-vector product AX would then involve the nodes
somehow deciding which nodes will handle which rows of A.  Each node
would then multiply its assigned rows of A times X, and place the result
directly in the proper section of Y.


Today, programming on shared-memory multiprocessors is typically done
via \textbf{threading}.  (Or, as we will see in other chapters, by
higher-level code that runs threads underneath.) A \textbf{thread} is
similar to a \textbf{process} in an operating system (OS), but with much
less overhead.  Threaded applications have become quite popular in even
uniprocessor systems, and Unix,\footnote{Here and below, the term {\it
Unix} includes Linux.} Windows, Python, Java and Perl all support
threaded programming.  

In the typical implementation, a thread is a special case of an OS
process.  One important difference is that the various threads of a
program share memory.  (One can arrange for processes to share memory
too in some OSs, but they don't do so by default.)

On a uniprocessor system, the threads of a program take turns executing,
so that there is only an illusion of parallelism.  But on a
multiprocessor system, one can genuinely have threads running in
parallel.  Again, though, they must still take turns with other
processes running on the machine.  Whenever a processor becomes
available, the OS will assign some ready thread to it.  So, among other
things, this says that a thread might actually run on different
processors during different turns.

\label{needknowos}
{\bf Important note:}  Effective use of threads requires a basic
understanding of how processes take turns executing.  See Section
\ref{processes} in the appendix of this book for this material.

One of the most popular threads systems is Pthreads, whose name is short
for POSIX threads.  POSIX is a Unix standard, and the Pthreads system
was designed to standardize threads programming on Unix.  It has since
been ported to other platforms.

\subsubsection{Example: Pthreads Prime Numbers Finder}
\label{pthreadsprime}

Following is an example of Pthreads programming, in which we determine
the number of prime numbers in a certain range.  Read the comments at
the top of the file for details; the threads operations will be
explained presently.

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left] 
// PrimesThreads.c

// threads-based program to find the number of primes between 2 and n;
// uses the Sieve of Eratosthenes, deleting all multiples of 2, all
// multiples of 3, all multiples of 5, etc. 

// for illustration purposes only; NOT claimed to be efficient

// Unix compilation:  gcc -g -o primesthreads PrimesThreads.c -lpthread -lm

// usage:  primesthreads n num_threads

#include <stdio.h>
#include <math.h>
#include <pthread.h>  // required for threads usage

#define MAX_N 100000000
#define MAX_THREADS 25

// shared variables
int nthreads,  // number of threads (not counting main())
    n,  // range to check for primeness
    prime[MAX_N+1],  // in the end, prime[i] = 1 if i prime, else 0
    nextbase;  // next sieve multiplier to be used
// lock for the shared variable nextbase
pthread_mutex_t nextbaselock = PTHREAD_MUTEX_INITIALIZER;
// ID structs for the threads
pthread_t id[MAX_THREADS];

// "crosses out" all odd multiples of k
void crossout(int k)
{  int i;
   for (i = 3; i*k <= n; i += 2)  {
      prime[i*k] = 0;
   }
}

// each thread runs this routine
void *worker(int tn)  // tn is the thread number (0,1,...)
{  int lim,base,
       work = 0;  // amount of work done by this thread
   // no need to check multipliers bigger than sqrt(n)
   lim = sqrt(n);
   do  {
      // get next sieve multiplier, avoiding duplication across threads
      // lock the lock
      pthread_mutex_lock(&nextbaselock);
      base = nextbase;
      nextbase += 2;
      // unlock
      pthread_mutex_unlock(&nextbaselock);
      if (base <= lim)  {
         // don't bother crossing out if base known composite
         if (prime[base])  {
            crossout(base);
            work++;  // log work done by this thread
         }
      }
      else return work; 
   } while (1);
}

main(int argc, char **argv)
{  int nprimes,  // number of primes found 
       i,work;
   n = atoi(argv[1]);
   nthreads = atoi(argv[2]);
   // mark all even numbers nonprime, and the rest "prime until
   // shown otherwise"
   for (i = 3; i <= n; i++)  {
      if (i%2 == 0) prime[i] = 0;
      else prime[i] = 1;
   }
   nextbase = 3;
   // get threads started
   for (i = 0; i < nthreads; i++)  {
      // this call says create a thread, record its ID in the array
      // id, and get the thread started executing the function worker(), 
      // passing the argument i to that function
      pthread_create(&id[i],NULL,worker,i);
   }

   // wait for all done
   for (i = 0; i < nthreads; i++)  {
      // this call says wait until thread number id[i] finishes
      // execution, and to assign the return value of that thread to our
      // local variable work here
      pthread_join(id[i],&work);
      printf("%d values of base done\n",work);
   }
   
   // report results
   nprimes = 1;
   for (i = 3; i <= n; i++)
      if (prime[i])  {
         nprimes++;
      }
   printf("the number of primes found was %d\n",nprimes);

}
\end{Verbatim}

To make our discussion concrete, suppose we are running this program
with two threads.  Suppose also the both threads are running
simultaneously most of the time.  This will occur if they aren't
competing for turns with other big threads, say if there are no other
big threads, or more generally if the number of other big threads is
less than or equal to the number of processors minus two.  (Actually,
the original thread is {\bf main()}, but it lies dormant most of the
time, as you'll see.)

Note the global variables:

\begin{Verbatim}[fontsize=\relsize{-2}]
int nthreads,  // number of threads (not counting main())
    n,  // range to check for primeness
    prime[MAX_N+1],  // in the end, prime[i] = 1 if i prime, else 0
    nextbase;  // next sieve multiplier to be used
pthread_mutex_t nextbaselock = PTHREAD_MUTEX_INITIALIZER;
pthread_t id[MAX_THREADS];
\end{Verbatim}

This will require some adjustment for those who've been taught that
global variables are ``evil.''  All communication between threads is via
global variables,\footnote{Or more accurately, nonlocal variables.  They
at least will be higher in the stack than the function currently being
executed.} so if they are evil, they are a necessary evil.  Personally I
think the stern admonitions against global variables are overblown
anyway.  See \url{http://heather.cs.ucdavis.edu/~matloff/globals.html},
and in the shared-memory context,
\url{}http://software.intel.com/en-us/articles/global-variable-reconsidered/?wapkw=%28parallelism%29}.

As mentioned earlier, the globals are shared by all
processors.\footnote{Technically, we should say ``shared by all
threads'' here, as a given thread does not always execute on the same
processor, but at any instant in time each executing thread is at some
processor, so the statement is all right.} If one processor, for
instance, assigns the value 0 to {\bf prime{[}35{]}} in the function
{\bf crossout()}, then that variable will have the value 0 when accessed
by any of the other processors as well.  On the other hand, local
variables have different values at each processor; for instance, the
variable {\bf i} in that function has a different value at each
processor.

Note that in the statement

\begin{Verbatim}[fontsize=\relsize{-2}]
pthread_mutex_t nextbaselock = PTHREAD_MUTEX_INITIALIZER;
\end{Verbatim}

the right-hand side is not a constant.  It is a macro call, and is thus
something which is executed.

In the code

\begin{Verbatim}[fontsize=\relsize{-2}]
pthread_mutex_lock(&nextbaselock);
base = nextbase
nextbase += 2
pthread_mutex_unlock(&nextbaselock);
\end{Verbatim}

we see a \textbf{critical section} operation which is
typical in shared-memory programming.  In this context here, it means
that we cannot allow more than one thread to execute

\begin{Verbatim}[fontsize=\relsize{-2}]
base = nextbase;
nextbase += 2;
\end{Verbatim}

at the same time.  A common term used for this is that we wish the
actions in the critical section to collectively be {\bf atomic}, meaning
not divisible among threads.  The calls to {\bf pthread\_mutex\_lock()}
and {\bf pthread\_mutex\_unlock()} ensure this.  If thread A is
currently executing inside the critical section and thread B tries to
lock the lock by calling {\bf pthread\_mutex\_lock()}, the call will
block until thread B executes {\bf pthread\_mutex\_unlock()}.

Here is why this is so important:  Say currently {\bf nextbase} has the
value 11.  What we want to happen is that the next thread to read {\bf
nextbase} will ``cross out'' all multiples of 11.  But if we allow
two threads to execute the critical section at the same time, the
following may occur:

\begin{itemize}

\item thread A reads {\bf nextbase}, setting its value of {\bf base} to
11

\item thread B reads {\bf nextbase}, setting its value of {\bf base} to
11

\item thread A adds 2 to {\bf nextbase}, so that {\bf nextbase} becomes
13

\item thread B adds 2 to {\bf nextbase}, so that {\bf nextbase} becomes
15

\end{itemize}

Two problems would then occur:

\begin{itemize}

\item Both threads would do ``crossing out'' of multiples of 11, 
duplicating work and thus slowing down execution speed.

\item We will never ``cross out'' multiples of 13.

\end{itemize}

Thus the lock is crucial to the correct (and speedy) execution of the
program.

Note that these problems could occur either on a uniprocessor or
multiprocessor system.  In the uniprocessor case, thread A's turn might
end right after it reads {\bf nextbase}, followed by a turn by B which
executes that same instruction.  In the multiprocessor case, A and B
could literally be running simultaneously, but still with the action by
B coming an instant after A.

This problem frequently arises in parallel database systems.  For
instance, consider an airline reservation system. If a flight has only
one seat left, we want to avoid giving it to two different customers who
might be talking to two agents at the same time. The lines of code in
which the seat is finally assigned (the \textbf{commit} phase, in
database terminology) is then a critical section.

A critical section is always a potential bottlement in a parallel
program, because its code is serial instead of parallel.  In our program
here, we may get better performance by having each thread work on, say,
five values of {\bf nextbase} at a time.  Our line

\begin{Verbatim}[fontsize=\relsize{-2}]
nextbase += 2;
\end{Verbatim}

would become

\begin{Verbatim}[fontsize=\relsize{-2}]
nextbase += 10;
\end{Verbatim}

That would mean that any given thread would need to go through the
critical section only one-fifth as often, thus greatly reducing
overhead.  On the other hand, near the end of the run, this may result
in some threads being idle while other threads still have a lot of work
to do.


Note this code.

\begin{Verbatim}[fontsize=\relsize{-2}]
for (i = 0; i < nthreads; i++)  {
   pthread_join(id[i],&work);
   printf("%d values of base done\n",work);
}
\end{Verbatim}

This is a special case of of {\bf barrier}.

A barrier is a point in the code that all threads must reach before
continuing.  In this case, a barrier is needed in order to prevent
premature execution of the later code

\begin{Verbatim}[fontsize=\relsize{-2}]
for (i = 3; i <= n; i++)
   if (prime[i])  {
      nprimes++;
   }
\end{Verbatim}

which would result in possibly wrong output if we start counting primes
before some threads are done.

Actually, we could have used Pthreads' built-in barrier function.  We
need to declare a barrier variable, e.g.

\begin{lstlisting}
pthread_barrier_t barr;
\end{lstlisting}

and then call it like this:

\begin{lstlisting}
pthread_barrier_wait(&barr);
\end{lstlisting}

The {\bf pthread\_join()} function actually causes the given thread to
exit, so that we then ``join'' the thread that created it, i.e. {\bf
main()}.  Thus some may argue that this is not really a true barrier.

Barriers are very common in shared-memory programming, and will be
discussed in more detail in Chapter \ref{chap:shared}. 

\subsubsection{Role of the OS}

Let's again ponder the role of the OS here.  What happens when a thread
tries to lock a lock:

\begin{itemize}

\item The lock call will ultimately cause a system call, causing the OS
to run.

\item The OS maintains the locked/unlocked status of each lock, so it
will check that status.  

\item Say the lock is unlocked (a 0), the OS sets it to locked (a 1), and
the lock call returns.  The thread enters the critical section.

\item When the thread is done, the unlock call unlocks the lock, similar
to the locking actions.

\item If the lock is locked at the time a thread makes a lock call, the
call will block.  The OS will mark this thread as waiting for the lock.
When whatever thread currently using the critical section unlocks the
lock, the OS will relock it and unblock the lock call of the waiting
thread.

\end{itemize}

Note that {\bf main()} is a thread too, the original thread that spawns
the others.  However, it is dormant most of the time, due to its calls
to {\bf pthread\_join()}.

Finally, keep in mind that although the globals variables are shared,
the locals are not.  Recall that local variables are stored on a stack.
Each thread (just like each process in general) has its own stack.  When
a thread begins a turn, the OS prepares for this by pointing the stack
pointer register to this thread's stack.

\subsubsection{Debugging Threads Programs}
\label{debugthreads}

Most debugging tools include facilities for threads.  Here's an overview
of how it works in GDB.

First, as you run a program under GDB, the creation of new threads will
be announced, e.g.

\begin{Verbatim}[fontsize=\relsize{-2}]
(gdb) r 100 2
Starting program: /debug/primes 100 2
[New Thread 16384 (LWP 28653)]
[New Thread 32769 (LWP 28676)]
[New Thread 16386 (LWP 28677)]
[New Thread 32771 (LWP 28678)]
\end{Verbatim}

You can do backtrace ({\bf bt}) etc. as usual.  Here are some
threads-related commands:

\begin{itemize}

\item {\tt info threads} (gives information on all current threads)

\item {\tt thread 3} (change to thread 3)

\item {\tt break 88 thread 3} (stop execution when thread 3 reaches
source line 88)

\item {\tt break 88 thread 3 if x==y} (stop execution when thread 3 reaches
source line 88 and the variables x and y are equal)

\end{itemize}

Of course, many GUI IDEs use GDB internally, and thus provide the above
facilities with a GUI wrapper.  Examples are DDD, Eclipse and NetBeans.

\subsubsection{Higher-Level Threads}

The OpenMP library gives the programmer a higher-level view of
threading.  The threads are there, but rather hidden by higher-level
abstractions.  We will study OpenMP in detail in Chapter \ref{chap:omp},
and use it frequently in the succeeding chapters, but below is an
introductory example.  

\subsubsection{Example:  Sampling Bucket Sort}
\label{ompbsort}

This code implements the sampling bucket sort of Section \ref{bsort}.

\begin{lstlisting}[numbers=left]
// OpenMP introductory example:  sampling bucket sort

// compile:  gcc -fopenmp -o bsort bucketsort.c

// set the number of threads via the environment variable
// OMP_NUM+THREADS, e.g. in the C shell

// setenv OMP_NUM_THREADS 8

#include <omp.h>  // required
#include <stdlib.h>

// needed for call to qsort()
int cmpints(int *u, int *v) 
{  if (*u < *v) return -1;
   if (*u > *v) return 1;
   return 0;
}

// adds xi to the part array, increments npart, the length of part
void grab(int xi, int *part, int *npart)
{
    part[*npart] = xi;
    *npart += 1;
}

// finds the min and max in y, length ny, 
// placing them in miny and maxy
void findminmax(int *y, int ny, int *miny, int *maxy)
{  int i,yi;
   *miny = *maxy = y[0];
   for (i = 1; i < ny; i++) {
      yi = y[i];
      if (yi < *miny) *miny = yi;
      else if (yi > *maxy) *maxy = yi;
   }
}

// sort the array x of length n
void bsort(int *x, int n)
{  // these are local to this function, but shared among the threads
   float *bdries; int *counts;
   #pragma omp parallel  
   // entering this block activates the threads, each executing it
   {  // these are local to each thread:
      int me = omp_get_thread_num();
      int nth = omp_get_num_threads();
      int i,xi,minx,maxx,start;
      int *mypart;
      float increm;
      int SAMPLESIZE;
      // now determine the bucket boundaries; nth - 1 of them, by
      // sampling the array to get an idea of its range 
      #pragma omp single  // only 1 thread does this, implied barrier at end
      {
         if (n > 1000) SAMPLESIZE = 1000;
         else SAMPLESIZE = n / 2;
         findminmax(x,SAMPLESIZE,&minx,&maxx);
         bdries = malloc((nth-1)*sizeof(float));
         increm = (maxx - minx) / (float) nth;
         for (i = 0; i < nth-1; i++) 
            bdries[i] = minx + (i+1) * increm;
         // array to serve as the count of the numbers of elements of x
         // in each bucket
         counts = malloc(nth*sizeof(int));
      }
      // now have this thread grab its portion of the array; thread 0
      // takes everything below bdries[0], thread 1 everything between
      // bdries[0] and bdries[1], etc., with thread nth-1 taking
      // everything over bdries[nth-1]
      mypart = malloc(n*sizeof(int)); int nummypart = 0;
      for (i = 0; i < n; i++) {  
         if (me == 0) {
            if (x[i] <= bdries[0]) grab(x[i],mypart,&nummypart);
         }
         else if (me < nth-1) {
            if (x[i] > bdries[me-1] && x[i] <= bdries[me]) 
               grab(x[i],mypart,&nummypart);
         } else
            if (x[i] > bdries[me-1]) grab(x[i],mypart,&nummypart);
      }
      // now record how many this thread got
      counts[me] = nummypart;
      // sort my part
      qsort(mypart,nummypart,sizeof(int),cmpints);
      #pragma omp barrier  // other threads need to know all of counts
      // copy sorted chunk back to the original array; first find start point
      start = 0;
      for (i = 0; i < me; i++) start += counts[i];
      for (i = 0; i < nummypart; i++) {
         x[start+i] = mypart[i];
      }
   }
   // implied barrier here; main thread won't resume until all threads
   // are done
}

int main(int argc, char **argv)
{  
   // test case
   int n = atoi(argv[1]), *x = malloc(n*sizeof(int));
   int i;
   for (i = 0; i < n; i++) x[i] = rand() % 50;
   if (n < 100) 
      for (i = 0; i < n; i++) printf("%d\n",x[i]);
   bsort(x,n);
   if (n <= 100) {
      printf("x after sorting:\n");
      for (i = 0; i < n; i++) printf("%d\n",x[i]);
   }
}
\end{lstlisting}

Details on OpenMP are presented in Chapter \ref{chap:omp}.  Here is an
overview of a few of the OpenMP constructs available:

\begin{itemize}

\item \lstinline{#pragma omp for}

In our example above, we wrote our own code to assign specific threads to
do specific parts of the work.  An alternative is to write an ordinary
{\bf for} loop that iterates over all the work to be done, and then ask
OpenMP to assign specific iterations to specific threads.  To do this,
insert the above pragam just before the loop.

\item \lstinline{#pragma omp critical}

The block that follows is implemented as a critical section.  OpenMP
sets up the locks etc. for you, alleviating you of work and alleviating
your code of clutter.

\end{itemize}

\subsection{Message Passing}

\subsubsection{Programmer View}
\label{msgview}

Again consider the matrix-vector multiply example of Section
\ref{matvec}.  In contrast to the shared-memory case, in the
message-passing paradigm all nodes would have \underbar{separate} copies
of A, X and Y.  Our example in Section \ref{sharedview} would now
change.  in order for node 2 to send this new value of Y{[}3{]} to node
15, it would have to execute some special function, which would be
something like

\begin{Verbatim}[fontsize=\relsize{-2}]
   send(15,12,"Y[3]");
\end{Verbatim}

and node 15 would have to execute some kind of {\bf receive()} function.

To compute the matrix-vector product, then, would involve the following.
One node, say node 0, would distribute the rows of A to the various
other nodes.  Each node would receive a different set of nodes.  The
vector X would be sent to all nodes.  Each node would then multiply X by
the node's assigned rows of A, and then send the result back to node 0.
The latter would collect those results, and store them in Y.

\subsubsection{Example:  MPI Prime Numbers Finder}
\label{mpiex}

Here we use the MPI system, with our hardware being a NOW.

MPI is a popular public-domain set of interface functions, callable from
C/C++, to do message passing.  We are again counting primes, though in
this case using a \textbf{pipelining} method.  It is similar to hardware
pipelines, but in this case it is done in software, and each ``stage''
in the pipe is a different computer.

The program is self-documenting, via the comments.

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
/* MPI sample program; NOT INTENDED TO BE EFFICIENT as a prime
   finder, either in algorithm or implementation 

   MPI (Message Passing Interface) is a popular package using
   the "message passing" paradigm for communicating between
   processors in parallel applications; as the name implies,
   processors communicate by passing messages using "send" and 
   "receive" functions

   finds and reports the number of primes less than or equal to N

   uses a pipeline approach:  node 0 looks at all the odd numbers (i.e.
   has already done filtering out of multiples of 2) and filters out
   those that are multiples of 3, passing the rest to node 1; node 1
   filters out the multiples of 5, passing the rest to node 2; node 2
   then removes the multiples of 7, and so on; the last node must check
   whatever is left

   note that we should NOT have a node run through all numbers
   before passing them on to the next node, since we would then
   have no parallelism at all; on the other hand, passing on just
   one number at a time isn't efficient either, due to the high
   overhead of sending a message if it is a network (tens of
   microseconds until the first bit reaches the wire, due to
   software delay); thus efficiency would be greatly improved if 
   each node saved up a chunk of numbers before passing them to 
   the next node */

#include <mpi.h>  // mandatory 

#define PIPE_MSG 0  // type of message containing a number to be checked
#define END_MSG 1  // type of message indicating no more data will be coming 

int NNodes,  // number of nodes in computation
    N,  // find all primes from 2 to N 
    Me;  // my node number 
double T1,T2;  // start and finish times 

void Init(int Argc,char **Argv)
{  int DebugWait;  
   N = atoi(Argv[1]); 
   // start debugging section
   DebugWait = atoi(Argv[2]);
   while (DebugWait) ;  // deliberate infinite loop; see below
   /* the above loop is here to synchronize all nodes for debugging;
      if DebugWait is specified as 1 on the mpirun command line, all 
      nodes wait here until the debugging programmer starts GDB at 
      all nodes (via attaching to OS process number), then sets
      some breakpoints, then GDB sets DebugWait to 0 to proceed; */
   // end debugging section
   MPI_Init(&Argc,&Argv);  // mandatory to begin any MPI program 
   // puts the number of nodes in NNodes 
   MPI_Comm_size(MPI_COMM_WORLD,&NNodes);
   // puts the node number of this node in Me 
   MPI_Comm_rank(MPI_COMM_WORLD,&Me); 
   // OK, get started; first record current time in T1 
   if (Me == NNodes-1) T1 = MPI_Wtime();
}

void Node0()
{  int I,ToCheck,Dummy,Error;
   for (I = 1; I <= N/2; I++)  {
      ToCheck = 2 * I + 1;  // latest number to check for div3
      if (ToCheck > N) break;
      if (ToCheck % 3 > 0)  // not divis by 3, so send it down the pipe
         // send the string at ToCheck, consisting of 1 MPI integer, to
         // node 1 among MPI_COMM_WORLD, with a message type PIPE_MSG
         Error = MPI_Send(&ToCheck,1,MPI_INT,1,PIPE_MSG,MPI_COMM_WORLD);
         // error not checked in this code
   }
   // sentinel
   MPI_Send(&Dummy,1,MPI_INT,1,END_MSG,MPI_COMM_WORLD);
}

void NodeBetween()
{  int ToCheck,Dummy,Divisor;
   MPI_Status Status;  
   // first received item gives us our prime divisor
   // receive into Divisor 1 MPI integer from node Me-1, of any message
   // type, and put information about the message in Status
   MPI_Recv(&Divisor,1,MPI_INT,Me-1,MPI_ANY_TAG,MPI_COMM_WORLD,&Status);
   while (1)  {
      MPI_Recv(&ToCheck,1,MPI_INT,Me-1,MPI_ANY_TAG,MPI_COMM_WORLD,&Status);
      // if the message type was END_MSG, end loop
      if (Status.MPI_TAG == END_MSG) break;
      if (ToCheck % Divisor > 0) 
         MPI_Send(&ToCheck,1,MPI_INT,Me+1,PIPE_MSG,MPI_COMM_WORLD);
   }
   MPI_Send(&Dummy,1,MPI_INT,Me+1,END_MSG,MPI_COMM_WORLD);
}

NodeEnd()
{  int ToCheck,PrimeCount,I,IsComposite,StartDivisor;
   MPI_Status Status;  
   MPI_Recv(&StartDivisor,1,MPI_INT,Me-1,MPI_ANY_TAG,MPI_COMM_WORLD,&Status);
   PrimeCount = Me + 2;  /* must account for the previous primes, which
                            won't be detected below */
   while (1)  {
      MPI_Recv(&ToCheck,1,MPI_INT,Me-1,MPI_ANY_TAG,MPI_COMM_WORLD,&Status);
      if (Status.MPI_TAG == END_MSG) break;
      IsComposite = 0;
      for (I = StartDivisor; I*I <= ToCheck; I += 2)
         if (ToCheck % I == 0)  {
            IsComposite = 1;
            break;
         }
      if (!IsComposite) PrimeCount++;  
   }
   /* check the time again, and subtract to find run time */
   T2 = MPI_Wtime();
   printf("elapsed time = %f\n",(float)(T2-T1));
   /* print results */
   printf("number of primes = %d\n",PrimeCount);
}

int main(int argc,char **argv)
{  Init(argc,argv);
   // all nodes run this same program, but different nodes take
   // different actions
   if (Me == 0) Node0();
   else if (Me == NNodes-1) NodeEnd();
        else NodeBetween();
   // mandatory for all MPI programs 
   MPI_Finalize();
}

/* explanation of "number of items" and "status" arguments at the end 
   of MPI_Recv():

   when receiving a message you must anticipate the longest possible
   message, but the actual received message may be much shorter than
   this; you can call the MPI_Get_count() function on the status
   argument to find out how many items were actually received

   the status argument will be a pointer to a struct, containing the
   node number, message type and error status of the received
   message

   say our last parameter is Status; then Status.MPI_SOURCE
   will contain the number of the sending node, and 
   Status.MPI_TAG will contain the message type; these are
   important if used MPI_ANY_SOURCE or MPI_ANY_TAG in our
   node or tag fields but still have to know who sent the
   message or what kind it is */
\end{Verbatim}

The set of machines can be heterogeneous, but MPI ``translates'' for you
automatically.  If say one node has a big-endian CPU and another has a
little-endian CPU, MPI will do the proper conversion.

\subsection{Scatter/Gather}

Technically, the {\bf scatter/gather} programmer world view is a special
case of message passing.  However, it has become so pervasive as to
merit its own section here.

In this paradigm, one node, say node 0, serves as a {\bf manager}, while
the others serve as {\bf workers}.  The manager sends work to the
workers, who process the data and return the results to the manager.
The latter receives the results and combines them into the final
product.

The matrix-vector multiply example in Section \ref{msgview} is an
example of scatter/gather.  

As noted, scatter/gather is very popular.  Here are some examples of
packages that use it:

\begin{itemize}

\item MPI includes scatter and gather functions (Section
\ref{scattergather}).

\item Cloud computing (Section \ref{mapreduce}) is basically a
scatter/gather operation.

\item The {\bf snow} package (Section \ref{snow}) for the  R language is
also a scatter/gather operation.

\end{itemize}


