/* A general use of pthreads to implement a workpile.  A workpile holds work
 * that needs to be done, and all available threads monitor the workpile and do
 * any work that comes in.  There is a work_put routine to put work in the
 * workpile, but there is no work_get routine since the workpile is implicitly
 * monitored.  Note that the workpile is general, and can be used by an program
 * that uses pthreads.
 * 
 * All code here comes from the book "Programming With Threads" by Steve
 * Kleiman (srk@netapp.com), Devang Shah (devang.shah@eng.sun.com), and
 * Bart Smaalders (bart.smaalders@eng.sun.com).  Check out the official book
 * description at http://www.sun.com/books/catalog/kleiman  -MW */

#include <stdlib.h>
#include <assert.h>
#include <pthread.h>

/* Workpile controller */
typedef void (*work_proc_t)(void *);

typedef struct workpile_struct {
	pthread_mutex_t lock;		/* mutex for this structure */
	pthread_cond_t work_wait;	/* workers waiting for work */
	pthread_cond_t finish_wait;	/* to wait for workers to finish */
	int max_pile;			/* length of workpile array */
	work_proc_t worker_proc;	/* work procedure */
	int n_working;			/* number of workers working */
	int n_waiting;			/* number of workers waiting for work */
	int n_pile;			/* number of pointers in the workpile */
	int inp;			/* FIFO input pointer */
	int outp;			/* FIFO output pointer */
	void *pile[1];			/* array of pointers - the workpile */
} *workpile_t;

/*
 * Worker thread routine. Continuously looks for work, calls the
 * worker_proc associated with the workpile to do work.
 */
static void
worker(workpile_t wp)
{
	void *ptr;

	/* Lock up the critical section so no one else can touch the queue while
	 * this thread is checking it to see if it has any work in it. -MW */
	pthread_mutex_lock(&wp->lock);
	wp->n_working++;

	for (;;) {
		/* Wait for new work to do -MW */
		while (wp->n_pile == 0) {
			/* If this thread is the last one working, wake up
			 * anyone blocked on finish_wait -MW */
			if (--wp->n_working == 0)
				pthread_cond_signal(&wp->finish_wait);

			/* Mark yourself as waiting, and block on work_wait.  The
			 * pthread_cond_wait has an implicit unlock of the mutex
			 * variable, so we don't have to worry about doing that.
			 * Also, if woken, the thread will reqcquire the lock
			 * before it continues on in the code -MW */
			wp->n_waiting++;
			pthread_cond_wait(&wp->work_wait, &wp->lock);

			/* When the thread wakes up, it must make the
			 * appropriate changes.  If there is work to do, it will			 * leave the while loop.  Otherwise, it will just
			 * block on work_wait again. -MW */
			wp->n_waiting--;
			wp->n_working++;
		}

		/* Grab the next thing from the pile and update the pointers. The
		 * thread is guaranteed to have the lock at this point, so we
		 * don't have to worry about race conditions. -MW */
		wp->n_pile--;
		ptr = wp->pile[wp->outp];

		/* It is a circular queue of work, so we have to use the mod (%)
		 * function to assure that we wrap around the array. -MW */
		wp->outp = (wp->outp + 1) % wp->max_pile;

		/* Unlock the mutex so another thread may proceed to the work queue */
		pthread_mutex_unlock(&wp->lock);

		/* Call application worker routine. */
		(*wp->worker_proc)(ptr);

		/* The thread must lock the mutex again so it can check to see if the
		 * workpile has anything in it.  If we didn't lock the mutex here, we
		 * would run into race conditions because some other thread could be
		 * altering the work queue (a few lines above) -MW */
		pthread_mutex_lock(&wp->lock);
	}

	/* NOT REACHED */
}

/*
 * Allocates and initializes a workpile that holds max_pile entries.
 * worker_proc is called to process each work item on the queue.
 */
workpile_t
work_init(int max_pile, work_proc_t worker_proc, int n_threads)
{
	int err;
	pthread_t t;

	/* Create a new workpile_t to hold all data items of interest to the
	 * program that initiated the workpile. -MW */
	workpile_t wp = (workpile_t)
		malloc(sizeof (struct workpile_struct) +
			(max_pile * sizeof (void *)));

	/* If malloc didn't fail, do the following: -MW */
	if (wp != NULL) {
		/* Initialize the mutex and 2 conditions.  The first argument is the
		 * name of the mutex/condition.  The second argument is NULL (uses all
		 * default values). -MW */
		pthread_mutex_init(&wp->lock, NULL);
		pthread_cond_init(&wp->work_wait, NULL);
		pthread_cond_init(&wp->finish_wait, NULL);

		/* Set up all variables to initial, meaningful values. -MW */
		wp->max_pile = max_pile;
		wp->worker_proc = worker_proc;
		wp->n_working = wp->n_waiting = wp->n_pile = 0;
		wp->inp = wp->outp = 0;

		/* Create the number of threads passed in. -MW */
		while (n_threads--) {
			/* The pthread_create function has 4 arguments:
			 * 1) The name of the pthread,
			 * 2) The attributes of the pthread (NULL ==> default)
			 * 3) The function that this thread operates on, and
			 * 4) The arguments to the thread-function  -MW */
			err = pthread_create(&t, NULL,
				(void *(*)(void *))worker, (void *)wp);
			assert(err == 0);
		}
	}

	return (wp);
}

/* Puts ptr in workpile. Called at the outset, or within a worker. */
void
work_put(workpile_t wp, void *ptr)
{
	/* Lock the mutex because we are about to modify the workpile.  We need
	 * exclusive access to the workpile, or else there will be race conditions.
	 * -MW */
	pthread_mutex_lock(&wp->lock);

	/* If anyone is waiting on the workpile, send a preemptive signal to them.
	 * They will not be able to leave their "wait" state until we unlock the
	 * mutex, so it is okay to send the signal at this point. -MW */
	if (wp->n_waiting)
		pthread_cond_signal(&wp->work_wait);

	/* Make sure that there is enough room in the workpile. -MW */
	assert(wp->n_pile != wp->max_pile);

	/* Add the new item to the workpile, and update the appropriate variables.
	 * Don't forget to use the mod (%) function when updating the in-pointer,
	 * because the workpile is a circular queue. -MW */
	wp->n_pile++;
	wp->pile[wp->inp] = ptr;
	wp->inp = (wp->inp + 1) % wp->max_pile;

	/* Unlock the mutex and allow anyone we woke up with our earlier signal (to
	 * work_wait) to start working. -MW */
	pthread_mutex_unlock(&wp->lock);
}

/* Wait until all work is done and workers quiesce. */
void
work_wait(workpile_t wp)
{
	/* Lock the mutex because we are checking values of the workpile, so it
	 * would be bad if they changed between our check and our action (i.e. it
	 * is a critical sectione). -MW */
	pthread_mutex_lock(&wp->lock);

	/* Just block if you are at this point, but there is still work to do (or
	 * some other thread is still working). A pthread_cond_wait has an implicit
	 * unlock of the mutex, so we don't have to worry about unlocking it.
	 * Also, if woken, the thread will reqacquire the mutex before it
	 * continuing on in the code. -MW */
	while(wp->n_pile !=0 || wp->n_working != 0)
		pthread_cond_wait(&wp->finish_wait, &wp->lock);

	/* Unlock the mutex to allow another thread to continue. -MW */
	pthread_mutex_unlock(&wp->lock);
}

