CVS diff for depot.c between 1.10 and 1.1:

Revision 1.1 Revision 1.10
 
#include <assert.h> 
#include <stdlib.h> 
#include <stdio.h> 
#include <errno.h>
#include <stdlib.h> 
#include <stdio.h> 
#include <errno.h>
#include "pth.h"
#include <unistd.h> 
#include "config.h"
#include "pmsg.h"

// response.h/c
int response_503 (int);

// defined in cow.c
extern pth_barrier_t init_bar;

void *
depot (void *_arg)
{
  FILE *hands_log;

  int max = (int) _arg;
  int top = 0;
  pth_msgport_t mp;
  pth_event_t ev;
  int i;
  pmsg *msg;
  pmsg *new_msg;
  pmsg **Stack = (pmsg **) malloc (max * sizeof (pmsg *));

  if (NULL == Stack)
  {
    perror ("malloc failed in depot()\n");
    exit (-1);
  }
  mp = pth_msgport_create (DEPOT);
  ev = pth_event (PTH_EVENT_MSG, mp);

  while (top < max)
  {
    pth_wait (ev);
    while (pth_msgport_pending (mp))
    {
      if (top == max)
      {
	perror ("ERROR: stack overflow \n");
	exit (-1);
      }
      msg = (pmsg *) pth_msgport_get (mp);
      if (NULL == msg)
      {
	perror ("pth_msgport_get (49)");
	exit (-1);

      }
      Stack[top++] = msg;
    }
  }
  pth_barrier_reach (&init_bar);	/* end of initialization */

  // suspend hands on the stack
#include "schedule.h"		/* enabler|disable SUSPention of handler */
#ifdef SUSP
  for (i = 0; i < top; i++)
  {
    if (FALSE == pth_suspend (Stack[i]->id))
    {
      perror ("pth_suspend fail");
      exit (-1);
    }
  }
#endif /* SUSP */

  // main loop
  for (;;)
  {
    if (!(pth_wait (ev) > 0))
    {
      perror ("depot.c :: pth_msgport_get");
      continue;
    }
    //else
    for (;;)			// sub for loop
    {
      msg = (pmsg *) pth_msgport_get (mp);

      if (msg == NULL)
      {
	perror ("depot.c :: pth_msgport_get");
	exit (errno);
      }
      else
      {
 
#include <sys/types.h> 
#include <fcntl.h> 
#include "defines.h"
#include <sys/types.h> 
#include <fcntl.h> 
#include "defines.h"
 
#include "cow.h"
#include "schedule.h"
#include "config.h" 

            

            
	if (PUSH == msg->act)
	{
	  extern int cow_cur_conn;
	  cow_cur_conn--;
	  // sanity check - prevent overflow
	  if (top == max)
	  {
	    perror ("depot.c :: stack overflow \n");
	    exit (-1);
	  }
	  Stack[top++] = msg; 
void *depot(void *_arg) {
    int max = (int)_arg;
    int top = 0;
    pth_msgport_t mp;
    pth_event_t ev;
    int i;
    pmsg *msg;
    pmsg *new_msg;
    pmsg **Stack = (pmsg **) malloc(max * sizeof(pmsg *));

    if (NULL == Stack) {
 
        perror("malloc failed in depot()");
        exit(-1);
    }
    mp = pth_msgport_create(DEPOT);
    ev = pth_event(PTH_EVENT_MSG, mp);

    while (top < max) {
        pth_wait(ev);
        while (pth_msgport_pending(mp)) {
            if (top == max) {
                perror("ERROR: stack overflow");
                exit(-1);
            }
            msg = (pmsg *) pth_msgport_get(mp);
            if (NULL == msg) {
                perror("pth_msgport_get");
                exit(-1);
            }
            Stack[top++] = msg;
        }
    }
    pth_barrier_reach(&init_bar);       /* end of initialization */

    // suspend hands on the stack
#ifdef SUSP
#ifdef SUSP
	  if (FALSE == pth_suspend (msg->id))
	  {
	    perror ("pth_suspend failed in depot.c\n");
	    exit (-23);
	  } 
    for (i = 0; i < top; i++) {
        if (FALSE == pth_suspend(Stack[i]->id)) {
            perror("pth_suspend fail");
            exit(-1);
        }
 
        --cow_nohands_running;
    }
#endif /* SUSP */
#endif /* SUSP */
	}
	else if (POP == msg->act)
	{
	  // POP i.e. assign a job 
 

            

            
	  // sanity check - prevent overflow
	  if (1 > top)
	  {
	    pth_yield (NULL);
	    response_503 (msg->fd);
	    if (close (msg->fd))
	    {
	      if (EBADF != errno)
	      {
		perror ("close");
    /*
     * Main loop.  Wait for an event indicating that we have a message on
     * our message port.  Grab the message and figure out whether it's
     * a handler push (i.e. one of the handlers is done processing its
     * request and needs to be put back on the idle queue) or a handler
     * pop (i.e. a new request has arrived and we need to assign a
     * handler to it).
     */
    for (;;) {
        int i, pending; 
		exit (errno);
	      }
	    }
 

            

            
 
        /* Wait for a 'new message' event */
        if (!(pth_wait(ev) > 0)) {
            perror(__FILE__ " :: pth_wait");
            continue;
        } 

            

            
/* 
        /* Get all pending messages from the message port */
 
        pending = pth_msgport_pending(mp);
        for (i = 0; i < pending; ++i) {
            msg = (pmsg*) pth_msgport_get(mp);
            assert(msg != NULL);

            /*
             * Is this a PUSH message (i.e. handler finished processing
             * a request and wants to return to the pool)?
             */
            if (PUSH == msg->act) {
                assert(top < max);
                Stack[top++] = msg;
#ifdef SUSP
                /* Suspend the handler thread; scheduler will ignore it now */
                if (FALSE == pth_suspend(msg->id)) {
                    perror("pth_suspend failed in depot.c");
                    exit(-23);
                }
                --cow_nohands_running;
#endif /* SUSP */

            /*
             * Otherwise, the message should be a POP (i.e. we just
             * got a new request and need to assign it to one of the
             * handlers from the pool.
             */
            } else if (POP == msg->act) {
                /*
                 * Set the message's action field to 'NONE' -- not really
                 * necessary anymore, but it should help us catch bugs if we
                 * start reusing messages we shouldn't be.
                 */
                msg->act = NONE;

                /*
                 * Prevent handler pool underflow; just send back a 503 error
                 * if there are no available handlers.  An alternative would
                 * be to spawn a new handler thread here (see farther down).
                 */
                if (top <= 0) {
                    reqnode* rn = (reqnode*)msg;

                    /* Send back 'unavailable' message and close connection */
#ifdef USE_TCPCORK
                    STATIC_RESPONSE(msg->fd, 503);
                    cow_close_socket(msg->fd);
#else
                    STATIC_RESPONSE(msg->buf, 503);
                    cow_close_socket(msg->buf);
#endif
                    --cow_cur_conn;

                    /* Return the message structure to the pmsg pool */
                    rn->next = pmsgpool_first;
                    pmsgpool_first = rn;
                    continue;
                }
/**********************************************************************
 * Takashi's code for spawning a new handler; he left it commented out
 * so I'll do the same for now.   -- mdr
 *
// alternatively, spawn a new hand. 
void *handler(void*); 
pth_attr_t attr; 
attr = pth_attr_new();
// alternatively, spawn a new hand. 
void *handler(void*); 
pth_attr_t attr; 
attr = pth_attr_new();
	  new_msg->fd = msg->fd; 
new_msg->fd = msg->fd; 
pth_spawn (attr, handler, (void *) ((long) msg->fd));
pth_spawn (attr, handler, (void *) ((long) msg->fd));
*/
 **********************************************************************/ 
	    continue;
	  }

	  if (NULL == (new_msg = (pmsg *) malloc (sizeof (pmsg))))
	  {
	    printf ("new_req malloc failed\n");
	    exit (-1);
	  }
	  new_msg->fd = msg->fd;
 
 
#ifdef SUSP
 
#ifdef SUSP
	  if (FALSE == pth_resume (Stack[top - 1]->id))
	  {
	    perror ("pth_resume in depot fialed\n");
	    exit (-22);
	  } 
                /* Put the assigned handler back on the scheduling queue */
                if (FALSE == pth_resume(Stack[top - 1]->id)) {
                    perror("pth_resume in depot failed");
                    exit(-22);
                }
 
                ++cow_nohands_running;
#endif
#endif
	  --top;
	  if (TRUE !=
	      pth_msgport_put (Stack[top]->mp, (pth_message_t *) new_msg))
	  {
	    perror ("ERROR: pth_msgport_put failed in depot()");
	    exit (-2);
	  }
	}
	else
	{
	  perror ("ERROR: unknown action in depot()\n");
	  exit (-2); 
 

            

            
	}
      }
    }				// end of sub for loop
  }				// end of main for loop
  return (void *) NULL; 
                /* Decrement the number of handlers remaining in the pool */
                --top;

                /* Forward the request to the handler */
                if (TRUE != pth_msgport_put(Stack[top]->mp,
 
                                            (pth_message_t *)msg)) {
                    perror(__FILE__ " :: pth_msgport_put");
                    exit(-2);
                }

            /*
             * We should never get here -- there aren't any other possible
             * message types.
             */
            } else {
                printf("ERROR: unknown message action in depot()\n");
                exit(-2);
            }
        }                       /* End of inner for loop */
    }                           /* End of main for loop */

    return (void *)NULL;




Legend
Lines deleted from 1.10  
Lines Modified
  Lines added in revision 1.1