#ifndef lint
static char     sccsid[] = "@(#)tmanager.c";
static char     Creator[] = "C 1991 G. Sxoinas csi.forth.gr \n";
static char     Mail[] = "sxoinas@ariadne.bitnet or sxoinas@csi.forth.gr";
#endif

/*
 * This file is part of the POSYBL (PrOgramming SYstem for distriButed
 * appLications, a free Linda implementation for Unix Networks) and contains
 * the the code of the Tuple Manager.
 * 
 * There are no restrictions on this code; however, if you make any changes, I
 * request that you document them so that I do not get credit or blame for your
 * modifications.
 * 
 * Written by Ioannis Schoinas (sxoinas@csd.uch.gr or sxoinas@csi.forth.gr),
 * Computer Science Department, University of Crete, Heraclion, Crete, Greece.
 * 
 * Note: Linda is a trademark of SCA, Inc.
 * 
 */

#include    "tmanager.h"

#ifdef STATS
static void
CopyStats(st)			/* copies the statistic information kept to a
				 * stats_message */
  register STATS_MESSAGE *st;
{
  gethostname(st->node, 32);
  st->LindaOutCalls = LindaOutCalls;
  st->LindaInCalls = LindaInCalls;
  st->LindaReadCalls = LindaReadCalls;
  st->LindaTemplateCalls = LindaTemplateCalls;
  st->Reset = Reset;
  st->meanT = (double) Tstat.sum / (double) Tstat.cnt;
  st->meanL = (double) Lstat.sum / (double) Lstat.cnt;
  st->meanO = (double) Ostat.sum / (double) Ostat.cnt;
}
#endif

static
int
FindTables(uid, rungroup, tspace, lbuffer, obuffer, create)	/* finds the table for a
								 * uid, rungroup pair */
  register int    uid, rungroup;
  register TABLE_OBJECT **tspace, **lbuffer, **obuffer;	/* here returns their
							 * addresses */
  register int    create;	/* if TRUE the tables are created, if they
				 * don't exist */
{
  register ENTRY *ptr;
  static int      info[5];
  static ENTRY    ent =		/* it must be as faster as possible, so with
				 * this tricks I save a few assignments */
  {
    (char *) info,
    2 * sizeof(int),
    3 * sizeof(int),
    NULL
  };

  static ENTRY    new =
  {
    NULL,
    2 * sizeof(int),
    3 * sizeof(int),
    NULL
  };

  info[0] = uid;
  info[1] = rungroup;
  ptr = my_hsearch(Users, &ent, FIND, UserMatch);
  if (create)
  {
    if (ptr == NULL)
    {
      new.buffer = (char *) malloc(5 * sizeof(int));
      ((int *) (new.buffer))[0] = uid;
      ((int *) (new.buffer))[1] = rungroup;
      ((int *) (new.buffer))[2] = (int) my_hcreate(TUPLE_SPACE_SIZE);
      ((int *) (new.buffer))[3] = (int) my_hcreate(LOCAL_TEMPLATES_BUFFER_SIZE);
      ((int *) (new.buffer))[4] = (int) my_hcreate(OTHER_TEMPLATES_BUFFER_SIZE);
      ptr = my_hsearch(Users, &new, ENTER, NULL);
    }
    if (tspace != NULL)
      *tspace = (TABLE_OBJECT *) (((int *) ptr->buffer)[2]);
    if (lbuffer != NULL)
      *lbuffer = (TABLE_OBJECT *) (((int *) ptr->buffer)[3]);
    if (obuffer != NULL)
      *obuffer = (TABLE_OBJECT *) (((int *) ptr->buffer)[4]);
    return (TRUE);
  }
  else
  {
    if (ptr != NULL)
    {
      if (tspace != NULL)
	*tspace = (TABLE_OBJECT *) (((int *) ptr->buffer)[2]);
      if (lbuffer != NULL)
	*lbuffer = (TABLE_OBJECT *) (((int *) ptr->buffer)[3]);
      if (obuffer != NULL)
	*obuffer = (TABLE_OBJECT *) (((int *) ptr->buffer)[4]);
      return (TRUE);
    }
    else
      return (FALSE);
  }
}

static void
TupleMsg2Entry(tuple, ent)	/* Copies the data from a tuple_message to an
				 * ENTRY */
  register TUPLE_MESSAGE *tuple;
  register ENTRY *ent;
{
  register char  *data = (char *) tuple->message_data;

  ent->keybytes = tuple->header.keybytes;
  ent->databytes = tuple->header.databytes;
  ent->buffer = (char *) malloc(ent->keybytes + ent->databytes);
  ent->rest = NULL;
  copybytes(ent->buffer, data, ent->keybytes);
  copybytes(ent->buffer + ent->keybytes, data + ent->keybytes, ent->databytes);
}

static void
TemplateMsg2Entry(template, ent)/* Copies the data from a template_message to
				 * an ENTRY */
  register TEMPLATE_MESSAGE *template;
  register ENTRY *ent;
{
  register char  *data = (char *) template->message_data;

  ent->keybytes = template->header.keybytes;
  ent->databytes = template->header.databytes;
  ent->buffer = (char *) malloc(ent->keybytes + ent->databytes);
  ent->rest = NULL;
  copybytes(ent->buffer, data, ent->keybytes);
  copybytes(ent->buffer + ent->keybytes, data + ent->keybytes, ent->databytes);
}

static void
Entry2TupleMsg(ent, tuple, uid, rungroup)
  register ENTRY *ent;
  register TUPLE_MESSAGE *tuple;
  register int    uid, rungroup;
{
  register char  *ptr = (char *) tuple->message_data;

  tuple->header.uid = uid;
  tuple->header.rungroup = rungroup;
  tuple->header.keybytes = ent->keybytes;
  tuple->header.databytes = ent->databytes;
  copybytes(ptr, ent->buffer, ent->keybytes + ent->databytes);
}

static void
Entry2TemplateMsg(ent, template)
  register ENTRY *ent;
  register TEMPLATE_MESSAGE *template;
{
  register char  *ptr = (char *) template->message_data;

  template->header.pid = ent->rest->pid;
  template->header.uid = ent->rest->uid;
  template->header.rungroup = ent->rest->rungroup;
  template->header.keybytes = ent->keybytes;
  template->header.databytes = ent->databytes;
  copybytes(ptr, ent->buffer, ent->keybytes + ent->databytes);
}

static void
SystemStats(from, msg, len, sysmsg)
  struct sockaddr_in *from;
  Rpc_Message     msg;
  int             len;
  register SYSTEM_MESSAGE *sysmsg;
{
  STATS_MESSAGE   st;

#ifdef STATS
  CopyStats(&st);
#endif
  sprintf(st.message, "");
  Rpc_Return(msg, sizeof(STATS_MESSAGE), &st, FALSE);
}

static void
SystemExit(from, msg, len, sysmsg)
  struct sockaddr_in *from;
  Rpc_Message     msg;
  int             len;
  register SYSTEM_MESSAGE *sysmsg;
{
  STATS_MESSAGE   st;

#ifdef STATS
  CopyStats(&st);
#endif
  if (sysmsg->uid != Uid)
  {
    sprintf(st.message, "Running under uid %d: You are not my owner", Uid);
    Rpc_Return(msg, sizeof(STATS_MESSAGE), &st, FALSE);
  }
  else
  {
#ifdef DEBUG
    printf("Tmanager: Exiting...\n\n");
#endif
    sprintf(st.message, "Exiting...");
    Rpc_Return(msg, sizeof(STATS_MESSAGE), &st, FALSE);
    if (access(FindLocalName(SOCKNAME_LOCAL), F_OK) == 0)
      unlink(FindLocalName(SOCKNAME_LOCAL));
    close(OutSocket);
    close(TcpCallsSocket);
    close(UdpCallsSocket);
    close(LocalCallsSocket);
    exit(2);
  }
}

static void
SystemReset(from, msg, len, sysmsg)
  struct sockaddr_in *from;
  Rpc_Message     msg;
  int             len;
  register SYSTEM_MESSAGE *sysmsg;
{
  STATS_MESSAGE   st;
  TABLE_OBJECT   *TupleSpace, *LocalsBuffer, *OthersBuffer;
  ENTRY          *ptr;
  static int      info[2];
  static ENTRY    ent =
  {
    (char *) info,
    2 * sizeof(int),
    0,
    NULL
  };

#ifdef DEBUG
  printf("Tmanager:Reseting for uid:%d, rungroup:%d\n",
	 sysmsg->uid, sysmsg->rungroup);
#endif
#ifdef STATS
  CopyStats(&st);
#endif
  if (FindTables(sysmsg->uid, sysmsg->rungroup,
		 &TupleSpace, &LocalsBuffer, &OthersBuffer, FALSE))
  {
    my_hdestroy(TupleSpace);
    my_hdestroy(LocalsBuffer);
    my_hdestroy(OthersBuffer);
    info[0] = sysmsg->uid;
    info[1] = sysmsg->rungroup;
    ptr = my_hsearch(Users, &ent, REMOVE, UserMatch);
    free(ptr->buffer);
    free(ptr);
    sprintf(st.message, "Reseting user (uid %d - rungroup %d)",
	    sysmsg->uid, sysmsg->rungroup);
  }
  else
    sprintf(st.message, "User (Uid %d - Rungroup %d) is not registered",
	    sysmsg->uid, sysmsg->rungroup);

  Rpc_Return(msg, sizeof(STATS_MESSAGE), &st, FALSE);
}

static void
SysGetPort(from, msg, len, data)
  register struct sockaddr_x *from;
  Rpc_Message     msg;
  int             len;
  char           *data;
{
  Rpc_Return(msg, sizeof(int), &UdpPort, FALSE);
}

static int
CheckLocalTemplates(table, ent)
  register TABLE_OBJECT *table;
  register ENTRY *ent;
{
  register ENTRY *buffer;
  register int    retval;
  register Rpc_Stat error;
  register RESPONSE_MESSAGE *response;

  buffer = my_hsearch(table, ent, REMOVE, TupleMatch);
  if (buffer == NULL)
    return (FALSE);

#ifdef DEBUG
  printf("Sending to process %d\n", buffer->rest->pid);
#endif
#ifdef STATS
  statreport(&Lstat, --Locals);
#endif
  if ((error = Rpc_Return(buffer->rest->msg, ent->databytes,
			  ent->buffer + ent->keybytes, FALSE)) != RPC_SUCCESS)
  {
    fprintf(stderr, "CheckLocalTemplates: Cannot send tuple to pid %d: %s\n",
	    buffer->rest->pid, Rpc_ErrorMessage(error));
    retval = FALSE;
  }
  else
    retval = (buffer->rest->proc == LINDA_IN);

  free(buffer->rest->msg);
  Rpc_EventDelete(buffer->rest->event);
  free(buffer->buffer);
  free(buffer->rest);
  free(buffer);

  if (!retval)
    retval = retval || CheckLocalTemplates(table, ent);
  else
    free(ent->buffer);
  return retval;
}

static int
CheckOthersTemplates(table, ent)
  register TABLE_OBJECT *table;
  register ENTRY *ent;
{
  register TUPLE_MESSAGE *tuplemsg;
  register int    msglen;
  register Rpc_Stat error;
  struct sockaddr_in sin;
  register ENTRY *buffer;

  buffer = my_hsearch(table, ent, REMOVE, TupleMatch);
  if (buffer == NULL)
    return (FALSE);

#ifdef STATS
  statreport(&Ostat, --Others);
#endif

  msglen = sizeof(TUPLE_HEADER) + ent->keybytes + ent->databytes;
  tuplemsg = (TUPLE_MESSAGE *) malloc(msglen);
  Entry2TupleMsg(ent, tuplemsg, buffer->rest->uid, buffer->rest->rungroup);

  Rpc_EventDelete(buffer->rest->event);
  sin.sin_addr.s_addr = buffer->rest->node.s_addr;
  free(buffer->rest);
  free(buffer->buffer);
  free(buffer);
  sin.sin_family = AF_INET;

#ifdef DEBUG
  printf("CheckOthersTemplates: Sending data to %s\n", inet_ntoa(sin.sin_addr));
#endif

  if (msglen <= MAXTEMLEN * RPC_ALIGN)
  {
    sin.sin_port = htons(UdpPort);
    if ((error = Rpc_Call(OutSocket, &sin, (Rpc_Proc) LINDA_NETOUT, msglen,
			  tuplemsg, 0, NULL, LINDA_NRETRY, &CallTimeOut))
	!= RPC_SUCCESS)
    {
      fprintf(stderr, "CheckOthersTemplates: Cannot send to %s, %s\n",
	      inet_ntoa(sin.sin_addr), Rpc_ErrorMessage(error));
      free(tuplemsg);
      return (FALSE);
    }
  }
  else
  {
    register int    OutSocket = Rpc_TcpCreate(False, 0);

    sin.sin_port = htons(TcpPort);
    if ((error = Rpc_Call(OutSocket, &sin, (Rpc_Proc) LINDA_NETOUT, msglen,
			  tuplemsg, 0, NULL, LINDA_NRETRY, &CallTimeOut))
	!= RPC_SUCCESS)
    {
      fprintf(stderr, "CheckOthersTemplates: Cannot send to %s, %s\n",
	      inet_ntoa(sin.sin_addr), Rpc_ErrorMessage(error));
      close(OutSocket);
      free(tuplemsg);
      return (FALSE);
    }
    free(tuplemsg);
    close(OutSocket);
  }
  free(ent->buffer);
  return TRUE;
}

static void
TemplatePurge(entry, event)
  register ENTRY *entry;
  register Rpc_Event event;
{
  register ENTRY *ptr;

#ifdef STATS
  statreport(&Ostat, --Others);
#endif
  ptr = my_hsearch(entry->rest->table, entry, REMOVE, EntryMatch);
  free(ptr->buffer);
  free(ptr->rest);
  free(ptr);
  Rpc_EventDelete(event);
}

static
int
Null()
{
  return TRUE;
}

static void
TemplateSend(entry, event)
  register ENTRY *entry;
  register Rpc_Event event;
{
  register int    i, msglen;
  register Rpc_Stat error;
  TEMPLATE_MESSAGE *template;
  static struct timeval TimeOut = {1, 0};	/* Its value doesn't matter
						 * since we just have the probe
						 * flag to be TRUE */

  msglen = entry->keybytes + entry->databytes + sizeof(TEMPLATE_HEADER);
  template = (TEMPLATE_MESSAGE *) malloc(msglen);
  Entry2TemplateMsg(entry, template);

  for (i = 0; i < NetworksNumber; i++)
  {
    error = Rpc_Broadcast(OutSocket, &BroadcastAddress[i],
			  (Rpc_Proc) LINDA_TEMPLATE, msglen, template,
			  0, NULL, 1, &TimeOut, Null, TRUE);
    if (error != RPC_SUCCESS)
    {
      Rpc_Error(entry->rest->msg, error);
      free(entry->rest->msg);
      Rpc_EventDelete(entry->rest->event);
      entry = my_hsearch(entry->rest->table, entry, REMOVE, EntryMatch);
      free(entry->rest);
      free(entry->buffer);
      free(entry);
    }
  }
  free(template);
}

static void
LindaTemplate(from, msg, len, template)
  register struct sockaddr_x *from;
  Rpc_Message     msg;
  int             len;
  register TEMPLATE_MESSAGE *template;
{
  static int      allowable_depth = 100;	/* Max allowable depth of
						 * LindaTemplate calls without
						 * return. When we exceed it we
						 * stop paying attention to the
						 * TemplateCallsSocket. Each
						 * uncompleted call results in
						 * about 4K stack space
						 * consumed. */
  TABLE_OBJECT   *TupleSpace, *LocalsBuffer, *OthersBuffer;
  register ENTRY *Tuple, *ptr;
  register TUPLE_MESSAGE *tuplemsg;
  register int    msglen = 0, uid, rungroup;
  register Rpc_Stat error;
  ENTRY           ent;
  struct sockaddr_in sin;

#ifdef STATS
  LindaTemplateCalls++;
#endif

  Rpc_Return(msg, 0, NULL, TRUE);

  uid = template->header.uid;
  rungroup = template->header.rungroup;
  FindTables(uid, rungroup, &TupleSpace, &LocalsBuffer, &OthersBuffer, TRUE);

  if (((struct sockaddr_in *) from)->sin_addr.s_addr ==
      LocalAddress.sin_addr.s_addr)
    return;

  /* Copy msg to en ENTRY structure */
  TemplateMsg2Entry(template, &ent);

  Tuple = my_hsearch(TupleSpace, &ent, REMOVE, TemplateMatch);
  if (Tuple == NULL)
  {
#ifdef DEBUG
    printf("Template: Not found\n");
#endif
    ent.rest = (EXTENSION *) malloc(sizeof(EXTENSION));
    ent.rest->node.s_addr = ((struct sockaddr_in *) from)->sin_addr.s_addr;
    ent.rest->pid = template->header.pid;
    ent.rest->msg = NULL;

    ptr = my_hsearch(OthersBuffer, &ent, FIND, MessageMatch);
    if (ptr != NULL)
    {
#ifdef STATS
      Reset++;
#endif
      free(ent.buffer);
      free(ent.rest);
      Rpc_EventReset(ptr->rest->event, &PurgeInterval);
    }
    else
    {
#ifdef STATS
      statreport(&Ostat, ++Others);
#endif
      ptr = my_hsearch(OthersBuffer, &ent, ENTER, NULL);
      ptr->rest->table = OthersBuffer;
      ptr->rest->uid = uid;
      ptr->rest->rungroup = rungroup;
      ptr->rest->event = Rpc_EventCreate(&PurgeInterval, TemplatePurge, ptr);
    }
  }
  else
  {
#ifdef STATS
    statreport(&Tstat, --Tuples);
#endif
    free(ent.buffer);

    msglen = sizeof(TUPLE_HEADER) + Tuple->keybytes + Tuple->databytes;
    tuplemsg = (TUPLE_MESSAGE *) malloc(msglen);
    Entry2TupleMsg(Tuple, tuplemsg, uid, rungroup);

    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = ((struct sockaddr_in *) from)->sin_addr.s_addr;

#ifdef DEBUG
    printf("LindaTemplate: Sending data to %s\n", inet_ntoa(sin.sin_addr));
#endif


    allowable_depth--;
    if (allowable_depth < 0)
      Rpc_Ignore(TemplateCallsSocket);

    if (msglen <= MAXTEMLEN * RPC_ALIGN)
    {
      sin.sin_port = htons(UdpPort);
      if ((error = Rpc_Call(OutSocket, &sin, (Rpc_Proc) LINDA_NETOUT,
		       msglen, tuplemsg, 0, NULL, LINDA_NRETRY, &CallTimeOut))
	  != RPC_SUCCESS)
      {
	fprintf(stderr, "Template: Cannot send to %s, %s\n",
		inet_ntoa(sin.sin_addr), Rpc_ErrorMessage(error));
	/* Check just in case someone asked it while we were trying to send it */
	if (!CheckLocalTemplates(LocalsBuffer, Tuple))
	  if (!CheckOthersTemplates(OthersBuffer, Tuple))
	    if (!CheckLocalTemplates(LocalsBuffer, Tuple))	/* in case it is asked */
	    {
	      my_hsearch(TupleSpace, Tuple, ENTER, NULL);
#ifdef STATS
	      statreport(&Tstat, ++Tuples);
#endif
	    }
      }
      else
	free(Tuple->buffer);
    }
    else
    {
      register int    OutSocket = Rpc_TcpCreate(False, 0);

      sin.sin_port = htons(TcpPort);
      if ((error = Rpc_Call(OutSocket, &sin, (Rpc_Proc) LINDA_NETOUT,
		       msglen, tuplemsg, 0, NULL, LINDA_NRETRY, &CallTimeOut))
	  != RPC_SUCCESS)
      {
	fprintf(stderr, "Template: Cannot send to %s, %s\n",
		inet_ntoa(sin.sin_addr), Rpc_ErrorMessage(error));
	/* Check just in case someone asked it while we were trying to send it */
	if (!CheckLocalTemplates(LocalsBuffer, Tuple))
	  if (!CheckOthersTemplates(OthersBuffer, Tuple))
	    if (!CheckLocalTemplates(LocalsBuffer, Tuple))	/* in case it is asked */
	    {
	      my_hsearch(TupleSpace, Tuple, ENTER, NULL);
#ifdef STATS
	      statreport(&Tstat, ++Tuples);
#endif
	    }
      }
      else
	free(Tuple->buffer);
      close(OutSocket);
    }
    free(Tuple);
    free(tuplemsg);
    allowable_depth++;
    Rpc_Rewatch(TemplateCallsSocket, RPC_READABLE);
  }
}

static void
LindaOut(from, msg, len, tuple)
  register struct sockaddr_x *from;
  Rpc_Message     msg;
  register int    len;
  register TUPLE_MESSAGE *tuple;
{
  register int    uid, rungroup;
  TABLE_OBJECT   *TupleSpace, *LocalsBuffer, *OthersBuffer;
  ENTRY           ent;

  uid = tuple->header.uid;
  rungroup = tuple->header.rungroup;
#ifdef DEBUG
  printf("LindaOut called(uid %d, rungroup %d)\n", uid, rungroup);
#endif
#ifdef STATS
  LindaOutCalls++;
#endif

  FindTables(uid, rungroup, &TupleSpace, &LocalsBuffer, &OthersBuffer, TRUE);

  /* Copy msg's data to an ENTRY structure */
  TupleMsg2Entry(tuple, &ent);

  /* check if someone asked for such a tuple */
  if (!CheckLocalTemplates(LocalsBuffer, &ent))
    if (!CheckOthersTemplates(OthersBuffer, &ent))
      if (!CheckLocalTemplates(LocalsBuffer, &ent))	/* in case it is asked */
      {
#ifdef STATS
	statreport(&Tstat, ++Tuples);
#endif
	my_hsearch(TupleSpace, &ent, ENTER, NULL);
      }

  /* Return ok */
  Rpc_Return(msg, 0, NULL, FALSE);
}

static void
LindaIn(from, msg, len, template)
  register struct sockaddr_x *from;
  Rpc_Message     msg;
  register int    len;
  register TEMPLATE_MESSAGE *template;
{
  TABLE_OBJECT   *TupleSpace, *LocalsBuffer;
  register ENTRY *Tuple, *ptr;
  register Rpc_Stat error;
  ENTRY           ent;
  register int    uid, rungroup;

  uid = template->header.uid;
  rungroup = template->header.rungroup;
#ifdef DEBUG
  printf("LindaIn called(uid %d, rungroup %d)\n", uid, rungroup);
#endif
#ifdef STATS
  LindaInCalls++;
#endif

  FindTables(uid, rungroup, &TupleSpace, &LocalsBuffer, NULL, TRUE);

  /* Copy msg's data to an ENTRY structure */
  TemplateMsg2Entry(template, &ent);

  /* Search for a matching tuple */
  Tuple = my_hsearch(TupleSpace, &ent, REMOVE, TemplateMatch);
  if (Tuple == NULL)
  {
#ifdef DEBUG
    printf("In: Not found\n");
#endif
#ifdef STATS
    statreport(&Lstat, ++Locals);
#endif
    /* keep track of request */
    ent.rest = (EXTENSION *) malloc(sizeof(EXTENSION));
    ent.rest->pid = template->header.pid;
    ent.rest->uid = uid;
    ent.rest->rungroup = rungroup;
    ent.rest->msg = Rpc_GetMsg(msg);
    ent.rest->event = NULL;
    ent.rest->proc = LINDA_IN;
    ent.rest->table = LocalsBuffer;
    ptr = my_hsearch(LocalsBuffer, &ent, ENTER, NULL);
    ptr->rest->event = Rpc_EventCreate(&SendInterval, TemplateSend, ptr);
    TemplateSend(ptr, NULL);
  }
  else
  {
#ifdef STATS
    statreport(&Tstat, --Tuples);
#endif
    /* return tuple and cleanup */
    free(ent.buffer);
    if ((error = Rpc_Return(msg, Tuple->databytes,
		      Tuple->buffer + Tuple->keybytes, FALSE)) != RPC_SUCCESS)
    {
      fprintf(stderr, "LindaIn: Cannot send tuple to pid %d: %s\n",
	      template->header.pid, Rpc_ErrorMessage(error));
      my_hsearch(TupleSpace, Tuple, ENTER, NULL);
      free(Tuple);
    }
    else
    {
      free(Tuple->buffer);
      free(Tuple);
    }
  }
}

static void
LindaRead(from, msg, len, template)
  register struct sockaddr_x *from;
  Rpc_Message     msg;
  register int    len;
  register TEMPLATE_MESSAGE *template;
{
  TABLE_OBJECT   *TupleSpace, *LocalsBuffer;
  register ENTRY *Tuple, *ptr;
  ENTRY           ent;
  register int    uid, rungroup;
  register Rpc_Stat error;

  uid = template->header.uid;
  rungroup = template->header.rungroup;
#ifdef DEBUG
  printf("LindaIn called(uid %d, rungroup %d)\n", uid, rungroup);
#endif
#ifdef STATS
  LindaReadCalls++;
#endif

  FindTables(uid, rungroup, &TupleSpace, &LocalsBuffer, NULL, TRUE);

  /* Copy msg's data to an ENTRY structure */
  TemplateMsg2Entry(template, &ent);

  /* search for a matching tuple */
  Tuple = my_hsearch(TupleSpace, &ent, FIND, TemplateMatch);
  if (Tuple == NULL)
  {
#ifdef DEBUG
    printf("Rd: Not found\n");
#endif
#ifdef STATS
    statreport(&Lstat, ++Locals);
#endif

    /* keep track of request */
    ent.rest = (EXTENSION *) malloc(sizeof(EXTENSION));
    ent.rest->pid = template->header.pid;
    ent.rest->uid = uid;
    ent.rest->rungroup = rungroup;
    ent.rest->msg = Rpc_GetMsg(msg);
    ent.rest->event = NULL;
    ent.rest->proc = LINDA_READ;
    ent.rest->table = LocalsBuffer;
    ptr = my_hsearch(LocalsBuffer, &ent, ENTER, NULL);
    ptr->rest->event = Rpc_EventCreate(&SendInterval, TemplateSend, ptr);
    TemplateSend(ptr, NULL);
  }
  else
  {
    /* return tuple's data and cleanup */
    if ((error = Rpc_Return(msg, Tuple->databytes,
		      Tuple->buffer + Tuple->keybytes, FALSE)) != RPC_SUCCESS)
    {
      fprintf(stderr, "LindaRead: Cannot send tuple to pid %d: %s\n",
	      template->header.pid, Rpc_ErrorMessage(error));
    }
    free(ent.buffer);
  }
}

static void
SwapDouble(ptr)
  register double *ptr;
{
  int             a = *(int *) ptr;
  int             b = *(((int *) ptr) + 1);

  Rpc_SwapLong(&a);
  Rpc_SwapLong(&b);
  *(int *) ptr = b;
  *(((int *) ptr) + 1) = a;
}

char           *
SwapFieldData(ptr)		/* Swaps a tuple's or template's actual field */
  char           *ptr;
{
  register int    type = *(int *) ptr;
  register int    arrsize, retval, i;
  register char  *ptr2 = ptr;

  ptr += sizeof(int);
  switch (type)
  {
  case CHAR:
  case SHORT:
  case INT:
    Rpc_SwapLong(ptr);
    return (ptr + sizeof(int));
  case FLOAT:
    Rpc_SwapLong(ptr);
    return (ptr + sizeof(int));
  case DOUBLE:
    SwapDouble(ptr);
    return (ptr + sizeof(double));
  case CHAR_STRING:
    retval = strlen(ptr) + 1;
    adjust(retval);
    return (ptr + retval);
  case ARRAY_OF_CHAR:
    Rpc_SwapLong(ptr);
    arrsize = *(int *) ptr;
    retval = (arrsize * sizeof(char) + sizeof(int));
    adjust(retval);
    return (ptr + retval);
  case ARRAY_OF_SHORT:
    Rpc_SwapLong(ptr);
    arrsize = *(int *) ptr;
    ptr += sizeof(int);
    for (i = 0; i < arrsize; i++)
    {
      Rpc_SwapShort(ptr);
      ptr += sizeof(short);
    }
    retval = (arrsize * sizeof(short) + sizeof(int));
    adjust(retval);
    return (ptr2 + retval);
  case ARRAY_OF_INT:
  case ARRAY_OF_FLOAT:
    Rpc_SwapLong(ptr);
    arrsize = *(int *) ptr;
    ptr += sizeof(int);
    for (i = 0; i < arrsize; i++)
    {
      Rpc_SwapLong(ptr);
      ptr += sizeof(int);
    }
    return (ptr2 + (arrsize * sizeof(int) + sizeof(int)));
  case ARRAY_OF_DOUBLE:
    Rpc_SwapLong(ptr);
    arrsize = *(int *) ptr;
    ptr += sizeof(int);
    for (i = 0; i < arrsize; i++)
    {
      SwapDouble(ptr);
      ptr += sizeof(double);
    }
    return (ptr2 + (arrsize * sizeof(double) + sizeof(int)));
  }
}

void
SwapTemplate(len, template, unused)	/* swap function for incoming templates */
  int             len;
  TEMPLATE_MESSAGE *template;
  int             unused;
{
  register int    i, nfields;
  register char  *ptr;

  Rpc_SwapLong(&template->header.pid);
  Rpc_SwapLong(&template->header.uid);
  Rpc_SwapLong(&template->header.rungroup);
  Rpc_SwapLong(&template->header.keybytes);
  Rpc_SwapLong(&template->header.databytes);
  Rpc_SwapLong(&template->message_data[0]);
  nfields = template->message_data[0];
  ptr = (char *) &template->message_data[1];
  for (i = 0; i < nfields; i++)
  {
    Rpc_SwapLong(ptr);
    if (i == 0)
      ptr = SwapFieldData(ptr);
    else
      ptr += sizeof(int);
  }
  for (i = 1; i < nfields; i++)
  {
    Rpc_SwapLong(ptr);
    if (isformal((*(int *) ptr)))
      ptr += sizeof(int);
    else
      ptr = SwapFieldData(ptr);
  }
}

void
SwapTuple(len, tuple, unused)	/* swap function for outgoing tuples */
  int             len;
  TUPLE_MESSAGE  *tuple;
  int             unused;
{
  register int    i, nfields;
  register char  *ptr;

  Rpc_SwapLong(&tuple->header.uid);
  Rpc_SwapLong(&tuple->header.rungroup);
  Rpc_SwapLong(&tuple->header.keybytes);
  Rpc_SwapLong(&tuple->header.databytes);
  Rpc_SwapLong(&tuple->message_data[0]);
  nfields = tuple->message_data[0];
  ptr = (char *) &tuple->message_data[1];
  for (i = 0; i < nfields; i++)
  {
    Rpc_SwapLong(ptr);
    if (i == 0)
      ptr = SwapFieldData(ptr);
    else
      ptr += sizeof(int);
  }
  for (i = 1; i < nfields; i++)
  {
    Rpc_SwapLong(ptr);
    ptr = SwapFieldData(ptr);
  }
}

void
SwapSystem(len, msg, unused)	/* swap function for incoming 'system' messages */
  int             len;
  SYSTEM_MESSAGE *msg;
  int             unused;
{
  Rpc_SwapLong(&msg->uid);
  Rpc_SwapLong(&msg->rungroup);
}

void
SwapStats(len, stat, unused)	/* swap function for outgoing stats messages */
  int             len;
  STATS_MESSAGE  *stat;
  int             unused;
{
  Rpc_SwapLong(&stat->LindaOutCalls);
  Rpc_SwapLong(&stat->LindaInCalls);
  Rpc_SwapLong(&stat->LindaReadCalls);
  Rpc_SwapLong(&stat->LindaTemplateCalls);
  Rpc_SwapLong(&stat->Reset);
}


void
Usage(progname)
{
  fprintf(stderr, "Usage: %s [-n <network file>]\n", progname);
  exit(1);
}

void
LocalsCleanup(stream, serverdata)	/* Bloody procedure. It must check all
					 * the LocalTemplates tables for all
					 * the users to find pending templates
					 * from this stream and destroy them */
  int             stream;
  Rpc_Opaque      serverdata;
{
  register int    i, j;
  register ENTRY *ent;
  register ELEMENT *ulist, *llist;
  register TABLE_OBJECT *locals;

  for (i = 0; i < Users->Size; i++)
    if (Users->Table[i] != NULL)
    {
      for (ulist = Users->Table[i]; ulist != NULL; ulist = ulist->next)
      {
	locals = ((TABLE_OBJECT **) ulist->item.buffer)[3];
	for (j = 0; j < locals->Size; j++)
	  if (locals->Table[j] != NULL)
	  {
	    for (llist = locals->Table[j]; llist != NULL; llist = llist->next)
	      if (llist->item.rest != NULL)
		if (Rpc_MessageSocket(llist->item.rest->msg) == stream)
		{
		  free(llist->item.rest->msg);
		  Rpc_EventDelete(llist->item.rest->event);
		  ent = my_hsearch(llist->item.rest->table, &llist->item,
				   REMOVE, EntryMatch);
		  free(ent->rest);
		  free(ent->buffer);
		  free(ent);
		}
	  }
      }
    }
}


main(argc, argv)
  int             argc;
  char          **argv;
{
  char            c, *netfile = NULL;
  extern char    *optarg;
  extern int      optind, opterr;
 
  /* modified by NM: */
  int myport;  /* takes the place of LINDA_PORT */

  myport = atoi(argv[1]);
  UdpPort = myport;
  TcpPort = myport + 1;
  BroadPort = myport + 2;
  /* end of modifications by NM */
  Uid = getuid();
  signal(SIGPIPE, SIG_IGN);	/* ignore SIGPIPE produced when a child exits
				 * without closing its socket */
  signal(SIGQUIT, exit);	/* just to have a way to exit it when 'system
				 * exit' fails and I want profiling statistics
				 * to be produced */
  get_myaddress(&LocalAddress);	/* Store the address in a global variable to
				 * avoid unneeded calls to get_myaddress() */

  Users = my_hcreate(USERS_TABLE_SIZE);	/* Create Users hash table */

  LoadNetworks(netfile, BroadcastAddress, &NetworksNumber, BroadPort);	/* Load the netwroks */

  OutSocket = Rpc_UdpCreate(False, 0);	/* Create the socket that will be used
					 * for issuing  Udp calls to other
					 * Tuple Managers */

  if (OutSocket < 0)
  {
    fprintf(stderr, "UdpCreate:Cannot create socket\n");
    exit(1);
  }
  TcpCallsSocket = Rpc_TcpCreate(True, TcpPort);	/* Create the socket
							 * that will be used for
							 * handling Tcp calls
							 * from other Tuple
							 * Managers */
  if (TcpCallsSocket < 0)
  {
    fprintf(stderr, "TcpCreate:Cannot create socket, port %d\n", TcpPort);
    exit(1);
  }
  /* define the server for these calls */
  Rpc_ServerCreate(TcpCallsSocket, (Rpc_Proc) LINDA_NETOUT, LindaOut,
		   SwapTuple, Rpc_SwapNull, NULL, NULL);

  UdpCallsSocket = Rpc_UdpCreate(True, UdpPort);	/* Create the socket
							 * that will be used for
							 * handling Udp calls
							 * from other Tuple
							 * Managers */
  if (UdpCallsSocket < 0)
  {
    fprintf(stderr, "UdpCreate:Cannot create socket, port %d\n", UdpPort);
    exit(1);
  }

  /* define the servers */
  Rpc_ServerCreate(UdpCallsSocket, (Rpc_Proc) LINDA_NETOUT, LindaOut,
		   SwapTuple, Rpc_SwapNull, NULL, NULL);
  Rpc_ServerCreate(UdpCallsSocket, (Rpc_Proc) SYSTEM_EXIT, SystemExit,
		   SwapSystem, SwapStats, NULL, NULL);
#ifdef STATS
  Rpc_ServerCreate(UdpCallsSocket, (Rpc_Proc) SYSTEM_STATS, SystemStats,
		   SwapSystem, SwapStats, NULL, NULL);
  Tstat = Ostat = Lstat = Zstat;
#endif
  Rpc_ServerCreate(UdpCallsSocket, (Rpc_Proc) SYSTEM_RESET, SystemReset,
		   SwapSystem, SwapStats, NULL, NULL);

  TemplateCallsSocket = Rpc_UdpCreate(True, BroadPort);	/* Create the socket
							 * that will be used for
							 * handling Template
							 * calls */
  if (TemplateCallsSocket < 0)
  {
    fprintf(stderr, "UdpCreate:Cannot create socket, port %d\n", BroadPort);
    exit(1);
  }
  /* define the server */
  Rpc_ServerCreate(TemplateCallsSocket, (Rpc_Proc) LINDA_TEMPLATE,
		   LindaTemplate, SwapTemplate, Rpc_SwapNull, NULL, NULL);

  if (access(FindLocalName(SOCKNAME_LOCAL), F_OK) == 0)
    unlink(FindLocalName(SOCKNAME_LOCAL));	/* delete any previous socket
						 * file */
  /* Create the socket that will be used for handling local calls */
  LocalCallsSocket = Rpc_UnixSTCreate(True, FindLocalName(SOCKNAME_LOCAL));
  if (LocalCallsSocket < 0)
  {
    fprintf(stderr, "UnixSTCreate:Cannot create socket\n");
    fprintf(stderr, "UnixSTCreate:Cannot create socket, %s\n",
	    FindLocalName(SOCKNAME_LOCAL));
    exit(1);
  }
  /* define the servers */
  Rpc_ServerCreate(LocalCallsSocket, (Rpc_Proc) LINDA_OUT, LindaOut,
		   Rpc_SwapNull, Rpc_SwapNull, (Rpc_Opaque) 0, LocalsCleanup);
  Rpc_ServerCreate(LocalCallsSocket, (Rpc_Proc) SYSTEM_GETPORT, SysGetPort,
		   Rpc_SwapNull, Rpc_SwapNull, (Rpc_Opaque) 0, LocalsCleanup);
  Rpc_ServerCreate(LocalCallsSocket, (Rpc_Proc) LINDA_IN, LindaIn,
		   Rpc_SwapNull, Rpc_SwapNull, (Rpc_Opaque) 0, LocalsCleanup);
  Rpc_ServerCreate(LocalCallsSocket, (Rpc_Proc) LINDA_READ, LindaRead,
		   Rpc_SwapNull, Rpc_SwapNull, (Rpc_Opaque) 0, LocalsCleanup);
  /* start my operation */
  Rpc_Run();
}

