Browse Source

add io timeouts

personal/stbuehler/wip
Thomas Porzelt 14 years ago
parent
commit
b491e91c68
  1. 68
      src/connection.c
  2. 11
      src/connection.h
  3. 3
      src/network.c
  4. 14
      src/plugin_core.c
  5. 2
      src/server.c
  6. 2
      src/server.h
  7. 26
      src/worker.c
  8. 4
      src/worker.h

68
src/connection.c

@ -123,8 +123,13 @@ static gboolean connection_handle_read(connection *con) {
con->state = CON_STATE_READ_REQUEST_HEADER;
con->ts = CUR_TS(con->wrk);
} else if (con->state == CON_STATE_REQUEST_START) {
con->state = CON_STATE_READ_REQUEST_HEADER;
connection_io_timeout_init(con);
} else {
if (vr->con->io_timeout.last_io != CUR_TS(vr->con->wrk))
connection_io_timeout_reset(con);
if (con->state == CON_STATE_REQUEST_START)
con->state = CON_STATE_READ_REQUEST_HEADER;
}
if (con->state == CON_STATE_READ_REQUEST_HEADER && con->mainvr->state == VRS_CLEAN) {
@ -366,6 +371,9 @@ void connection_reset(connection *con) {
con->stats.bytes_out_5s = G_GUINT64_CONSTANT(0);
con->stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0);
con->stats.last_avg = 0;
/* remove from timeout queue */
connection_io_timeout_remove(con);
}
void server_check_keepalive(server *srv);
@ -416,6 +424,8 @@ void connection_reset_keep_alive(connection *con) {
con->stats.bytes_out_5s = G_GUINT64_CONSTANT(0);
con->stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0);
con->stats.last_avg = 0;
connection_io_timeout_remove(con);
}
void connection_free(connection *con) {
@ -467,3 +477,57 @@ gchar *connection_state_str(connection_state_t state) {
return (gchar*)states[state];
}
void connection_io_timeout_init(connection *con) {
worker *wrk = con->wrk;
if (wrk->io_timeout_queue_tail)
wrk->io_timeout_queue_tail->io_timeout.next = con;
else
/* if there is no tail, it means the queue is empty */
wrk->io_timeout_queue_head = con;
wrk->io_timeout_queue_tail = con;
con->io_timeout.last_io = CUR_TS(wrk);
con->io_timeout.next = NULL;
}
void connection_io_timeout_reset(connection *con) {
/* move con to the end of the timeout queue */
worker *wrk = con->wrk;
if (con == wrk->io_timeout_queue_head && con != wrk->io_timeout_queue_tail)
wrk->io_timeout_queue_head = con->io_timeout.next;
if (con != wrk->io_timeout_queue_tail)
con->io_timeout.prev = wrk->io_timeout_queue_tail;
if (wrk->io_timeout_queue_tail)
wrk->io_timeout_queue_tail->io_timeout.next = con;
con->io_timeout.next = NULL;
wrk->io_timeout_queue_tail = con;
con->io_timeout.last_io = CUR_TS(wrk);
}
void connection_io_timeout_remove(connection *con) {
/* remove con from the timeout queue */
worker *wrk = con->wrk;
/* check if connection is in the timeout queue, it might not be the case when it is in keep alive idle state */
if (con->io_timeout.prev == NULL && con->io_timeout.next == NULL && con != wrk->io_timeout_queue_head)
return;
if (con == wrk->io_timeout_queue_head)
wrk->io_timeout_queue_head = con->io_timeout.next;
else
con->io_timeout.prev->io_timeout.next = con->io_timeout.next;
if (con == wrk->io_timeout_queue_tail)
wrk->io_timeout_queue_tail = con->io_timeout.prev;
else
con->io_timeout.next->io_timeout.prev = con->io_timeout.prev;
con->io_timeout.prev = NULL;
con->io_timeout.next = NULL;
}

11
src/connection.h

@ -57,6 +57,13 @@ struct connection {
ev_timer watcher;
} keep_alive_data;
/* I/O timeout data */
struct {
ev_tstamp last_io;
connection *prev;
connection *next;
} io_timeout;
time_t ts;
struct {
@ -75,6 +82,10 @@ LI_API void connection_reset(connection *con);
LI_API void connection_reset_keep_alive(connection *con);
LI_API void connection_free(connection *con);
LI_API void connection_io_timeout_init(connection *con);
LI_API void connection_io_timeout_reset(connection *con);
LI_API void connection_io_timeout_remove(connection *con);
LI_API void connection_error(connection *con);
LI_API void connection_handle_direct(connection *con);

3
src/network.c

@ -61,6 +61,9 @@ network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) {
}
#endif
if (vr->con->io_timeout.last_io != CUR_TS(vr->con->wrk))
connection_io_timeout_reset(vr->con);
return res;
}

14
src/plugin_core.c

@ -518,6 +518,19 @@ static gboolean core_module_load(server *srv, plugin* p, value *val) {
return TRUE;
}
static gboolean core_io_timeout(server *srv, plugin* p, value *val) {
UNUSED(p);
if (!val || val->type != VALUE_NUMBER || val->data.number < 1) {
ERROR(srv, "%s", "io_timeout expects a positive number as parameter");
return FALSE;
}
srv->io_timeout = value_extract(val).number;
return TRUE;
}
/*
* OPTIONS
*/
@ -814,6 +827,7 @@ static const plugin_setup setups[] = {
{ "event_handler", core_event_handler },
{ "workers", core_workers },
{ "module_load", core_module_load },
{ "io_timeout", core_io_timeout },
{ NULL, NULL }
};

2
src/server.c

@ -75,6 +75,8 @@ server* server_new(const gchar *module_dir) {
log_init(srv);
srv->io_timeout = 30; /* default I/O timeout */
return srv;
}

2
src/server.h

@ -71,6 +71,8 @@ struct server {
/* keep alive timeout */
guint keep_alive_queue_timeout;
guint io_timeout;
};

26
src/worker.c

@ -97,6 +97,26 @@ static void worker_keepalive_cb(struct ev_loop *loop, ev_timer *w, int revents)
}
}
/* check for timeouted connections */
static void worker_io_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
worker *wrk = (worker*) w->data;
connection *con;
UNUSED(loop);
UNUSED(revents);
for (con = wrk->io_timeout_queue_head; con != NULL; con = con->io_timeout.next) {
if ((con->io_timeout.last_io + wrk->srv->io_timeout) < CUR_TS(wrk)) {
/* connection has timed out */
CON_TRACE(con, "connection io-timeout from %s after %u seconds", con->remote_addr_str->str, wrk->srv->io_timeout);
plugins_handle_close(con);
worker_con_put(con);
} else {
break;
}
}
}
/* cache timestamp */
GString *worker_current_timestamp(worker *wrk) {
time_t cur_ts = CUR_TS(wrk);
@ -146,6 +166,7 @@ void worker_new_con(worker *ctx, worker *wrk, sock_addr *remote_addr, int s) {
ev_io_start(wrk->loop, &con->sock_watcher);
con->ts = CUR_TS(con->wrk);
sockaddr_to_string(remote_addr, con->remote_addr_str);
connection_io_timeout_init(con);
} else {
worker_new_con_data *d = g_slice_new(worker_new_con_data);
d->remote_addr = *remote_addr;
@ -252,6 +273,11 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) {
wrk->collect_queue = g_async_queue_new();
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
ev_timer_init(&wrk->io_timer, worker_io_timeout_cb, 1, 1);
wrk->io_timer.data = wrk;
ev_timer_start(wrk->loop, &wrk->io_timer);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
return wrk;
}

4
src/worker.h

@ -65,6 +65,10 @@ struct worker {
ev_timer keep_alive_timer;
GQueue keep_alive_queue;
ev_timer io_timer;
connection *io_timeout_queue_head;
connection *io_timeout_queue_tail;
guint connection_load; /** incremented by server_accept_cb, decremented by worker_con_put. use atomic access */
time_t last_generated_date_ts;

Loading…
Cancel
Save