\chapter{Parallel Python Threads and Multiprocessing Modules} 
\label{chap:pythr}

(Francis Hsu contributed sections of this chapter.)

There are a number of ways to write parallel Python code.\footnote{This 
chapter is shared by two of my open source books:
\url{http://heather.cs.ucdavis.edu/~matloff/158/PLN/ParProcBook.pdf} and
\url{http://heather.cs.ucdavis.edu/~matloff/Python/PLN/FastLanePython.pdf}.
If you wish to more about the topics covered in the book other than the
one you are now reading, please check the other!}

\section{The Python Threads and Multiprocessing Modules}

Python's thread system builds on the underlying OS threads.  They are
thus pre-emptible.  Note, though, that Python adds its own threads
manager on top of the OS thread system; see Section \ref{internals}.

\subsection{Python Threads Modules}

Python threads are accessible via two modules, {\bf thread.py} and {\bf
threading.py}.  The former is more primitive, thus easier to learn from,
and we will start with it.

\subsubsection{The {\tt thread} Module}
\label{threadmodex}

The example here involves a client/server pair.\footnote{It is
preferable here that the reader be familiar with basic network
programming.  See my tutorial at
\url{http://heather.cs.ucdavis.edu/~matloff/Python/PLN/FastLanePython.pdf}.  However,
the comments preceding the various network calls would probably be
enough for a reader without background in networks to follow what is
going on.} As you'll see from reading the comments at the start of the
files, the program does nothing useful, but is a simple illustration of
the principles.  We set up two invocations of the client; they keep
sending letters to the server; the server concatenates all the letters
it receives.

Only the server needs to be threaded.  It will have one thread for each
client.

Here is the client code, {\bf clnt.py}:

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
# simple illustration of thread module

# two clients connect to server; each client repeatedly sends a letter,
# stored in the variable k, which the server appends to a global string
# v, and reports v to the client; k = '' means the client is dropping
# out; when all clients are gone, server prints the final string v

# this is the client; usage is

#    python clnt.py server_address port_number

import socket  # networking module
import sys

# create Internet TCP socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

host = sys.argv[1]  # server address
port = int(sys.argv[2])  # server port

# connect to server
s.connect((host, port))

while(1):
   # get letter
   k = raw_input('enter a letter:')
   s.send(k)  # send k to server
   # if stop signal, then leave loop
   if k == '': break
   v = s.recv(1024)  # receive v from server (up to 1024 bytes)
   print v  

s.close() # close socket
\end{Verbatim}

And here is the server, {\bf srvr.py}:

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left] 
# simple illustration of thread module

# multiple clients connect to server; each client repeatedly sends a
# letter k, which the server adds to a global string v and echos back
# to the client; k = '' means the client is dropping out; when all
# clients are gone, server prints final value of v

# this is the server

import socket  # networking module
import sys

import thread  

# note the globals v and nclnt, and their supporting locks, which are
#    also global; the standard method of communication between threads is
#    via globals

# function for thread to serve a particular client, c
def serveclient(c):
   global v,nclnt,vlock,nclntlock
   while 1:
      # receive letter from c, if it is still connected
      k = c.recv(1)
      if k == '': break
      # concatenate v with k in an atomic manner, i.e. with protection
      #    by locks
      vlock.acquire()
      v += k
      vlock.release()
      # send new v back to client
      c.send(v)
   c.close()
   nclntlock.acquire()
   nclnt -= 1
   nclntlock.release()

# set up Internet TCP socket
lstn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

port = int(sys.argv[1])  # server port number
# bind lstn socket to this port 
lstn.bind(('', port))
# start listening for contacts from clients (at most 2 at a time)
lstn.listen(5)

# initialize concatenated string, v
v = ''
# set up a lock to guard v
vlock = thread.allocate_lock()  

# nclnt will be the number of clients still connected
nclnt = 2
# set up a lock to guard nclnt
nclntlock = thread.allocate_lock()  

# accept calls from the clients
for i in range(nclnt):
   # wait for call, then get a new socket to use for this client,
   #    and get the client's address/port tuple (though not used)
   (clnt,ap) = lstn.accept()
   # start thread for this client, with serveclient() as the thread's
   #    function, with parameter clnt; note that parameter set must be
   #    a tuple; in this case, the tuple is of length 1, so a comma is
   #    needed
   thread.start_new_thread(serveclient,(clnt,))

# shut down the server socket, since it's not needed anymore 
lstn.close()

# wait for both threads to finish 
while nclnt > 0: pass

print 'the final value of v is', v
\end{Verbatim}

Make absolutely sure to run the programs before proceeding
further.\footnote{You can get them from the {\bf .tex} source file for
this tutorial, located wherever your picked up the {\bf .pdf} version.}
Here is how to do this:  

I'll refer to the machine on which you run the server as {\bf a.b.c},
and the two client machines as {\bf u.v.w} and {\bf x.y.z}.\footnote{You
could in fact run all of them on the same machine, with address name
{\bf localhost} or something like that, but it would be better on
separate machines.}  First, on the server machine, type

\begin{verbatim}
python srvr.py 2000
\end{verbatim}

and then on each of the client machines type

\begin{verbatim}
python clnt.py a.b.c 2000
\end{verbatim}

(You may need to try another port than 2000, anything above 1023.)

Input letters into both clients, in a rather random pattern, typing some
on one client, then on the other, then on the first, etc.  Then finally
hit Enter without typing a letter to one of the clients to end the
session for that client, type a few more characters in the other client,
and then end that session too.

The reason for threading the server is that the inputs from the clients
will come in at unpredictable times.  At any given time, the server
doesn't know which client will send input next, and thus doesn't know on
which client to call {\bf recv()}.  One way to solve this problem is by
having threads, which run ``simultaneously'' and thus give the server
the ability to read from whichever client has sent data.\footnote{Another 
solution is to use nonblocking I/O.  See this example in that context in
\url{http://heather.cs.ucdavis.edu/~matloff/Python/PyNet.pdf}}.  

So, let's see the technical details.  We start with the ``main''
program.\footnote{Just as you should write the main program first, you
should read it first too, for the same reasons.}

\begin{Verbatim}[fontsize=\relsize{-2}]
vlock = thread.allocate_lock()  
\end{Verbatim}

Here we set up a {\bf lock variable} which guards {\bf
v}.  We will explain later why this is needed.  Note that in order to
use this function and others we needed to import the {\bf thread}
module.

\begin{Verbatim}[fontsize=\relsize{-2}]
nclnt = 2
nclntlock = thread.allocate_lock()  
\end{Verbatim}

We will need a mechanism to insure that the ``main'' program, which also
counts as a thread, will be passive until both application threads have
finished.  The variable {\bf nclnt} will serve this purpose.  It will be
a count of how many clients are still connected.  The ``main'' program
will monitor this, and wrap things up later when the count reaches 0. 

\begin{Verbatim}[fontsize=\relsize{-2}]
thread.start_new_thread(serveclient,(clnt,))
\end{Verbatim}

Having accepted a a client connection, the server sets up a thread for
serving it, via {\bf thread.start\_new\_thread()}.  The first argument
is the name of the application function which the thread will run, in
this case {\bf serveclient()}.  The second argument is a tuple
consisting of the set of arguments for that application function.  As
noted in the comment, this set is expressed as a tuple, and since in
this case our tuple has only one component, we use a comma to signal the
Python interpreter that this is a tuple.

So, here we are telling Python's threads system to call our function
{\bf serveclient()}, supplying that function with the argument {\bf
clnt}.  The thread becomes ``active'' immediately, but this does not
mean that it starts executing right away.  All that happens is that the
threads manager adds this new thread to its list of threads, and marks
its current state as Run, as opposed to being in a Sleep state, waiting
for some event. 

By the way, this gives us a chance to show how clean and elegant
Python's threads interface is compared to what one would need in C/C++.
For example, in {\bf pthreads}, the function analogous to 
{\bf thread.start\_new\_thread()} has the signature

\begin{Verbatim}[fontsize=\relsize{-2}]
pthread_create (pthread_t *thread_id, const pthread_attr_t *attributes,
   void *(*thread_function)(void *), void *arguments);
\end{Verbatim}

What a mess!  For instance, look at the types in that third argument:  A
pointer to a function whose argument is pointer to {\tt void} and whose
value is a pointer to {\tt void} (all of which would have to be {\tt
cast} when called).  It's such a pleasure to work in Python, where we
don't have to be bothered by low-level things like that.

Now consider our statement

\begin{Verbatim}[fontsize=\relsize{-2}]
while nclnt > 0: pass
\end{Verbatim}

The statement says that as long as at least one client is still active,
do nothing.  Sounds simple, and it is, but you should consider what is
really happening here.

Remember, the three threads---the two client threads, and the ``main''
one---will take turns executing, with each turn lasting a brief period
of time.  Each time ``main'' gets a turn, it will loop repeatedly on
this line.  But all that empty looping in ``main'' is wasted time.  What
we would really like is a way to prevent the ``main'' function from
getting a turn at all until the two clients are gone.  There are ways to
do this which you will see later, but we have chosen to remain simple
for now.

Now consider the function {\bf serveclient()}.  Any thread executing
this function will deal with only one particular client, the one
corresponding to the connection {\bf c} (an argument to the function).
So this {\bf while} loop does nothing but read from that particular
client.  If the client has not sent anything, the thread will block on 
the line

\begin{Verbatim}[fontsize=\relsize{-2}]
k = c.recv(1)
\end{Verbatim}

This thread will then be marked as being in Sleep state by the thread
manager, thus allowing the other client thread a chance to run.  If
neither client thread can run, then the ``main'' thread keeps getting
turns.  When a user at one of the clients finally types a letter, the
corresponding thread unblocks, i.e. the threads manager changes its
state to Run, so that it will soon resume execution.

Next comes the most important code for the purpose of this tutorial:

\begin{Verbatim}[fontsize=\relsize{-2}]
vlock.acquire()
v += k
vlock.release()
\end{Verbatim}

Here we are worried about a {\bf race condition}.  Suppose for example
{\bf v} is currently 'abx', and Client 0 sends {\bf k} equal to 'g'.
The concern is that this thread's turn might end in the middle of that
addition to {\bf v}, say right after the Python interpreter had formed
'abxg' but before that value was written back to {\bf v}.  This could be
a big problem.  The next thread might get to the same statement, take
{\bf v}, still equal to 'abx', and append, say, 'w', making {\bf v}
equal to 'abxw'.  Then when the first thread gets its next turn, it
would finish its interrupted action, and set {\bf v} to 'abxg'---which
would mean that the 'w' from the other thread would be lost.  

All of this hinges on whether the operation

\begin{Verbatim}[fontsize=\relsize{-2}]
v += k
\end{Verbatim}

is interruptible.  Could a thread's turn end somewhere in the midst of
the execution of this statement?  If not, we say that the operation is
{\bf atomic}.  If the operation were atomic, we would not need the
lock/unlock operations surrounding the above statement.  I did this,
using the methods described in Section \ref{gil}, and it appears to me
that the above statement is {\it not} atomic.

Moreover, it's safer not to take a chance, especially since Python
compilers could vary or the virtual machine could change; after all, we
would like our Python source code to work even if the machine changes.

So, we need the lock/unlock operations:

\begin{Verbatim}[fontsize=\relsize{-2}]
vlock.acquire()
v += k
vlock.release()
\end{Verbatim}

The lock, {\bf vlock} here, can only be held by one thread at a time.
When a thread executes this statement, the Python interpreter will check
to see whether the lock is locked or unlocked right now.  In the latter
case, the interpreter will lock the lock and the thread will continue,
and will execute the statement which updates {\bf v}.  It will then
release the lock, i.e. the lock will go back to unlocked state.

If on the other hand, when a thread executes {\bf acquire()} on this
lock when it is locked, i.e. held by some other thread, its turn will
end and the interpreter will mark this thread as being in Sleep state,
waiting for the lock to be unlocked.  When whichever thread currently
holds the lock unlocks it, the interpreter will change the blocked
thread from Sleep state to Run state.

Note that if our threads were non-preemptive, we would not need
these locks.

Note also the crucial role being played by the global nature of {\bf v}.
Global variables are used to communicate between threads.  In fact,
recall that this is one of the reasons that threads are so
popular---easy access to global variables.  Thus the dogma so often
taught in beginning programming courses that global variables must be
avoided is wrong; on the contrary, there are many situations in which
globals are necessary and natural.\footnote{I think that dogma is
presented in a far too extreme manner anyway.  See
\url{http://heather.cs.ucdavis.edu/~matloff/globals.html}. }

The same race-condition issues apply to the code

\begin{Verbatim}[fontsize=\relsize{-2}]
nclntlock.acquire()
nclnt -= 1
nclntlock.release()
\end{Verbatim}

Following is a Python program that finds prime numbers using threads.
Note carefully that it is not claimed to be efficient at all (it may
well run more slowly than a serial version); it is merely an
illustration of the concepts.  Note too that we are again using the
simple {\bf thread} module, rather than {\bf threading}.

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
#!/usr/bin/env python

import sys
import math
import thread

def dowork(tn):  # thread number tn
   global n,prime,nexti,nextilock,nstarted,nstartedlock,donelock
   donelock[tn].acquire()
   nstartedlock.acquire()
   nstarted += 1
   nstartedlock.release()
   lim = math.sqrt(n)
   nk = 0
   while 1:
      nextilock.acquire()
      k = nexti
      nexti += 1
      nextilock.release()
      if k > lim: break
      nk += 1
      if prime[k]:
         r = n / k
         for i in range(2,r+1):
            prime[i*k] = 0
   print 'thread', tn, 'exiting; processed', nk, 'values of k'
   donelock[tn].release()

def main():
   global n,prime,nexti,nextilock,nstarted,nstartedlock,donelock
   n = int(sys.argv[1])
   prime = (n+1) * [1]
   nthreads = int(sys.argv[2])
   nstarted = 0
   nexti = 2
   nextilock = thread.allocate_lock()  
   nstartedlock = thread.allocate_lock()  
   donelock = []
   for i in range(nthreads):
      d = thread.allocate_lock()  
      donelock.append(d) 
      thread.start_new_thread(dowork,(i,))
   while nstarted < nthreads: pass
   for i in range(nthreads):
      donelock[i].acquire()  
   print 'there are', reduce(lambda x,y: x+y, prime) - 2, 'primes'

if __name__ == '__main__':
    main()
\end{Verbatim}

So, let's see how the code works.

The algorithm is the famous Sieve of Erathosthenes:  We list all the
numbers from 2 to {\bf n}, then cross out all multiples of 2 (except 2),
then cross out all multiples of 3 (except 3), and so on.  The numbers
which get crossed out are composite, so the ones which remain at the end
are prime.

{\bf Line 32:}  We set up an array {\bf prime}, which is what we will be
``crossing out.''  The value 1 means ``not crossed out,'' so we start
everything at 1.  (Note how Python makes this easy to do, using list
``multiplication.'')

{\bf Line 33:}  Here we get the number of desired threads from the
command line.

{\bf Line 34:}  The variable {\bf nstarted} will show how many threads
have already started.  This will be used later, in Lines 43-45, in
determining when the {\bf main()} thread exits.  Since the various
threads will be writing this variable, we need to protect it with a
lock, on Line 37.

{\bf Lines 35-36:}  The variable {\bf nexti} will say which value we
should do ``crossing out'' by next.  If this is, say, 17, then it means
our next task is to cross out all multiples of 17 (except 17).  Again we
need to protect it with a lock.

{\bf Lines 39-42:}  We create the threads here.  The function executed
by the threads is named {\bf dowork()}.  We also create locks in an
array {\bf donelock}, which again will be used later on as a mechanism
for determining when {\bf main()} exits (Line 44-45).

{\bf Lines 43-45:}  There is a lot to discuss here.  

To start, recall that in {\bf srvr.py}, our example in Section
\ref{threadmodex}, we didn't want the main thread to exit until the
child threads were done.\footnote{The effect of the main thread ending
earlier would depend on the underlying OS.  On some platforms, exit of
the parent may terminate the child threads, but on other platforms the
children continue on their own.}  So, Line 50 was a {\bf busy wait},
repeatedly doing nothing ({\bf pass}).  That's a waste of time---each
time the main thread gets a turn to run, it repeatedly executes {\bf
pass} until its turn is over.

Here in our primes program, a premature exit by {\bf main()} result in
printing out wrong answers.  On the other hand, we don't want {\bf
main()} to engage in a wasteful busy wait.  We could use {\bf join()} 
from {\bf threading.Thread} for this purpose, to be discussed later, but
here we take a different tack:  We set up a list of locks, one for each
thread, in a list {\bf donelock}.  Each thread initially acquires its
lock (Line 9), and releases it when the thread finishes its work (Lin
27).  Meanwhile, {\bf main()} has been waiting to acquire those locks
(Line 45).  So, when the threads finish, {\bf main()} will move on to
Line 46 and print out the program's results.

But there is a subtle problem (threaded programming is notorious for
subtle problems), in that there is no guarantee that a thread will
execute Line 9 before {\bf main()} executes Line 45.  That's why we have
a busy wait in Line 43, to make sure all the threads acquire their locks
before {\bf main()} does.  Of course, we're trying to avoid busy waits,
but this one is quick.

{\bf Line 13:}  We need not check any ``crosser-outers'' that are larger
than $\sqrt{n}$.  

{\bf Lines 15-25:}  We keep trying ``crosser-outers'' until we reach
that limit (Line 20).  Note the need to use the lock in Lines 16-19.
In Line 22, we check the potential ``crosser-outer'' for primeness; if
we have previously crossed it out, we would just be doing duplicate work
if we used this {\bf k} as a ``crosser-outer.''

Here's one more example, a type of Web crawler.  This one continually
monitors the access time of the Web, by repeatedly accessing a list of
``representative'' Web sites, say the top 100.  What's really different
about this program, though, is that we've reserved one thread for human
interaction.  The person can, whenever he/she desires, find for instance
the mean of recent access times.

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
import sys
import time
import os
import thread

class glbls:
   acctimes = []  # access times
   acclock = thread.allocate_lock()  # lock to guard access time data
   nextprobe = 0  # index of next site to probe
   nextprobelock = thread.allocate_lock()  # lock to guard access time data
   sites = open(sys.argv[1]).readlines()  # the sites to monitor
   ww = int(sys.argv[2])  # window width

def probe(me):
   if me > 0:
      while 1:
         # determine what site to probe next
         glbls.nextprobelock.acquire()
         i = glbls.nextprobe
         i1 = i + 1
         if i1 >= len(glbls.sites): i1 = 0
         glbls.nextprobe = i1
         glbls.nextprobelock.release()
         # do probe
         t1 = time.time()
         os.system('wget --spider -q '+glbls.sites[i1])
         t2 = time.time()
         accesstime = t2 - t1  
         glbls.acclock.acquire()
         # list full yet?
         if len(glbls.acctimes) < glbls.ww:
            glbls.acctimes.append(accesstime)
         else:
            glbls.acctimes = glbls.acctimes[1:] + [accesstime]
         glbls.acclock.release()
   else:
      while 1:
         rsp = raw_input('monitor: ')
         if rsp == 'mean': print mean(glbls.acctimes)
         elif rsp == 'median': print median(glbls.acctimes)
         elif rsp == 'all': print all(glbls.acctimes)

def mean(x):
   return sum(x)/len(x)

def median(x):
   y = x
   y.sort()
   return y[len(y)/2]  # a little sloppy

def all(x):
   return x

def main():
   nthr = int(sys.argv[3])  # number of threads
   for thr in range(nthr):
      thread.start_new_thread(probe,(thr,))
   while 1: continue

if __name__ == '__main__': 
   main() 

\end{Verbatim}

\subsubsection{The {\tt threading} Module}  
\label{threadingmodex}

The program below treats the same network client/server application
considered in Section \ref{threadmodex}, but with the more sophisticated
{\bf threading} module.  The client program stays the same, since it
didn't involve threads in the first place.  Here is the new server code:

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
# simple illustration of threading module

# multiple clients connect to server; each client repeatedly sends a
# value k, which the server adds to a global string v and echos back
# to the client; k = '' means the client is dropping out; when all
# clients are gone, server prints final value of v

# this is the server

import socket  # networking module
import sys
import threading 

# class for threads, subclassed from threading.Thread class 
class srvr(threading.Thread):
   # v and vlock are now class variables
   v = ''
   vlock = threading.Lock()
   id = 0  # I want to give an ID number to each thread, starting at 0
   def __init__(self,clntsock):
      # invoke constructor of parent class
      threading.Thread.__init__(self)
      # add instance variables
      self.myid = srvr.id
      srvr.id += 1
      self.myclntsock = clntsock 
   # this function is what the thread actually runs; the required name
   #    is run(); threading.Thread.start() calls threading.Thread.run(),
   #    which is always overridden, as we are doing here
   def run(self):
      while 1:
         # receive letter from client, if it is still connected
         k = self.myclntsock.recv(1)
         if k == '': break
         # update v in an atomic manner
         srvr.vlock.acquire()
         srvr.v += k
         srvr.vlock.release()
         # send new v back to client
         self.myclntsock.send(srvr.v)
      self.myclntsock.close()

# set up Internet TCP socket
lstn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
port = int(sys.argv[1])  # server port number
# bind lstn socket to this port 
lstn.bind(('', port))
# start listening for contacts from clients (at most 2 at a time)
lstn.listen(5)

nclnt = int(sys.argv[2])  # number of clients

mythreads = []  # list of all the threads
# accept calls from the clients
for i in range(nclnt):
   # wait for call, then get a new socket to use for this client,
   #    and get the client's address/port tuple (though not used)
   (clnt,ap) = lstn.accept()
   # make a new instance of the class srvr 
   s = srvr(clnt)
   # keep a list all threads
   mythreads.append(s)
   # threading.Thread.start calls threading.Thread.run(), which we
   #    overrode in our definition of the class srvr
   s.start()

# shut down the server socket, since it's not needed anymore 
lstn.close()

# wait for all threads to finish 
for s in mythreads:
   s.join()

print 'the final value of v is', srvr.v
\end{Verbatim}

Again, let's look at the main data structure first:

\begin{Verbatim}[fontsize=\relsize{-2}]
class srvr(threading.Thread):
\end{Verbatim}

The {\bf threading} module contains a class {\bf Thread}, any instance
of which represents one thread.  A typical application will subclass
this class, for two reasons.  First, we will probably have some
application-specific variables or methods to be used.  Second, the class
{\bf Thread} has a member method {\bf run()} which is meant to be
overridden, as you will see below.

Consistent with OOP philosophy, we might as well put the old globals in
as class variables:

\begin{Verbatim}[fontsize=\relsize{-2}]
v = ''
vlock = threading.Lock()
\end{Verbatim}

Note that class variable code is executed immediately upon execution of the
program, as opposed to when the first object of this class is created.
So, the lock is created right away.

\begin{Verbatim}[fontsize=\relsize{-2}]
id = 0
\end{Verbatim}

This is to set up ID numbers for each of the threads.  We don't use them
here, but they might be useful in debugging or in future enhancement of
the code.

\begin{Verbatim}[fontsize=\relsize{-2}]
def __init__(self,clntsock):
   ...
   self.myclntsock = clntsock

# ``main'' program
...
   (clnt,ap) = lstn.accept()
   s = srvr(clnt)
\end{Verbatim}

The ``main'' program, in creating an object of this class for the
client, will pass as an argument the socket for that client.  We then
store it as a member variable for the object.

\begin{Verbatim}[fontsize=\relsize{-2}]
def run(self):
   ...
\end{Verbatim}

As noted earlier, the {\bf Thread} class contains a member method {\bf
run()}.  This is a dummy, to be overridden with the application-specific
function to be run by the thread.  It is invoked by the method {\bf
Thread.start()}, called here in the main program.  As you can see above,
it is pretty much the same as the previous code in Section
\ref{threadmodex} which used the {\bf thread} module, adapted to the
class environment.

One thing that is quite different in this program is the way we end
it:

\begin{Verbatim}[fontsize=\relsize{-2}]
for s in mythreads:
   s.join()
\end{Verbatim}

The {\bf join()} method in the class {\bf Thread} blocks until the given
thread exits.  (The threads manager puts the main thread in Sleep state,
and when the given thread exits, the manager changes that state to Run.)
The overall effect of this loop, then, is that the main program will
wait at that point until all the threads are done.  They ``join'' the
main program.  This is a much cleaner approach than what we used
earlier, and it is also more efficient, since the main program will not
be given any turns in which it wastes time looping around doing nothing,
as in the program in Section \ref{threadmodex} in the line

\begin{Verbatim}[fontsize=\relsize{-2}]
while nclnt > 0: pass
\end{Verbatim}

Here we maintained our own list of threads.  However, we could also get
one via the call {\bf threading.enumerate()}.  If placed after the {\bf
for} loop in our server code above, for instance as

\begin{Verbatim}[fontsize=\relsize{-2}]
print threading.enumerate()
\end{Verbatim}

we would get output like

\begin{Verbatim}[fontsize=\relsize{-2}]
[<_MainThread(MainThread, started)>, <srvr(Thread-1, started)>,
<srvr(Thread-2, started)>]
\end{Verbatim}

Here's another example, which finds and counts prime numbers, again not
assumed to be efficient:

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
#!/usr/bin/env python

# prime number counter, based on Python threading class

# usage:  python PrimeThreading.py n nthreads 
#   where we wish the count of the number of primes from 2 to n, and to
#   use nthreads to do the work

# uses Sieve of Erathosthenes:  write out all numbers from 2 to n, then
# cross out all the multiples of 2, then of 3, then of 5, etc., up to
# sqrt(n); what's left at the end are the primes

import sys
import math
import threading

class prmfinder(threading.Thread):
   n = int(sys.argv[1])
   nthreads = int(sys.argv[2])
   thrdlist = []  # list of all instances of this class
   prime = (n+1) * [1]  # 1 means assumed prime, until find otherwise
   nextk = 2  # next value to try crossing out with
   nextklock = threading.Lock()
   def __init__(self,id):
      threading.Thread.__init__(self)
      self.myid = id
   def run(self):
      lim = math.sqrt(prmfinder.n)
      nk = 0  # count of k's done by this thread, to assess load balance
      while 1:
         # find next value to cross out with
         prmfinder.nextklock.acquire()
         k = prmfinder.nextk
         prmfinder.nextk += 1
         prmfinder.nextklock.release()
         if k > lim: break
         nk += 1  # increment workload data
         if prmfinder.prime[k]:  # now cross out
            r = prmfinder.n / k
            for i in range(2,r+1):
               prmfinder.prime[i*k] = 0
      print 'thread', self.myid, 'exiting; processed', nk, 'values of k'

def main():
   for i in range(prmfinder.nthreads):
      pf = prmfinder(i)  # create thread i
      prmfinder.thrdlist.append(pf) 
      pf.start()
   for thrd in prmfinder.thrdlist: thrd.join()
   print 'there are', reduce(lambda x,y: x+y, prmfinder.prime) - 2, 'primes'

if __name__ == '__main__':
    main()
\end{Verbatim}

\subsection{Condition Variables}

\subsubsection{General Ideas}

We saw in the last section that {\bf threading.Thread.join()} avoids the
need for wasteful looping in {\bf main()}, while the latter is waiting
for the other threads to finish.  In fact, it is very common in threaded
programs to have situations in which one thread needs to wait for
something to occur in another thread.  Again, in such situations we
would not want the waiting thread to engage in wasteful looping.

The solution to this problem is {\bf condition variables}.  As the name
implies, these are variables used by code to wait for a certain
condition to occur.  Most threads systems include provisions for these,
and Python's {\bf threading} package is no exception.  

The {\bf pthreads} package, for instance, has a type {\bf pthread\_cond}
for such variables, and has functions such as {\bf
pthread\_cond\_wait()}, which a thread calls to wait for an event to
occur, and {\bf pthread\_cond\_signal()}, which another thread calls to
announce that the event now has occurred.

But as is typical with Python in so many things, it is easier for us to
use condition variables in Python than in C.  At the first level, there
is the class {\bf threading.Condition}, which corresponds well to the
condition variables available in most threads systems.  However, at this
level condition variables are rather cumbersome to use, as not only do
we need to set up condition variables but we also need to set up extra
locks to guard them.  This is necessary in any threading system, but it
is a nuisance to deal with.

So, Python offers a higher-level class, {\bf threading.Event}, which is
just a wrapper for {\bf threading.Condition}, but which does all the
condition lock operations behind the scenes, alleviating the programmer
of having to do this work.

% \subsubsection{{\tt Event} Example}
% \label{eventex}
% 
% Following is an example of the use of {\bf threading.Event}.  It
% searches a given network host for servers at various ports on that host.
% (This is called a {\bf port scanner}.) As noted in a comment, the
% threaded operation used here would make more sense if many hosts were to
% be scanned, rather than just one, as each {\bf connect()} operation does
% take some time.  But even on the same machine, if a server is active but
% busy enough that we never get to connect to it, it may take a long for
% the attempt to timeout.  It is common to set up Web operations to be
% threaded for that reason.  We could also have each thread check a block
% of ports on a host, not just one, for better efficiency.
% 
% The use of threads is aimed at checking many ports in parallel, one per
% thread.  The program has a self-imposed limit on the number of threads.
% If {\bf main()} is ready to start checking another port but we are at
% the thread limit, the code in {\bf main()} waits for the number of
% threads to drop below the limit.  This is accomplished by a condition
% wait, implemented through the {\bf threading.Event} class.
% 
% \begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
% # portscanner.py:  checks for active ports on a given machine; would be
% # more realistic if checked several hosts at once; different threads
% # check different ports; there is a self-imposed limit on the number of
% # threads, and the event mechanism is used to wait if that limit is
% # reached
% 
% # usage:  python portscanner.py host maxthreads
% 
% import sys, threading, socket
% 
% class scanner(threading.Thread):
%    tlist = []  # list of all current scanner threads
%    maxthreads = int(sys.argv[2])  # max number of threads we're allowing
%    evnt = threading.Event()  # event to signal OK to create more threads
%    lck =  threading.Lock()  # lock to guard tlist
%    def __init__(self,tn,host):
%       threading.Thread.__init__(self)
%       self.threadnum = tn  # thread ID/port number
%       self.host = host  # checking ports on this host
%    def run(self):  
%       s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
%       try:
%          s.connect((self.host, self.threadnum))
%          print "%d:  successfully connected" % self.threadnum
%          s.close()
%       except: 
%          print "%d:  connection failed" % self.threadnum
%       # thread is about to exit; remove from list, and signal OK if we
%       # had been up against the limit
%       scanner.lck.acquire()
%       scanner.tlist.remove(self)
%       print "%d:  now active --" % self.threadnum, scanner.tlist
%       if len(scanner.tlist) == scanner.maxthreads-1:
%          scanner.evnt.set()
%          scanner.evnt.clear()
%       scanner.lck.release()
%    def newthread(pn,hst):
%       scanner.lck.acquire()
%       sc = scanner(pn,hst)
%       scanner.tlist.append(sc)
%       scanner.lck.release()
%       sc.start()
%       print "%d:  starting check" % pn
%       print "%d:  now active --" % pn, scanner.tlist
%    newthread = staticmethod(newthread)
% 
% def main():
%    host = sys.argv[1]
%    for i in range(1,100):
%       scanner.lck.acquire()
%       print "%d:  attempting check" % i
%       # check to see if we're at the limit before starting a new thread
%       if len(scanner.tlist) >= scanner.maxthreads:
%          # too bad, need to wait until not at thread limit
%          print "%d:  need to wait" % i
%          scanner.lck.release()
%          scanner.evnt.wait()
%       else:
%          scanner.lck.release()
%       scanner.newthread(i,host)
%    for sc in scanner.tlist:
%       sc.join()
% 
% if __name__ == '__main__':
%     main()
% \end{Verbatim}
% 
% As you can see, when {\bf main()} discovers that we are at our
% self-imposed limit of number of active threads, we back off by calling
% {\bf threading.Event.wait()}.  At that point {\bf main()}---which,
% recall, is also a thread---blocks.  It will not be given any more
% timeslices for the time being.  When some active thread exits, we have
% it call {\bf threading.Event.set()} and {\bf threading.Event.clear()}.
% The threads manager reacts to the former by moving all threads which had
% been waiting for this event---in our case here, only {\bf main()}---from
% Sleep state to Run state; {\bf main()} will eventually get another
% timeslice.  
% 
% The call to {\bf threading.Event.clear()} is crucial.  The word {\it
% clear} here means that {\bf threading.Event.clear()} is clearing the
% occurence of the event.  Without this, any subsequent call to {\bf
% threading.Event.wait()} would immediately return, even though the
% condition has not been met yet.  
% 
% Note carefully the use of locks.  The {\bf main()} thread adds items to
% {\bf tlist}, while the other threads delete items (delete themselves,
% actually) from it.  These operations must be atomic, and thus must be
% guarded by locks.
% 
% I've put in a lot of extra {\bf print} statements so that you can get an
% idea as to how the threads' execution is interleaved.  Try running the
% program.\footnote{Disclaimer:  Not guaranteed to be bug-free.}  But
% remember, the program may appear to hang for a long time if a server is
% active but so busy that the attempt to connect times out.

% \subsection{Warning}
% 
% In Python threads, as with most threads systems, when we have a {\bf
% wait()}/{\bf set()} pair, the former must be executed before the latter.
% If the latter is done first, its action will be immediately discarded by
% the threads system, and a subsequent {\bf wait()} will wait forever.
% 
% So, it's very important to write your code so that you don't call {\bf
% wait()} unless you are certain that it won't be executed before the
% corresponding {\bf set()}.

\subsubsection{Other {\tt threading} Classes}

The function {\bf Event.set()} ``wakes'' all threads that are waiting
for the given event.  That didn't matter in our example above, since
only one thread ({\bf main()}) would ever be waiting at a time in that
example.  But in more general applications, we sometimes want to wake
only one thread instead of all of them.  For this, we can revert to
working at the level of {\bf threading.Condition} instead of {\bf
threading.Event}.  There we have a choice between using {\bf notify()}
or {\bf notifyAll()}.  

The latter is actually what is called internally by {\bf Event.set()}.
But {\bf notify()} instructs the threads manager to wake just one of
the waiting threads (we don't know which one).

The class {\bf threading.Semaphore} offers semaphore operations.  Other
classes of advanced interest are {\bf threading.RLock} and {\bf
threading.Timer}.

\subsection{Threads Internals}
\label{internals}

The thread manager acts like a ``mini-operating system.'' Just like a
real OS maintains a table of processes, a thread system's thread manager
maintains a table of threads.  When one thread gives up the CPU, or has
its turn pre-empted (see below), the thread manager looks in the table
for another thread to activate.  Whichever thread is activated will then
resume execution where it had left off, i.e. where its last turn ended. 

Just as a process is either in Run state or Sleep state, the same is
true for a thread.   A thread is either ready to be given a turn to run,
or is waiting for some event.  The thread manager will keep track of
these states, decide which thread to run when another has lost its turn,
etc.

\subsubsection{Kernel-Level Thread Managers}

Here each thread really is a process, and for example will show up
on Unix systems when one runs the appropriate {\bf ps} process-list
command, say {\bf ps axH}.  The threads manager is then the OS.  

The different threads set up by a given application program take turns
running, among all the other processes. 

This kind of thread system is is used in the Unix {\bf pthreads} system,
as well as in Windows threads.

\subsubsection{User-Level Thread Managers}
\label{user}

User-level thread systems are ``private'' to the application.  Running
the {\bf ps} command on a Unix system will show only the original
application running, not all the threads it creates.  Here the threads
are not pre-empted; on the contrary, a given thread will continue to run
until it voluntarily gives up control of the CPU, either by calling some
``yield'' function or by calling a function by which it requests a wait
for some event to occur.\footnote{In typical user-level thread systems,
an external event, such as an I/O operation or a signal, will also also
cause the current thread to relinquish the CPU.} 

A typical example of a user-level thread system is {\bf pth}.

\subsubsection{Comparison}

Kernel-level threads have the advantage that they can be used on
multiprocessor systems, thus achieving true parallelism between threads.
This is a major advantage.

On the other hand, in my opinion user-level threads also have a major
advantage in that they allow one to produce code which is much easier to
write, is easier to debug, and is cleaner and clearer.  This in turn
stems from the non-preemptive nature of user-level threads; application
programs written in this manner typically are not cluttered up with lots
of lock/unlock calls (details on these below), which are needed in the
pre-emptive case.

\subsubsection{The Python Thread Manager}

Python ``piggybacks'' on top of the OS' underlying threads system.  A
Python thread is a real OS thread.   If a Python program has three
threads, for instance, there will be three entries in the {\bf ps}
output.

However, Python's thread manager imposes further structure on top of the
OS threads.  It keeps track of how long a thread has been executing, in
terms of the number of Python {\bf byte code} instructions that have
executed.\footnote{This is the ``machine language'' for the Python
virtual machine.}  When that reaches a certain number, by default 100,
the thread's turn ends.  In other words, the turn can be pre-empted
either by the hardware timer and the OS, or when the interpreter sees
that the thread has executed 100 byte code instructions.\footnote{The
author thanks Alex Martelli for a helpful clarification.}

\subsubsection{The GIL}
\label{gil}

In the case of CPython (but not Jython or Iron Python), there is a
global interpreter lock, the famous (or infamous) GIL.  It is set up to
ensure that only one thread runs at a time, in order to facilitate easy
garbage collection.

Suppose we have a C program with three threads, which I'll call X, Y and
Z.  Say currently Y is running.  After 30 milliseconds (or whatever the
quantum size has been set to by the OS), Y will be interrupted by the
timer, and the OS will start some other process.  Say the latter, which
I'll call Q, is a different, unrelated program.  Eventually Q's turn
will end too, and let's say that the OS then gives X a turn.  From the
point of view of our X/Y/Z program, i.e. ignoring Q, control has passed
from Y to X.  The key point is that the point within Y at which that
event occurs is random (with respect to where Y is at the time), based
on the time of the hardware interrupt.

By contrast, say my Python program has three threads, U, V and W.  Say V
is running.  The hardware timer will go off at a random time, and again
Q might be given a turn, {\it but} definitely neither U nor W will be
given a turn, because the Python interpreter had earlier made a call to
the OS which makes U and W wait for the GIL to become unlocked.

Let's look at this a little closer.  The key point to note is that the
Python interpreter itself is threaded, say using {\bf pthreads}.  For
instance, in our X/Y/Z example above, when you ran {\bf ps axH}, you
would see three Python processes/threads.  I just tried that on my
program {\bf thsvr.py}, which creates two threads, with a command-line
argument of 2000 for that program.  Here is the relevant portion of the
output of {\bf ps axH}:

\begin{Verbatim}[fontsize=\relsize{-2}]
28145 pts/5    Rl     0:09 python thsvr.py 2000
28145 pts/5    Sl     0:00 python thsvr.py 2000
28145 pts/5    Sl     0:00 python thsvr.py 2000
\end{Verbatim}

What has happened is the Python interpreter has spawned two child
threads, one for each of my threads in {\bf thsvr.py}, in addition to
the interpreter's original thread, which runs my {\bf main()}.  Let's
call those threads UP, VP and WP.  Again, these are the threads that the
OS sees, while U, V and W are the threads that I see---or think I see,
since they are just virtual.

The GIL is a {\bf pthreads} lock.  Say V is now running.  Again, what
that actually means on my real machine is that VP is running.  VP keeps
track of how long V has been executing, in terms of the number of Python
{\bf byte code} instructions that have executed.  When that reaches a
certain number, by default 100, UP will release the GIL by calling {\bf
pthread\_mutex\_unlock()} or something similar.  

The OS then says, ``Oh, were any threads waiting for that lock?''  It
then basically gives a turn to UP or WP (we can't predict which), which
then means that from my point of view U or W starts, say U.  Then VP and
WP are still in Sleep state, and thus so are my V and W.

So you can see that it is the Python interpreter, not the hardware
timer, that is determining how long a thread's turn runs, relative to
the other threads in my program.  Again, Q might run too, but within
this Python program there will be no control passing from V to U or W
simply because the timer went off; such a control change will only occur
when the Python interpreter wants it to.  This will be either after the
100 byte code instructions or when U reaches an I/O operation or other
wait-event operation.

So, the bottom line is that while Python uses the underlying OS threads
system as its base, it superimposes further structure in terms of
transfer of control between threads.

Most importantly, the presence of the GIL means that two Python threads
(spawned from the same program) cannot run at the same time---{\it even
on a multicore machine}.  This has been the subject of great
controversy.

\subsubsection{Implications for Randomness and Need for Locks}

I mentioned in Section \ref{user} that non-pre-emptive threading is nice
because one can avoid the code clutter of locking and unlocking (details
of lock/unlock below).  Since, barring I/O issues, a thread working on
the same data would seem to always yield control at exactly the same
point (i.e.  at 100 byte code instruction boundaries), Python would seem
to be deterministic and non-pre-emptive.  However, it will not quite be
so simple.  

First of all, there is the issue of I/O, which adds randomness.  There
may also be randomness in how the OS chooses the first thread to be run,
which could affect computation order and so on.

Finally, there is the question of atomicity in Python operations: The
interpreter will treat any Python virtual machine instruction as
indivisible, thus not needing locks in that case.  But the bottom line
will be that unless you know the virtual machine well, you should use
locks at all times.

\subsection{The {\tt multiprocessing} Module}
\label{mpmodule}

CPython's GIL is the subject of much controversy.  As we saw in Section
\ref{gil}, it prevents running true parallel applications when using the
{\bf thread} or {\bf threading} modules.

That might not seem to be too severe a restriction---after all if you
really need the speed, you probably won't use a scripting language in
the first place.  But a number of people took the point of view that,
given that they have decided to use Python no matter what, they would
like to get the best speed subject to that restriction.  So, there was
much grumbling about the GIL.

Thus, later the {\bf multiprocessing} module was developed, which
enables true parallel processing with Python on a multiprocore machine,
with an interface very close to that of the {\bf threading} module.

Moreover, one can run a program across machines!  In other words, the
{\bf multiprocessing} module allows to run several threads not only on
the different cores of one machine, but on many machines at once, in
cooperation in the same manner that threads cooperate on one machine.
By the way, this idea is similar to something I did for Perl some years
ago (PerlDSM: A Distributed Shared Memory System for Perl. {\it
Proceedings of PDPTA 2002}, 63-68), and for which I did in R as a
package {\bf Rdsm} some time later.  We will not cover the cross-machine
case here.

So, let's go to our first example, a simulation application that will
find the probability of getting a total of exactly k dots when we roll n
dice:

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
# dice probability finder, based on Python multiprocessing class

# usage:  python DiceProb.py n k nreps nthreads 
#   where we wish to find the probability of getting a total of k dots
#   when we roll n dice; we'll use nreps total repetitions of the
#   simulation, dividing those repetitions among nthreads threads

import sys
import random
from multiprocessing import Process, Lock, Value

class glbls:  # globals, other than shared
   n = int(sys.argv[1])
   k = int(sys.argv[2])
   nreps = int(sys.argv[3])
   nthreads = int(sys.argv[4])
   thrdlist = []  # list of all instances of this class

def worker(id,tot,totlock):
   mynreps = glbls.nreps/glbls.nthreads
   r = random.Random()  # set up random number generator
   count = 0  # number of times get total of k
   for i in range(mynreps):
      if rolldice(r) == glbls.k:
         count += 1
   totlock.acquire()
   tot.value += count  
   totlock.release()
   # check for load balance
   print 'thread', id, 'exiting; total was', count

def rolldice(r):
   ndots = 0
   for roll in range(glbls.n):
      dots = r.randint(1,6)
      ndots += dots
   return ndots

def main():
   tot = Value('i',0)
   totlock = Lock()
   for i in range(glbls.nthreads):
      pr = Process(target=worker, args=(i,tot,totlock))
      glbls.thrdlist.append(pr)
      pr.start()
   for thrd in glbls.thrdlist: thrd.join()
   # adjust for truncation, in case nthreads doesn't divide nreps evenly
   actualnreps = glbls.nreps/glbls.nthreads * glbls.nthreads
   print 'the probability is',float(tot.value)/actualnreps

if __name__ == '__main__':
    main()
\end{Verbatim}

As in any simulation, the longer one runs it, the better the accuracy is
likely to be.  Here we run the simulation {\bf nreps} times, but divide
those repetitions among the threads.  This is an example of an
``embarrassingly parallel'' application, so we should get a good speedup
(not shown here).

So, how does it work?  The general structure looks similar to that of
the Python {\bf threading} module, using {\bf Process()} to create a create
a thread, {\bf start()} to get it running, {\bf Lock()} to create a
lock, {\bf acquire()} and {\bf release()} to lock and unlock a lock, and
so on.

The main difference, though, is that globals are not automatically
shared.  Instead, shared variables must be created using {\bf Value}
for a scalar and {\bf Array} for an array.  Unlike Python in general,
here one must specify a data type, `i' for integer and `d' for double
(floating-point).  (One can use {\bf Namespace} to create more complex
types, at some cost in performance.) One also specifies the initial
value of the variable.  One must pass these variables explicitly to the
functions to be run by the threads, in our case above the function {\bf
worker()}.  Note carefully that the shared variables are still accessed
syntactically as if they were globals.  

Here's the prime number-finding program from before, now using {\bf
multiprocessing}:

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
#!/usr/bin/env python

# prime number counter, based on Python multiprocessing class

# usage:  python PrimeThreading.py n nthreads 
#   where we wish the count of the number of primes from 2 to n, and to
#   use nthreads to do the work

# uses Sieve of Erathosthenes:  write out all numbers from 2 to n, then
# cross out all the multiples of 2, then of 3, then of 5, etc., up to
# sqrt(n); what's left at the end are the primes

import sys
import math
from multiprocessing import Process, Lock, Array, Value

class glbls:  # globals, other than shared
   n = int(sys.argv[1])
   nthreads = int(sys.argv[2])
   thrdlist = []  # list of all instances of this class

def prmfinder(id,prm,nxtk,nxtklock):
   lim = math.sqrt(glbls.n)
   nk = 0  # count of k's done by this thread, to assess load balance
   while 1:
      # find next value to cross out with
      nxtklock.acquire()
      k = nxtk.value
      nxtk.value = nxtk.value + 1
      nxtklock.release()
      if k > lim: break
      nk += 1  # increment workload data
      if prm[k]:  # now cross out
         r = glbls.n / k
         for i in range(2,r+1):
            prm[i*k] = 0
   print 'thread', id, 'exiting; processed', nk, 'values of k'

def main():
   prime = Array('i',(glbls.n+1) * [1])  # 1 means prime, until find otherwise
   nextk = Value('i',2)  # next value to try crossing out with
   nextklock = Lock()
   for i in range(glbls.nthreads):
      pf = Process(target=prmfinder, args=(i,prime,nextk,nextklock))
      glbls.thrdlist.append(pf)
      pf.start()
   for thrd in glbls.thrdlist: thrd.join()
   print 'there are', reduce(lambda x,y: x+y, prime) - 2, 'primes'

if __name__ == '__main__':
    main()
\end{Verbatim}

The main new item in this example is use of {\bf Array()}.

One can use the {\bf Pool} class to create a set of threads, rather than
doing so ``by hand'' in a loop as above.  You can start them with
various initial values for the threads using {\bf Pool.map()}, which
works similarly to Python's ordinary {\bf map()} function.

The {\bf multiprocessing} documentation warns that shared items may be
costly, and suggests using {\bf Queue} and {\bf Pipe} where
possible.  We will cover the former in the next section.  Note, though,
that in general it's difficult to get much speedup (or difficult even to
avoid slowdown!) with non-``embarrassingly parallel'' applications.

\subsection{The {\tt Queue} Module for Threads and Multiprocessing}
\label{queue}

Threaded applications often have some sort of work queue data structure.
When a thread becomes free, it will pick up work to do from the queue.
When a thread creates a task, it will add that task to the queue.

Clearly one needs to guard the queue with locks.  But Python provides
the {\bf Queue} module to take care of all the lock creation,
locking and unlocking, and so on.  This means we don't have to bother with
it, and the code will probably be faster.

{\bf Queue} is implemented for both {\bf threading} and {\bf
multiprocessing}, in almost identical forms.  This is good, because the
documentation for {\bf multiprocessing} is rather sketchy, so you can
turn to the docs for {\tt threading} for more details.

The function {\bf put()} in Queue adds an element to the end of the
queue, while {\bf get()} will remove the head of the queue, again
without the programmer having to worry about race conditions.  

Note that {\bf get()} will block if the queue is currently empty.
An alternative is to call it with {\bf block=False}, within a {\bf
try/except} construct.  One can also set timeout periods.

Here once again is the prime number example, this time done with {\bf
Queue}:

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
#!/usr/bin/env python

# prime number counter, based on Python multiprocessing class with
# Queue

# usage:  python PrimeThreading.py n nthreads 
#   where we wish the count of the number of primes from 2 to n, and to
#   use nthreads to do the work

# uses Sieve of Erathosthenes:  write out all numbers from 2 to n, then
# cross out all the multiples of 2, then of 3, then of 5, etc., up to
# sqrt(n); what's left at the end are the primes

import sys
import math
from multiprocessing import Process, Array, Queue

class glbls:  # globals, other than shared
   n = int(sys.argv[1])
   nthreads = int(sys.argv[2])
   thrdlist = []  # list of all instances of this class

def prmfinder(id,prm,nxtk):
   nk = 0  # count of k's done by this thread, to assess load balance
   while 1:
      # find next value to cross out with
      try: k = nxtk.get(False)
      except: break
      nk += 1  # increment workload data
      if prm[k]:  # now cross out
         r = glbls.n / k
         for i in range(2,r+1):
            prm[i*k] = 0
   print 'thread', id, 'exiting; processed', nk, 'values of k'

def main():
   prime = Array('i',(glbls.n+1) * [1])  # 1 means prime, until find otherwise
   nextk = Queue()  # next value to try crossing out with
   lim = int(math.sqrt(glbls.n)) + 1  # fill the queue with 2...sqrt(n)
   for i in range(2,lim): nextk.put(i)
   for i in range(glbls.nthreads):
      pf = Process(target=prmfinder, args=(i,prime,nextk))
      glbls.thrdlist.append(pf)
      pf.start()
   for thrd in glbls.thrdlist: thrd.join()
   print 'there are', reduce(lambda x,y: x+y, prime) - 2, 'primes'

if __name__ == '__main__':
    main()
\end{Verbatim}

The way {\bf Queue} is used here is to put all the possible
``crosser-outers,'' obtained in the variable {\bf nextk} in the previous
versions of this code, into a queue at the outset.  One then uses {\tt
get()} to pick up work from the queue.  Look Ma, no locks!

Below is an example of queues in an in-place quicksort.  (Again, the
reader is warned that this is just an example, not claimed to be
efficient.)

The work items in the queue are a bit more involved here.  They have the
form {\bf (i,j,k)}, with the first two elements of this tuple meaning
that the given array chunk corresponds to indices {\bf i} through {\bf
j} of {\bf x}, the original array to be sorted.  In other words,
whichever thread picks up this chunk of work will have the
responsibility of handling that particular section of {\bf x}.

Quicksort, of course, works by repeatedly splitting the original array
into smaller and more numerous chunks.  Here a thread will split its
chunk, taking the lower half for itself to sort, but placing the upper
half into the queue, to be available for other chunks that have not been
assigned any work yet.  I've written the algorithm so that as soon as
all threads have gotten some work to do, no more splitting will occur.
That's where the value of {\bf k} comes in.  It tells us the split
number of this chunk.  If it's equal to {\bf nthreads-1}, this thread
won't split the chunk.

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
# Quicksort and test code, based on Python multiprocessing class and
# Queue

# code is incomplete, as some special cases such as empty subarrays
# need to be accounted for

# usage:  python QSort.py n nthreads 
#   where we wish to test the sort on a random list of n items, 
#   using nthreads to do the work

import sys
import random
from multiprocessing import Process, Array, Queue

class glbls:  # globals, other than shared
   nthreads = int(sys.argv[2])
   thrdlist = []  # list of all instances of this class
   r = random.Random(9876543)

def sortworker(id,x,q):
   chunkinfo = q.get()
   i = chunkinfo[0]
   j = chunkinfo[1]
   k = chunkinfo[2]
   if k < glbls.nthreads - 1:  # need more splitting?
      splitpt = separate(x,i,j)
      q.put((splitpt+1,j,k+1))
      # now, what do I sort?
      rightend = splitpt + 1
   else: rightend = j
   tmp = x[i:(rightend+1)]  # need copy, as Array type has no sort() method
   tmp.sort()
   x[i:(rightend+1)] = tmp

def separate(xc, low, high):  # common algorithm; see Wikipedia
   pivot = xc[low]  # would be better to take, e.g., median of 1st 3 elts
   (xc[low],xc[high]) = (xc[high],xc[low])
   last = low
   for i in range(low,high):
      if xc[i] <= pivot:
         (xc[last],xc[i]) = (xc[i],xc[last])
         last += 1
   (xc[last],xc[high]) = (xc[high],xc[last])
   return last

def main():
   tmp = []
   n = int(sys.argv[1])
   for i in range(n): tmp.append(glbls.r.uniform(0,1))
   x = Array('d',tmp)  
   # work items have form (i,j,k), meaning that the given array chunk
   # corresponds to indices i through j of x, and that this is the kth
   # chunk that has been created, x being the 0th
   q = Queue()  # work queue
   q.put((0,n-1,0))  
   for i in range(glbls.nthreads):
      p = Process(target=sortworker, args=(i,x,q))
      glbls.thrdlist.append(p)
      p.start()
   for thrd in glbls.thrdlist: thrd.join()
   if n < 25: print x[:]

if __name__ == '__main__':
    main()
\end{Verbatim}

\subsection{Debugging Threaded and Multiprocessing Python Programs}

Debugging is always tough with parallel programs, including threads
programs.  It's especially difficult with pre-emptive threads; those
accustomed to debugging non-threads programs find it rather jarring to
see sudden changes of context while single-stepping through code.
Tracking down the cause of deadlocks can be very hard.  (Often just
getting a threads program to end properly is a challenge.)

Another problem which sometimes occurs is that if you issue a ``next''
command in your debugging tool, you may end up inside the internal
threads code.  In such cases, use a ``continue'' command or something
like that to extricate yourself.

Unfortunately, as of April 2010, I know of no debugging tool that works
with {\bf multiprocessing}.  However, one can do well with {\bf thread}
and {\bf threading}.

\section{Using Python with MPI}

({\bf Important note}:  As of April 2010, a much more widely used
Python/MPI interface is MPI4Py.  It works similarly to what is described
here.)

A number of interfaces of Python to MPI have been developed.\footnote{If
you are not familiar with Python, I have a quick tutorial at
\url{http://heather.cs.ucdavis.edu/~matloff/python.html}.}  A well-known
example is pyMPI, developed by a PhD graduate in computer science in
UCD, Patrick Miller.  

One writes one's pyMPI code, say in {\bf x.py}, by calling pyMPI
versions of the usual MPI routines.  To run the code, one then runs MPI
on the program {\bf pyMPI} with {\bf x.py} as a command-line argument.

Python is a very elegant language, and pyMPI does a nice job of
elegantly interfacing to MPI.  Following is a rendition of Quicksort in
pyMPI.  Don't worry if you haven't worked in Python before; the
``non-C-like'' Python constructs are explained in comments at the end
of the code. 

\begin{Verbatim}[fontsize=\relsize{-2},numbers=left]
# a type of quicksort; break array x (actually a Python "list") into 
# p quicksort-style piles, based # on comparison with the first p-1 
# elements of x, where p is the number # of MPI nodes; the nodes sort 
# their piles, then return them to node 0, # which strings them all 
# together into the final sorted array 

import mpi  # load pyMPI module

# makes npls quicksort-style piles
def makepiles(x,npls):
   pivot = x[:npls]  # we'll use the first npls elements of x as pivots,
                     # i.e. we'll compare all other elements of x to these
   pivot.sort()  # sort() is a member function of the Python list class
   pls = []  # initialize piles list to empty
   lp = len(pivot)  # length of the pivot array
   # pls will be a list of lists, with the i-th list in pls storing the
   # i-th pile; the i-th pile will start with ID i (to enable 
   # identification later on) and pivot[i]
   for i in range(lp):  # i = 0,1,...lp-1
      pls.append([i,pivot[i]])  # build up array via append() member function
   pls.append([lp])
   for xi in x[npls:]:  # now place each element in the rest of x into
                        # its proper pile
      for j in range(lp):  # j = 0,1,...,lp-1
         if xi <= pivot[j]:
            pls[j].append(xi)
            break
         elif j == lp-1: pls[lp].append(xi)
   return pls

def main():
   if mpi.rank == 0:  # analog of calling MPI_Rank()
      x = [12,5,13,61,9,6,20,1]  # small test case
      # divide x into piles to be disbursed to the various nodes
      pls = makepiles(x,mpi.size)
   else:  # all other nodes set their x and pls to empty
      x = []
      pls = []
   mychunk = mpi.scatter(pls)  # node 0 (not an explicit argument) disburses 
                               # pls to the nodes, each of which receives 
                               # its chunk in its mychunk
   newchunk = []  # will become sorted version of mychunk
   for pile in mychunk:
      # I need to sort my chunk but most remove the ID first
      plnum = pile.pop(0)  # ID
      pile.sort()
      # restore ID
      newchunk.append([plnum]+pile)  # the + is array concatenation
   # now everyone sends their newchunk lists, which node 0 (again an
   # implied argument) gathers together into haveitall
   haveitall = mpi.gather(newchunk)
   if mpi.rank == 0:
      haveitall.sort()
      # string all the piles together
      sortedx = [z for q in haveitall for z in q[1:]]
      print sortedx  

# common idiom for launching a Python program
if __name__ == '__main__': main()
\end{Verbatim}

Some examples of use of other MPI functions:

\begin{Verbatim}[fontsize=\relsize{-2}]
mpi.send(mesgstring,destnodenumber)
(message,status) = mpi.recv()  # receive from anyone  
print message
(message,status) = mpi.recv(3)  # receive only from node 3
(message,status) = mpi.recv(3,ZMSG)  # receive only message type ZMSG, 
                                     # only from node 3 
(message,status) = mpi.recv(tag=ZMSG)  # receive from anyone, but 
                                       # only message type ZMSG
\end{Verbatim}

\subsection{Using PDB to Debug Threaded Programs}

Using PDB is a bit more complex when threads are involved.  One cannot,
for instance, simply do something like this:

\begin{Verbatim}[fontsize=\relsize{-2}]
pdb.py buggyprog.py
\end{Verbatim}

because the child threads will not inherit the PDB process from the main
thread.  You can still run PDB in the latter, but will not be able to
set breakpoints in threads.

What you can do, though, is invoke PDB from {\it within} the function which is
run by the thread, by calling {\bf pdb.set\_trace()} at one or more
points within the code:

\begin{Verbatim}[fontsize=\relsize{-2}]
import pdb
pdb.set_trace()
\end{Verbatim}

In essence, those become breakpoints. 

For example, in our program {\bf srvr.py} in Section \ref{threadmodex},
we could add a PDB call at the beginning of the loop in {\bf
serveclient()}:

\begin{Verbatim}[fontsize=\relsize{-2}]
while 1:
   import pdb
   pdb.set_trace()
   # receive letter from client, if it is still connected
   k = c.recv(1)
   if k == '': break
\end{Verbatim}

You then run the program directly through the Python interpreter as
usual, NOT through PDB, but then the program suddenly moves into
debugging mode on its own.  At that point, one can then step through the
code using the {\bf n} or {\bf s} commands, query the values of
variables, etc.

PDB's {\bf c} (``continue'') command still works.  Can one still use the
{\bf b} command to set additional breakpoints?  Yes, but it might be
only on a one-time basis, depending on the context.  A breakpoint might
work only once, due to a scope problem.  Leaving the scope where we
invoked PDB causes removal of the trace object.  Thus I suggested
setting up the trace inside the loop above.

Of course, you can get fancier, e.g. setting up ``conditional
breakpoints,'' something like:

\begin{Verbatim}[fontsize=\relsize{-2}]
debugflag = int(sys.argv[1])
...
if debugflag == 1:
   import pdb
   pdb.set_trace()
\end{Verbatim}

Then, the debugger would run only if you asked for it on the command
line.  Or, you could have multiple {\bf debugflag} variables, for
activating/deactivating breakpoints at various places in the code.

Moreover, once you get the {\tt (Pdb)} prompt, you could set/reset
those flags, thus also activating/deactivating breakpoints.

Note that local variables which were set before invoking PDB, including
parameters, are not accessible to PDB.  

Make sure to insert code to maintain an ID number for each thread.  This
really helps when debugging.

\subsection{RPDB2 and Winpdb}

The Winpdb debugger
(\url{www.digitalpeers.com/pythondebugger/}),\footnote{No, it's not just
for Microsoft Windows machines, in spite of the name.} is very good.
Among other things, it can be used to debug threaded code, curses-based
code and so on, which many debuggers can't.  Winpdb is a GUI front end
to the text-based RPDB2, which is in the same package.  I have a
tutorial on both at
\url{http://heather.cs.ucdavis.edu/~matloff/winpdb.html}.

Another very promising debugger that handles threads is PYDB, by Rocky
Bernstein (not to be confused with an earlier debugger of the same
name).  You can obtain it from \url{http://code.google.com/p/pydbgr/} or
the older version at \url{http://bashdb.sourceforge.net/pydb/}.
Invoke it on your code {\bf x.py} by typing

\begin{Verbatim}[fontsize=\relsize{-2}]
$ pydb --threading x.py your_command_line_args_for_x
\end{Verbatim}


