Use ev_now for current timestamp, clean usage of shutdown (wait for eof before close)

personal/stbuehler/wip
Stefan Bühler 15 years ago
parent 4a9ed5e8a3
commit 2546ce9259

@ -68,10 +68,13 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
dojoblist = TRUE;
break;
case NETWORK_STATUS_FATAL_ERROR:
CON_ERROR(srv, con, "%s", "network read fatal error");
connection_set_state(srv, con, CON_STATE_ERROR);
dojoblist = TRUE;
break;
case NETWORK_STATUS_CONNECTION_CLOSE:
con->raw_in->is_closed = TRUE;
shutdown(w->fd, SHUT_RD);
connection_set_state(srv, con, CON_STATE_CLOSE);
dojoblist = TRUE;
break;
@ -96,6 +99,7 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
dojoblist = TRUE;
break;
case NETWORK_STATUS_FATAL_ERROR:
CON_ERROR(srv, con, "%s", "network write fatal error");
connection_set_state(srv, con, CON_STATE_ERROR);
dojoblist = TRUE;
break;
@ -114,10 +118,10 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
ev_io_rem_events(loop, w, EV_WRITE);
break;
}
// CON_TRACE(srv, con, "cq->len: raw_out=%i, out=%i", (int) con->raw_out->length, (int) con->out->length);
CON_TRACE(srv, con, "cq->len: raw_out=%i, out=%i", (int) con->raw_out->length, (int) con->out->length);
}
if (con->raw_out->length == 0) {
// CON_TRACE(srv, con, "%s", "stop write");
CON_TRACE(srv, con, "%s", "stop write");
ev_io_rem_events(loop, w, EV_WRITE);
dojoblist = TRUE;
}
@ -135,17 +139,17 @@ connection* connection_new(server *srv) {
con->response_headers_sent = FALSE;
con->expect_100_cont = FALSE;
con->raw_in = chunkqueue_new();
con->raw_out = chunkqueue_new();
con->in = chunkqueue_new();
con->out = chunkqueue_new();
ev_io_init(&con->sock.watcher, connection_cb, -1, 0);
con->sock.srv = srv; con->sock.con = con; con->sock.watcher.data = &con->sock;
con->remote_addr_str = g_string_sized_new(0);
con->local_addr_str = g_string_sized_new(0);
con->keep_alive = TRUE;
con->raw_in = chunkqueue_new();
con->raw_out = chunkqueue_new();
con->in = chunkqueue_new();
con->out = chunkqueue_new();
action_stack_init(&con->action_stack);
request_init(&con->request, con->raw_in);
@ -160,21 +164,25 @@ void connection_reset(server *srv, connection *con) {
con->response_headers_sent = FALSE;
con->expect_100_cont = FALSE;
chunkqueue_reset(con->raw_in);
chunkqueue_reset(con->raw_out);
chunkqueue_reset(con->in);
chunkqueue_reset(con->out);
ev_io_stop(srv->loop, &con->sock.watcher);
if (con->sock.watcher.fd != -1) {
shutdown(con->sock.watcher.fd, SHUT_RDWR);
close(con->sock.watcher.fd);
if (con->raw_in->is_closed) { /* read already shutdown */
shutdown(con->sock.watcher.fd, SHUT_WR);
close(con->sock.watcher.fd);
} else {
server_add_closing_socket(srv, con->sock.watcher.fd);
}
}
ev_io_set(&con->sock.watcher, -1, 0);
g_string_truncate(con->remote_addr_str, 0);
g_string_truncate(con->local_addr_str, 0);
con->keep_alive = TRUE;
chunkqueue_reset(con->raw_in);
chunkqueue_reset(con->raw_out);
chunkqueue_reset(con->in);
chunkqueue_reset(con->out);
action_stack_reset(srv, &con->action_stack);
request_reset(&con->request);
@ -187,15 +195,15 @@ void connection_reset_keep_alive(server *srv, connection *con) {
con->response_headers_sent = FALSE;
con->expect_100_cont = FALSE;
con->raw_out->is_closed = FALSE;
chunkqueue_reset(con->in);
chunkqueue_reset(con->out);
ev_io_set_events(srv->loop, &con->sock.watcher, EV_READ);
g_string_truncate(con->remote_addr_str, 0);
g_string_truncate(con->local_addr_str, 0);
con->keep_alive = TRUE;
con->raw_out->is_closed = FALSE;
chunkqueue_reset(con->in);
chunkqueue_reset(con->out);
action_stack_reset(srv, &con->action_stack);
request_reset(&con->request);
@ -208,21 +216,25 @@ void connection_free(server *srv, connection *con) {
con->response_headers_sent = FALSE;
con->expect_100_cont = FALSE;
chunkqueue_free(con->raw_in);
chunkqueue_free(con->raw_out);
chunkqueue_free(con->in);
chunkqueue_free(con->out);
ev_io_stop(srv->loop, &con->sock.watcher);
if (con->sock.watcher.fd != -1) {
shutdown(con->sock.watcher.fd, SHUT_RDWR);
close(con->sock.watcher.fd);
if (con->raw_in->is_closed) { /* read already shutdown */
shutdown(con->sock.watcher.fd, SHUT_WR);
close(con->sock.watcher.fd);
} else {
server_add_closing_socket(srv, con->sock.watcher.fd);
}
}
ev_io_set(&con->sock.watcher, -1, 0);
g_string_free(con->remote_addr_str, TRUE);
g_string_free(con->local_addr_str, TRUE);
con->keep_alive = TRUE;
chunkqueue_free(con->raw_in);
chunkqueue_free(con->raw_out);
chunkqueue_free(con->in);
chunkqueue_free(con->out);
action_stack_clear(srv, &con->action_stack);
request_clear(&con->request);
@ -249,7 +261,7 @@ void connection_state_machine(server *srv, connection *con) {
action_enter(con, srv->mainaction);
break;
case CON_STATE_READ_REQUEST_HEADER:
// TRACE(srv, "%s", "reading request header");
TRACE(srv, "%s", "reading request header");
switch(http_request_parse(srv, con, &con->request.parser_ctx)) {
case HANDLER_FINISHED:
case HANDLER_GO_ON:
@ -270,12 +282,12 @@ void connection_state_machine(server *srv, connection *con) {
}
break;
case CON_STATE_VALIDATE_REQUEST_HEADER:
// TRACE(srv, "%s", "validating request header");
TRACE(srv, "%s", "validating request header");
connection_set_state(srv, con, CON_STATE_HANDLE_REQUEST_HEADER);
request_validate_header(srv, con);
break;
case CON_STATE_HANDLE_REQUEST_HEADER:
// TRACE(srv, "%s", "handle request header");
TRACE(srv, "%s", "handle request header");
switch (action_execute(srv, con)) {
case ACTION_WAIT_FOR_EVENT:
done = TRUE;
@ -294,7 +306,7 @@ void connection_state_machine(server *srv, connection *con) {
break;
case CON_STATE_READ_REQUEST_CONTENT:
case CON_STATE_HANDLE_RESPONSE_HEADER:
// TRACE(srv, "%s", "read request/handle response header");
TRACE(srv, "%s", "read request/handle response header");
parse_request_body(srv, con);
/* TODO: call plugin content_handler */
switch (action_execute(srv, con)) {
@ -318,10 +330,10 @@ void connection_state_machine(server *srv, connection *con) {
if (!con->response_headers_sent) {
con->response_headers_sent = TRUE;
// TRACE(srv, "%s", "write response headers");
TRACE(srv, "%s", "write response headers");
response_send_headers(srv, con);
}
// TRACE(srv, "%s", "write response");
TRACE(srv, "%s", "write response");
parse_request_body(srv, con);
/* TODO: call plugin content_handler */
forward_response_body(srv, con);
@ -333,7 +345,7 @@ void connection_state_machine(server *srv, connection *con) {
if (con->state == CON_STATE_WRITE_RESPONSE) done = TRUE;
break;
case CON_STATE_RESPONSE_END:
// TRACE(srv, "%s", "response end");
TRACE(srv, "response end (keep_alive = %i)", con->keep_alive);
/* TODO: call plugin callbacks */
if (con->keep_alive) {
connection_reset_keep_alive(srv, con);
@ -343,13 +355,13 @@ void connection_state_machine(server *srv, connection *con) {
}
break;
case CON_STATE_CLOSE:
// TRACE(srv, "%s", "connection closed");
TRACE(srv, "%s", "connection closed");
/* TODO: call plugin callbacks */
con_put(srv, con);
done = TRUE;
break;
case CON_STATE_ERROR:
// TRACE(srv, "%s", "connection closed (error)");
TRACE(srv, "%s", "connection closed (error)");
/* TODO: call plugin callbacks */
con_put(srv, con);
done = TRUE;

@ -28,6 +28,8 @@ network_status_t network_write(server *srv, connection *con, int fd, chunkqueue
chunkiter ci;
do {
if (0 == cq->length) return NETWORK_STATUS_SUCCESS;
ci = chunkqueue_iter(cq);
switch (chunkiter_read(srv, con, ci, 0, blocksize, &block_data, &block_len)) {
case HANDLER_GO_ON:

@ -82,11 +82,11 @@ void request_validate_header(server *srv, connection *con) {
switch (req->http_version) {
case HTTP_VERSION_1_0:
if (!http_header_is(req->headers, CONST_STR_LEN("connection"), CONST_STR_LEN("keep-alive")))
con->keep_alive = 0;
con->keep_alive = FALSE;
break;
case HTTP_VERSION_1_1:
if (http_header_is(req->headers, CONST_STR_LEN("connection"), CONST_STR_LEN("close")))
con->keep_alive = 0;
con->keep_alive = FALSE;
break;
case HTTP_VERSION_UNSET:
bad_request(srv, con, 505); /* Version not Supported */

@ -2,6 +2,44 @@
#include "base.h"
#include "utils.h"
struct server_closing_socket;
typedef struct server_closing_socket server_closing_socket;
struct server_closing_socket {
server *srv;
GList *link;
int fd;
};
static void server_closing_socket_cb(int revents, void* arg) {
server_closing_socket *scs = (server_closing_socket*) arg;
UNUSED(revents);
/* Whatever happend: we just close the socket */
shutdown(scs->fd, SHUT_RD);
close(scs->fd);
g_queue_delete_link(&scs->srv->closing_sockets, scs->link);
g_slice_free(server_closing_socket, scs);
}
void server_add_closing_socket(server *srv, int fd) {
server_closing_socket *scs = g_slice_new(server_closing_socket);
shutdown(fd, SHUT_WR);
scs->srv = srv;
scs->fd = fd;
g_queue_push_tail(&srv->closing_sockets, scs);
scs->link = g_queue_peek_tail_link(&srv->closing_sockets);
ev_once(srv->loop, fd, EV_READ, 10.0, server_closing_socket_cb, scs);
}
/* Kill it - frees fd */
static void server_rem_closing_socket(server *srv, server_closing_socket *scs) {
ev_feed_fd_event(srv->loop, scs->fd, EV_READ);
}
void con_put(server *srv, connection *con);
static void server_option_free(gpointer _so) {
@ -30,6 +68,7 @@ server* server_new() {
srv->connections_active = 0;
srv->connections = g_array_new(FALSE, TRUE, sizeof(connection*));
srv->sockets = g_array_new(FALSE, TRUE, sizeof(server_socket*));
g_queue_init(&srv->closing_sockets);
srv->plugins = g_hash_table_new(g_str_hash, g_str_equal);
srv->options = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, server_option_free);
@ -41,7 +80,6 @@ server* server_new() {
srv->exiting = FALSE;
srv->tmp_str = g_string_sized_new(255);
srv->cur_ts = time(0);
srv->last_generated_date_ts = 0;
srv->ts_date_str = g_string_sized_new(255);
@ -71,6 +109,20 @@ void server_free(server* srv) {
g_array_free(srv->connections, TRUE);
}
{ guint i; for (i = 0; i < srv->sockets->len; i++) {
server_socket *sock = g_array_index(srv->sockets, server_socket*, i);
close(sock->watcher.fd);
g_slice_free(server_socket, sock);
g_array_free(srv->sockets, TRUE);
}}
{ /* force closing sockets */
GList *iter;
for (iter = g_queue_peek_head_link(&srv->closing_sockets); iter; iter = g_list_next(iter)) {
server_closing_socket_cb(EV_TIMEOUT, (server_closing_socket*) iter->data);
}
g_queue_clear(&srv->closing_sockets);
}
g_hash_table_destroy(srv->plugins);
g_hash_table_destroy(srv->options);
g_hash_table_destroy(srv->actions);
@ -231,20 +283,32 @@ void server_stop(server *srv) {
}
}
void server_exit(server *srv) {
srv->exiting = TRUE;
server_stop(srv);
{ /* force closing sockets */
GList *iter;
for (iter = g_queue_peek_head_link(&srv->closing_sockets); iter; iter = g_list_next(iter)) {
server_rem_closing_socket(srv, (server_closing_socket*) iter->data);
}
}
}
void joblist_append(server *srv, connection *con) {
connection_state_machine(srv, con);
}
GString *server_current_timestamp(server *srv) {
srv->cur_ts = time(0); /* TODO: update cur_ts somewhere else */
if (srv->cur_ts != srv->last_generated_date_ts) {
time_t cur_ts = CUR_TS(srv);
if (cur_ts != srv->last_generated_date_ts) {
g_string_set_size(srv->ts_date_str, 255);
strftime(srv->ts_date_str->str, srv->ts_date_str->allocated_len,
"%a, %d %b %Y %H:%M:%S GMT", gmtime(&(srv->cur_ts)));
"%a, %d %b %Y %H:%M:%S GMT", gmtime(&(cur_ts)));
g_string_set_size(srv->ts_date_str, strlen(srv->ts_date_str->str));
srv->last_generated_date_ts = srv->cur_ts;
srv->last_generated_date_ts = cur_ts;
}
return srv->ts_date_str;
}

@ -5,6 +5,8 @@
#define LIGHTTPD_SERVER_MAGIC ((guint)0x12AB34CD)
#endif
#define CUR_TS(srv) ((time_t)ev_now((srv)->loop))
typedef enum {
SERVER_STARTING, /** start up: don't write log files, don't accept connections */
SERVER_RUNNING, /** running: write logs, accept connections */
@ -28,6 +30,7 @@ struct server {
guint connections_active; /** 0..con_act-1: active connections, con_act..used-1: free connections */
GArray *connections; /** array of (connection*) */
GArray *sockets; /** array of (server_socket*) */
GQueue closing_sockets; /** wait for EOF before shutdown(SHUT_RD) and close() */
GHashTable *plugins; /**< const gchar* => (plugin*) */
@ -43,7 +46,7 @@ struct server {
GString *tmp_str; /**< can be used everywhere for local temporary needed strings */
time_t cur_ts, last_generated_date_ts;
time_t last_generated_date_ts;
GString *ts_date_str; /**< use server_current_timestamp(srv) */
/* logs */
@ -69,4 +72,7 @@ LI_API void joblist_append(server *srv, connection *con);
LI_API GString *server_current_timestamp(server *srv);
/* shutdown write and wait for eof before shutdown read and close */
LI_API void server_add_closing_socket(server *srv, int fd);
#endif

Loading…
Cancel
Save