Revision 1.3 |
Revision 1.9 |
Line 3 |
Line 3 |
#include <stdlib.h> #include "pth.h" 
|
#include <stdlib.h> #include "pth.h" 
|
|
#include "cow.h"
|
#include "config.h" #include "request.h" #include "http_message.h"
|
#include "config.h" #include "request.h" #include "http_message.h"
|
#include "pmsg.h"
#include "defines.h" // refers to #define MAXREQLINE 1024
|
#include "defines.h" // refers to #define MAXREQLINE 1024
|
|
//#define HAND_ACCOUNTING
|
/* * The HTTP request handler */
|
/* * The HTTP request handler */
|
// defined in cow.c
|
void *hand(void *_arg) {
|
extern pth_barrier_t init_bar;
extern pth_t tid_depot;
extern int cow_max_conn;
extern int cow_cur_conn;
void *hand(void *_arg)
{
|
|
// pth related vars pth_t my_id = (pth_t) NULL;
|
// pth related vars pth_t my_id = (pth_t) NULL;
|
char my_name[16]; // 64-bit machine use 16 here? 8 is fine on 32bit pc
pth_msgport_t depot;
|
char my_name[16]; // 64-bit machine use 16 here? 8 is fine on 32bit pc
pth_msgport_t depot = NULL;
|
pth_msgport_t mp; pth_event_t ev; pmsg pm_msg;
|
pth_msgport_t mp; pth_event_t ev; pmsg pm_msg;
|
Line 32 |
Line 27 |
// httpd related vars int ret_val;
|
// httpd related vars int ret_val;
|
int fd = (int)NULL;
|
|
msg req_msg;
|
msg req_msg;
|
pm_msg.id = my_id = pth_self();
sprintf(my_name, "%x", (int)my_id);
/* no doc. of ret val on fail ; on success, it's pth_msgport_t */
pm_msg.mp = mp = pth_msgport_create(my_name);
|
/*
* Setup a message port for the current thread so the depot can dispatch
* requests to us.
*/
|
|
my_id = pth_self();
sprintf(my_name, "hand%d", (int)_arg);
mp = pth_msgport_create(my_name);
if (mp == NULL) {
perror("Failed to create message port for handler");
return NULL;
}
/* Wait for the depot's message port to become available */
depot = pth_msgport_find(DEPOT);
while (NULL == depot) {
pth_yield(NULL);
depot = pth_msgport_find(DEPOT);
}
/*
* Send an initial "request finished" message to the depot. This will
* push this handler onto the stack of idle handlers and allow the depot
* to dispatch requests to us.
*/
pm_msg.id = my_id;
pm_msg.mp = mp;
|
pm_msg.act = PUSH;
|
pm_msg.act = PUSH;
|
|
if (TRUE != pth_msgport_put(depot, (pth_message_t*) &pm_msg)) {
perror("msgport_put failed in hand initialization");
exit(1);
}
/* Wait for all threads to finish initializing */
pth_barrier_reach(&init_bar);
/*
* Main handler loop; wait for a message on our message port, process the
* associated HTTP request(s), and then notify the depot that we're done.
*/
|
ev = pth_event(PTH_EVENT_MSG, mp);
|
ev = pth_event(PTH_EVENT_MSG, mp);
|
|
for (;;) {
reqnode* rn;
#ifdef HAND_ACCOUNTING
struct timeval t1, t2;
#endif
|
|
|
//initialization
for (;;) {
|
/* Wait for a request to be dispatched to us */
pth_wait(ev);
|
|
|
if (NULL != (depot = pth_msgport_find(DEPOT))) {
if (TRUE != pth_msgport_put(depot, (pth_message_t *) & pm_msg)) {
printf
("ERROR: msgport_put fialed in hand() initialization. \n");
exit(-1);
}
break;
}
pth_yield(NULL);
|
#ifdef HAND_ACCOUNTING
gettimeofday(&t1, NULL);
printf("%s starting back up after %d ms delay.\n", my_name, (t1.tv_sec - t2.tv_sec)*1000 + (t1.tv_usec - t2.tv_usec)/1000);
#endif
/* Get the request message from our message port */
pp_msg = (pmsg *) pth_msgport_get(mp);
assert(pp_msg != NULL);
|
|
/* Extract the file descriptor / transaction buffer */
#ifdef USE_TCPCORK
req_msg.fdp = pp_msg->fd;
#else
req_msg.fdp = pp_msg->buf;
#endif
/* We're done with the pmsg structure now; return it to the pool */
rn = (reqnode*)pp_msg;
rn->next = pmsgpool_first;
pmsgpool_first = rn;
/* Read and process the request */
ret_val = read_request(&req_msg);
cow_tot_served++;
#ifdef HAND_ACCOUNTING
gettimeofday(&t2, NULL);
printf("%s finished after %d ms delay.\n", my_name, (t2.tv_sec - t1.tv_sec)*1000 + (t2.tv_usec - t1.tv_usec)/1000);
#endif
/*
* Send a message to the depot to put this thread back on the 'idle
* handler' stack.
*/
if (TRUE != pth_msgport_push(depot, (pth_message_t *) & pm_msg)) {
printf("ERROR: msgport_push fialed in hand() initialization. \n");
exit(-1);
}
|
}
|
}
|
pth_barrier_reach(&init_bar);
// end of initialization
|
|
|
|
for (;;) {
if (1 != pth_wait(ev)) // returned value is 0 or 1.
|
/* unreachable */
return _arg;
|
{
printf("possibly error on pth_wait in hand() \n");
continue;
}
printf("(DEBUG) Handler %d got a message\n", my_id);
/*
++cow_cur_conn;
cow_max_conn = cow_max_conn<cow_cur_conn?cow_cur_conn:cow_max_conn;
*/
pp_msg = (pmsg *) pth_msgport_get(mp);
if (NULL == pp_msg)
exit(-726578); // ascii dec HAN
fd = pp_msg->fd;
req_msg.per_thread = 0;
req_msg.fdp = fd;
ret_val = read_request(&req_msg);
/*
if( ret_val ) printf("error? hand.c retval from read_request = %d",ret_val);
*/
// then put myself on the queue
if (TRUE != pth_msgport_push(depot, (pth_message_t *) & pm_msg)) {
printf("ERROR: msgport_push fialed in hand() initialization. \n");
exit(-1);
}
} /* end of for loop */
// unreachable
return (void *)NULL;
|
|
}
|
}
|
/*
* handler is per thread spawned for each request.
* This is still here just for ref or debug purpose.
*/
void *handler(void *_arg)
{
int fd = (int)((long)_arg);
int msgsize;
msg req_msg;
msg *new_req_msg;
req_msg.per_thread = 1;
req_msg.fdp = fd;
msgsize = read_request(&req_msg);
printf("read_request() returned %d\n", msgsize);
// to make sure
/*
{ extern int cow_noof;
COW_CLOSE(req_msg.fdp)
COW_CLOSE(req_msg.fdd)
}
*/
pth_abort(pth_self());
pth_exit(NULL);
return (void *)NULL;
}
|
|