#include <iostream.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <sys/ioctl.h>
#include "UCTInclude.h"
#include "UCTGlobal.h"
#include "UCTTupleMgmt.h"
#include "UCTPacketMgmt.h"

// for SPARCs, uncomment the following line
// #include <sys/filio.h>
#include <sys/filio.h>

/* function prototypes */
void ProcessMsg(int, int);

/* variables global to manager */
/* DoneList is an array of size equal to the number of worker nodes, which
   stores a 0 in the workers index if the worker is active, and 1 if retired. */

  int* DoneList;
  int WorkerCount;
  Tuple * InTuple,	//stores incoming data in form of Tuple
        * OutTuple;	//stores outgoing data in form of Tuple

/* PendingList is an array of size equal to the number of worker nodes, which
   stores and processes pending "in" and "rd" calls */
  Tuple ** PendingList;

  char outArgs[TMEXEC_RTN_SIZE];

/* Manager Functions */
/* UCTupleManager() is called by UCTInit() to initialize the Tuple
   Manager.  It will set up non-blocking socket connections with
   each worker and then enter a main loop which checks for worker
   messages and processes them. */

void UCTupleManager() {
  int * ClntDescriptor,	/* socket descriptor to client */
      SrvrDescriptor;	/* socket descriptor for server */
  struct sockaddr_in BindInfo;
  int NBytes, Count=0, Flag=1;

  DoneList = new int[UCTNWorkNodes];
  memset(DoneList, 0, (sizeof(int) * UCTNWorkNodes));

  WorkerCount = UCTNWorkNodes;

  PendingList = new Tuple* [UCTNWorkNodes];
  for (int i=0; i < UCTNWorkNodes; i++)
    PendingList[i] = NULL;

  if ((SrvrDescriptor = socket(AF_INET,SOCK_STREAM,0)) < 0)
    Error("Failure creating socket at server.\n");

  /* make the socket nonblocking */
//  ioctl(SrvrDescriptor,FIONBIO,&Flag); 
  BindInfo.sin_family = AF_INET;
  BindInfo.sin_port = UCTPortNum;
  BindInfo.sin_addr.s_addr = INADDR_ANY;
  if (bind(SrvrDescriptor,(struct sockaddr *) &BindInfo,sizeof(BindInfo)) < 0) {
    close(SrvrDescriptor);
    Error("Failure binding server socket.\n");
  }

  if (listen(SrvrDescriptor, UCTNWorkNodes+1) < 0)
    Error("Failure to listen at server.\n");

  /* first set up the client sockets */
  ClntDescriptor = new int[UCTNWorkNodes];
  for (int j=0; j < UCTNWorkNodes; j++)  {
    while (1)  {
       ClntDescriptor[j] = accept(SrvrDescriptor,0,0);
       if (ClntDescriptor[j] > 0) {
         ioctl(ClntDescriptor[j], FIONBIO, &Flag); 
         break;
       }
    }
  }

  Flag = 0;	// Used to test if there was any activity this loop

  /* Main Manager Loop.  Loop through each active worker to see if they
     have a message for us.  After the last worker has been tested, run
     through the pending list to try and process pending "in" and "rd"
     calls.  Terminate loop when WorkerCount = 0, indicating that all workers
     have terminated. */

  while (WorkerCount) {
    if (!DoneList[Count]) {
      /* read header, which is always two ints long */
      NBytes = read(ClntDescriptor[Count], HeaderBuffer, headersize);

      /* if bytes were there to be read then process them */
      if (NBytes > 0) {
        ReadData(ClntDescriptor[Count], HeaderBuffer + NBytes, headersize - NBytes);
        Flag = 1;	//tell PendingList that activity has occurred this cycle

        /* ProcessMsg() will decode the incoming data into a UCTuple and
           then process the message.  It will also take care of returning
           data to the worker (except, of course, with out() operation. */
        ProcessMsg(ClntDescriptor[Count], Count);
      }
    }

    /* if true, we've cycled through all workers, so see if we can process
       anyone in the pending list */
    if (++Count == UCTNWorkNodes) {
      if (Flag) UpdatePendingList(PendingList, ClntDescriptor);
      Count = 0; Flag = 0;
    }
  }

  /* close the worker descriptors */
  for (int m=0; m < UCTNWorkNodes; m++)
    close (ClntDescriptor[m]);
}



/* ProcessMsg() is called by the manager to decode and process an incoming
   message. */

void ProcessMsg(int client, int count) {
  void (*fp)(char*, int, char*, int*);
  int argSize, outLen;
  char* Args;

  /* Decode the incoming packet and build a tuple out of it. */
  InTuple = DecodePacketSrvr(client);

  /* Handle tm_exec() calls. */
  if (InTuple->op == EXEC) {
    memcpy(&fp, InTuple->data, addrsize);	//copy the function pointer
    argSize = InTuple->datasize - addrsize; //calculate the argument size

    Args = new char[argSize];
    memcpy(Args, InTuple->data + addrsize, argSize); //get the function args

    fp(Args, argSize, outArgs, &outLen); //call function

    char * buffer = new char[outLen + intsize];   
    //copy return the output arguments
    memcpy(buffer, &outLen, intsize);
    memcpy(buffer+intsize, &outArgs, outLen);
    WriteData(client, buffer, outLen+intsize);

    delete [] Args;
    delete [] buffer;
    return;
  }

  /* For type OUT, insert this new tuple into the library.  Also, test if this
     was a termination msg "UCTEnd", and if so, decrement WorkerCount. */
  if (InTuple->op == OUT) {
    library[InTuple->keysize].Insert(InTuple);
    if (TestForEndMsg(InTuple)) {
      DoneList[count] = 1;
      WorkerCount--;
    }

    /* Send a receipt confirmation to the worker. */
    WriteData(client, "go", 3);
    return;
  }

  /* Otherwise, the type is IN or RD.  Find a matching UCTuple and
     send relevant data to the calling worker. If no match exists, the
     call to FindTuple will return NULL. */
  
  OutTuple = library[InTuple->keysize].FindTuple(InTuple, InTuple->op);

  /* If OutTuple is not NULL, we have data to return to the caller.
     Encode this data and ship it off. If it is NULL, place the InTuple
     in the pending queue for this worker, unless the original call was
     out(). */
  
  if (OutTuple)
    EncodePacketSrvr(client, InTuple, OutTuple);
  else PendingList[count] = InTuple;
}

void LocalProcessMsg(Tuple* t) {
  int size, offset_addr, offset_data;
  void * addr;

  if (t->op == OUT) {
    library[t->keysize].Insert(t);
    return;
  }

  OutTuple = library[t->keysize].FindTuple(t, t->op);

  if (!OutTuple) Error("All calls must not block within user-specified function - (What a jackass!)\n");

  for (int j=1; j < t->keysize; j++)
    if (t->key[j] == 'p') {
      offset_addr = t->info[j][2];
      offset_data = OutTuple->info[j][2];
      size = OutTuple->info[j][1];

      memcpy(&addr, t->data + offset_addr, addrsize);
    
      memcpy(addr, OutTuple->data + offset_data, size);
    }

  if (t->op == IN) delete OutTuple;
  delete t;
}
