Revision 1.2 |
Revision 1.10 |
|
#include <assert.h>
|
#include <stdlib.h> #include <stdio.h> #include <errno.h>
|
#include <stdlib.h> #include <stdio.h> #include <errno.h>
|
#include "pth.h"
|
#include <unistd.h>
|
|
#include <sys/types.h>
#include <fcntl.h>
#include "defines.h"
#include "cow.h"
#include "schedule.h"
|
#include "config.h"
|
#include "config.h"
|
#include "pmsg.h"
|
|
|
|
// response.h/c
|
void *depot(void *_arg) {
|
int response_503(int);
// defined in cow.c
extern pth_barrier_t init_bar;
void *depot(void *_arg)
{
FILE *hands_log;
|
|
int max = (int)_arg; int top = 0; pth_msgport_t mp;
|
int max = (int)_arg; int top = 0; pth_msgport_t mp;
|
Line 25 |
Line 21 |
pmsg **Stack = (pmsg **) malloc(max * sizeof(pmsg *)); if (NULL == Stack) {
|
pmsg **Stack = (pmsg **) malloc(max * sizeof(pmsg *)); if (NULL == Stack) {
|
perror("malloc failed in depot()\n");
exit(-1);
|
perror("malloc failed in depot()");
exit(-1);
|
} mp = pth_msgport_create(DEPOT); ev = pth_event(PTH_EVENT_MSG, mp); while (top < max) {
|
} mp = pth_msgport_create(DEPOT); ev = pth_event(PTH_EVENT_MSG, mp); while (top < max) {
|
pth_wait(ev);
while (pth_msgport_pending(mp)) {
if (top == max) {
perror("ERROR: stack overflow \n");
exit(-1);
}
msg = (pmsg *) pth_msgport_get(mp);
if (NULL == msg) {
perror("pth_msgport_get (49)");
exit(-1);
}
Stack[top++] = msg;
|
pth_wait(ev);
while (pth_msgport_pending(mp)) {
if (top == max) {
perror("ERROR: stack overflow");
exit(-1);
}
msg = (pmsg *) pth_msgport_get(mp);
if (NULL == msg) {
perror("pth_msgport_get");
exit(-1);
}
Stack[top++] = msg;
}
|
}
|
|
}
|
}
|
pth_barrier_reach(&init_bar); /* end of initialization */
|
pth_barrier_reach(&init_bar); /* end of initialization */
|
// suspend hands on the stack
|
// suspend hands on the stack
|
#include "schedule.h" /* enabler|disable SUSPention of handler */
|
|
#ifdef SUSP for (i = 0; i < top; i++) {
|
#ifdef SUSP for (i = 0; i < top; i++) {
|
if (FALSE == pth_suspend(Stack[i]->id)) {
perror("pth_suspend fail");
exit(-1);
}
|
if (FALSE == pth_suspend(Stack[i]->id)) {
perror("pth_suspend fail");
exit(-1);
}
|
|
--cow_nohands_running;
|
} #endif /* SUSP */
|
} #endif /* SUSP */
|
// main loop
|
/*
|
|
* Main loop. Wait for an event indicating that we have a message on
* our message port. Grab the message and figure out whether it's
* a handler push (i.e. one of the handlers is done processing its
* request and needs to be put back on the idle queue) or a handler
* pop (i.e. a new request has arrived and we need to assign a
* handler to it).
*/
|
for (;;) {
|
for (;;) {
|
if (!(pth_wait(ev) > 0)) {
|
int i, pending;
|
perror("depot:67 :: pth_msgport_get");
continue;
}
//else
for (;;) // sub for loop
{
msg = (pmsg *) pth_msgport_get(mp);
|
|
|
|
if (msg == NULL) {
perror("depot:76 :: pth_msgport_get");
exit(errno);
}
else {
|
/* Wait for a 'new message' event */
if (!(pth_wait(ev) > 0)) {
perror(__FILE__ " :: pth_wait");
continue;
}
|
#include <sys/types.h>
#include <fcntl.h>
#include "defines.h"
|
|
|
|
if (PUSH == msg->act) {
extern int cow_cur_conn;
cow_cur_conn--;
// sanity check - prevent overflow
if (top == max) {
perror("depot.c :: stack overflow \n");
exit(-1);
}
Stack[top++] = msg;
|
/* Get all pending messages from the message port */
pending = pth_msgport_pending(mp);
for (i = 0; i < pending; ++i) {
msg = (pmsg*) pth_msgport_get(mp);
assert(msg != NULL);
/*
* Is this a PUSH message (i.e. handler finished processing
* a request and wants to return to the pool)?
|
|
*/
if (PUSH == msg->act) {
assert(top < max);
Stack[top++] = msg;
|
#ifdef SUSP
|
#ifdef SUSP
|
if (FALSE == pth_suspend(msg->id)) {
perror("pth_suspend failed in depot.c\n");
exit(-23);
}
|
/* Suspend the handler thread; scheduler will ignore it now */
if (FALSE == pth_suspend(msg->id)) {
perror("pth_suspend failed in depot.c");
exit(-23);
|
|
}
--cow_nohands_running;
|
#endif /* SUSP */
|
#endif /* SUSP */
|
}
else if (POP == msg->act) {
// POP i.e. assign a job
|
|
|
|
// sanity check - prevent overflow
if (1 > top) {
pth_yield(NULL);
response_503(msg->fd);
if (close(msg->fd)) {
if (EBADF != errno) {
perror("close");
exit(errno);
}
}
|
/*
* Otherwise, the message should be a POP (i.e. we just
* got a new request and need to assign it to one of the
* handlers from the pool.
*/
} else if (POP == msg->act) {
/*
* Set the message's action field to 'NONE' -- not really
* necessary anymore, but it should help us catch bugs if we
* start reusing messages we shouldn't be.
|
|
*/
msg->act = NONE;
|
|
|
|
/*
* Prevent handler pool underflow; just send back a 503 error
* if there are no available handlers. An alternative would
* be to spawn a new handler thread here (see farther down).
*/
if (top <= 0) {
reqnode* rn = (reqnode*)msg;
|
|
|
/*
|
/* Send back 'unavailable' message and close connection */
|
|
#ifdef USE_TCPCORK
STATIC_RESPONSE(msg->fd, 503);
cow_close_socket(msg->fd);
#else
STATIC_RESPONSE(msg->buf, 503);
cow_close_socket(msg->buf);
#endif
--cow_cur_conn;
/* Return the message structure to the pmsg pool */
rn->next = pmsgpool_first;
pmsgpool_first = rn;
continue;
}
/**********************************************************************
* Takashi's code for spawning a new handler; he left it commented out
* so I'll do the same for now. -- mdr
*
|
// alternatively, spawn a new hand. void *handler(void*); pth_attr_t attr; attr = pth_attr_new();
|
// alternatively, spawn a new hand. void *handler(void*); pth_attr_t attr; attr = pth_attr_new();
|
new_msg->fd = msg->fd;
|
new_msg->fd = msg->fd;
|
pth_spawn (attr, handler, (void *) ((long) msg->fd));
|
pth_spawn (attr, handler, (void *) ((long) msg->fd));
|
*/
|
**********************************************************************/
|
continue;
}
if (NULL == (new_msg = (pmsg *) malloc(sizeof(pmsg)))) {
printf("new_req malloc failed\n");
exit(-1);
}
new_msg->fd = msg->fd;
|
|
#ifdef SUSP
|
#ifdef SUSP
|
if (FALSE == pth_resume(Stack[top - 1]->id)) {
perror("pth_resume in depot fialed\n");
exit(-22);
}
|
/* Put the assigned handler back on the scheduling queue */
if (FALSE == pth_resume(Stack[top - 1]->id)) {
perror("pth_resume in depot failed");
exit(-22);
|
|
}
++cow_nohands_running;
|
#endif
|
#endif
|
--top;
if (TRUE !=
pth_msgport_put(Stack[top]->mp,
(pth_message_t *) new_msg)) {
perror("ERROR: pth_msgport_put failed in depot()");
exit(-2);
}
}
else {
perror("ERROR: unknown action in depot()\n");
exit(-2);
|
|
|
|
}
}
} // end of sub for loop
} // end of main for loop
|
/* Decrement the number of handlers remaining in the pool */
--top;
/* Forward the request to the handler */
|
|
if (TRUE != pth_msgport_put(Stack[top]->mp,
(pth_message_t *)msg)) {
perror(__FILE__ " :: pth_msgport_put");
exit(-2);
}
/*
* We should never get here -- there aren't any other possible
* message types.
*/
} else {
printf("ERROR: unknown message action in depot()\n");
exit(-2);
}
} /* End of inner for loop */
} /* End of main for loop */
|
return (void *)NULL; }
|
return (void *)NULL; }
|