/* A general use of pthreads to implement a workpile. A workpile holds work * that needs to be done, and all available threads monitor the workpile and do * any work that comes in. There is a work_put routine to put work in the * workpile, but there is no work_get routine since the workpile is implicitly * monitored. Note that the workpile is general, and can be used by an program * that uses pthreads. * * All code here comes from the book "Programming With Threads" by Steve * Kleiman (srk@netapp.com), Devang Shah (devang.shah@eng.sun.com), and * Bart Smaalders (bart.smaalders@eng.sun.com). Check out the official book * description at http://www.sun.com/books/catalog/kleiman -MW */ #include #include #include /* Workpile controller */ typedef void (*work_proc_t)(void *); typedef struct workpile_struct { pthread_mutex_t lock; /* mutex for this structure */ pthread_cond_t work_wait; /* workers waiting for work */ pthread_cond_t finish_wait; /* to wait for workers to finish */ int max_pile; /* length of workpile array */ work_proc_t worker_proc; /* work procedure */ int n_working; /* number of workers working */ int n_waiting; /* number of workers waiting for work */ int n_pile; /* number of pointers in the workpile */ int inp; /* FIFO input pointer */ int outp; /* FIFO output pointer */ void *pile[1]; /* array of pointers - the workpile */ } *workpile_t; /* * Worker thread routine. Continuously looks for work, calls the * worker_proc associated with the workpile to do work. */ static void worker(workpile_t wp) { void *ptr; /* Lock up the critical section so no one else can touch the queue while * this thread is checking it to see if it has any work in it. -MW */ pthread_mutex_lock(&wp->lock); wp->n_working++; for (;;) { /* Wait for new work to do -MW */ while (wp->n_pile == 0) { /* If this thread is the last one working, wake up * anyone blocked on finish_wait -MW */ if (--wp->n_working == 0) pthread_cond_signal(&wp->finish_wait); /* Mark yourself as waiting, and block on work_wait. The * pthread_cond_wait has an implicit unlock of the mutex * variable, so we don't have to worry about doing that. * Also, if woken, the thread will reqcquire the lock * before it continues on in the code -MW */ wp->n_waiting++; pthread_cond_wait(&wp->work_wait, &wp->lock); /* When the thread wakes up, it must make the * appropriate changes. If there is work to do, it will * leave the while loop. Otherwise, it will just * block on work_wait again. -MW */ wp->n_waiting--; wp->n_working++; } /* Grab the next thing from the pile and update the pointers. The * thread is guaranteed to have the lock at this point, so we * don't have to worry about race conditions. -MW */ wp->n_pile--; ptr = wp->pile[wp->outp]; /* It is a circular queue of work, so we have to use the mod (%) * function to assure that we wrap around the array. -MW */ wp->outp = (wp->outp + 1) % wp->max_pile; /* Unlock the mutex so another thread may proceed to the work queue */ pthread_mutex_unlock(&wp->lock); /* Call application worker routine. */ (*wp->worker_proc)(ptr); /* The thread must lock the mutex again so it can check to see if the * workpile has anything in it. If we didn't lock the mutex here, we * would run into race conditions because some other thread could be * altering the work queue (a few lines above) -MW */ pthread_mutex_lock(&wp->lock); } /* NOT REACHED */ } /* * Allocates and initializes a workpile that holds max_pile entries. * worker_proc is called to process each work item on the queue. */ workpile_t work_init(int max_pile, work_proc_t worker_proc, int n_threads) { int err; pthread_t t; /* Create a new workpile_t to hold all data items of interest to the * program that initiated the workpile. -MW */ workpile_t wp = (workpile_t) malloc(sizeof (struct workpile_struct) + (max_pile * sizeof (void *))); /* If malloc didn't fail, do the following: -MW */ if (wp != NULL) { /* Initialize the mutex and 2 conditions. The first argument is the * name of the mutex/condition. The second argument is NULL (uses all * default values). -MW */ pthread_mutex_init(&wp->lock, NULL); pthread_cond_init(&wp->work_wait, NULL); pthread_cond_init(&wp->finish_wait, NULL); /* Set up all variables to initial, meaningful values. -MW */ wp->max_pile = max_pile; wp->worker_proc = worker_proc; wp->n_working = wp->n_waiting = wp->n_pile = 0; wp->inp = wp->outp = 0; /* Create the number of threads passed in. -MW */ while (n_threads--) { /* The pthread_create function has 4 arguments: * 1) The name of the pthread, * 2) The attributes of the pthread (NULL ==> default) * 3) The function that this thread operates on, and * 4) The arguments to the thread-function -MW */ err = pthread_create(&t, NULL, (void *(*)(void *))worker, (void *)wp); assert(err == 0); } } return (wp); } /* Puts ptr in workpile. Called at the outset, or within a worker. */ void work_put(workpile_t wp, void *ptr) { /* Lock the mutex because we are about to modify the workpile. We need * exclusive access to the workpile, or else there will be race conditions. * -MW */ pthread_mutex_lock(&wp->lock); /* If anyone is waiting on the workpile, send a preemptive signal to them. * They will not be able to leave their "wait" state until we unlock the * mutex, so it is okay to send the signal at this point. -MW */ if (wp->n_waiting) pthread_cond_signal(&wp->work_wait); /* Make sure that there is enough room in the workpile. -MW */ assert(wp->n_pile != wp->max_pile); /* Add the new item to the workpile, and update the appropriate variables. * Don't forget to use the mod (%) function when updating the in-pointer, * because the workpile is a circular queue. -MW */ wp->n_pile++; wp->pile[wp->inp] = ptr; wp->inp = (wp->inp + 1) % wp->max_pile; /* Unlock the mutex and allow anyone we woke up with our earlier signal (to * work_wait) to start working. -MW */ pthread_mutex_unlock(&wp->lock); } /* Wait until all work is done and workers quiesce. */ void work_wait(workpile_t wp) { /* Lock the mutex because we are checking values of the workpile, so it * would be bad if they changed between our check and our action (i.e. it * is a critical sectione). -MW */ pthread_mutex_lock(&wp->lock); /* Just block if you are at this point, but there is still work to do (or * some other thread is still working). A pthread_cond_wait has an implicit * unlock of the mutex, so we don't have to worry about unlocking it. * Also, if woken, the thread will reqacquire the mutex before it * continuing on in the code. -MW */ while(wp->n_pile !=0 || wp->n_working != 0) pthread_cond_wait(&wp->finish_wait, &wp->lock); /* Unlock the mutex to allow another thread to continue. -MW */ pthread_mutex_unlock(&wp->lock); }