CVS diff for depot.c between 1.10 and 1.3:

Revision 1.3 Revision 1.10
 
#include <assert.h> 
#include <stdlib.h> 
#include <stdio.h> 
#include <errno.h>
#include <stdlib.h> 
#include <stdio.h> 
#include <errno.h>
#include "pth.h" 
#include <unistd.h>
 
#include <sys/types.h>
#include <fcntl.h>
#include "defines.h"
#include "cow.h"
#include "schedule.h"
#include "config.h"
#include "config.h"
#include "pmsg.h" 
 

            

            
// response.h/c
void *depot(void *_arg) { 
int response_503(int);

// defined in cow.c
extern pth_barrier_t init_bar;

void *depot(void *_arg)
{
    FILE *hands_log;
 
    int max = (int)_arg; 
int top = 0; 
pth_msgport_t mp; 
    int max = (int)_arg; 
int top = 0; 
pth_msgport_t mp; 
Line 24 Line 20
    pmsg *new_msg; 
pmsg **Stack = (pmsg **) malloc(max * sizeof(pmsg *)); 
 
    pmsg *new_msg; 
pmsg **Stack = (pmsg **) malloc(max * sizeof(pmsg *)); 
 
    if (errno) perror("ERROR WHEN STARTING"); 
 
    if (NULL == Stack) {
    if (NULL == Stack) {
	perror("malloc failed in depot()\n");
	exit(-1); 
        perror("malloc failed in depot()");
        exit(-1); 

mp = pth_msgport_create(DEPOT);

mp = pth_msgport_create(DEPOT);
    printf("** DEBUG **  After creation, depot msgport has %d penging msgs.\n", pth_msgport_pending(mp)); 
 
    ev = pth_event(PTH_EVENT_MSG, mp); 
 
while (top < max) {
    ev = pth_event(PTH_EVENT_MSG, mp); 
 
while (top < max) {
	pth_wait(ev);
        printf("(DEBUG) Got an event on the msgport; there are now %d messages.\n", pth_msgport_pending(mp));
	while (pth_msgport_pending(mp)) {
	    if (top == max) {
		perror("ERROR: stack overflow \n");
		exit(-1);
	    }
	    msg = (pmsg *) pth_msgport_get(mp);
	    if (NULL == msg) {
		perror("pth_msgport_get (49)");
		exit(-1);

	    }
        pth_wait(ev);
        while (pth_msgport_pending(mp)) {
            if (top == max) {
                perror("ERROR: stack overflow");
                exit(-1);
            }
            msg = (pmsg *) pth_msgport_get(mp);
            if (NULL == msg) {
                perror("pth_msgport_get");
                exit(-1);
            }
            Stack[top++] = msg;
        } 
	    Stack[top++] = msg;
	}
 
    }
    }
    pth_barrier_reach(&init_bar);	/* end of initialization */ 
    pth_barrier_reach(&init_bar);       /* end of initialization */ 
 
// suspend hands on the stack
 
// suspend hands on the stack
#include "schedule.h"		/* enabler|disable SUSPention of handler */ 
 
#ifdef SUSP 
for (i = 0; i < top; i++) {
#ifdef SUSP 
for (i = 0; i < top; i++) {
	if (FALSE == pth_suspend(Stack[i]->id)) {
	    perror("pth_suspend fail");
	    exit(-1);
	} 
        if (FALSE == pth_suspend(Stack[i]->id)) {
            perror("pth_suspend fail");
            exit(-1);
        }
 
        --cow_nohands_running;

#endif /* SUSP */ 

#endif /* SUSP */ 
    // main loop 
    /*
 
     * Main loop.  Wait for an event indicating that we have a message on
     * our message port.  Grab the message and figure out whether it's
     * a handler push (i.e. one of the handlers is done processing its
     * request and needs to be put back on the idle queue) or a handler
     * pop (i.e. a new request has arrived and we need to assign a
     * handler to it).
     */
    for (;;) {
    for (;;) {
	if (!(pth_wait(ev) > 0)) {
        int i, pending; 
	    perror("depot:67 :: pth_wait");
	    continue;
	}
	//else
	for (;;)		// sub for loop
	{
	    msg = (pmsg *) pth_msgport_get(mp);
 

            

            
	    if (msg == NULL) {
		perror("depot:76 :: pth_msgport_get");
		exit(errno);
	    }
	    else {
        /* Wait for a 'new message' event */
        if (!(pth_wait(ev) > 0)) {
            perror(__FILE__ " :: pth_wait");
            continue;
        } 
#include <sys/types.h>
#include <fcntl.h>
#include "defines.h"
 

            

            
		if (PUSH == msg->act) {
		    extern int cow_cur_conn;
		    cow_cur_conn--;
		    // sanity check - prevent overflow
		    if (top == max) {
			perror("depot.c :: stack overflow \n");
			exit(-1);
		    }
		    Stack[top++] = msg; 
        /* Get all pending messages from the message port */
        pending = pth_msgport_pending(mp);
        for (i = 0; i < pending; ++i) {
            msg = (pmsg*) pth_msgport_get(mp);
            assert(msg != NULL);

            /*
             * Is this a PUSH message (i.e. handler finished processing
             * a request and wants to return to the pool)?
 
             */
            if (PUSH == msg->act) {
                assert(top < max);
                Stack[top++] = msg;
#ifdef SUSP
#ifdef SUSP
		    if (FALSE == pth_suspend(msg->id)) {
			perror("pth_suspend failed in depot.c\n");
			exit(-23);
		    } 
                /* Suspend the handler thread; scheduler will ignore it now */
                if (FALSE == pth_suspend(msg->id)) {
                    perror("pth_suspend failed in depot.c");
                    exit(-23);
 
                }
                --cow_nohands_running;
#endif /* SUSP */
#endif /* SUSP */
		}
		else if (POP == msg->act) {
		    // POP i.e. assign a job 
 

            

            
		    // sanity check - prevent overflow
		    if (1 > top) {
			pth_yield(NULL);
			response_503(msg->fd);
			if (close(msg->fd)) {
			    if (EBADF != errno) {
				perror("close");
				exit(errno);
			    }
			} 
            /*
             * Otherwise, the message should be a POP (i.e. we just
             * got a new request and need to assign it to one of the
             * handlers from the pool.
             */
            } else if (POP == msg->act) {
                /*
                 * Set the message's action field to 'NONE' -- not really
                 * necessary anymore, but it should help us catch bugs if we
                 * start reusing messages we shouldn't be.
 
                 */
                msg->act = NONE;

            

            
 
                /*
                 * Prevent handler pool underflow; just send back a 503 error
                 * if there are no available handlers.  An alternative would
                 * be to spawn a new handler thread here (see farther down).
                 */
                if (top <= 0) {
                    reqnode* rn = (reqnode*)msg; 

            

            
/* 
                    /* Send back 'unavailable' message and close connection */
 
#ifdef USE_TCPCORK
                    STATIC_RESPONSE(msg->fd, 503);
                    cow_close_socket(msg->fd);
#else
                    STATIC_RESPONSE(msg->buf, 503);
                    cow_close_socket(msg->buf);
#endif
                    --cow_cur_conn;

                    /* Return the message structure to the pmsg pool */
                    rn->next = pmsgpool_first;
                    pmsgpool_first = rn;
                    continue;
                }
/**********************************************************************
 * Takashi's code for spawning a new handler; he left it commented out
 * so I'll do the same for now.   -- mdr
 *
// alternatively, spawn a new hand. 
void *handler(void*); 
pth_attr_t attr; 
attr = pth_attr_new();
// alternatively, spawn a new hand. 
void *handler(void*); 
pth_attr_t attr; 
attr = pth_attr_new();
	  new_msg->fd = msg->fd; 
new_msg->fd = msg->fd; 
pth_spawn (attr, handler, (void *) ((long) msg->fd));
pth_spawn (attr, handler, (void *) ((long) msg->fd));
*/
 **********************************************************************/ 
			continue;
		    }

		    if (NULL == (new_msg = (pmsg *) malloc(sizeof(pmsg)))) {
			printf("new_req malloc failed\n");
			exit(-1);
		    }
		    new_msg->fd = msg->fd;
 
 
#ifdef SUSP
 
#ifdef SUSP
		    if (FALSE == pth_resume(Stack[top - 1]->id)) {
			perror("pth_resume in depot fialed\n");
			exit(-22);
		    } 
                /* Put the assigned handler back on the scheduling queue */
                if (FALSE == pth_resume(Stack[top - 1]->id)) {
                    perror("pth_resume in depot failed");
                    exit(-22);
 
                }
                ++cow_nohands_running;
#endif
#endif
		    --top;
		    if (TRUE !=
			pth_msgport_put(Stack[top]->mp,
					(pth_message_t *) new_msg)) {
			perror("ERROR: pth_msgport_put failed in depot()");
			exit(-2);
		    }
		}
		else {
		    perror("ERROR: unknown action in depot()\n");
		    exit(-2); 
 

            

            
		}
	    }
	}			// end of sub for loop
    }				// end of main for loop 
                /* Decrement the number of handlers remaining in the pool */
                --top;

                /* Forward the request to the handler */
 
                if (TRUE != pth_msgport_put(Stack[top]->mp,
                                            (pth_message_t *)msg)) {
                    perror(__FILE__ " :: pth_msgport_put");
                    exit(-2);
                }

            /*
             * We should never get here -- there aren't any other possible
             * message types.
             */
            } else {
                printf("ERROR: unknown message action in depot()\n");
                exit(-2);
            }
        }                       /* End of inner for loop */
    }                           /* End of main for loop */
    return (void *)NULL; 

    return (void *)NULL; 



Legend
Lines deleted from 1.10  
Lines Modified
  Lines added in revision 1.3