\chapter{Introduction to MPI}
\label{chap:mpi}

MPI is the {\it de facto} standard for message-passing software.

\section{Overview}

\subsection{History}

Though (small) shared-memory machines have come down radically in price,
to the point at which a dual-core PC is now commonplace in the home,
historically shared-memory machines were available only to the ``very
rich''---large banks, national research labs and so on.  This led to
interest in message-passing machines.  

The first ``affordable'' message-machine type was the Hypercube,
developed by a physics professor at Cal Tech.  It consisted of a number
of {\bf processing elements} (PEs) connected by fast serial I/O cards.
This was in the range of university departmental research labs.  It was
later commercialized by Intel and NCube.

Later, the notion of {\bf networks of workstations} (NOWs) became
popular.  Here the PEs were entirely independent PCs, connected via a
standard network.  This was refined a bit, by the use of more suitable
network hardware and protocols, with the new term being {\bf clusters}.

All of this necessitated the development of standardized software tools
based on a message-passing paradigm.  The first popular such tool was
Parallel Virtual Machine (PVM).  It still has its adherents today, but
has largely been supplanted by the Message Passing Interface (MPI).

MPI itself later became MPI 2.  Our document here is intended mainly for
the original. 

\subsection{Structure and Execution}

MPI is merely a set of Application Programmer Interfaces (APIs), called
from user programs written in C, C++ and other languages.  It has many
implementations, with some being open source and generic, while others
are proprietary and fine-tuned for specific commercial hardware.

Suppose we have written an MPI program {\bf x}, and will run it on four
machines in a cluster.  Each machine will be running its own copy of
{\bf x}.  Official MPI terminology refers to this as four {\bf
processes}.  Now that multicore machines are commonplace, one might
indeed run two or more cooperating MPI processes---where now we use the
term {\it processes} in the real OS sense---on the same multicore
machine.  In this document, we will tend to refer to the various MPI
processes as {\bf nodes}, with an eye to the cluster setting.

Though the nodes are all running the same program, they will likely be
working on different parts of the program's data.  This is called the
Single Program Multiple Data (SPMD) model.  This is the typical
approach, but there could be different programs running on different
nodes.  Most of the APIs involve a node sending information to, or
receiving information from, other nodes.

\subsection{Implementations}

Two of the most popular implementations of MPI are MPICH and LAM.  MPICH
offers more tailoring to various networks and other platforms, while LAM
runs on networks.  Introductions to MPICH and LAM can be found, for
example, at
\url{http://heather.cs.ucdavis.edu/~matloff/MPI/NotesMPICH.NM.html} and
\url{http://heather.cs.ucdavis.edu/~matloff/MPI/NotesLAM.NM.html},
respectively. 

LAM is no longer being developed, and has been replaced by Open MPI (not
to be confused with OpenMP).  Personally, I still prefer the simplicity
of LAM.  It is still being maintained.

\subsection{Performance Issues} 

Mere usage of a parallel language on a parallel platform does not
guarantee a performance improvement over a serial version of your
program.  The central issue here is the overhead involved in internode
communication.

Infiniband, one of the fastest cluster networks commercially available,
has a {\bf latency} of about 1.0-3.0 microseconds, meaning that it takes
the first bit of a packet that long to get from one node on an
Infiniband switch to another.  Comparing that to the nanosecond time
scale of CPU speeds, one can see that the communications overhead can
destroy a program's performance.  And Ethernet is quite a bit slower
than Infiniband.

Latency is quite different from {\bf bandwidth}, which is the number of
bits sent per second.  Say the latency is 1.0 microsecond and the
bandwidth is 1 gigabit, i.e. 1000000000 bits per second or 1000 bits
per microsecond.  Say the message is 2000 bits long.  Then the first bit 
of the message arrives after 1 microsecond, and the last bit arrives
after an additional 2 microseconds.  In other words, the message is does
not arrive fully at the destination until 3 microseconds after it is
sent.

In the same setting, say bandwidth is 10 gigabits.  Now the message
would need 1.2 seconds to arrive fully, in spite of a 10-fold increase
in bandwidth.  So latency is a major problem even if the bandwidth is
high.  

For this reason, the MPI applications that run well on networks tend to
be of the ``embarrassingly parallel'' type, with very little
communication between the processes.

Of course, if your platform is a shared-memory multiprocessor
(especially a multicore one, where communication between cores is
particularly fast) and you are running all your MPI processes on that
machine, the problem is less severe.  In fact, some implementations of
MPI communicate directly through shared memory in that case, rather than
using the TCP/IP or other network protocol.

\section{Review of Earlier Example}

Though the presentation in this chapter is self-contained, you may wish
to look first at the somewhat simpler example in Section \ref{mpiex}, a
pipelined prime number finder.

\section{Example:  Dijkstra Algorithm}

\subsection{The Algorithm}

The code implements the Dijkstra algorithm for finding the shortest
paths in an undirected graph.  Pseudocode for the algorithm is

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
Done = {0}
NonDone = {1,2,...,N-1}
for J = 1 to N-1 Dist[J] = infinity`
Dist[0] = 0
for Step = 1 to N-1
   find J such that Dist[J] is min among all J in NonDone
   transfer J from NonDone to Done
   NewDone = J
   for K = 1 to N-1
      if K is in NonDone
         Dist[K] = min(Dist[K],Dist[NewDone]+G[NewDone,K])
\end{Verbatim}

At each iteration, the algorithm finds the closest vertex J to 0 among
all those not yet processed, and then updates the list of minimum
distances to each vertex from 0 by considering paths that go through J.
Two obvious potential candidate part of the algorithm for
parallelization are the ``find J'' and ``for K'' lines, and the above
OpenMP code takes this approach.

\subsection{The MPI Code}

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
// Dijkstra.c

// MPI example program:  Dijkstra shortest-path finder in a
// bidirectional graph; finds the shortest path from vertex 0 to all
// others

// command line arguments:  nv print dbg

// where:  nv is the size of the graph; print is 1 if graph and min
// distances are to be printed out, 0 otherwise; and dbg is 1 or 0, 1
// for debug

// node 0 will both participate in the computation and serve as a
// "manager"

#include <stdio.h>
#include <mpi.h>

#define MYMIN_MSG 0
#define OVRLMIN_MSG 1
#define COLLECT_MSG 2

// global variables (but of course not shared across nodes)

int nv,  // number of vertices
    *notdone, // vertices not checked yet
    nnodes,  // number of MPI nodes in the computation
    chunk,  // number of vertices handled by each node
    startv,endv,  // start, end vertices for this node
    me,  // my node number 
    dbg; 
unsigned largeint,  // max possible unsigned int
         mymin[2],  // mymin[0] is min for my chunk,
                    // mymin[1] is vertex which achieves that min
         othermin[2],  // othermin[0] is min over the other chunks
                       // (used by node 0 only)
                       // othermin[1] is vertex which achieves that min
         overallmin[2],  // overallmin[0] is current min over all nodes,
                         // overallmin[1] is vertex which achieves that min
         *ohd,  // 1-hop distances between vertices; "ohd[i][j]" is
                // ohd[i*nv+j]
         *mind;  // min distances found so far

double T1,T2;  // start and finish times 

void init(int ac, char **av)
{  int i,j,tmp; unsigned u;
   nv = atoi(av[1]);
   dbg = atoi(av[3]);
   MPI_Init(&ac,&av);
   MPI_Comm_size(MPI_COMM_WORLD,&nnodes);
   MPI_Comm_rank(MPI_COMM_WORLD,&me);
   chunk = nv/nnodes;  
   startv = me * chunk; 
   endv = startv + chunk - 1;
   u = -1;
   largeint = u >> 1;
   ohd = malloc(nv*nv*sizeof(int));
   mind = malloc(nv*sizeof(int));
   notdone = malloc(nv*sizeof(int)); 
   // random graph
   // note that this will be generated at all nodes; could generate just
   // at node 0 and then send to others, but faster this way
   srand(9999);
   for (i = 0; i < nv; i++)  
      for (j = i; j < nv; j++)  {
         if (j == i) ohd[i*nv+i] = 0;
         else  {
            ohd[nv*i+j] = rand() % 20;
            ohd[nv*j+i] = ohd[nv*i+j];
         }
      }
   for (i = 0; i < nv; i++)  {
      notdone[i] = 1;
      mind[i] = largeint;
   }
   mind[0] = 0;
   while (dbg) ;  // stalling so can attach debugger
}

// finds closest to 0 among notdone, among startv through endv
void findmymin()
{  int i;
   mymin[0] = largeint; 
   for (i = startv; i <= endv; i++)
      if (notdone[i] && mind[i] < mymin[0])  {
         mymin[0] = mind[i];
         mymin[1] = i;
      }
}

void findoverallmin()
{  int i;
   MPI_Status status;  // describes result of MPI_Recv() call 
   // nodes other than 0 report their mins to node 0, which receives
   // them and updates its value for the global min
   if (me > 0)  
      MPI_Send(mymin,2,MPI_INT,0,MYMIN_MSG,MPI_COMM_WORLD);
   else  {
      // check my own first
      overallmin[0] = mymin[0];
      overallmin[1] = mymin[1];
      // check the others
      for (i = 1; i < nnodes; i++)  {
         MPI_Recv(othermin,2,MPI_INT,i,MYMIN_MSG,MPI_COMM_WORLD,&status);
         if (othermin[0] < overallmin[0])  {
            overallmin[0] = othermin[0];
            overallmin[1] = othermin[1];
         }
      }
   }
}

void updatemymind()  // update my mind segment
{  // for each i in [startv,endv], ask whether a shorter path to i 
   // exists, through mv
   int i, mv = overallmin[1]; 
   unsigned md = overallmin[0];
   for (i = startv; i <= endv; i++)
      if (md + ohd[mv*nv+i] < mind[i])  
         mind[i] = md + ohd[mv*nv+i];
}

void disseminateoverallmin()
{  int i;
   MPI_Status status;  
   if (me == 0)  
      for (i = 1; i < nnodes; i++)  
         MPI_Send(overallmin,2,MPI_INT,i,OVRLMIN_MSG,MPI_COMM_WORLD);
   else 
      MPI_Recv(overallmin,2,MPI_INT,0,OVRLMIN_MSG,MPI_COMM_WORLD,&status);
}

void updateallmind()  // collects all the mind segments at node 0
{  int i;
   MPI_Status status;  
   if (me > 0)  
      MPI_Send(mind+startv,chunk,MPI_INT,0,COLLECT_MSG,MPI_COMM_WORLD);
   else  
      for (i = 1; i < nnodes; i++)  
         MPI_Recv(mind+i*chunk,chunk,MPI_INT,i,COLLECT_MSG,MPI_COMM_WORLD,
            &status);
}

void printmind()  // partly for debugging (call from GDB)
{  int i;
   printf("minimum distances:\n");
   for (i = 1; i < nv; i++)
      printf("%u\n",mind[i]);
}

void dowork()
{  int step,  // index for loop of nv steps
       i;
   if (me == 0) T1 = MPI_Wtime();
   for (step = 0; step < nv; step++)  {
      findmymin();
      findoverallmin();
      disseminateoverallmin();
      // mark new vertex as done 
      notdone[overallmin[1]] = 0;  
      updatemymind(startv,endv);
   }
   updateallmind();
   T2 = MPI_Wtime();
}

int main(int ac, char **av)
{  int i,j,print;
   init(ac,av);
   dowork();  
   print = atoi(av[2]);
   if (print && me == 0)  {
      printf("graph weights:\n");
      for (i = 0; i < nv; i++)  {
         for (j = 0; j < nv; j++)  
            printf("%u  ",ohd[nv*i+j]);
         printf("\n");
      }
      printmind();
   }
   if (me == 0) printf("time at node 0: %f\n",(float)(T2-T1));
   MPI_Finalize();
}

\end{Verbatim}

The various MPI functions will be explained in the next section.

\subsection{Introduction to MPI APIs}

\subsubsection{MPI\_Init() and MPI\_Finalize()}

These are required for starting and ending execution of an MPI program.
Their actions may be implementation-dependent.  For instance, if our
platform is an Ethernet-based cluster , {\bf MPI\_Init()} will probably
set up the TCP/IP sockets via which the various nodes communicate with
each other.  On an Infiniband-based cluster, connections in the special
Infiniband network protocol will be established.  On a shared-memory
multiprocessor, an implementation of MPI that is tailored to that
platform would take very different actions.

\subsubsection{MPI\_Comm\_size() and MPI\_Comm\_rank()}

In our function {\bf init()} above, note the calls

\begin{Verbatim}[fontsize=\relsize{-2}]
MPI_Comm_size(MPI_COMM_WORLD,&nnodes);
MPI_Comm_rank(MPI_COMM_WORLD,&me);
\end{Verbatim}

The first call determines how many nodes are participating in our
computation, placing the result in our variable {\bf nnodes}.  Here {\bf
MPI\_COMM\_WORLD} is our node group, termed a {\bf communicator} in MPI
parlance.  MPI allows the programmer to subdivide the nodes into groups,
to facilitate performance and clarity of code.  Note that for some
operations, such as barriers, the only way to apply the operation to a
proper subset of all nodes is to form a group.  The totality of all
groups is denoted by {\bf MPI\_COMM\_WORLD}.  In our program here, we
are not subdividing into groups.

The second call determines this node's ID number, called its {\bf rank},
within its group.  As mentioned earlier, even though the nodes are all
running the same program, they are typically working on different parts
of the program's data.  So, the program needs to be able to sense which
node it is running on, so as to access the appropriate data.  Here we
record that information in our variable {\bf me}.

\subsubsection{MPI\_Send()}

To see how MPI's basic send function works, consider our line above,

\begin{Verbatim}[fontsize=\relsize{-2}]
MPI_Send(mymin,2,MPI_INT,0,MYMIN_MSG,MPI_COMM_WORLD);
\end{Verbatim}

Let's look at the arguments:

\begin{itemize}

\item [{\bf mymin:}] 

We are sending a set of bytes.  This argument states the address at
which these bytes begin.

\item [{\bf 2, MPI\_INT:}]

This says that our set of bytes to be sent consists of 2 objects of type
{\bf MPI\_INT}.  That means 8 bytes on 32-bit machines, so why not just
collapse these two arguments to one, namely the number 8?  Why did the
designers of MPI bother to define data types?  The answer is that we
want to be able to run MPI on a heterogeneous set of machines, with MPI
serving as the ``broker'' between them in case different architectures
among those machines handle data differently.

First of all, there is the issue of {\bf endianness}.  Intel machines,
for instance, are {\bf little-endian}, which means that the least
significant byte of a memory word has the smallest address among bytes
of the word.  Sun SPARC chips, on the other hand, are {\bf big-endian},
with the opposite storage scheme.  If our set of nodes included machines
of both types, straight transmission of sequences of 8 bytes might mean
that some of the machines literally receive the data backwards!

Secondly, these days 64-bit machines are becoming more and more common.
Again, if our set of nodes were to include both 32-bit and 64-bit words,
some major problems would occur if no conversion were done.

\item [{\bf 0:}]

We are sending to node 0.

\item [{\bf MYMIN\_MSG:}]

This is the message type, programmer-defined in our line

\begin{Verbatim}[fontsize=\relsize{-2}]
#define MYMIN_MSG 0
\end{Verbatim}

Receive calls, described in the next section, can ask to receive only
messages of a certain type.

\item [{\bf MPI\_COMM\_WORLD:}]

This is the node group to which the message is to be sent.  Above, where
we said we are sending to node 0, we technically should say we are
sending to node 0 within the group {\bf MPI\_COMM\_WORLD}.

\end{itemize}

\subsubsection{MPI\_Recv()}

Let's now look at the arguments for a basic receive:

\begin{Verbatim}[fontsize=\relsize{-2}]
MPI_Recv(othermin,2,MPI_INT,i,MYMIN_MSG,MPI_COMM_WORLD,&status);
\end{Verbatim}

\begin{itemize}

\item [{\bf othermin:}]

The received message is to be placed at our location {\bf othermin}.

\item [{\bf 2,MPI\_INT:}]

Two objects of {\bf MPI\_INT} type are to be received.

\item [{\bf i:}]

Receive only messages of from node {\bf i}.  If we did not care what
node we received a message from, we could specify the value
{\bf MPI\_ANY\_SOURCE}.

\item [{\bf MYMIN\_MSG:}]

Receive only messages of type {\bf MYMIN\_MSG}.  If we did not care what
type of message we received, we would specify the value {\bf
MPI\_ANY\_TAG}.

\item [{\bf MPI\_COMM\_WORLD:}]

Group name.

\item [{\bf status:}]

Recall our line

\begin{Verbatim}[fontsize=\relsize{-2}]
MPI_Status status;  // describes result of MPI_Recv() call 
\end{Verbatim}

The type is an MPI {\bf struct} containing information about the
received message.  Its primary fields of interest are {\bf MPI\_SOURCE},
which contains the identity of the sending node, and {\bf MPI\_TAG},
which contains the message type.  These would be useful if the receive
had been done with {\bf MPI\_ANY\_SOURCE} or {\bf MPI\_ANY\_TAG}; the
status argument would then tell us which node sent the message and what
type the message was.

\end{itemize}

\section{Collective Communications}

MPI features a number of {\bf collective communication} capabilities, a
number of which are used in the following refinement of our Dijkstra
program:

\subsection{Example:  Refined Dijkstra Code}

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
// Dijkstra.coll1.c

// MPI example program:  Dijkstra shortest-path finder in a
// bidirectional graph; finds the shortest path from vertex 0 to all
// others; this version uses collective communication

// command line arguments:  nv print dbg

// where:  nv is the size of the graph; print is 1 if graph and min
// distances are to be printed out, 0 otherwise; and dbg is 1 or 0, 1
// for debug

// node 0 will both participate in the computation and serve as a
// "manager"

#include <stdio.h>
#include <mpi.h>

// global variables (but of course not shared across nodes)

int nv,  // number of vertices
    *notdone, // vertices not checked yet
    nnodes,  // number of MPI nodes in the computation
    chunk,  // number of vertices handled by each node
    startv,endv,  // start, end vertices for this node
    me,  // my node number 
    dbg; 
unsigned largeint,  // max possible unsigned int
         mymin[2],  // mymin[0] is min for my chunk,
                    // mymin[1] is vertex which achieves that min
         overallmin[2],  // overallmin[0] is current min over all nodes,
                         // overallmin[1] is vertex which achieves that min
         *ohd,  // 1-hop distances between vertices; "ohd[i][j]" is
                // ohd[i*nv+j]
         *mind;  // min distances found so far

double T1,T2;  // start and finish times 

void init(int ac, char **av)
{  int i,j,tmp; unsigned u;
   nv = atoi(av[1]);
   dbg = atoi(av[3]);
   MPI_Init(&ac,&av);
   MPI_Comm_size(MPI_COMM_WORLD,&nnodes);
   MPI_Comm_rank(MPI_COMM_WORLD,&me);
   chunk = nv/nnodes;  
   startv = me * chunk; 
   endv = startv + chunk - 1;
   u = -1;
   largeint = u >> 1;
   ohd = malloc(nv*nv*sizeof(int));
   mind = malloc(nv*sizeof(int));
   notdone = malloc(nv*sizeof(int)); 
   // random graph
   // note that this will be generated at all nodes; could generate just
   // at node 0 and then send to others, but faster this way
   srand(9999);
   for (i = 0; i < nv; i++)  
      for (j = i; j < nv; j++)  {
         if (j == i) ohd[i*nv+i] = 0;
         else  {
            ohd[nv*i+j] = rand() % 20;
            ohd[nv*j+i] = ohd[nv*i+j];
         }
      }
   for (i = 0; i < nv; i++)  {
      notdone[i] = 1;
      mind[i] = largeint;
   }
   mind[0] = 0;
   while (dbg) ;  // stalling so can attach debugger
}

// finds closest to 0 among notdone, among startv through endv
void findmymin()
{  int i;
   mymin[0] = largeint; 
   for (i = startv; i <= endv; i++)
      if (notdone[i] && mind[i] < mymin[0])  {
         mymin[0] = mind[i];
         mymin[1] = i;
      }
}

void updatemymind()  // update my mind segment
{  // for each i in [startv,endv], ask whether a shorter path to i 
   // exists, through mv
   int i, mv = overallmin[1]; 
   unsigned md = overallmin[0];
   for (i = startv; i <= endv; i++)
      if (md + ohd[mv*nv+i] < mind[i])  
         mind[i] = md + ohd[mv*nv+i];
}

void printmind()  // partly for debugging (call from GDB)
{  int i;
   printf("minimum distances:\n");
   for (i = 1; i < nv; i++)
      printf("%u\n",mind[i]);
}

void dowork()
{  int step,  // index for loop of nv steps
       i;
   if (me == 0) T1 = MPI_Wtime();
   for (step = 0; step < nv; step++)  {
      findmymin();
      MPI_Reduce(mymin,overallmin,1,MPI_2INT,MPI_MINLOC,0,MPI_COMM_WORLD);
      MPI_Bcast(overallmin,1,MPI_2INT,0,MPI_COMM_WORLD);
      // mark new vertex as done 
      notdone[overallmin[1]] = 0;  
      updatemymind(startv,endv);
   }
   // now need to collect all the mind values from other nodes to node 0
   MPI_Gather(mind+startv,chunk,MPI_INT,mind,chunk,MPI_INT,0,MPI_COMM_WORLD);
   T2 = MPI_Wtime();
}

int main(int ac, char **av)
{  int i,j,print;
   init(ac,av);
   dowork();  
   print = atoi(av[2]);
   if (print && me == 0)  {
      printf("graph weights:\n");
      for (i = 0; i < nv; i++)  {
         for (j = 0; j < nv; j++)  
            printf("%u  ",ohd[nv*i+j]);
         printf("\n");
      }
      printmind();
   }
   if (me == 0) printf("time at node 0: %f\n",(float)(T2-T1));
   MPI_Finalize();
}
\end{Verbatim}

The new calls will be explained in the next section.

\subsection{MPI\_Bcast()}

In our original Dijkstra example, we had a loop

\begin{Verbatim}[fontsize=\relsize{-2}]
for (i = 1; i < nnodes; i++)  
   MPI_Send(overallmin,2,MPI_INT,i,OVRLMIN_MSG,MPI_COMM_WORLD);
\end{Verbatim}

in which node 0 sends to all other nodes.  We can replace this by

\begin{Verbatim}[fontsize=\relsize{-2}]
MPI_Bcast(overallmin,2,MPI_INT,0,MPI_COMM_WORLD);
\end{Verbatim}

In English, this call would say, 

\begin{quote}

At this point all nodes participate in a broadcast operation, in which
node 0 sends 2 objects of type {\bf MPI\_INT}.  The source of the data
will be located at address {\bf overallmin} at node 0, and the other
nodes will receive the data at a location of that name.

\end{quote}

Note my word ``participate'' above.  The name of the function is
``broadcast,'' which makes it sound like only node 0 executes this line
of code, which is not the case; all the nodes in the group (in this
case that means all nodes in our entire computation) execute this line.
The only difference is the action; most nodes participate by receiving,
while node 0 participates by sending.

Actually, this call to {\bf MPI\_Bcast()} is doing more than replacing
the loop, since the latter had been part of an if-then-else that checked
whether the given process had rank 0 or not.

Why might this be preferable to using an explicit loop?  First, it would
obviously be much clearer.  That makes the program easier to write,
easier to debug, and easier for others (and ourselves, later) to read.

But even more importantly, using the broadcast may improve performance.
We may, for instance, be using an implementation of MPI which is
tailored to the platform on which we are running MPI.  If for instance
we are running on a network designed for parallel computing, such as
Myrinet or Infiniband, an optimized broadcast may achieve a much higher
performance level than would simply a loop with individual send calls.
On a shared-memory multiprocessor system, special machine instructions
specific to that platform's architecture can be exploited, as for
instance IBM has done for its shared-memory machines.  Even on an
ordinary Ethernet, one could exploit Ethernet's own broadcast mechanism,
as had been done for PVM, a system like MPI (G. Davies and N. Matloff,
Network-Specific Performance Enhancements for PVM, {\it Proceedings  of
the  Fourth IEEE International Symposium on High-Performance Distributed
Computing}, 1995, 205-210).

\subsubsection{MPI\_Reduce()/MPI\_Allreduce()}

Look at our call

\begin{Verbatim}[fontsize=\relsize{-2}]
MPI_Reduce(mymin,overallmin,1,MPI_2INT,MPI_MINLOC,0,MPI_COMM_WORLD);
\end{Verbatim}

above.  In English, this would say,

\begin{quote} At this point all nodes in this group participate in a
``reduce'' operation.  The type of reduce operation is {\bf
MPI\_MINLOC}, which means that the minimum value among the nodes will be
computed, and the index attaining that minimum will be recorded as well.
Each node contributes a value to be checked, and an associated index,
from a location {\bf mymin} in their programs; the type of the pair is
{\bf MPI\_2INT}.  The overall min value/index will be computed by
combining all of these values at node 0, where they will be placed at a
location {\bf overallmin}.  \end{quote}

MPI also includes a function {\bf MPI\_Allreduce()}, which does the same
operation, except that instead of just depositing the result at one
node, it does so at all nodes.  So for instance our code above,

\begin{Verbatim}[fontsize=\relsize{-2}]
MPI_Reduce(mymin,overallmin,1,MPI_2INT,MPI_MINLOC,0,MPI_COMM_WORLD);
MPI_Bcast(overallmin,1,MPI_2INT,0,MPI_COMM_WORLD);
\end{Verbatim}

could be replaced by

\begin{Verbatim}[fontsize=\relsize{-2}]
MPI_Allreduce(mymin,overallmin,1,MPI_2INT,MPI_MINLOC,MPI_COMM_WORLD);
\end{Verbatim}

Again, these can be optimized for particular platforms.

Here is a table of MPI reduce operations:

\begin{tabular}{|r|r|}
\hline
MPI\_MAX & max \\
MPI\_MIN & min \\
MPI\_SUM & sum \\
MPI\_PROD & product \\
MPI\_LAND & wordwise boolean and \\
MPI\_LOR & wordwise boolean or \\
MPI\_LXOR & wordwise exclusive or \\
MPI\_BAND & bitwise boolean and \\
MPI\_BOR & bitwise boolean or \\
MPI\_BXOR & bitwise exclusive or \\
MPI\_MAXLOC & max value and location or \\
MPI\_MINLOC & min value and location or \\
\hline
\end{tabular}

\subsubsection{MPI\_Gather()/MPI\_Allgather()}

A classical approach to parallel computation is to first break the data for
the application into chunks, then have each node work on its chunk, and
then gather all the processed chunks together at some node.  The MPI
function {\bf MPI\_Gather()} does this.

In our program above, look at the line

\begin{Verbatim}[fontsize=\relsize{-2}]
MPI_Gather(mind+startv,chunk,MPI_INT,mind,chunk,MPI_INT,0,MPI_COMM_WORLD);
\end{Verbatim}

In English, this says,

\begin{quote}

At this point all nodes participate in a gather operation, in which each
node contributes data, consisting of {\bf chunk} number of MPI integers,
from a location {\bf mind+startv} in its program.  All that data is
strung together and deposited at the location {\bf mind} in the program
running at node 0.

\end{quote}

There is also {\bf MPI\_Allgather()}, which places the result at all
nodes, not just one.

\subsubsection{The MPI\_Scatter()}

This is the opposite of {\bf MPI\_Gather()}, i.e. it breaks long data
into chunks which it parcels out to individual nodes.  

Here is MPI code to count the number of edges in a directed graph.  (A
link from i to j does not necessarily imply one from j to i.)  In the
context here, {\bf me} is the node's rank; {\bf nv} is the number of
vertices; {\bf oh} is the one-hop distance matrix; and {\bf nnodes} is
the number of MPI processes.  At the beginning only the process of rank
0 has a copy of {\bf oh}, but it sends that matrix out in chunks to the
other nodes, each of which stores its chunk in an array {\bf ohchunk}. 

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
MPI_Scatter(oh, nv*nv, MPI_INT, ohchunk, nv/nnodes, MPI_INT, 0,
MPI_COMM_WORLD);
mycount = 0;
for (i = 0; i < nv*nv/nnodes)
   if (ohchunk[i] != 0) mycount++;
MPI_Reduce(&mycount,&numedge,1,MPI_INT,MPI_SUM,0,MPI_COMM_WORLD);
if (me == 0) printf("there are %d edges\n",numedge);
\end{Verbatim}

\subsubsection{The MPI\_Barrier()}  

This implements a barrier for a given communicator.  The name of the
communicator is the sole argument for the function.  

Explicit barriers are less common in message-passing programs than in
the shared-memory world.

\subsection{Creating Communicators} 

Again, a communicator is a subset (either proper or improper) of all of
our nodes.  MPI includes a number of functions for use in creating
communicators.  Some set up a virtual ``topology'' among the nodes.  

For instance, many physics problems consist of solving differential
equations in two- or three-dimensional space, via approximation on a
grid of points.  In two dimensions, groups may consists of rows in the
grid.

Here's how we might divide an MPI run into two groups (assumes an even
number of MPI processes to begin with):

\begin{Verbatim}[fontsize=\relsize{-2}]
MPI_Comm_size(MPI_COMM_WORLD,&nnodes);
MPI_Comm_rank(MPI_COMM_WORLD,&me);
...
// declare variables to bind to groups
MPI_Group worldgroup, subgroup; 
// declare variable to bind to a communicator
MPI_Comm subcomm; 
...
int i,startrank,nn2 = nnodes/2;
int *subranks = malloc(nn2*sizeof(int));
if (me < nn2) start = 0;
else start = nn2;
for (i = 0; i < nn2; i++) 
   subranks[i] = i + start;
// bind the world to a group variable
MPI_Comm_group(MPI_COMM_WORLD, &worldgroup); 
// take worldgroup the nn2 ranks in "subranks" and form group 
// "subgroup" from them 
MPI_Group_incl(worldgroup, nn2, subranks, subgroup);
// create a communicator for that new group
MPI_Comm_create(MPI_COMM_WORLD, subgroup, subcomm); 
// get my rank in this new group
MPI_Group_rank (subgroup, &subme); 
\end{Verbatim}

You would then use {\bf subcomm} instead of MPI\_COMM\_WORLD
whenever you wish to, say, broadcast, only to that group.

\section{Buffering, Synchrony and Related Issues}

As noted several times so far, interprocess communication in parallel
systems can be quite expensive in terms of time delay.  In this section
we will consider some issues which can be extremely important in this
regard.

\subsection{Buffering, Etc.}

To understand this point, first consider situations in which MPI is
running on some network, under the TCP/IP protocol.  Say an MPI program
at node A is sending to one at node B.

It is extremely import to keep in mind the levels of abstraction here.
The OS's TCP/IP stack is running at the Session, Transport and Network
layers of the network.  MPI---meaning the MPI internals---is running
above the TCP/IP stack, in the Application layers at A and B.  And the
MPI user-written application could be considered to be running at a
``Super-application'' layer, since it calls the MPI internals.  (From
here on, we will refer to the MPI internals as simply ``MPI.'')

MPI at node A will have set up a TCP/IP socket to B during the user
program's call to {\bf MPI\_Init()}.  The other end of the socket will
be a corresponding one at B.  This setting up of this socket pair as
establishing a {\bf connection} between A and B.  When node A calls {\bf
MPI\_Send()}, MPI will write to the socket, and the TCP/IP stack will
transmit that data to the TCP/IP socket at B.  The TCP/IP stack at B
will then send whatever bytes come in to MPI at B.

Now, it is important to keep in mind that in TCP/IP the totality of
bytes sent by A to B during lifetime of the connection is considered one
long message.  So for instance if the MPI program at A calls {\bf
MPI\_Send()} five times, the MPI internals will write to the socket five
times, but the bytes from those five messages will not be perceived by
the TCP/IP stack at B as five messages, but rather as just one long
message (in fact, only part of one long message, since more may be yet
to come).

MPI at B continually reads that ``long message'' and breaks it back into
MPI messages, keeping them ready for calls to {\bf MPI\_Recv()} from the
MPI application program at B.  Note carefully that phrase, {\it keeping
them ready}; it refers to the fact that the order in which the MPI
application program requests those messages may be different from the
order in which they arrive.

On the other hand, looking again at the TCP/IP level, even though all
the bytes sent are considered one long message, it will physically be
sent out in pieces.  These pieces don't correspond to the pieces written
to the socket, i.e. the MPI messages.  Rather, the breaking into pieces
is done for the purpose of {\bf flow control}, meaning that the TCP/IP
stack at A will not send data to the one at B if the OS at B has no room
for it.  The {\bf buffer} space the OS at B has set up for receiving
data is limited.  As A is sending to B, the TCP layer at B is telling
its counterpart at A when A is allowed to send more data.  

Think of what happens the MPI application at B calls {\bf MPI\_Recv()},
requesting to receive from A, with a certain tag T.  Say the first
argument is named {\bf x}, i.e.  the data to be received is to be
deposited at {\bf x}.  If MPI sees that it already has a message of tag
T, it will have its {\bf MPI\_Recv()} function return the message to the
caller, i.e. to the MPI application at B.  {\bf If no such message has
arrived yet, MPI won't return to the caller yet, and thus the caller
blocks.}

{\bf MPI\_Send()} can block too.  If the platform and MPI implementation
is that of the TCP/IP network context described above, then the send
call will return when its call to the OS' {\bf write()} (or equivalent,
depending on OS) returns, but that could be delayed if the OS' buffer
space is full.  On the other hand, another implementation could require
a positive response from B before allowing the send call to return.

%  \subsection{Nonbuffered Communication}
%  
%  The above analysis applies to MPI applications that run on top of
%  TCP/IP, with a natural buffering system.  In fact, there is likely
%  additional buffering as well.  By contrast, some other platforms may not
%  have any buffering at all.  This is not the usual situation, but it
%  could be the case, for instance, when the underlying platform is a
%  shared-memory multiprocessor and the MPI implementation takes advantage
%  of that structure, rather than just using TCP/IP.

Note that buffering slows everything down.  In our TCP scenario above,
{\bf MPI\_Recv()} at B must copy messages from the OS' buffer space to
the MPI application program's program variables, e.g.  {\bf x} above.
This is definitely a blow to performance.  That in fact is why networks
developed specially for parallel processing typically include mechanisms
to avoid the copying.  Infiniband, for example, has a Remote Direct
Memory Access capability, meaning that A can write directly to {\bf x}
at B.  Of course, if our implementation uses {\bf synchronous}
communication, with A's send call not returning until A gets a response
from B, we must wait even longer.

Technically, the MPI standard states that {\bf MPI\_Send(x,...)} will
return only when it is safe for the application program to write over
the array which it is using to store its message, i.e. {\bf x}.  As we
have seen, there are various ways to implement this, with performance
implications.  Similarly, {\bf MPI\_Recv(y,...)} will return only when
it is safe to read {\bf y}.

%  So, we may either have a no-buffering situation forced upon us, or may
%  opt for no buffering for performance reasons.  But that has a big
%  implication:  Node A cannot call {\bf MPI\_Send()} until node B has
%  called {\bf MPI\_Recv()}; otherwise B may be using the space at {\bf x}, 
%  in which case A's premature {\bf MPI\_Send()} would ruin things at that
%  location.  That would mean that B would have to inform A when it calls
%  {\bf MPI\_Recv()}.  This is called {\bf synchronous} communication.
%  Clearly, this can be a major cause of slowdown if not handled carefully.

\subsection{Safety}

With {\bf synchronous} communication, deadlock is a real risk.  Say A
wants to send two messages to B, of types U and V, but that B wants to
receive V first.  Then A won't even get to send V, because in preparing
to send U it must wait for a notice from B that B wants to read U---a
notice which will never come, because B sends such a notice for V first.
This would not occur if the communication were asynchronous.

But beyond formal deadlock, programs can fail in other ways, even with
buffering, as buffer space is always by nature finite.  A program can
fail if it runs out of buffer space, either at the sender or the
receiver.  See
\url{www.llnl.gov/computing/tutorials/mpi_performance/samples/unsafe.c}
for an example of a test program which demonstrates this on a certain
platform, by deliberating overwhelming the buffers at the receiver.

In MPI terminology, asynchronous communication is considered {\bf
unsafe}.  The program may run fine on most systems, as most systems are
buffered, but fail on some systems.  Of course, as long as you know your
program won't be run in nonbuffered settings, it's fine, and since there
is potentially such a performance penalty for doing things
synchronously, most people are willing to go ahead with their ``unsafe''
code.

\subsection{Living Dangerously}

If one is sure that there will be no problems of buffer overflow and so
on, one can use variant send and receive calls provided by MPI, such as
{\bf MPI\_Isend()} and {\bf MPI\_Irecv()}.  The key difference between
them and {\bf MPI\_Send()} and {\bf MPI\_Recv()} is that they return
immediately, and thus are termed {\bf nonblocking}.  Your code can go on
and do other things, not having to wait.

This does mean that at A you cannot touch the data you are sending until
you determine that it has either been buffered somewhere or has reached
{\bf x} at B.  Similarly, at B you can't use the data at {\bf x} until
you determine that it has arrived.  Such determinations can be made via
{\bf MPI\_Wait()}.  In other words, you can do your send or receive,
then perform some other computations for a while, and then call {\bf
MPI\_Wait()} to determine whether you can go on.  Or you can call {\bf
MPI\_Probe()} to ask whether the operation has completed yet.

\subsection{Safe Exchange Operations}

In many applications A and B are swapping data, so both are sending and
both are receiving.  This too can lead to deadlock.  An obvious solution
would be, for instance, to have the lower-rank node send first and the
higher-rank node receive first.  

But a more convenient, safer and possibly faster alternative would be to
use MPI's {\bf MPI\_Sendrecv()} function.  Its prototype is

\begin{Verbatim}[fontsize=\relsize{-2}]
intMPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype, 
   int dest, int sendtag, int source, int recvtag, MPI_Comm comm, 
   MPI_Status *status) 
\end{Verbatim}

Note that the sent and received messages can be of different lengths and
can use different tags.

\section{Use of MPI from Other Languages}

MPI is a vehicle for parallelizing C/C++, but some clever people have
extended the concept to other languages, such as the cases of Python and
R that we treat in Chapters \ref{chap:pythr} and \ref{chap:r}.

\section{Other MPI Examples in This Book}

\begin{itemize}

\item The pipelined prime number finder in Chapter \ref{chap:intro}.

\item Bucket sort with sampling, in Section \ref{bsort}.

\end{itemize}




