\chapter{Introduction to Parallel Sorting} 
\label{chap:sort}

Sorting is one of the most common operations in parallel processing
applications.  For example, it is central to many parallel database
operations, and important in areas such as image processing, statistical
methodology and so on.  A number of different types of parallel sorting
schemes have been developed. Here we look at some of these schemes.  

\section{Quicksort}

You are probably familiar with the idea of quicksort: First break the
original array into a ``small-element'' pile and a ``large-element''
pile, by comparing to a {\bf pivot} element.  In a naive implementation,
the first element of the array serves as the pivot, but better
performance can be obtained by taking, say, the median of the first
three elements.  Then ``recurse'' on each of the two piles, and then
string the results back together again.

This is an example of the {\bf divide and conquer} approach seen in so
many serial algorithms.  It is easily parallelized (though
load-balancing issues may arise).  Here, for instance, we might assign
one pile to one thread and the other pile to another thread.

Suppose the array to be sorted is named {\bf x}, and consists of {\bf n}
elements.  

\subsection{The Separation Process}

A major issue is how we separate the data into piles.

In a naive implementation, the piles would be put into new arrays, but
this is bad in two senses:  It wastes memory space, and wastes time,
since much copying of arrays needs to be done.  A better implementation
places the two piles back into the original array {\bf x}.  The
following C code does that.  

The function {\bf separate()} is intended to be used in a recursive
quicksort operation.  It operates on {\bf x[l]} through {\bf x[h]}, a
subarray of {\bf x} that itself may have been formed at an earlier stage
of the recursion.  It forms two piles from those elements, and placing
the piles back in the same region {\bf x[l]} through {\bf x[h]}.  It
also has a return value, showing where the first pile ends.

\begin{Verbatim}[fontsize=\relsize{-2}]
int separate(int l, int h)
{  int ref,i,j,k,tmp;
   ref = x[h]; i = l-1; j = h;
   do  {
      do i++; while (x[i] < ref && i < h);
      do j--; while (x[j] > ref && j > l);
      tmp = x[i];  x[i] = x[j];  x[j] = tmp;
   } while (j > i);
   x[j] = x[i];  x[i] = x[h];  x[h] = tmp;
   return i;
}
\end{Verbatim}

The function {\bf separate()} rearranges the subarray, returning a value
{\bf m}, so that:

\begin{itemize}

\item {\bf x[l]} through {\bf x[m-1]} are less than {\bf x[m]},

\item {\bf x[m+1]} through {\bf x[h]} are greater than {\bf x[m]}, and

\item {\bf x[m]} is in its ``final resting place,'' meaning that {\bf
x[m]} will never move again for the remainder of the sorting process.
(Another way of saying this is that the current {\bf x[m]} is the {\bf
m}-th smallest of all the original {\bf x[i]}, {\bf i} = 0,1,...,{\bf
n}-1.)

\end{itemize}

By the way, {\bf x[l]} through {\bf x[m-1]} will also be in their final
resting places as a group.  They may be exchanging places with each
other from now on, but they will never again leave the range {\bf i}
though {\bf m-1} within the {\bf x} array as a whole.  A similar
statement holds for {\bf x[m+1]} through {\bf x[n-1]}.

Another approach is to do a prefix scan.  As an illustration, consider
the array

\begin{equation}
28 ~~~~ 35 ~~~~ 12  ~~~~ 5 ~~~~ 13  ~~~~ 6  ~~~~ 8 ~~~~ 10  ~~~~ 168
\end{equation}

We'll take the first element, 28, as the pivot, and form a new array of
1s and 0s, where 1 means ``less than the pivot'':

\begin{tabular}{rrrrrrrrr}
28 & 35 & 12  & 5 & 13  & 6  & 48 & 10 & 168 \\
1  &  0 &  1  & 1 &  1  & 1  & 0  &  1 &  0 \\
\end{tabular}

Now form the prefix scan (Chapter \ref{chap:prefix}) of that second
array, with respect to addition.  It will be an {\it exclusive} scan
(Section \ref{prefiximps}).  This gives us

\begin{tabular}{rrrrrrrrr}
28 & 35 & 12  & 5 & 13  & 6  & 48 & 10 & 168 \\
0  &  0 &  1  & 1 &  1  & 1  & 0  &  1 &  0 \\
0  &  0 &  0  & 1 &  2  & 3  & 3  &  4 &  4 \\
\end{tabular}

Now, the key point is that for every element 1 in that second row, the
corresponding element in the third row shows where the first-row element
should be placed under the separation operation!  Here's why:

The elements 12, 5, 13, 6 and 10 should go in the first pile, which in
an in-place separation would means indices 0, 1, 2, 3, and 4.  Well, as
you can see above, these are precisely the values shown in the third
row for 12, 5, 13, 6 and 10, all of which have 1s in the second row.

The pivot, 28, then should immediately follow that low pile, i.e. it
should be placed at index 5.

We can simply place the high pile at the remaining indicies, 6 through 8
(though we'll do it more systematically below).

In general for an array of length k, we:

\begin{itemize}

\item form the second row of 1s and 0s indicating $<$ pivot

\item form the third row, the exclusive prefix scan

\item for each 1 in the second row, place the corresponding element in
row 1 into the spot indicated by row 3

\item place the pivot in the place indicated by 1 plus m, the largest value
in row 3

\item form row 4, equal to (0,1,...,k-1) minus row 3 plus m

\item for each 0 in the second row, place the corresponding element in
row 1 into the spot indicated by row 4

\end{itemize}

Note that this operation, using scan, could be used an an alternative to
the {\bf separate()} function above.  But it could be done in parallel;
more on this below.

\subsection{Example: OpenMP Quicksort}
\label{sharedqs}

Here is OpenMP code which performs quicksort in the shared-memory
paradigm (adapted from code in the OpenMP Source Code Repository,
\url{http://www.pcg.ull.es/ompscr/}):

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
void qs(int *x, int l, int h)
{  int newl[2], newh[2], i, m;
   m = separate(x,l,h);
   newl[0] = l;  newh[0] = m-1;
   newl[1] = m+1;  newh[1] = h;
   #pragma omp parallel
   {
      #pragma omp for nowait
      for (i = 0; i < 2; i++)
         qs(newl[i],newh[i]);
   }
}
\end{Verbatim}

Note the {\bf nowait} clause.  Since different threads are operating on
different portions of the array, they need not be synchronized.

Recall that another implementation, using the {\bf task} directive, was
given earlier in Section \ref{taskdir}.

In both of these implementations, we used the function {\bf separate()}
defined above.  So, different threads apply different separation
operations to different subarrays.  An alternative would be to place the
parallelism in the separation operation itself, using the parallel
algorithms for prefix scan in Chapter \ref{chap:prefix}.

% Note too that this implementation requires {\bf nested parallelism},
% meaning that when a thread created from one {\bf \#pragma omp parallel}
% directive itself hits such a directive, it will create more threads.
% Some OpenMP compilers allow this, such as the Omni OpenMP compiler when
% used with StackThreads.

% A variant on this which might achieve better load balancing would set up
% a {\bf task pool}, consisting of an array of ({\bf l}, {\bf h}) pairs.
% Initially the pool consists of just [0,n-1].  The function {\bf qs()}
% would then become iterative instead of recursive, with its main loop
% looking something like this for an array of length n:
% 
% \begin{Verbatim}[fontsize=\relsize{-2}]
% fetch an (l,h) pair from the task pool
% while not done
%    call separate() on x[l] through x[h], yielding m
%    if m < h
%       add (m+1,h) to the task pool
%    h = m-1
%    if l == h
%       fetch [l,h] from the task pool
% \end{Verbatim}
% 
% This pseudocode is missing important details.  For example, How does the
% iteration within a thread stop?  The key lies in pairs of the form
% (i,i), which I'll call {\it singletons}.  The sort is done when the
% number of singletons reaches n.  

\subsection{Hyperquicksort}

This algorithm was originally developed for hypercubes, but can be used
on any message-passing system having a power of 2 for the number of
nodes.\footnote{See Chapter \ref{chap:msg} for definitions of
hypercube terms.}

It is assumed that at the beginning each PE contains some chunk of the
array to be sorted. After sorting, each PE will contain some chunk of
the \underbar{sorted} array, meaning that:

\begin{itemize}

\item each chunk is itself in sorted form

\item for all cases of \( i<j \), the elements at PE i are less than the
elements at PE j

\end{itemize}

If the sorted array itself were our end, rather than our means to
something else, we could now collect it at some node, say node 0.
If, as is more likely, the sorting is merely an intermediate step in a
larger distributed computation, we may just leave the chunks at the
nodes and go to the next phase of work.

Say we are on a d-cube.  The intuition behind the algorithm is quite
simple:

\begin{Verbatim}[fontsize=\relsize{-2}]
for i = d downto 1
   for each i-cube:
      root of the i-cube broadcasts its median to all in the i-cube, 
         to serve as pivot
      consider the two (i-1)-subcubes of this i-cube
      each pair of partners in the (i-1)-subcubes exchanges data:
         low-numbered PE gives its partner its data larger than pivot
         high-numbered PE gives its partner its data smaller than pivot
\end{Verbatim}

To avoid deadlock, have the lower-numbered partner send then receive,
and vice versa for the higher-numbered one.  Better, in MPI, use {\bf
MPI\_SendRcv()}.

After the first iteration, all elements in the lower (d-1)-cube are less
than all elements in higher (d-1)-cube.  After d such steps, the array
will be sorted.

\section{Mergesorts}
\label{mergesort}

\subsection{Sequential Form}

In its serial form, mergesort has the following pseudocode:

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
// initially called with l = 0 and h = n-1, where n is the length of the
// array and is assumed here to be a power of 2
void seqmergesort(int *x, int l, int h)
{  seqmergesort(x,0,h/2-1);
   seqmergesort(x,h/2,h);
   merge(x,l,h);
}
\end{Verbatim}

The function {\bf merge()} should be done in-place, i.e. without using
an auxiliary array.  It basically codes the operation shown in
pseudocode for the message-passing case in Section \ref{basicstuff}.

\subsection{Shared-Memory Mergesort} 

This is similar to the patterns for shared-memory quicksort in Section 
\ref{sharedqs} above.

\subsection{Message Passing Mergesort on a Tree Topology}
\label{basicstuff}

First, we organize the processing nodes into a binary tree. This is
simply from the point of view of the software, rather than a physical
grouping of the nodes.  We will assume, though, that the number of nodes
is one less than a power of 2.

To illustrate the plan, say we have seven nodes in all. We could label
node 0 as the root of the tree, label nodes 1 and 2 to be its two
children, label nodes 3 and 4 to be node 1's children, and finally label
nodes 5 and 6 to be node 2's children.

It is assumed that the array to be sorted is initially distributed in
the leaf nodes (recall a similar situation for hyperquicksort), i.e.
nodes 3-6 in the above example. The algorithm works best if there are
approximately the same number of array elements in the various leaves.

In the first stage of the algorithm, each leaf node applies a regular
sequential sort to its current holdings. Then each node begins sending
its now-sorted array elements to its parent, one at a time, in ascending
numerical order.

Each nonleaf node then will merge the lists handed to it by its two
children.  Eventually the root node will have the entire sorted array.
Specifically, each nonleaf node does the following:

\begin{Verbatim}[fontsize=\relsize{-2}]
do
   if my left-child datum < my right-child datum
      pass my left-child datum to my parent
   else
      pass my right-child datum to my parent
until receive the "no more data" signal from both children
\end{Verbatim}

There is quite a load balancing issue here.  On the one hand, due to
network latency and the like, one may get better performance if each
node accumulates a chunk of data before sending to the parent, rather
than sending just one datum at a time.  Otherwise, ``upstream'' nodes
will frequently have no work to do.

On the other hand, the larger the chunk size, the earlier the leaf nodes
will have no work to do.  So for any particular platform, there will be
some optimal chunk size, which would need to be determined by
experimentation.

% The term \textit{two-way} here refers to the fact that each node merges
% two lists together.
% 
% 
% \subsection{Special Case: Hypercube Hardware}
% 
% Though earlier we said that the parent-child relationships were only in
% the software and did not reflect they physical configuration of the
% nodes, hypercube hardware does present an excellent opportunity to
% design the parent-child relationships around the hardware.
% 
% Here every node will act as a leaf node, and some nodes will act as
% their own children. Two or more processes will run on each of the nodes
% playing dual roles.  Then the software's tree is mapped to the nodes as
% follows:
% 
% \begin{Verbatim}[fontsize=\relsize{-2}]
%                            0
% 
%                   0                 1
% 
%               0       2          1       3                   
% 
%             0   4   2   6      1   5   3   7
% \end{Verbatim}
% 
% So 0 is its own child, grandchild and great-grandchild.
% 
% \subsection{A Refinement: Three-Way Merge Sort}
% 
% At the beginning of the Two-Way Merge Sort, the nonleaf nodes are idle,
% wasting precious parallelism. To remedy this, the Three-Way Merge Sort
% assumes that all nodes start with a chunk of the array. The operation is
% the same as before, except that now each node merges together
% \underbar{three} lists---left child, right child and its own list---and
% passes the merged array to its parent.

% \subsection{Hypercube Mergesort}
% \label{hypermerge}
% 
% This is similar to hyperquicksort, the main difference being that the
% numbers of data items are balanced at all times.  It again is set up
% so that in the end the sorted array is distributed across all the nodes.
% Here is the pseudocode:
% 
% \begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
% // assumes data is initially distributed at the nodes, and that the
% // array length n is divisible by the number of nnodes; the
% // dimension of the cube is d; the variable "me" is my node ID
% void parmergesort(int me, int mydata, int n, int nnodes, int d)
% {  chunksize = n / nnodes;
%    do a sequential sort of mydata
%    for dim = 1 to d
%       for i = dim-1 downto 0
%          do compare-exchange with my bit-i neighbor 
% }
% \end{Verbatim}

\subsection{Compare-Exchange Operations}
\label{compareexchange}

These are key to many sorting algorithms.

A {\bf compare-exchange}, also known as {\bf compare-split}, simply
means in English, ``Let's pool our data, and then I'll take the lower
half and you take the upper half.''  Each node executes the following
pseudocode:

\begin{Verbatim}[fontsize=\relsize{-2}]
send all my data to partner
receive all my partner's data 
if I have a lower id than my partner
   I keep the lower half of the pooled data 
else
   I keep the upper half of the pooled data 
\end{Verbatim}

\subsection{Bitonic Mergesort}

{\bf Definition:}
A sequence $(a_0, a_1,.., a_{k-1})$ is called {\bf bitonic} if either of
the following conditions holds:

\begin{itemize}

\item [(a)] The sequence is first nondecreasing then nonincreasing,
meaning that for some r

$$
(a_0 \leq a_1 \leq ... \leq a_r \geq a_{r+1} \geq a_{n-1})
$$

\item [(b)] The sequence can be converted to the form in (a) by
rotation, i.e. by moving the last k elements from the right end to the
left end, for some k. 

\end{itemize}

As an example of (b), the sequence (3,8,12,15,14,5,1,2) can be rotated
rightward by two element positions to form (1,2,3,8,12,15,14,5).  Or we
could just rotate by one element, moving the 2 to forming
(2,3,8,12,15,14,5,1).

Note that the definition includes the cases in which the sequence is
purely nondecreasing (r = n-1) or purely nonincreasing (r = 0).  

Also included are ``V-shape'' sequences, in which the numbers first
decrease then increase, such as (12,5,2,8,20).  By (b), these can be
rotated to form (a), with (12,5,2,8,20) being rotated to form
(2,8,20,12,5), an ``A-shape'' sequence.

(For convenience, from here on I will use the terms {\it increasing} and
{\it decreasing} instead of {\it nonincreasing} and {\it
nondecreasing}.)

Suppose we have bitonic sequence $(a_0, a_1,.., a_{k-1})$, where k is a
power of 2.  Rearrange the sequence by doing compare-exchange operations
between $a_i$ and $a_{n/2+i})$, i = 0,1,...,n/2-1.  Then it is not hard
to prove that the new $(a_0, a_1,.., a_{k/2-1})$ and $(a_{k/2},
a_{k/2+1},.., a_{k-1})$ are bitonic, and every element of that first
subarray is less than or equal to every element in the second one.  

So, we have set things up for yet another divide-and-conquer attack:

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
// x is bitonic of length n, n a power of 2
void sortbitonic(int *x, int n)
{  do the pairwise compare-exchange operations
   if (n > 2) {
      sortbitonic(x,n/2);
      sortbitonic(x+n/2,n/2);
   }
}
\end{Verbatim}

This can be parallelized in the same ways we saw for Quicksort earlier.

So much for sorting bitonic sequences.  But what about general
sequences?  

We can proceed as follows, using our function {\bf sortbitonic()} above:

\begin{enumerate}

\item For each i = 0,2,4,...,n-2:

   \begin{itemize}

   \item Each of the pairs $(a_i,a_{i+1})$, i = 0,2,...,n-2 is bitonic,
   since {\it any} 2-element array is bitonic!  

   \item Apply {\bf sortbitonic()} to  $(a_i,a_{i+1})$.  In this case,
   we are simply doing a compare-exchange.

   \item If i/2 is odd, reverse the pair, so that this pair and the pair
   immediately preceding it now form a 4-element bitonic sequence.

   \end{itemize}

\item For each i = 0,4,8,...,n-4:

   \begin{itemize}

   \item Apply {\bf sortbitonic()} to  $(a_i,a_{i+1},a_{i+2},a_{i+3})$.

   \item If i/4 is odd, reverse the quartet, so that this quartet and
   the quartet immediately preceding it now form an 8-element bitonic
   sequence.

   \end{itemize}

\item Keep building in this manner, until get to a single sorted
n-element list.

\end{enumerate}

There are many ways to parallelize this.  In the hypercube case, the
algorithm consists of doing compare-exchange operations with all
neighbors, pretty much in the same pattern as hyperquicksort.  

\section{The Bubble Sort and Its Cousins}

\subsection{The Much-Maligned Bubble Sort}

Recall the {\bf bubble sort}:

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
void bubblesort(int *x, int n)
{  for i = n-1 downto 1
      for j = 0 to i
         compare-exchange(x,i,j,n)
}
\end{Verbatim}

Here the function {\bf compare-exchange()} is as in Section
\ref{compareexchange} above.  In the context here, it boils down to

\begin{Verbatim}[fontsize=\relsize{-2}]
if x[i] > x[j]
   swap x[i] and x[j]
\end{Verbatim}

In the first {\bf i} iteration, the largest element ``bubbles'' all the
way to the right end of the array.  In the second iteration, the
second-largest element bubbles to the next-to-right-end position, and so
on.

You learned in your algorithms class that this is a very inefficient
algorithm---when used serially.  But it's actually rather usable in
parallel systems.

For example, in the shared-memory setting, suppose we have one thread
for each value of {\bf i}.  Then those threads can work in parallel, as
long as a thread with a larger value of {\bf i} does not overtake a
thread with a smaller {\bf i}, where ``overtake'' means working on a
larger {\bf j} value.

Once again, it probably pays to chunk the data.  In this case, 
{\bf compare-exchange()} fully takes on the meaning it had in Section
\ref{compareexchange}.

\subsection{A Popular Variant:  Odd-Even Transposition}

A popular variant of this is the {\bf odd-even transposition sort}.  The
pseudocode for a shared-memory version is:

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
// the argument "me" is this thread's ID
void oddevensort(int *x, int n, int me)
{  for i = 1 to n 
      if i is odd
         if me is even
           compare-exchange(x,me,me+1,n)
         else  // me is odd
           compare-exchange(x,me,me-1,n)
      else  // i is even
         if me is even
           compare-exchange(x,me,me-1,n)
         else  // me is odd
           compare-exchange(x,me,me+1,n)
\end{Verbatim}

If the second or third argument of {\bf compare-exchange()} is less than
0 or greater than {\bf n}-1, the function has no action.

This looks a bit complicated, but all it's saying is that, from the
point of view of an even-numbered element of {\bf x}, it trades with its
right neighbor during odd phases of the procedure and with its left
neighbor during even phases.

Again, this is usually much more effective if done in chunks.

\subsection{Example:  CUDA Implementation of Odd/Even Transposition Sort}
\label{cudaoddeven}

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
#include <stdio.h>
#include <stdlib.h>
#include <cuda.h>

// compare and swap; copies from the f to t, swapping f[i] and
// f[j] if the higher-index value is smaller; it is required that i < j
__device__ void cas(int *f,int *t,int i,int j, int n, int me)
{  
   if (i < 0 || j >= n) return;
   if (me == i) {
      if (f[i] > f[j]) t[me] = f[j];
      else t[me] = f[i];
   } else {  // me == j
      if (f[i] > f[j]) t[me] = f[i];
      else t[me] = f[j];
   }
}

// does one iteration of the sort
__global__ void oekern(int *da, int *daaux, int n, int iter)
{  int bix = blockIdx.x;  // block number within grid
   if (iter % 2) { 
      if (bix % 2) cas(da,daaux,bix-1,bix,n,bix);
      else cas(da,daaux,bix,bix+1,n,bix);
   } else {
      if (bix % 2) cas(da,daaux,bix,bix+1,n,bix);
      else cas(da,daaux,bix-1,bix,n,bix);
   }
}

// sorts the array ha, length n, using odd/even transp. sort; 
// kept simple for illustration, no optimization 
void oddeven(int *ha, int n)
{
   int *da; 
   int dasize = n * sizeof(int);
   cudaMalloc((void **)&da,dasize);
   cudaMemcpy(da,ha,dasize,cudaMemcpyHostToDevice);
   // the array daaux will serve as "scratch space" 
   int *daaux;  
   cudaMalloc((void **)&daaux,dasize);
   dim3 dimGrid(n,1);
   dim3 dimBlock(1,1,1);
   int *tmp;
   for (int iter = 1; iter <= n; iter++) {
      oekern<<<dimGrid,dimBlock>>>(da,daaux,n,iter);
      cudaThreadSynchronize();
      if (iter < n) {
         // swap pointers
         tmp = da;
         da = daaux;
         daaux = tmp;
      } else
         cudaMemcpy(ha,daaux,dasize,cudaMemcpyDeviceToHost);
   }
}
\end{Verbatim}

Recall that in CUDA code, separate blocks of threads cannot
synchronize with each other.  Unless we deal with just a single block,
this necessitates limiting the kernel to a single iteration of the
algorithm, so that as iterations progress, execution alternates between
the device and the host.  

Moreover, we do not take advantage of shared memory.  One possible
solution would be to use {\bf \_\_syncthreads()} within each block for
most of the compare-and-exchange operations, and then having the host
take care of the operations on the boundaries between blocks.

\section{Shearsort}

In some contexts, our hardware consists of a two-dimensional mesh of
PEs.  A number of methods have been developed for such settings, one of
the most well known being Shearsort, developed by Sen, Shamir and the
eponymous Isaac Scherson of UC Irvine.  Again, the data is assumed to be
initially distributed among the PEs.  Here is the pseudocode:

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
for i = 1 to ceiling(log2(n)) + 1
    if i is odd
       sort each even row in descending order
       sort each odd row in ascending order
    else
       sort each column is ascending order
\end{Verbatim}

At the end, the numbers are sorted in a ``snakelike'' manner.

For example:

\begin{tabular}{|l|l|}
\hline
6 & 12 \\ \hline 
5 & 9 \\ \hline 
\end{tabular}

\begin{tabular}{|l|l|}
\hline
6 & 12 \\ \hline 
9 & 5 \\ \hline 
\end{tabular}

\begin{tabular}{|l|l|}
\hline
6 & 5 \\ \hline 
9 & 12 \\ \hline 
\end{tabular}

\begin{tabular}{|l|l|}
\hline
5 & 6 $\downarrow$ \\ \hline 
12 & $\leftarrow$ 9 \\ \hline 
\end{tabular}

No matter what kind of system we have, a natural domain decomposition
for this problem would be for each process to be responsible for a group
of rows.  There then is the question about what to do during the
even-numbered iterations, in which column operations are done.  This can
be handled via a parallel matrix transpose operation.  In MPI, the
function {\bf MPI\_Alltoall()} may be useful.

\section{Bucket Sort with Sampling}
\label{bsort}

For concreteness, suppose we are using MPI on message-passing hardware,
say with 10 PEs.  As usual in such a setting, suppose our data is
initially distributed among the PEs.

Suppose we knew that our array to be sorted is a random sample from the
uniform distribution on (0,1).  In other words, about 20\% of our array
will be in (0,0.2), 38\% will be in (0.45,0.83) and so on.  

What we could do is assign PE0 to the interval (0,0.1), PE1 to (0.1,0.2)
etc.  Each PE would look at its local data, and distribute it to the
other PEs according to this interval scheme.  Then each PE would do a
local sort.

In general, we don't know what distribution our data comes from.  We
solve this problem by doing sampling.  In our example here, each PE
would sample some of its local data, and send the sample to PE0.  From
all of these samples, PE0 would find the decile values, i.e. 10th
percentile, 20th percentile,..., 90th percentile.  These values, called
{\bf splitters} would then be broadcast to all the PEs, and they would
then distribute their local data to the other PEs according to these
intervals.  

OpenMP code for this was given in Section \ref{ompbsort}.  Here is
similar MPI code below (various improvements could be made, e.g. with
broadcast):

\begin{lstlisting}[numbers=left]
// bucket sort, bin boundaries known in advance

// node 0 is manager, all else worker nodes; node 0 sends full data, bin
// boundaries to all worker nodes; i-th worker node extracts data for
// bin i-1, sorts it, sends sorted chunk back to node 0; node 0 places
// sorted results back in original array

// not claimed efficient; e.g. could be better to have manager place
// items into bins

#include <mpi.h>

#define MAX_N 100000  // max size of original data array
#define MAX_NPROCS 100  // max number of MPI processes
#define DATA_MSG 0  // manager sending original data
#define BDRIES_MSG 0  // manager sending bin boundaries
#define CHUNKS_MSG 2  // workers sending their sorted chunks

int nnodes,  // 
    n,  // size of full array
    me,  // my node number 
    fulldata[MAX_N],
    tmp[MAX_N],
    nbdries,  // number of bin boundaries
    counts[MAX_NPROCS];
float bdries[MAX_NPROCS-2];  // bin boundaries

int debug,debugme; 

init(int argc, char **argv)
{  
   int i;
   debug = atoi(argv[3]); 
   debugme = atoi(argv[4]); 
   MPI_Init(&argc,&argv);
   MPI_Comm_size(MPI_COMM_WORLD,&nnodes);
   MPI_Comm_rank(MPI_COMM_WORLD,&me); 
   nbdries = nnodes - 2;
   n = atoi(argv[1]); 
   int k = atoi(argv[2]);  // for random # gen
   // generate random data for test purposes
   for (i = 0; i < n; i++) fulldata[i] = rand() % k;
   // generate bin boundaries for test purposes
   for (i = 0; i < nbdries; i++) {
      bdries[i] = i * (k+1) / ((float) nnodes);
   }
}

void managernode()
{  
   MPI_Status status;
   int i;
   int lenchunk;  // length of a chunk received from a worker
   // send full data, bin boundaries to workers
   for (i = 1; i < nnodes; i++) {
      MPI_Send(fulldata,n,MPI_INT,i,DATA_MSG,MPI_COMM_WORLD);
      MPI_Send(bdries,nbdries,MPI_FLOAT,i,BDRIES_MSG,MPI_COMM_WORLD);
   }
   // collect sorted chunks from workers, place them in their proper
   // positions within the original array
   int currposition = 0;
   for (i = 1; i < nnodes; i++) {
      MPI_Recv(tmp,MAX_N,MPI_INT,i,CHUNKS_MSG,MPI_COMM_WORLD,&status);
      MPI_Get_count(&status,MPI_INT,&lenchunk);
      memcpy(fulldata+currposition,tmp,lenchunk*sizeof(int));
      currposition += lenchunk;
   }
   if (n < 25) {
      for (i = 0; i < n; i++) printf("%d ",fulldata[i]);
      printf("\n");
   }
}

// adds xi to the part array, increments npart, the length of part
void grab(int xi, int *part, int *npart)
{
    part[*npart] = xi;
    *npart += 1;
}

int cmpints(int *u, int *v)
{  if (*u < *v) return -1;
   if (*u > *v) return 1;
   return 0;
}

void getandsortmychunk(int *tmp, int n, int *chunk, int *lenchunk)
{
   int i,count = 0;
   int workernumber = me - 1;
      if (me == debugme) while (debug) ;
   for (i = 0; i < n; i++) {
       if (workernumber == 0) {
          if (tmp[i] <= bdries[0]) grab(tmp[i],chunk,&count);
       }
       else if (workernumber < nbdries-1) {
          if (tmp[i] > bdries[workernumber-1] && 
              tmp[i] <= bdries[workernumber]) grab(tmp[i],chunk,&count);
       } else
          if (tmp[i] > bdries[nbdries-1]) grab(tmp[i],chunk,&count);
   }
   qsort(chunk,count,sizeof(int),cmpints);
   *lenchunk = count;
}

void workernode()
{
   int n,fulldata[MAX_N],  // size and storage of full data
       chunk[MAX_N],
       lenchunk,
       nbdries;  // number of bin boundaries
   float bdries[MAX_NPROCS-1];  // bin boundaries
   MPI_Status status;
   MPI_Recv(fulldata,MAX_N,MPI_INT,0,DATA_MSG,MPI_COMM_WORLD,&status);
   MPI_Get_count(&status,MPI_INT,&n);
   MPI_Recv(bdries,MAX_NPROCS-2,MPI_FLOAT,0,BDRIES_MSG,MPI_COMM_WORLD,&status);
   MPI_Get_count(&status,MPI_FLOAT,&nbdries);
   getandsortmychunk(fulldata,n,chunk,&lenchunk);
   MPI_Send(chunk,lenchunk,MPI_INT,0,CHUNKS_MSG,MPI_COMM_WORLD);
}

int main(int argc,char **argv)
{  
   int i;
   init(argc,argv);
   if (me == 0) managernode();
   else workernode();
   MPI_Finalize();
}
\end{lstlisting}

\section{Radix Sort}

The radix sort is essentially a special case of a bucket sort.  If we
have 16 threads, say, we could determine a datum's bucket by its lower 4
bits.  As long as our data is uniformly distributed under the mod 16
operation, we would not need to do any sampling.

The CUDPP GPU library uses a radix sort.  The buckets are formed one bit
at a time, using segmented scan as above.

\section{Enumeration Sort}

This one is really simple.  Take for instance the array (12,5,13,18,6).
There are 2 elements less than 12, so in the end, it should go in
position 2 of the sorted array, (5,6,12,13,18).

Say we wish to sort {\bf x}, which for convenience we assume contains no
tied values.  Then the pseudocode for this algorithm, placing the
results in {\bf y}, is

\begin{Verbatim}[fontsize=\relsize{-2}]
for all i in 0...n-1:
   count = 0
   elt = x[i]
   for all j in 0...n-1:
      if x[j] < elt then count++
   y[count] = elt
\end{Verbatim}

The outer (or inner) loop is easily parallelized.

