

/*  

    Demonstration of pth and I/O, especially a comparison of ordinary
    UNIX read() and pth_read().

    There will be two I/O threads, pointed to by IOT[0] and IOT[1], 
    to check two sockets, and there will be two threads, pointed to
    by WT[0] and WT[1],to process the sockets.  The remote processes
    (invoked from two other windows) will send a single-digit integer, 
    which the corresponding thread on the server will add 1 and the 
    I/O thread will send it back to the originator.

    Usage:

       svr read_flag port_number

    where read_flag is 1 for read(), 2 for pth_read() (stored in the
    variable ReadFtn), and port_number (stored in the variable Port)
    is any integer above 1024 (if re-run the program soon after a 
    previous run, use a different number, due to temporary 
    persistence of ports).  After starting the server, start two 
    invocations of Clnt.C, with usage

       clnt port_number

    and keep typing in single-digit numbers to them.  The user does
    NOT need to alternate between typing to the two remote processes.

*/


#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/ioctl.h>

#include <pth.h>

// this variable is needed to avoid a situation at the beginning of
// execution, in which an I/O thread posts a signal before a work
// thread has had a chance to post a wait
int FirstWaitPosted = 0;

int ReadFtn;  // see introductory comments above

int Port;  // see introductory comments above

class IOThread  {

   public:

      int ClntDescriptor,  /* socket descriptor to client */
          SrvrDescriptor;  /* socket descriptor for server */
      char Buf;  /* messages from client */
      pth_t ThreadID;

      IOThread(int);  // argument is 0 or 1, for IOT[0] or IOT[1]

};

class WorkThread  {  

   public:

      pth_cond_t WorkThreadCond;
      pth_mutex_t WorkThreadMutex;
      pth_t ThreadID;

      WorkThread(int);
      void SignalWorkThread();
      void WaitWorkThread();

};

IOThread *IOT[2];
WorkThread *WT[2];

void *IOFtn(int WID) 

{  int I,NRead;

   // need to yield first, to avoid a situation in which an I/O 
   // thread starts early, and posts a signal before the work thread
   // has had a chance to post a wait
   if (!FirstWaitPosted) pth_yield(NULL);

   while (1)  {
      IOT[WID]->Buf = 0;
      if (ReadFtn == 1)
         NRead = read(IOT[WID]->ClntDescriptor,&IOT[WID]->Buf,1); 
      else
         NRead = pth_read(IOT[WID]->ClntDescriptor,&IOT[WID]->Buf,1); 
      // note:  NRead will be 0 only in the case of ReadFtn = 1, and
      // even then only if no data has arrived
      if (NRead > 0)  {
         WT[WID]->SignalWorkThread();  // note:  will call pth_yield()
         write(IOT[WID]->ClntDescriptor,&IOT[WID]->Buf,1);
      }
      else pth_yield(NULL);
   }

}

IOThread::IOThread(int WID) 

{  int I,Flag;
   struct sockaddr_in BindInfo;  

   // set up the server socket only once
   if (WID == 0)  {
      if ((this->SrvrDescriptor = socket(AF_INET,SOCK_STREAM,0)) < 0)
         perror(NULL);
      BindInfo.sin_family = AF_INET;
      BindInfo.sin_port = Port;
      BindInfo.sin_addr.s_addr = INADDR_ANY;
      if (bind(this->SrvrDescriptor, (sockaddr *) &BindInfo,sizeof(BindInfo))
          < 0) perror(NULL);
      if (listen(this->SrvrDescriptor,5) < 0) perror(NULL); 
   }
   else this->SrvrDescriptor = IOT[0]->SrvrDescriptor;

   if ((this->ClntDescriptor = accept(this->SrvrDescriptor,0,0)) < 0)
      perror(NULL);
   if (ReadFtn == 1)  {
      // set up nonblocking status
      Flag = 1;
      ioctl(this->ClntDescriptor,FIONBIO,&Flag);
   }
   this->ThreadID = pth_spawn(PTH_ATTR_DEFAULT, 
      (void * (*) (void *)) IOFtn,WID);  
}

void *WorkFtn(int WID) 

{  while (1)  {
      FirstWaitPosted = 1;
      // wait for data ready
      WT[WID]->WaitWorkThread();  
      IOT[WID]->Buf++;
   }
}

WorkThread::WorkThread(int WID) 

{  pth_mutex_init(&this->WorkThreadMutex);
   pth_cond_init(&this->WorkThreadCond);
   this->ThreadID = pth_spawn(PTH_ATTR_DEFAULT,(void * (*)(void *)) WorkFtn, 
      (void *) WID);  
}

void WorkThread::SignalWorkThread()

{
   pth_mutex_acquire(&this->WorkThreadMutex,FALSE,NULL);
   pth_cond_notify(&this->WorkThreadCond,FALSE);
   pth_mutex_release(&this->WorkThreadMutex);
   pth_yield(NULL);
}

void WorkThread::WaitWorkThread()

{  pth_mutex_acquire(&this->WorkThreadMutex,FALSE,NULL);
   pth_cond_await(&this->WorkThreadCond,&this->WorkThreadMutex,NULL);
   pth_mutex_release(&this->WorkThreadMutex);
}

main(int argc, char **argv)

{  int I;

   pth_init();
   ReadFtn = atoi(argv[1]);
   Port = atoi(argv[2]);
   for (I = 0; I < 2; I++)  {
      IOT[I] = new IOThread(I);
      WT[I] = new WorkThread(I);
   }
   while (1) pth_yield(NULL);
}

      

