Browse Source

[core] fix connection timeout handling

personal/stbuehler/wip
Stefan Bühler 9 years ago
parent
commit
64cabac477
  1. 3
      include/lighttpd/connection.h
  2. 5
      include/lighttpd/worker.h
  3. 68
      src/main/connection.c
  4. 45
      src/main/worker.c
  5. 15
      src/modules/mod_status.c

3
include/lighttpd/connection.h

@ -91,6 +91,9 @@ LI_API void li_connection_free(liConnection *con);
/* close connection (for worker keep-alive timeout) */
LI_API void li_connection_reset(liConnection *con);
/* update whether we're waiting for io timeouts */
LI_API void li_connection_update_io_wait(liConnection *con);
/** aborts an active connection, calls all plugin cleanup handlers */
LI_API void li_connection_error(liConnection *con); /* used in worker.c */

5
include/lighttpd/worker.h

@ -111,8 +111,13 @@ LI_API liWorker* li_worker_new(liServer *srv, struct ev_loop *loop);
LI_API struct ev_loop* li_worker_free(liWorker *wrk);
LI_API void li_worker_run(liWorker *wrk);
/* stopped now, all connections down. exit loop soon */
LI_API void li_worker_stop(liWorker *context, liWorker *wrk);
/* start stopping. don't stop loop yet, connection handling on other workers might need all workers (mod_status) */
LI_API void li_worker_stopping(liWorker *context, liWorker *wrk);
LI_API void li_worker_suspend(liWorker *context, liWorker *wrk);
LI_API void li_worker_exit(liWorker *context, liWorker *wrk);

68
src/main/connection.c

@ -234,10 +234,6 @@ static void _connection_http_in_cb(liStream *stream, liStreamEvent event) {
con->keep_alive_data.timeout = 0;
li_event_stop(&con->keep_alive_data.watcher);
/* put back in io timeout queue */
if (!con->io_timeout_elem.queued)
li_waitqueue_push(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
con->keep_alive_requests++;
/* disable keep alive if limit is reached */
if (con->keep_alive_requests == CORE_OPTION(LI_CORE_OPTION_MAX_KEEP_ALIVE_REQUESTS).number)
@ -249,8 +245,12 @@ static void _connection_http_in_cb(liStream *stream, liStreamEvent event) {
li_vrequest_start(con->mainvr);
con->state = LI_CON_STATE_READ_REQUEST_HEADER;
/* put back in io timeout queue */
li_connection_update_io_wait(con);
} else if (con->state == LI_CON_STATE_REQUEST_START) {
con->state = LI_CON_STATE_READ_REQUEST_HEADER;
li_connection_update_io_wait(con);
}
if (con->state == LI_CON_STATE_READ_REQUEST_HEADER) {
@ -273,6 +273,7 @@ static void _connection_http_in_cb(liStream *stream, liStreamEvent event) {
con->info.keep_alive = FALSE;
vr->response.http_status = 414; /* Request-URI Too Large */
con->state = LI_CON_STATE_WRITE;
li_connection_update_io_wait(con);
li_stream_again(&con->out);
return;
}
@ -295,6 +296,7 @@ static void _connection_http_in_cb(liStream *stream, liStreamEvent event) {
if (vr->response.http_status == 0)
vr->response.http_status = 400;
con->state = LI_CON_STATE_WRITE;
li_connection_update_io_wait(con);
li_stream_again(&con->out);
return;
}
@ -311,6 +313,7 @@ static void _connection_http_in_cb(liStream *stream, liStreamEvent event) {
vr->response.http_status = 400;
con->state = LI_CON_STATE_WRITE;
con->info.keep_alive = FALSE;
li_connection_update_io_wait(con);
li_stream_again(&con->out);
return;
}
@ -331,6 +334,7 @@ static void _connection_http_in_cb(liStream *stream, liStreamEvent event) {
}
con->state = LI_CON_STATE_HANDLE_MAINVR;
li_connection_update_io_wait(con);
li_action_enter(vr, con->srv->mainaction);
li_vrequest_handle_request_headers(vr);
@ -427,7 +431,10 @@ static void _connection_http_out_cb(liStream *stream, liStreamEvent event) {
raw_out->is_closed = FALSE;
}
if (con->out_has_all_data) {
if (con->state < LI_CON_STATE_WRITE) con->state = LI_CON_STATE_WRITE;
if (con->state < LI_CON_STATE_WRITE) {
con->state = LI_CON_STATE_WRITE;
li_connection_update_io_wait(con);
}
if (NULL != out) {
out = NULL;
li_stream_disconnect(stream);
@ -443,11 +450,51 @@ static void _connection_http_out_cb(liStream *stream, liStreamEvent event) {
void li_connection_update_io_timeout(liConnection *con) {
liWorker *wrk = con->wrk;
if ((con->io_timeout_elem.ts + 1.0) < li_cur_ts(wrk)) {
if (con->io_timeout_elem.queued && (con->io_timeout_elem.ts + 1.0) < li_cur_ts(wrk)) {
li_waitqueue_push(&wrk->io_timeout_queue, &con->io_timeout_elem);
}
}
void li_connection_update_io_wait(liConnection *con) {
liWorker *wrk = con->wrk;
gboolean want_timeout = FALSE;
gboolean stopping = wrk->wait_for_stop_connections.active;
switch (con->state) {
case LI_CON_STATE_DEAD:
case LI_CON_STATE_CLOSE: /* only a temporary state before DEAD */
want_timeout = FALSE;
break;
case LI_CON_STATE_KEEP_ALIVE:
want_timeout = stopping;
break;
case LI_CON_STATE_REQUEST_START:
want_timeout = TRUE;
break;
case LI_CON_STATE_READ_REQUEST_HEADER:
want_timeout = TRUE;
break;
case LI_CON_STATE_HANDLE_MAINVR:
/* want timeout while we're still reading request body */
want_timeout = stopping || !con->in.out->is_closed;
break;
case LI_CON_STATE_WRITE:
want_timeout = TRUE;
break;
case LI_CON_STATE_UPGRADED:
want_timeout = stopping;
break;
}
if (want_timeout == con->io_timeout_elem.queued) return;
if (want_timeout) {
li_waitqueue_push(&wrk->io_timeout_queue, &con->io_timeout_elem);
} else {
li_waitqueue_remove(&wrk->io_timeout_queue, &con->io_timeout_elem);
}
}
void li_connection_start(liConnection *con, liSocketAddress remote_addr, int s, liServerSocket *srv_sock) {
assert(NULL == con->con_sock.data);
@ -469,7 +516,7 @@ void li_connection_start(liConnection *con, liSocketAddress remote_addr, int s,
con->info.req = &con->in;
con->info.resp = &con->out;
li_waitqueue_push(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
li_connection_update_io_wait(con);
if (srv_sock->new_cb) {
if (!srv_sock->new_cb(con, s)) {
@ -594,6 +641,7 @@ static void mainvr_connection_upgrade(liVRequest *vr, liStream *backend_drain, l
li_response_send_headers(vr, con->out.out, NULL, TRUE);
con->state = LI_CON_STATE_UPGRADED;
vr->response.transfer_encoding = 0;
li_connection_update_io_wait(con);
li_stream_disconnect_dest(&con->in);
con->in.out->is_closed = FALSE;
@ -679,6 +727,7 @@ void li_connection_reset(liConnection *con) {
con->keep_alive_requests = 0;
}
li_connection_update_io_wait(con);
li_job_later(&con->wrk->loop.jobqueue, &con->job_reset);
}
@ -766,9 +815,6 @@ static void li_connection_reset_keep_alive(liConnection *con) {
li_event_timer_once(&con->keep_alive_data.watcher, con->keep_alive_data.max_idle);
}
}
/* remove from timeout queue */
li_waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
} else {
li_stream_again_later(&con->in);
}
@ -780,6 +826,8 @@ static void li_connection_reset_keep_alive(liConnection *con) {
con->info.keep_alive = TRUE;
li_connection_update_io_wait(con);
li_vrequest_reset(con->mainvr, TRUE);
li_http_request_parser_reset(&con->req_parser_ctx);

45
src/main/worker.c

@ -86,6 +86,25 @@ static void worker_io_timeout_cb(liWaitQueue *wq, gpointer data) {
li_waitqueue_update(wq);
}
static void worker_close_idle_connections(liWorker *wrk) {
guint i;
for (i = wrk->connections_active; i-- > 0;) {
liConnection *con = g_array_index(wrk->connections, liConnection*, i);
switch (con->state) {
case LI_CON_STATE_KEEP_ALIVE:
li_connection_reset(con);
break;
default:
/* update if wrk->wait_for_stop_connections.active changed */
li_connection_update_io_wait(con);
break;
}
}
li_worker_check_keepalive(wrk);
}
/* cache timestamp */
GString *li_worker_current_timestamp(liWorker *wrk, liTimeFunc timefunc, guint format_ndx) {
gsize len;
@ -428,14 +447,13 @@ void li_worker_stop(liWorker *context, liWorker *wrk) {
if (wrk->stat_cache)
li_waitqueue_stop(&wrk->stat_cache->delete_queue);
li_worker_new_con_cb(&wrk->new_con_watcher.base, 0); /* handle remaining new connections */
/* handle remaining new connections. there shouldn't be any, we'll kill them soon anyway */
li_worker_new_con_cb(&wrk->new_con_watcher.base, 0);
/* close keep alive connections */
/* connections should already be done on "normal" shutdowns. otherwise force it now */
for (i = wrk->connections_active; i-- > 0;) {
liConnection *con = g_array_index(wrk->connections, liConnection*, i);
if (con->state == LI_CON_STATE_KEEP_ALIVE) {
li_connection_reset(con);
}
li_connection_reset(con);
}
li_worker_check_keepalive(wrk);
@ -448,7 +466,6 @@ void li_worker_stop(liWorker *context, liWorker *wrk) {
void li_worker_stopping(liWorker *context, liWorker *wrk) {
liServer *srv = context->srv;
guint i;
if (context == srv->main_worker) {
li_server_state_wait(srv, &wrk->wait_for_stop_connections);
@ -457,22 +474,18 @@ void li_worker_stopping(liWorker *context, liWorker *wrk) {
if (context == wrk) {
/* li_plugins_worker_stopping(wrk); ??? */
/* close keep alive connections */
for (i = wrk->connections_active; i-- > 0;) {
liConnection *con = g_array_index(wrk->connections, liConnection*, i);
if (con->state == LI_CON_STATE_KEEP_ALIVE) {
li_connection_reset(con);
}
}
/* connections are killed after 3 seconds without IO */
li_waitqueue_set_delay(&wrk->io_timeout_queue, 3);
li_worker_check_keepalive(wrk);
worker_close_idle_connections(wrk);
li_worker_new_con_cb(&wrk->new_con_watcher.base, 0); /* handle remaining new connections */
li_event_loop_force_close_sockets(&wrk->loop);
if (0 == g_atomic_int_get(&wrk->connection_load) && wrk->wait_for_stop_connections.active) {
li_server_state_ready(srv, &wrk->wait_for_stop_connections);
}
li_event_loop_force_close_sockets(&wrk->loop);
} else {
li_event_async_send(&wrk->worker_stopping_watcher);
}

15
src/modules/mod_status.c

@ -373,7 +373,7 @@ struct mod_status_con_data {
goffset request_size;
goffset response_size;
guint64 ts_started;
guint64 ts_timeout;
gint64 ts_timeout;
guint64 bytes_in;
guint64 bytes_out;
guint64 bytes_in_5s_diff;
@ -398,6 +398,7 @@ struct mod_status_job {
/* the CollectFunc */
static gpointer status_collect_func(liWorker *wrk, gpointer fdata) {
mod_status_wrk_data *sd = g_slice_new0(mod_status_wrk_data);
li_tstamp now = li_cur_ts(wrk);
UNUSED(fdata);
sd->stats = wrk->stats;
@ -424,12 +425,14 @@ static gpointer status_collect_func(liWorker *wrk, gpointer fdata) {
cd->bytes_in_5s_diff = c->info.stats.bytes_in_5s_diff;
cd->bytes_out_5s_diff = c->info.stats.bytes_out_5s_diff;
cd->ts_started = (guint64)(li_cur_ts(wrk) - c->ts_started);
cd->ts_started = (guint64)(now - c->ts_started);
if (c->state == LI_CON_STATE_KEEP_ALIVE) {
cd->ts_timeout = (guint64)(c->keep_alive_data.timeout - li_cur_ts(wrk));
if (c->io_timeout_elem.queued) {
cd->ts_timeout = MAX(0, wrk->srv->io_timeout - (now - c->io_timeout_elem.ts));
} else if (LI_CON_STATE_KEEP_ALIVE == c->state) {
cd->ts_timeout = MAX(0, c->keep_alive_data.timeout - now);
} else {
cd->ts_timeout = (guint64)(wrk->srv->io_timeout - (li_cur_ts(wrk) - c->io_timeout_elem.ts));
cd->ts_timeout = -1;
}
sd->connection_count[c->state]++;
@ -792,7 +795,7 @@ static GString *status_info_full(liVRequest *vr, liPlugin *p, gboolean short_inf
cd->query->len ? "?":"",
cd->query->len ? cd->query->str : "",
cd->ts_started, ts_started->str,
cd->ts_timeout, ts_timeout->str,
cd->ts_timeout, cd->ts_timeout < 0 ? "" : ts_timeout->str,
cd->bytes_in, bytes_in->str,
cd->bytes_out, bytes_out->str,
cd->bytes_in_5s_diff / G_GUINT64_CONSTANT(5), bytes_in_5s->str,

Loading…
Cancel
Save