Revision 1.10 |
Revision 1.19 |
Line 6 |
Line 6 |
#include "mime.h" #include "http_message.h"  
|
#include "mime.h" #include "http_message.h"  
|
#define ERROR_LOG stdout
|
|
/* * The HTTP request handler */
|
/* * The HTTP request handler */
|
void fixup_server_root();
|
static void fixup_server_root(void);
|
|
static void spawn_threads(void);
|
/* from hand.c */ void *handler(void *);
|
/* from hand.c */ void *handler(void *);
|
Line 32 |
Line 31 |
pth_barrier_t init_bar; pth_t tid_depot; pth_t tid_main;
|
pth_barrier_t init_bar; pth_t tid_depot; pth_t tid_main;
|
pmsg new_req;
|
reqnode* new_req;
|
|
int zero = 0;
int one = 1;
|
/* * And the server main procedure */
|
/* * And the server main procedure */
|
// COW_NOHAND = max # hands
#define COW_NOHAND_X COW_NOHAND*3
pmsg new_req_list[COW_NOHAND_X]; // new request to send as a msg to the depot
|
/* pool of pmsg's used to avoid malloc overhead */
reqnode* pmsgpool;
reqnode* pmsgpool_first = NULL;
|
static int new_req_x = 0; // index of new_req_list
static int cow_noacc = 0; // number of req pending at accept
|
|
int cow_nohands_running = 0; // number of handler running
|
int cow_nohands_running = 0; // number of handler running
|
int cow_nohands = COW_NOHAND; // max numbef or handler (see defines.h for val)
|
int cow_nohands = COW_NOHAND; // max number of handlers (see defines.h for val)
|
|
int server_root_len = 0;
|
static int s_socket; pth_attr_t attr;
|
static int s_socket; pth_attr_t attr;
|
/* these values are "number of hands running at the same time" */
|
|
unsigned long cow_max_conn = 0; /* max simultaneous connection of session */ unsigned long cow_cur_conn = 0; /* current simultaneous connection */
|
unsigned long cow_max_conn = 0; /* max simultaneous connection of session */ unsigned long cow_cur_conn = 0; /* current simultaneous connection */
|
|
unsigned long cow_tot_conn = 0; /* total connections accepted */
unsigned long cow_tot_parsed = 0; /* total completed parses */
unsigned long cow_tot_served = 0; /* total completed serves */
|
char* cfgfile = NULL; char cwd[400];
|
char* cfgfile = NULL; char cwd[400];
|
|
/****
* myexit()
*
* Signal handler used to catch SIGINT (i.e. Ctrl+C) or SIGTERM (i.e. kill)
****/
|
static void myexit(int sig) { if (sig != SIGINT && sig != SIGTERM)
|
static void myexit(int sig) { if (sig != SIGINT && sig != SIGTERM)
|
fprintf(ERROR_LOG, "myexit() called without SIGINT or SIGTERM!?!?\n");
|
fprintf(ELOG, "myexit() called without SIGINT or SIGTERM!?!?\n");
|
COW_CLOSE(s_socket); pth_attr_destroy(attr);
|
COW_CLOSE(s_socket); pth_attr_destroy(attr);
|
Line 68 |
Line 77 |
if (verbose) { printf("cow_max_conn = %lu\n", cow_max_conn); printf("cow_cur_conn = %lu\n", cow_cur_conn);
|
if (verbose) { printf("cow_max_conn = %lu\n", cow_max_conn); printf("cow_cur_conn = %lu\n", cow_cur_conn);
|
|
printf("cow_tot_conn = %lu\n", cow_tot_conn);
printf("cow_tot_parsed = %lu\n", cow_tot_parsed);
printf("cow_tot_served = %lu\n", cow_tot_served);
|
printf("cow_noof = %d\n", cow_noof); }
|
printf("cow_noof = %d\n", cow_noof); }
|
Line 82 |
Line 94 |
} int main(int argc, char *argv[]) {
|
} int main(int argc, char *argv[]) {
|
|
int stay_in_foreground = 0;
|
struct sockaddr_in sar; struct protoent *pe; struct sockaddr_in peer_addr;
|
struct sockaddr_in sar; struct protoent *pe; struct sockaddr_in peer_addr;
|
socklen_t peer_len;
|
socklen_t peer_len = sizeof(peer_addr);
|
int sr; int c; /* command line arg */ FILE *mime_fp; pth_msgport_t mp_depot; // port to the depot
|
int sr; int c; /* command line arg */ FILE *mime_fp; pth_msgport_t mp_depot; // port to the depot
|
|
int i;
int one = 1;
#ifdef SENDBUFSIZE
int bufsize = SENDBUFSIZE;
#endif
int backlog;
int numforks = 1;
|
/* * Takashi had hardcoded MAX_OPEN_FD into defines.h to 1024, but that
|
/* * Takashi had hardcoded MAX_OPEN_FD into defines.h to 1024, but that
|
Line 111 |
Line 131 |
} }
|
} }
|
while (-1 != (c = getopt(argc, argv, "c:r:p:v"))) {
|
/* Parse command line options */
|
|
while (-1 != (c = getopt(argc, argv, "b:c:df:r:p:v"))) {
|
switch (c) {
|
switch (c) {
|
|
case 'b':
backlog = atoi(optarg);
break;
|
case 'c': cfgfile = optarg;
|
case 'c': cfgfile = optarg;
|
|
break;
case 'd':
stay_in_foreground = 1;
break;
case 'f':
numforks = atoi(optarg);
|
break; case 'r': server_root = strdup(optarg);
|
break; case 'r': server_root = strdup(optarg);
|
Line 122 |
Line 152 |
case 'p': server_port = atoi(optarg); if (server_port <= 0 || server_port >= 65535) {
|
case 'p': server_port = atoi(optarg); if (server_port <= 0 || server_port >= 65535) {
|
fprintf(ERROR_LOG, "Illegal port: %d\n", server_port);
|
fprintf(stderr, "Illegal port: %d\n", server_port);
|
exit(1); } break;
|
exit(1); } break;
|
Line 130 |
Line 160 |
verbose = 1; break; default:
|
verbose = 1; break; default:
|
fprintf(stderr, "Usage: %s [-c <cfg file>] [-p <port>] [-r <dir>]\n",
argv[0]);
|
fprintf(stderr, "Usage: %s [-c <cfg file>] [-d] [-p <port>] "
"[-r <dir>] [-f <numprocs>] [-b <backlog>]\n", argv[0]);
|
exit(1); } }
|
exit(1); } }
|
|
/* Background this process */
if (!stay_in_foreground) {
switch(fork()) {
case -1:
/* non-fatal failure */
perror("Warning: could not daemon-ize CoW");
break;
case 0:
/* Child; continue running CoW */
break;
default:
/* Parent; exit */
exit(0);
}
}
/* Read cow.conf */
|
read_config_files(cfgfile);
|
read_config_files(cfgfile);
|
mime_fp = fopen("/etc/mime.types", "r");
if (NULL == mime_fp) {
fprintf(stderr, "FATAL: Unable to open /etc/mime.types\n");
exit(-1);
}
mime_init(mime_fp);
if (fclose(mime_fp)) {
perror("fclose");
|
/* Register mime types if we're not using a single, global mime type */
if (!global_mime) {
mime_fp = fopen("/etc/mime.types", "r");
if (NULL == mime_fp) {
fprintf(stderr, "FATAL: Unable to open /etc/mime.types\n");
exit(-1);
}
mime_init(mime_fp);
|
|
if (fclose(mime_fp)) {
perror("fclose");
}
|
} /* setting up the server root */
|
} /* setting up the server root */
|
Line 155 |
Line 205 |
printf("\n" PACKAGE_STRING "\n"); printf("Send feedback and bugreports to " PACKAGE_BUGREPORT "\n");
|
printf("\n" PACKAGE_STRING "\n"); printf("Send feedback and bugreports to " PACKAGE_BUGREPORT "\n");
|
printf("Hit CTRL-C to stop the server.\n\n");
|
if (numforks > 1)
|
|
printf("Forked %d CoW processes.\n", numforks);
if (stay_in_foreground)
printf("Hit CTRL-C to stop the foreground server.\n\n");
|
if (verbose) {
|
if (verbose) {
|
fprintf(ERROR_LOG, "Using server port = %d\n", server_port);
fprintf(ERROR_LOG, "Using server root = %s\n", server_root);
fprintf(ERROR_LOG, "PthVersion 0x V RR T LL is %lX\n", pth_version());
|
printf("Using server port = %d\n", server_port);
printf("Using server root = %s\n", server_root);
printf("PthVersion 0x V RR T LL is %lX\n", pth_version());
|
|
#ifdef USE_TCPCORK
printf("TCP corking enabled\n");
#else
printf("Network buffering code enabled\n");
#endif
|
} /* initialize scheduler */
|
} /* initialize scheduler */
|
Line 171 |
Line 229 |
signal(SIGINT, myexit); signal(SIGTERM, myexit);
|
signal(SIGINT, myexit); signal(SIGTERM, myexit);
|
/* for now, they are dummy functions */
|
/* If logging was compiled in, open the logs */
|
open_logs();
|
open_logs();
|
|
/* for now, this is a dummy function */
|
create_common_env(); /* create TCP socket */
|
create_common_env(); /* create TCP socket */
|
Line 186 |
Line 246 |
exit(1); }
|
exit(1); }
|
/* Blank the message list */
memset(new_req_list, 0, sizeof(new_req_list));
|
/*
* Reuse the server socket; this way we don't have to wait for a
|
|
* couple minutes after shutdown to get rid of the "Address in Use"
* messages. Failure here is not fatal.
*/
setsockopt(s_socket, SOL_SOCKET, SO_REUSEADDR,
(void*)&one, sizeof(one));
/*
* Allocate a pool of pmsg structures; we'll keep re-using these in
* order to avoid the overhead of malloc(). We technically only need
* one message per handler thread, but we'll allocate a multiple of that
* in order to let the depot queue up some requests. POOL_SIZE_FACT
* is set in defines.h.
*/
pmsgpool = (reqnode*)malloc(POOL_SIZE_FACT*cow_nohands*sizeof(reqnode));
if (pmsgpool == NULL) {
perror("Failed to allocate request message pool");
exit(1);
}
memset(pmsgpool, 0, POOL_SIZE_FACT * cow_nohands * sizeof(reqnode));
for (i = 0; i < cow_nohands - 1; ++i)
pmsgpool[i].next = &pmsgpool[i+1];
pmsgpool[cow_nohands - 1].next = NULL;
pmsgpool_first = pmsgpool;
|
/* bind socket to port */ sar.sin_family = AF_INET;
|
/* bind socket to port */ sar.sin_family = AF_INET;
|
Line 199 |
Line 282 |
exit(1); }
|
exit(1); }
|
/* start listening on the socket; queue up a backlog of cow_nohands */
if (listen(s_socket, cow_nohands) == -1) {
|
/* start listening on the socket */
//if (listen(s_socket, SOMAXCONN) == -1) {
|
|
//if (listen(s_socket, cow_nohands) == -1) {
if (listen(s_socket, backlog)) {
|
perror("listen"); exit(1); }
|
perror("listen"); exit(1); }
|
/* initialize static reply message */
|
/* initialize static reply messages */
|
if (response_init()) { perror("response_init"); exit(1); }
|
if (response_init()) { perror("response_init"); exit(1); }
|
#include "schedule.h" // defines use POOL or not
#ifdef USE_POOL
cow_noacc = 2 * cow_nohands;
/* pool of threads -> future handler */
attr = pth_attr_new();
pth_attr_set(attr, PTH_ATTR_NAME, "depot");
pth_attr_set(attr, PTH_ATTR_JOINABLE, FALSE);
pth_attr_set(attr, PTH_ATTR_STACK_SIZE, COW_STACK_SIZE);
{
int h = cow_nohands; // number of hands to spawn
int i = 0;
|
spawn_threads();
/* Get a handle for the depot's message port */
mp_depot = pth_msgport_find(DEPOT);
if (mp_depot == NULL) {
perror("Failed to get depot's message port");
exit(1);
}
/* Fork off extra processes so that we can handle more connections (FD's) */
while (--numforks > 0) {
int pid;
if ((pid = fork()) < 0) {
perror("fork");
|
if (verbose)
printf("using POOL of hands (thread) of size %d\n", h);
tid_depot = pth_spawn(attr, depot, (void *)h);
if (tid_depot == NULL) {
perror("Failed to spawn depot");
|
|
exit(1);
|
exit(1);
|
}
tid_main = pth_self();
|
} else if (pid == 0) {
break;
}
|
pth_barrier_init(&init_bar, (h + 2)); // +2 for main and depot
pth_attr_set(attr, PTH_ATTR_NAME, "hand");
pth_attr_set(attr, PTH_ATTR_PRIO, PTH_PRIO_MAX);
{
int x;
for (i = 0; i < h; i++) {
x = (int)pth_spawn(attr, hand, (void *)NULL);
if (x < 0) {
perror("pth_spawn in main");
exit(errno);
}
++cow_nohands_running;
}
}
pth_attr_destroy(attr);
for (;;) {
if (NULL != (mp_depot = pth_msgport_find(DEPOT)))
break;
pth_yield(NULL);
}
pth_barrier_reach(&init_bar);
|
|
} /* loop for requests */ for (;;) {
|
} /* loop for requests */ for (;;) {
|
|
#ifndef ENABLE_503
/* Wait until we actually have a free handler thread */
while (cow_cur_conn >= cow_nohands)
pth_yield(NULL);
#endif
|
/* accept next connection */
|
/* accept next connection */
|
peer_len = sizeof(peer_addr);
|
|
if (-1 == (sr = pth_accept(s_socket, (struct sockaddr *)&peer_addr, &peer_len))) { if (EMFILE == errno) { perror("EMFILE in accept"); continue; } else { perror("accept");
|
if (-1 == (sr = pth_accept(s_socket, (struct sockaddr *)&peer_addr, &peer_len))) { if (EMFILE == errno) { perror("EMFILE in accept"); continue; } else { perror("accept");
|
|
printf("errno = %d\n", errno);
|
myexit(sr); }
|
myexit(sr); }
|
} else {
++cow_noof;
++cow_cur_conn;
cow_max_conn = cow_max_conn < cow_cur_conn ?
cow_cur_conn : cow_max_conn;
/* Optional: Disable Nagle algorithm */
if (disable_nagle) {
int one = 1;
setsockopt(sr, IPPROTO_TCP, TCP_NODELAY,
(void*)&one, sizeof(one));
}
}
|
}
++cow_noof;
++cow_cur_conn;
++cow_tot_conn;
cow_max_conn = cow_max_conn < cow_cur_conn ?
cow_cur_conn : cow_max_conn;
/*
* We can't use TCP_CORK together with TCP_NODELAY (wouldn't make any
* sense). If we have TCP_CORK, use it; if not, turn off Nagle and
* use our own buffering scheme (stolen from Apache).
*/
#ifdef USE_TCPCORK
|
//while (cow_cur_conn >= cow_nohands ) pth_yield(NULL);
//while (cow_cur_conn > COW_NOHAND/2 ) pth_yield(NULL);
// got to be better way to tune this ...
//while ( pth_msgport_pending(mp_depot) > COW_NOHAND ) pth_yield(NULL);
//while ( cow_cur_conn > COW_NOHAND/2 ) pth_yield(NULL);
//while ( ( cow_cur_conn > (COW_NOHAND - COW_NOHAND/8) ) ||
//while ((cow_cur_conn > (COW_NOHAND - COW_NOHAND / 4)) //||
// // ((1024 - 128 - 4) - pth_msgport_pending(mp_depot) -
// // 3 * (long)cow_cur_conn / 2 < 0)
// //( pth_msgport_pending(mp_depot) > MAX_DEPOT )
// ) {
// //printf("Current connections: %lu, pending=%d, hands running=%d\n", cow_cur_conn, pth_msgport_pending(mp_depot), cow_nohands_running);
// pth_yield(NULL);
//}
|
|
{
|
{
|
/* verbose
fprintf (ERROR_LOG,
"connection established (fd: %d, ip: %s, port: %d)\n",
sr,
inet_ntoa (peer_addr.sin_addr), ntohs (peer_addr.sin_port));
*/
}
/* redirect the request msg to a handler (via depot) */
// first, make a msg packet
|
int one = 1;
setsockopt(sr, IPPROTO_TCP, TCP_CORK, &one, sizeof(one));
}
#else
/* Optional: Disable Nagle algorithm */
if (disable_nagle) {
int one = 1;
setsockopt(sr, IPPROTO_TCP, TCP_NODELAY,
(void*)&one, sizeof(one));
}
|
|
#endif
/* Optional: Change outgoing buffer size */
#ifdef SENDBUFSIZE
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF,
&bufsize, sizeof(bufsize)) < 0) {
perror("Error setting send buffer size");
}
#endif
|
/*
|
/*
|
* If the depot has a full message queue, wait for it to
* do some processing. If we don't, we'll overwrite one
* of the pmsg's that it hasn't processed yet and that
* message will get stuck on the same message port twice
* which causes all kinds of problems in the internal
* pth_ring_t structure.
|
* Stick the information about the incoming connection in a pmsg
* structure and ship it to the depot so that it can be dispatched
* to a a handler thread. If the pmsg pool is empty, then it means
* the depot has a backlog of requests. We'll wait here (and not
* accept any new connections) until some pmsg structures are freed
* up.
|
*/
|
*/
|
while (new_req_list[new_req_x].act != NONE)
|
while (pmsgpool_first == NULL)
|
pth_yield(NULL);
|
pth_yield(NULL);
|
new_req_list[new_req_x].fd = sr;
new_req_list[new_req_x].act = POP;
if (TRUE !=
pth_msgport_put(mp_depot,
(pth_message_t *) & new_req_list[new_req_x])) {
fprintf(stderr, "msgport_put failed\n");
|
/* Just grab the first pmsg off the stack */
new_req = pmsgpool_first;
pmsgpool_first = pmsgpool_first->next;
new_req->next = NULL;
/* Fill in the pmsg fields and then send it */
|
|
#ifdef USE_TCPCORK
/* TCP_CORK will take care of all buffering needs; just pass the FD */
(new_req->m).fd = sr;
#else
/* Create a buffer wrapper around the file descriptor and pass that */
(new_req->m).buf = ap_bcreate(B_RDWR);
ap_bpushfd((new_req->m).buf, sr, sr);
#endif
(new_req->m).act = POP;
if (TRUE != pth_msgport_put(mp_depot, (pth_message_t*)new_req)) {
fprintf(stderr, "Couldn't send request to depot (msgport_put)\n");
|
exit(-2); }
|
exit(-2); }
|
new_req_x++;
|
}
|
new_req_x %= COW_NOHAND_X;
}
#else /* NON POOL VERSION */
#ifdef NON_POOL_SINGLE
if (verbose)
printf("using NON POOL && NO SPAWNING THREAD.\n");
for (;;) {
if ((sr =
pth_accept(s_socket, (struct sockaddr *)&peer_addr,
&peer_len)) == -1) {
perror("accept");
continue;
}
{
msg req;
req.fdp = sr;
++cow_noof;
read_request(&req);
if (cow_noof > 100 && verbose)
printf("--- cow_noof == %d ---\n", cow_noof);
DEB_FD}
}
#else
if (verbose)
printf("using NON POOL (i.e. spawn a new thread per request.\n");
/* finally loop for requests */
pth_attr_set(attr, PTH_ATTR_NAME, "handler");
pth_attr_set(attr, PTH_ATTR_JOINABLE, FALSE);
if (verbose)
printf("listening on port %d (max %d simultaneous connections)\n",
server_port, cow_nohands);
/* to increase|decrease the priority of the main thread
{
pth_attr_t this_attr = NULL;
pth_t this_id = pth_self();
int *prio;
this_attr = pth_attr_of(this_id);
// pth_attr_get(this_id, PTH_ATTR_PRIO, prio);
// printf("prio = %d\n", *prio);
pth_attr_set(this_id, PTH_ATTR_PRIO, 100);
}
*/
for (;;) {
/* accept next connection */
peer_len = sizeof(peer_addr);
if ((sr =
pth_accept(s_socket, (struct sockaddr *)&peer_addr,
&peer_len)) == -1) {
if (EMFILE == errno) {
// simply yield or sleep is no good.
// so let the hands process some, then yield.
while (pth_ctrl
(PTH_CTRL_GETTHREADS_READY
| PTH_CTRL_GETTHREADS_WAITING) > cow_nohands / 4)
pth_yield(NULL);
// tune this number
continue;
}
else {
// really bad. (not simply out of fd).
perror("accept");
exit(errno);
}
}
/* debug :
{
int cthreads = pth_ctrl (PTH_CTRL_GETTHREADS);
int cnew = pth_ctrl (PTH_CTRL_GETTHREADS_NEW);
int cready = pth_ctrl (PTH_CTRL_GETTHREADS_READY);
int cwaiting = pth_ctrl (PTH_CTRL_GETTHREADS_WAITING);
int crunning = pth_ctrl (PTH_CTRL_GETTHREADS_RUNNING); // always 1
int csuspend = pth_ctrl (PTH_CTRL_GETTHREADS_SUSPENDED); // should be 0
int cdead = pth_ctrl (PTH_CTRL_GETTHREADS_DEAD);
printf ("[ <new %d><ready %d><waiting %d><dead %d> ]\n",
cnew, cready, cwaiting, cdead);
}
* end of debug */
if (pth_ctrl
(PTH_CTRL_GETTHREADS_READY
| PTH_CTRL_GETTHREADS_NEW
| PTH_CTRL_GETTHREADS_WAITING
| PTH_CTRL_GETTHREADS_RUNNING
| PTH_CTRL_GETTHREADS_SUSPENDED) >= cow_nohands) {
fprintf(ERROR_LOG, "currently no more connections acceptable\n");
COW_CLOSE(sr);
pth_yield(NULL);
continue;
}
fprintf(ERROR_LOG,
"connection established (fd: %d, ip: %s, port: %d)\n",
sr, inet_ntoa(peer_addr.sin_addr), ntohs(peer_addr.sin_port));
/* spawn new handling thread for connection */
{
pth_t tid;
tid = pth_spawn(attr, handler, (void *)((long)sr));
if (0 > tid) {
perror("pth_spawn (non-pool version in cow.c)");
}
pth_yield(NULL);
}
}
# endif /* NON_POOL_SINGLE */
# endif /* USE_POOL */
|
|
pth_exit(0); }
|
pth_exit(0); }
|
Line 453 |
Line 408 |
* * Description: Makes sure the server root is valid. */
|
* * Description: Makes sure the server root is valid. */
|
void fixup_server_root() {
|
static void fixup_server_root() {
|
char *dirbuf; int dirbuf_size;
|
char *dirbuf; int dirbuf_size;
|
Line 475 |
Line 430 |
exit(1); }
|
exit(1); }
|
if ('/' == server_root[0])
|
if ('/' == server_root[0]) {
|
/* Absolute path; no problem. */
|
/* Absolute path; no problem. */
|
|
server_root_len = strlen(server_root);
|
return;
|
return;
|
|
}
|
/* * If server_root is a relative path, then we need to make it absolute
|
/* * If server_root is a relative path, then we need to make it absolute
|
Line 509 |
Line 466 |
free(server_root); server_root = dirbuf;
|
free(server_root); server_root = dirbuf;
|
|
server_root_len = strlen(server_root);
|
}
|
}
|
|
/****
* spawn_threads()
*
* Pre-spawns a pool of handler threads in addition to the depot thread.
****/
static void spawn_threads(void) {
int i = 0;
pth_attr_t attr;
/*
* Create a barrier which will wait for the main thread, the depot thread,
* and all handler threads.
*/
pth_barrier_init(&init_bar, (cow_nohands + 2));
if (verbose)
printf("using POOL of hands (thread) of size %d\n", cow_nohands);
tid_main = pth_self();
attr = pth_attr_new();
pth_attr_set(attr, PTH_ATTR_NAME, "depot");
pth_attr_set(attr, PTH_ATTR_JOINABLE, FALSE);
pth_attr_set(attr, PTH_ATTR_STACK_SIZE, COW_STACK_SIZE);
tid_depot = pth_spawn(attr, depot, (void *)cow_nohands);
if (tid_depot == NULL) {
perror("Failed to spawn depot");
exit(1);
}
/* Spawn all the handler threads */
pth_attr_set(attr, PTH_ATTR_NAME, "hand");
pth_attr_set(attr, PTH_ATTR_PRIO, PTH_PRIO_MAX);
for (i = 0; i < cow_nohands; i++) {
if (0 > (int)pth_spawn(attr, hand, (void *)i)) {
perror("Failed to spawn handler");
exit(errno);
}
++cow_nohands_running;
}
pth_attr_destroy(attr);
/* Wait for all threads to start up */
pth_barrier_reach(&init_bar);
}
|