Added keep-alive and dead state for connection, added timeout for keep-alive.

With "server.max-keep-alive-idle" = 5 (default) you get O(1), other values need O(log n)
personal/stbuehler/wip
Stefan Bühler 15 years ago
parent 7bdce8aeb4
commit 486f64bd7d

@ -130,11 +130,17 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
joblist_append(srv, con);
}
static void connection_keepalive_cb(struct ev_loop *loop, ev_timer *w, int revents) {
connection *con = (connection*) w->data;
UNUSED(loop); UNUSED(revents);
con_put(con->sock.srv, con);
}
connection* connection_new(server *srv) {
connection *con = g_slice_new0(connection);
UNUSED(srv);
con->state = CON_STATE_REQUEST_START;
con->state = CON_STATE_DEAD;
con->response_headers_sent = FALSE;
con->expect_100_cont = FALSE;
@ -158,11 +164,16 @@ connection* connection_new(server *srv) {
physical_init(&con->physical);
response_init(&con->response);
con->keep_alive_data.link = NULL;
con->keep_alive_data.timeout = 0;
my_ev_init(&con->keep_alive_data.watcher, connection_keepalive_cb);
con->keep_alive_data.watcher.data = con;
return con;
}
void connection_reset(server *srv, connection *con) {
con->state = CON_STATE_REQUEST_START;
con->state = CON_STATE_DEAD;
con->response_headers_sent = FALSE;
con->expect_100_cont = FALSE;
@ -192,10 +203,39 @@ void connection_reset(server *srv, connection *con) {
request_reset(&con->request);
physical_reset(&con->physical);
response_reset(&con->response);
if (con->keep_alive_data.link) {
g_queue_delete_link(&srv->keep_alive_queue, con->keep_alive_data.link);
con->keep_alive_data.link = NULL;
}
con->keep_alive_data.timeout = 0;
ev_timer_stop(srv->loop, &con->keep_alive_data.watcher);
}
void server_check_keepalive(server *srv);
void connection_reset_keep_alive(server *srv, connection *con) {
con->state = CON_STATE_REQUEST_START;
ev_timer_stop(srv->loop, &con->keep_alive_data.watcher);
{
guint timeout = GPOINTER_TO_INT(CORE_OPTION(CORE_OPTION_MAX_KEEP_ALIVE_IDLE));
if (timeout == 0) {
con_put(srv, con);
return;
}
if (timeout >= srv->keep_alive_queue_timeout) {
/* queue is sorted by con->keep_alive_data.timeout */
gboolean need_start = (0 == srv->keep_alive_queue.length);
con->keep_alive_data.timeout = ev_now((srv)->loop) + srv->keep_alive_queue_timeout;
g_queue_push_tail(&srv->keep_alive_queue, con);
con->keep_alive_data.link = g_queue_peek_tail_link(&srv->keep_alive_queue);
if (need_start)
server_check_keepalive(srv);
} else {
ev_timer_set(&con->keep_alive_data.watcher, timeout, 0);
ev_timer_start(srv->loop, &con->keep_alive_data.watcher);
}
}
con->state = CON_STATE_KEEP_ALIVE;
con->response_headers_sent = FALSE;
con->expect_100_cont = FALSE;
@ -218,7 +258,7 @@ void connection_reset_keep_alive(server *srv, connection *con) {
}
void connection_free(server *srv, connection *con) {
con->state = CON_STATE_REQUEST_START;
con->state = CON_STATE_DEAD;
con->response_headers_sent = FALSE;
con->expect_100_cont = FALSE;
@ -249,6 +289,13 @@ void connection_free(server *srv, connection *con) {
physical_clear(&con->physical);
response_clear(&con->response);
if (con->keep_alive_data.link) {
g_queue_delete_link(&srv->keep_alive_queue, con->keep_alive_data.link);
con->keep_alive_data.link = NULL;
}
con->keep_alive_data.timeout = 0;
ev_timer_stop(srv->loop, &con->keep_alive_data.watcher);
g_slice_free(connection, con);
}
@ -264,6 +311,26 @@ void connection_state_machine(server *srv, connection *con) {
gboolean done = FALSE;
do {
switch (con->state) {
case CON_STATE_DEAD:
done = TRUE;
break;
case CON_STATE_KEEP_ALIVE:
if (con->raw_in->length > 0) {
/* stop keep alive timeout watchers */
if (con->keep_alive_data.link) {
g_queue_delete_link(&srv->keep_alive_queue, con->keep_alive_data.link);
con->keep_alive_data.link = NULL;
}
con->keep_alive_data.timeout = 0;
ev_timer_stop(srv->loop, &con->keep_alive_data.watcher);
connection_set_state(srv, con, CON_STATE_REQUEST_START);
} else
done = TRUE;
break;
case CON_STATE_REQUEST_START:
connection_set_state(srv, con, CON_STATE_READ_REQUEST_HEADER);
action_enter(con, srv->mainaction);

@ -4,6 +4,12 @@
#include "base.h"
typedef enum {
/** unused */
CON_STATE_DEAD,
/** waiting for new input after first request */
CON_STATE_KEEP_ALIVE,
/** after the connect, the request is initialized, keep-alive starts here again */
CON_STATE_REQUEST_START,
@ -89,6 +95,13 @@ struct connection {
struct log_t *log;
gint log_level;
/* Keep alive timeout data */
struct {
GList *link;
ev_tstamp timeout;
ev_timer watcher;
} keep_alive_data;
};
LI_API connection* connection_new(server *srv);

@ -433,6 +433,7 @@ static plugin_option options[] = {
{ "static-file.exclude", OPTION_LIST, NULL, NULL, NULL },
{ "server.tag", OPTION_STRING, NULL, NULL, NULL },
{ "server.max_keep_alive_idle", OPTION_INT, GINT_TO_POINTER(5), NULL, NULL },
{ NULL, 0, NULL, NULL, NULL }
};

@ -9,7 +9,8 @@ enum core_options_t {
CORE_OPTION_STATIC_FILE_EXCLUDE = 3,
CORE_OPTION_SERVER_TAG = 4
CORE_OPTION_SERVER_TAG = 4,
CORE_OPTION_MAX_KEEP_ALIVE_IDLE = 5
};
/* the core plugin always has base index 0, as it is the first plugin loaded */

@ -1,6 +1,7 @@
#include "base.h"
#include "utils.h"
#include "plugin_core.h"
struct server_closing_socket;
typedef struct server_closing_socket server_closing_socket;
@ -85,13 +86,54 @@ static void sigpipe_cb(struct ev_loop *loop, struct ev_signal *w, int revents) {
ev_unref(loop); /* Signal watchers shouldn't keep loop alive */ \
} while (0)
void server_check_keepalive(server *srv) {
ev_tstamp now = ev_now((srv)->loop);
if (0 == srv->keep_alive_queue.length) {
ev_timer_stop(srv->loop, &srv->keep_alive_timer);
} else {
srv->keep_alive_timer.repeat = ((connection*)g_queue_peek_head(&srv->keep_alive_queue))->keep_alive_data.timeout - now + 1;
ev_timer_again(srv->loop, &srv->keep_alive_timer);
}
}
static void server_keepalive_cb(struct ev_loop *loop, ev_timer *w, int revents) {
server *srv = (server*) w->data;
ev_tstamp now = ev_now((srv)->loop);
GQueue *q = &srv->keep_alive_queue;
GList *l;
connection *con;
UNUSED(loop);
UNUSED(revents);
while ( NULL != (l = g_queue_peek_head_link(q)) &&
(con = (connection*) l->data)->keep_alive_data.timeout <= now ) {
guint timeout = GPOINTER_TO_INT(CORE_OPTION(CORE_OPTION_MAX_KEEP_ALIVE_IDLE));
ev_tstamp remaining = timeout - srv->keep_alive_queue_timeout - (now - con->keep_alive_data.timeout);
if (remaining > 0) {
ev_timer_set(&con->keep_alive_data.watcher, remaining, 0);
ev_timer_start(srv->loop, &con->keep_alive_data.watcher);
} else {
/* close it */
con_put(srv, con);
}
}
if (NULL == l) {
ev_timer_stop(srv->loop, &srv->keep_alive_timer);
} else {
srv->keep_alive_timer.repeat = con->keep_alive_data.timeout - now + 1;
ev_timer_again(srv->loop, &srv->keep_alive_timer);
}
}
server* server_new() {
server* srv = g_slice_new0(server);
srv->magic = LIGHTTPD_SERVER_MAGIC;
srv->state = SERVER_STARTING;
srv->connections_active = 0;
srv->connections = g_array_new(FALSE, TRUE, sizeof(connection*));
srv->sockets = g_array_new(FALSE, TRUE, sizeof(server_socket*));
@ -114,6 +156,10 @@ server* server_new() {
log_init(srv);
g_queue_init(&srv->keep_alive_queue);
my_ev_init(&srv->keep_alive_timer, server_keepalive_cb);
srv->keep_alive_timer.data = srv;
return srv;
}
@ -183,6 +229,9 @@ void server_free(server* srv) {
g_mutex_free(srv->log_mutex);
g_async_queue_unref(srv->log_queue);
g_queue_clear(&srv->keep_alive_queue);
ev_timer_stop(srv->loop, &srv->keep_alive_timer);
g_slice_free(server, srv);
}
@ -239,6 +288,7 @@ static void server_listen_cb(struct ev_loop *loop, ev_io *w, int revents) {
while (-1 != (s = accept(w->fd, (struct sockaddr*) &remote_addr, &l))) {
connection *con = con_get(srv);
con->state = CON_STATE_REQUEST_START;
con->remote_addr = remote_addr;
ev_io_set(&con->sock.watcher, s, EV_READ);
ev_io_start(srv->loop, &con->sock.watcher);
@ -295,6 +345,8 @@ void server_start(server *srv) {
return;
}
srv->keep_alive_queue_timeout = 5;
srv->option_count = g_hash_table_size(srv->options);
srv->option_def_values = g_slice_alloc0(srv->option_count * sizeof(*srv->option_def_values));
@ -328,6 +380,12 @@ void server_stop(server *srv) {
server_socket *sock = g_array_index(srv->sockets, server_socket*, i);
ev_io_stop(srv->loop, &sock->watcher);
}
for (i = srv->connections_active; i-- > 0;) {
connection *con = g_array_index(srv->connections, connection*, i);
if (con->state == CON_STATE_KEEP_ALIVE)
con_put(srv, con);
}
}
void server_exit(server *srv) {
@ -344,7 +402,6 @@ void server_exit(server *srv) {
log_thread_wakeup(srv);
}
void joblist_append(server *srv, connection *con) {
connection_state_machine(srv, con);
}

@ -39,6 +39,7 @@ struct server {
guint loop_flags;
struct ev_loop *loop;
ev_timer keep_alive_timer;
guint connections_active; /** 0..con_act-1: active connections, con_act..used-1: free connections */
GArray *connections; /** array of (connection*) */
@ -76,6 +77,10 @@ struct server {
ev_tstamp started;
statistics_t stats;
/* keep alive timeout queue */
guint keep_alive_queue_timeout;
GQueue keep_alive_queue;
};

Loading…
Cancel
Save