Revision 1.1 |
Revision 1.10 |
|
#include <assert.h>
|
#include <stdlib.h> #include <stdio.h> #include <errno.h>
|
#include <stdlib.h> #include <stdio.h> #include <errno.h>
|
#include "pth.h"
|
#include <unistd.h>
|
#include "config.h"
#include "pmsg.h"
// response.h/c
int response_503 (int);
// defined in cow.c
extern pth_barrier_t init_bar;
void *
depot (void *_arg)
{
FILE *hands_log;
int max = (int) _arg;
int top = 0;
pth_msgport_t mp;
pth_event_t ev;
int i;
pmsg *msg;
pmsg *new_msg;
pmsg **Stack = (pmsg **) malloc (max * sizeof (pmsg *));
if (NULL == Stack)
{
perror ("malloc failed in depot()\n");
exit (-1);
}
mp = pth_msgport_create (DEPOT);
ev = pth_event (PTH_EVENT_MSG, mp);
while (top < max)
{
pth_wait (ev);
while (pth_msgport_pending (mp))
{
if (top == max)
{
perror ("ERROR: stack overflow \n");
exit (-1);
}
msg = (pmsg *) pth_msgport_get (mp);
if (NULL == msg)
{
perror ("pth_msgport_get (49)");
exit (-1);
}
Stack[top++] = msg;
}
}
pth_barrier_reach (&init_bar); /* end of initialization */
// suspend hands on the stack
#include "schedule.h" /* enabler|disable SUSPention of handler */
#ifdef SUSP
for (i = 0; i < top; i++)
{
if (FALSE == pth_suspend (Stack[i]->id))
{
perror ("pth_suspend fail");
exit (-1);
}
}
#endif /* SUSP */
// main loop
for (;;)
{
if (!(pth_wait (ev) > 0))
{
perror ("depot.c :: pth_msgport_get");
continue;
}
//else
for (;;) // sub for loop
{
msg = (pmsg *) pth_msgport_get (mp);
if (msg == NULL)
{
perror ("depot.c :: pth_msgport_get");
exit (errno);
}
else
{
|
|
#include <sys/types.h> #include <fcntl.h> #include "defines.h"
|
#include <sys/types.h> #include <fcntl.h> #include "defines.h"
|
|
#include "cow.h"
#include "schedule.h"
#include "config.h"
|
|
|
if (PUSH == msg->act)
{
extern int cow_cur_conn;
cow_cur_conn--;
// sanity check - prevent overflow
if (top == max)
{
perror ("depot.c :: stack overflow \n");
exit (-1);
}
Stack[top++] = msg;
|
void *depot(void *_arg) {
int max = (int)_arg;
int top = 0;
pth_msgport_t mp;
pth_event_t ev;
int i;
pmsg *msg;
pmsg *new_msg;
pmsg **Stack = (pmsg **) malloc(max * sizeof(pmsg *));
if (NULL == Stack) {
|
|
perror("malloc failed in depot()");
exit(-1);
}
mp = pth_msgport_create(DEPOT);
ev = pth_event(PTH_EVENT_MSG, mp);
while (top < max) {
pth_wait(ev);
while (pth_msgport_pending(mp)) {
if (top == max) {
perror("ERROR: stack overflow");
exit(-1);
}
msg = (pmsg *) pth_msgport_get(mp);
if (NULL == msg) {
perror("pth_msgport_get");
exit(-1);
}
Stack[top++] = msg;
}
}
pth_barrier_reach(&init_bar); /* end of initialization */
// suspend hands on the stack
|
#ifdef SUSP
|
#ifdef SUSP
|
if (FALSE == pth_suspend (msg->id))
{
perror ("pth_suspend failed in depot.c\n");
exit (-23);
}
|
for (i = 0; i < top; i++) {
if (FALSE == pth_suspend(Stack[i]->id)) {
perror("pth_suspend fail");
exit(-1);
}
|
|
--cow_nohands_running;
}
|
#endif /* SUSP */
|
#endif /* SUSP */
|
}
else if (POP == msg->act)
{
// POP i.e. assign a job
|
|
|
|
// sanity check - prevent overflow
if (1 > top)
{
pth_yield (NULL);
response_503 (msg->fd);
if (close (msg->fd))
{
if (EBADF != errno)
{
perror ("close");
|
/*
* Main loop. Wait for an event indicating that we have a message on
* our message port. Grab the message and figure out whether it's
* a handler push (i.e. one of the handlers is done processing its
* request and needs to be put back on the idle queue) or a handler
* pop (i.e. a new request has arrived and we need to assign a
* handler to it).
*/
for (;;) {
int i, pending;
|
exit (errno);
}
}
|
|
|
|
|
/* Wait for a 'new message' event */
if (!(pth_wait(ev) > 0)) {
perror(__FILE__ " :: pth_wait");
continue;
}
|
|
|
/*
|
/* Get all pending messages from the message port */
|
|
pending = pth_msgport_pending(mp);
for (i = 0; i < pending; ++i) {
msg = (pmsg*) pth_msgport_get(mp);
assert(msg != NULL);
/*
* Is this a PUSH message (i.e. handler finished processing
* a request and wants to return to the pool)?
*/
if (PUSH == msg->act) {
assert(top < max);
Stack[top++] = msg;
#ifdef SUSP
/* Suspend the handler thread; scheduler will ignore it now */
if (FALSE == pth_suspend(msg->id)) {
perror("pth_suspend failed in depot.c");
exit(-23);
}
--cow_nohands_running;
#endif /* SUSP */
/*
* Otherwise, the message should be a POP (i.e. we just
* got a new request and need to assign it to one of the
* handlers from the pool.
*/
} else if (POP == msg->act) {
/*
* Set the message's action field to 'NONE' -- not really
* necessary anymore, but it should help us catch bugs if we
* start reusing messages we shouldn't be.
*/
msg->act = NONE;
/*
* Prevent handler pool underflow; just send back a 503 error
* if there are no available handlers. An alternative would
* be to spawn a new handler thread here (see farther down).
*/
if (top <= 0) {
reqnode* rn = (reqnode*)msg;
/* Send back 'unavailable' message and close connection */
#ifdef USE_TCPCORK
STATIC_RESPONSE(msg->fd, 503);
cow_close_socket(msg->fd);
#else
STATIC_RESPONSE(msg->buf, 503);
cow_close_socket(msg->buf);
#endif
--cow_cur_conn;
/* Return the message structure to the pmsg pool */
rn->next = pmsgpool_first;
pmsgpool_first = rn;
continue;
}
/**********************************************************************
* Takashi's code for spawning a new handler; he left it commented out
* so I'll do the same for now. -- mdr
*
|
// alternatively, spawn a new hand. void *handler(void*); pth_attr_t attr; attr = pth_attr_new();
|
// alternatively, spawn a new hand. void *handler(void*); pth_attr_t attr; attr = pth_attr_new();
|
new_msg->fd = msg->fd;
|
new_msg->fd = msg->fd;
|
pth_spawn (attr, handler, (void *) ((long) msg->fd));
|
pth_spawn (attr, handler, (void *) ((long) msg->fd));
|
*/
|
**********************************************************************/
|
continue;
}
if (NULL == (new_msg = (pmsg *) malloc (sizeof (pmsg))))
{
printf ("new_req malloc failed\n");
exit (-1);
}
new_msg->fd = msg->fd;
|
|
#ifdef SUSP
|
#ifdef SUSP
|
if (FALSE == pth_resume (Stack[top - 1]->id))
{
perror ("pth_resume in depot fialed\n");
exit (-22);
}
|
/* Put the assigned handler back on the scheduling queue */
if (FALSE == pth_resume(Stack[top - 1]->id)) {
perror("pth_resume in depot failed");
exit(-22);
}
|
|
++cow_nohands_running;
|
#endif
|
#endif
|
--top;
if (TRUE !=
pth_msgport_put (Stack[top]->mp, (pth_message_t *) new_msg))
{
perror ("ERROR: pth_msgport_put failed in depot()");
exit (-2);
}
}
else
{
perror ("ERROR: unknown action in depot()\n");
exit (-2);
|
|
|
|
}
}
} // end of sub for loop
} // end of main for loop
return (void *) NULL;
|
/* Decrement the number of handlers remaining in the pool */
--top;
/* Forward the request to the handler */
if (TRUE != pth_msgport_put(Stack[top]->mp,
|
|
(pth_message_t *)msg)) {
perror(__FILE__ " :: pth_msgport_put");
exit(-2);
}
/*
* We should never get here -- there aren't any other possible
* message types.
*/
} else {
printf("ERROR: unknown message action in depot()\n");
exit(-2);
}
} /* End of inner for loop */
} /* End of main for loop */
return (void *)NULL;
|
}
|
}
|