\chapter{Cloud Computing}
\label{chap:cloud}


In {\bf cloud computing}, the idea is that a large corporation that has
many computers could sell time on them, for example to make profitable
use of excess capacity.   The typical customer would have occasional
need for large-scale computing---and often large-scale data storage.
The customer would submit a program to the cloud computing vendor, who
would run it in parallel on the vendor's many machines (unseen, thus
forming the ``cloud''), then return the output to the customer.  

Google, Yahoo! and Amazon, among others, have recently gotten into the
cloud computing business.  Moreover, universities, businesses, research
labs and so on are setting up their own small clouds, typically on
clusters (a bunch of computers on the same local network, possibly with
central controlling software for job management).  

The paradigm that has become standard in the cloud today is MapReduce,
developed by Google.  In rough form, the approach is as follows.
Various nodes server as {\it mappers}, and others serve as {\it
reducers}.\footnote{Of course, some or all of these might be threads on
the same machine.}

The terms {\it map} and {\bf reduce} are in the functional programming
sense.  In the case of {\bf reduce}, the idea is similar to reduction
oeprations we've seen earlier in this book, such as the {\bf reduction} 
clause in OpenMP and {\bf MPI\_Reduce()} for MPI.  So, reducers in
Hadoop perform operations such as summation, finding minima or maxima,
and so on.

In this chapter we give a very brief introduction to Hadoop, today's
open-source application of choice of MapReduce.  

\section{Platforms and Modes}

In terms of platforms, Hadoop is basically a Linux product.  Quoting
from the Hadoop Quick Start,
\url{http://hadoop.apache.org/common/docs/r0.20.2/quickstart.html#Supported+Platforms}:

\begin{quote}
Supported Platforms:

\begin{itemize}

\item GNU/Linux is supported as a development and production platform.
Hadoop has been demonstrated on GNU/Linux clusters with 2000 nodes. 

\item Win32 is supported as a {\it development platform}. Distributed
operation has not been well tested on Win32, so it is not supported as a
{\it production platform}. 

\end{itemize}

\end{quote}

Hadoop runs in one of three modes, of varying degrees of parallelism:

\begin{itemize}

\item standalone mode:  Single mapper, single reducer, mainly useful for
testing.

\item pseudo-distributed mode:  Single node, but multiple mapper and
reducer threads.

\item fully-distributed mode:  Multiple nodes, multiple mappers and
reducers.

\end{itemize}

\section{Overview of Operations}

Here is a sumary of how a Hadoop application runs:

\begin{itemize}

\item Divide the input data into chunks of records.  (In many cases,
this is already the case, as a very large data file might be distributed
across many machines.)

\item Send the chunks to the mappers.

\item Each mapper does some transformation (a ``map,'' in functional
programming terms) to each record in its chunks, and sends the output
records to Hadoop.

\item Hadoop collects the transformed records, splits them into chunks,
sorts them, and then sends the chunks to the reducers.

\item Each reducer does some summary operation (functional programming
``reduce''), producing output records.

\item Hadoop collects all those output records and concatenates
them, producing the final output.

\end{itemize}

\section{Role of Keys}

The sorting operation, called the {\bf shuffle} phase, is based on a
{\bf key} defined by the programmer.  The key defines groups.  If for
instance we wish to find the total number of men and women in a certain
debate, the key would be gender.  The reducer would do addition, in this
case adding 1s, one 1 for each person, but keeping a separate sum for
each gender.

During the shuffle stage, Hadoop sends all records for a given key, e.g.
all men, to one reducer.  In other words, records for a given key will
never be split across multiple reducers.  (Keep in mind, though, that
typically a reducer will have the records for many keys.)  

\section{Hadoop Streaming}

Actually Hadoop is really written for Java or C++ applications.
However, Hadoop can work with programs in any language under Hadoop's
Streaming option, by reading from STDIN and writing to STDOUT, in text,
line-oriented form in both cases.  In other words, any executable
program, be it Java, C/C++, Python, R, shell scripts or whatever, can
run in Hadoop in streaming mode.

Everything is text-file based.  Mappers input lines of text, and output
lines of text.  Reducers input lines of text, and output lines of text.
The final output is lines of text.

Streaming mode may be less efficient, but it is simple to develop
programs in it, and efficient enough in many applications.  Here we
present streaming mode.

So, STDIN and STDOUT are key in this mode, and as mentioned earlier,
input and output are done in terms of lines of text.  One additional
requirement, though, is that the line format for both mappers and
reducers must be

\begin{lstlisting}
key \t value
\end{lstlisting}

where \lstinline{\t} is the TAB character.

The usage of text format does cause some slowdown in numeric programs,
for the conversion of strings to numbers and vice versa, but again,
Hadoop is not designed for maximum efficiency.

\section{Example:  Word Count}
\label{hadoopwordcount}

The typical introductory example is word count in a group of text files.
One wishes to determine what words are in the files, and how many times
each word appears.  Let's simplify that a bit, so that we simply want a
count of the number of words in the files, not an individual count for
each word.

The initial input is the lines of the files (combined internally by
Hadoop into one superfile).  The mapper program breaks a line into
words, and emits (key,value) pairs in the form of (0,1).  Our key here,
0, is arbitrary and meaningless, but we need to have one.

In the reducer stage, all those (key,value) pairs get sorted by the
Hadoop internals (which has no effect in this case), and then fed into
the reducers.  Since there is only one key, 0, only one reducer will
actually be involved.  The latter adds up all its input values, i.e. all
the 1s, yielding a grand total number of words in all the files.

Here's the pseudocode:

{\bf mapper:}

\begin{lstlisting}[numbers=left]
for each line in STDIN
   break line into words, placed in wordarray
   for each word in wordarray
      # we have found 1 word
      print '0', '1' to STDOUT  
\end{lstlisting}

{\bf reducer:}

\begin{lstlisting}[numbers=left]
count = 0
for each line in STDIN
   split line into (key,value) # i.e. (0,1) here
   count += value   # i.e. add 1 to count
print count  
\end{lstlisting}

In terms of the key 0, the final output tells us how many words there
were of type 0.  Since we arbitrarily considered all words to be of type
0, the final output is simply an overall word count.

\section{Example:  Maximum Air Temperature by Year}

A common Hadoop example on the Web involves data with the format for
each line

\begin{lstlisting}
year month day high_temperature air_quality
\end{lstlisting}

It is desired to find the maximum air temperature for each year.

The code would be very similar to the outline in the word count example
above, but now we have a real key---the year.  So, in the end, the final
output will be a listing of maximum temperature by year.

Our mapper here will simply be a text processor, extracting year and
temperature from each line of text.  (This is a common paradigm for
Hadoop applications.) The reducer will then do the maximum-finding work.  

Here is the pseudocode:

{\bf mapper:}

\begin{lstlisting}[numbers=left]
for each line in STDIN
   extract year and temperature
   print year, temperature to STDOUT  
\end{lstlisting}

We have to be a bit more careful in the case of the reducer.  Remember,
though no year will be split across reducers, each reducer will likely
receive the data for more than one year.  It needs to find and output
the maximum temperature for each of those years.\footnote{The results
from the various reducers will then in turn be reduced, yielding the max
temps for all years.}

Since Hadoop internals sort the output of the mappers by key, our
reducer code can expect a bunch of records for one year, then a bunch
for another year and so on.  So, as the reducer goes through its input
line by line, it needs to detect when one bunch ends and the next
begins.  When such an event occurs, it outputs the max temp for the
bunch that just ended.  

Here is the pseudocode:

{\bf reducer:}

\begin{lstlisting}[numbers=left]
currentyear = NULL
currentmax = "-infinity"
for each line in STDIN
   split line into year, temperature
   if year == currentyear:  # still in the current bunch
      currentmax = max(currentmax,temperature)
   else:  # encountered a new bunch
      # print summary for previous bunch
      if currentyear not NULL:  
         print currentyear, currentmax
      # start our bookkeeping for the new bunch
      currentyear = year
      currentmax = temperature
print currentyear, currentmax
\end{lstlisting}

\section{Role of Disk Files}

Hadoop has its own file system, HDFS, which is built on top of the
native OS' file system of the machines.  


Very large files are possible, in some cases spanning more than one
disk/machine.  Indeed, this is the typical goal of Hadoop---to easily
parallelize operations on a very large database.  Files are typically
gigabytes or terabytes in size.  Moreover, there may be thousands of
clusters, and millions of files.

This raises serious reliability issues.  Thus HDFS is replicated, with
each HDFS block existing in at least 3 copies, i.e.  on at least 3
separate disks.  

Disk files play a major role in Hadoop programs:

\begin{itemize}

\item Input is from a file in the HDFS system.

\item The output of the mappers goes to temporary files in the native
OS' file system.

\item Final output is to a file in the HDFS system.  As noted earlier,
that file may be distributed across several disks/machines.

\end{itemize}

Note that by having the input and output files in HDFS, we minimize
communications costs in shipping the data.  The slogan used is ``Moving
computation is cheaper than moving data.''

\section{The Hadoop Shell}

The HDFS can be accessed via a set of Unix-like commands.  For example,

\begin{lstlisting}[numbers=left]
hadoop fs -mkdir somedir
\end{lstlisting}

creates a file {\bf somedir} in my HDFS (invisible to me).  Then

\begin{lstlisting}[numbers=left]
hadoop fs -put gy* somedir
\end{lstlisting}

copies all my files whose names begin with ``gy'' to {\bf somedir}, and

\begin{lstlisting}[numbers=left]
hadoop fs -ls somedir
\end{lstlisting}

lists the file names in the directory {\bf somedir}. 

See \url{http://hadoop.apache.org/common/} for a list of available
commands.

\section{Running Hadoop}

You run the above word count example something like this, say on the UCD
CSIF machines.  

Say my input data is in the directory {\bf indata} on my HDFS, and I
want to write the output to a new directory {\bf outdata}.   Say I've
placed the mapper and reducer programs in my home directory (non-HDFS).
I could then run

\begin{lstlisting}
$ hadoop jar \
     /usr/local/hadoop-0.20.2/contrib/streaming/hadoop-0.20.2-streaming.jar \
      -input indata -output outdata \
      -mapper mapper.py -reducer reducer.py \
      -file /home/matloff/mapper.py \
      -file /home/matloff/reducer.py
\end{lstlisting}

This tells Hadoop to run a Java {\bf .jar} file, which in our case here
contains the code to run streaming-mode Hadoop, with the specified input
and output data locations, and with the specified mapper and reducer
functions.  The {\bf -file} flag indicates the locations of those
functions (not needed if they are in my shell search path).

I could then run 

\begin{lstlisting}[numbers=left]
hadoop fs -ls outdata
\end{lstlisting}

to see what files were produced, say {\bf part\_00000}, and then type

\begin{lstlisting}[numbers=left]
hadoop fs -cat outdata/part_00000
\end{lstlisting}

to view the results.

Note that the {\bf .py} files must be executable, both in terms of file
permissions and in terms of invoking the Python interpreter, the latter
done by including 

\begin{lstlisting}
#!/usr/bin/env python
\end{lstlisting}

as the first line in the two Python files.

\section{Example:  Transforming an Adjacency Graph}

Yet another rendition of the app in Section \ref{transgraph}, but this
time with a bit of problem, which will illustrate a limitation of
Hadoop.  

To review:

Say we have a graph with adjacency matrix

\begin{equation}
\left (
\begin{array}{rrrr}
0 & 1 & 0 & 0 \\
1 & 0 & 0 & 1 \\
0 & 1 & 0 & 1 \\
1 & 1 & 1 & 0 \\
\end{array}
\right )
\end{equation}

with row and column numbering starting at 0, not 1.  We'd like to
transform this to a two-column matrix that displays the links, in this
case

\begin{equation}
\left (
\begin{array}{rr}
0 & 1 \\
1 & 0 \\
1 & 3 \\
2 & 1 \\
2 & 3 \\
3 & 0 \\
3 & 1 \\
3 & 2 \\
\end{array}
\right )
\end{equation}

Suppose further that we require this listing to be in lexicographical
order, sorted on source vertex and then on destination vertex.  

At first, this seems right up Hadoop's alley.  After all, Hadoop does
sorting for us within groups automatically, and we could set up one
group per row of the matrix, in other words make the row number the key.

We will actually do this below, but there is a fundamental problem:
Hadoop's simple elegance hinges on there being an independence between
the lines in the input file.  We should be able to process them one line
at a time, independently of other lines.

The problem with this is that we will have many mappers, each reading
only some rows of the adjacency matrix.  Then for any given row, the
mapper handling that row doesn't know what row number this row had in
the original matrix.  So we have no key!

The solution is to add a column to the matrix, containing the original
row numbers.  The matrix above, for instance, would become

\begin{equation}
\left (
\begin{array}{rrrrr}
0 & 0 & 1 & 0 & 0 \\
1 & 1 & 0 & 0 & 1 \\
2 & 0 & 1 & 0 & 1 \\
3 & 1 & 1 & 1 & 0 \\
\end{array}
\right )
\end{equation}

Adding this column may be difficult to do, if the matrix is very large
and already distributed over many machines.  Assuming we do this,
though, here is the mapper code (real code this time, not
pseudocode):\footnote{There is a quick introduction to Python in
Appendix \ref{chap:pyintro}}.

\begin{lstlisting}[numbers=left]
#!/usr/bin/env python

# map/reduce pair inputs a graph adjacency matrix and outputs a list of
# links; if say row 3, column 8 of the input is 1, then there will be a
# row (3,8) in the final output
 
import sys

for line in sys.stdin:
   tks = line.split() # get tokens
   srcnode = tks[0]
   links = tks[1:]
   for dstnode in range(len(links)):
      if links[dstnode] == '1':
          toprint = '%s\t%s' % (srcnode, dstnode) 
          print toprint
\end{lstlisting}

And the reducer code:

\begin{lstlisting}[numbers=left]
#!/usr/bin/env python

import sys

for line in sys.stdin:
    line = line.strip()
    print line  # could remove the \t
\end{lstlisting}

Note that the row number, needed for other reasons, is also serving as
our Hadoop key variable.

Recall that in the word count and yearly temperature examples above, the
reducer did the main work, with the mappers playing only a supporting
role.  In this case here, it's the opposite, with the reducers doing
little more than printing what they're given.  {\it However}, keep in
mind that Hadoop itself did a lot of the work, with its shuffle phase,
which produced the sorting that we required in the output.

Here's the same code in R:

{\bf mapper:}

\begin{lstlisting}[numbers=left]
#!/usr/bin/env Rscript

# map/reduce pair inputs a graph adjacency matrix and outputs a list of
# links; if say row 3, column 8 of the input is 1, then there will be a
# row (3,8) in the final output
 
con <- file("stdin", open = "r") 
mapin <- readLines(con)  # better not to read all at once, but keep simple
for (line in mapin) {
   tks <- strsplit(line,split=" ")
   tks <- tks[[1]]
   srcnode <- tks[1]
   links <- tks[-1]
   for (dstnode in 1:length(links)) {
      if (links[dstnode] == '1')  
         cat(srcnode,"\t",dstnode,"\n",sep="")
   }
}
\end{lstlisting}

{\bf reducer:}

\begin{lstlisting}[numbers=left]
#!/usr/bin/env Rscript

con <- file("stdin", open = "r") 
mapin <- readLines(con)
for (line in mapin) {
   line <- strsplit(line,split="\t")  # remove \t
   line <- line[[1]]
   cat(line,"\n")
}
\end{lstlisting}

\section{Example:  Identifying Outliers}

In any large data set, there are various errors, say 3-year-olds who are
listed as 7 feet tall. One way to try to track these down is to comb the
data for outliers, which are data points (rows in the data set) that are
far from the others. These may not be erroneous, but they are
``suspicious,'' and we want to flag them for closer inspection.

In this simple version, we will define an outlier point to be one for
which at least one of its variables is in the upper p proportion {\it in
its group}. Say for example p is 0.02, and our groups are male adults
and female adults, with our data variables being height and weight. Then
if the height for some man were in the upper 2\% of all men in the data
set, we'd flag him as an outlier; we'd do the same for weight. Note that
he'd be selected if either his height or his weight were in the top 2\%
for that variable among men in the data set.  Of course, we could also
look at the bottom 2\%, or those whose Euclidean distance as vectors are
in the most distant 2\% from the centroid of the data in a group, etc.

We'll define groups in terms of combinations of variables.  These might
be, say, Asian male lawyers, female Kentucky natives registered as
Democrats, etc.

{\bf mapper:}

\begin{lstlisting}[numbers=left]
#!/usr/bin/env Rscript

# map/reduce pair inputs a data matrix, forms groups according to
# combinations of specified integer variables, and then outputs the
# indices of outliers in each group

# any observation that has at least one variable in the top p% of its
# group is considered an outlier

# the first column of the input matrix is the observation number,
# starting at 0

# the group number for an observation is the linear index in
# lexicographical terms

# in Hadoop command line, use -mapper "olmap.R '0.4 2 3 6 16'" to
# specify parameters:

# p (given as a decimal number, e.g. 0.02); d, the number
# of data variables; g, the number of grouping variables; and finally g
# numbers which are the upper bounds for the last g-1 group variables
# (the lower bounds are always assumed to be 0)

init <- function() {
   ca <- commandArgs(trailingOnly=T)
   pars <- ca[1]
   pars <- strsplit(pars,split=" ")[[1]]
   pars <- pars[-1]
   ndv <<- as.integer(pars[1])
   ngv <<- as.integer(pars[2])
   # a few position variables, used in findgrp() below
   grpstart <<- 2
   grpend <<- grpstart + ngv - 1
   datastart <<- 2+ngv
   dataend <<- 1 + ngv + ndv
   # get upper bounds, and their reverse-cumulative products
   ubds <<- as.integer(pars[3:(1+ngv)])
   ubdsprod <<- vector(length=ngv-1)
   for (i in 1:(ngv-1)) 
      ubdsprod[i] <<- prod(ubds[i:(ngv-1)])
}

# converts vector of group variables to group number
findgrp <- function(grpvars) {
   sum <- 0
   for (i in 1:(ngv-1)) {
      m <- grpvars[i]
      sum <- sum + m * ubdsprod[i] 
   }
   return(sum+grpvars[ngv])
}

# test 
init()
con <- file("stdin", open = "r") 
mapin <- readLines(con)  # better not to read all at once
for (line in mapin) {
   tks <- strsplit(line,split=" ")
   tks <- tks[[1]]
   rownum <- tks[1]
   grpvars <- tks[grpstart:grpend]
   grpvars <- as.integer(grpvars)
   grpnum <- findgrp(grpvars)
   cat(grpnum,"\t",rownum," ",tks[datastart:dataend],"\n")
}
\end{lstlisting}

{\bf reducer:}

\begin{lstlisting}[numbers=left]
#!/usr/bin/env Rscript

# see comments in olmap.R

# in Hadoop command line, use -reducer "olred.R '0.4 2 3 6 16'" or
# similar

# R's quantile() too complicated
quantl <- function(x,q) {
   return(sort(x)[ceiling(length(x)*q)])
}

init <- function() {
   ca <- commandArgs(trailingOnly=T)
   pars <- ca[1]
   pars <- strsplit(pars,split=" ")[[1]]
   # pars <-  c("0.4","2","3","6","16")  # for little test
   # pars <-  c("0.1","2","2","3")  # for big test
   p <<- as.double(pars[1])
   ndv <<- as.integer(pars[2])
}

emitoutliers <- function(datamat) {
   # find the upper p quantile for each variable (skip row number)
   toohigh <- apply(datamat[,-(1:2),drop=F],2,quantl,1-p)
   for (i in 1:nrow(datamat)) {
      if (any(datamat[i,-(1:2)] >= toohigh))
         cat(datamat[i,],"\n")
   }
}

# test
init()
con <- file("stdin", open = "r") 
mapin <- readLines(con)  
oldgrpnum <- -1
nmapin <- length(mapin)
for (i in 1:nmapin) {
   line <- mapin[i]
   tks <- strsplit(line,split="\t")[[1]]
   grpnum <- as.integer(tks[1])
   tmp <- strsplit(tks[2],split=" ")[[1]]
   tmp <- tmp[tmp != ""]  # delete empty element
   row <- as.numeric(tmp)
   if (oldgrpnum == -1) {
      datamat <- matrix(c(grpnum,row),nrow=1)
      oldgrpnum <- grpnum
   } else if (grpnum == oldgrpnum) {
      datamat <- rbind(datamat,c(grpnum,row))
      if (i == nmapin) emitoutliers(datamat)
   } else {  # new group
      emitoutliers(datamat)
      datamat <- matrix(c(grpnum,row),nrow=1)
      oldgrpnum <- grpnum
   }
}
\end{lstlisting}

\section{Debugging Hadoop Streaming Programs}

One advantage of the streaming approach is that mapper and reducer
programs can be debugged via normal tools, since those programs can be
run on their own, without Hadoop, simply by using the Unix/Linux pipe
capability.  

This all depends on the fact that Hadoop essentially does the following
Unix shell computation:

\begin{lstlisting}
cat inputfile | mapperprog | sort -n | reducerprog
\end{lstlisting}

(Omit the {\bf -n} if the key is a string.)

The mapper alone works like

\begin{lstlisting}
cat inputfile | mapperprog 
\end{lstlisting}

You thus can use whatever debugging tool you favor, to debug the
mapper and reducer code separately.

Note, though, that the above pipe is {\it not quite} the same as Hadoop.
the pipe doesn't break up the data, and there may be subtle problems
arising as a result.  But overall, the above approach provides a quick
and easy first attempt at debugging.

The {\bf userlogs} subdirectory of your Hadoop logs directory contains
files that may be helpful, such as {\bf stderr}.

\section{It's a Lot More Than Just Programming}

The real challenge in Hadoop is often not the programming, but rather
the minimization of overhead.  This involves things like tuning the file
system, the number of mappers and reducers, and so on.  These topics are
beyond the scope of this book.

% \section{Evaluation}
% 
% Note again that this works well only on problems of a certain structure.
% Also, some say that the idea has been overpromoted; see for instance
% ``MapReduce: A Major Step Backwards,'' {\it The Database Column},
% by Professor David DeWitt,
% \url{ http://www.databasecolumn.com/2008/01/mapreduce-a-major-step-back.html}
% 
% Nevetheless, it has become very popular in some applications.


