[core]: wait for connections before shutdown

personal/stbuehler/wip
Stefan Bühler 13 years ago
parent 1790afc37f
commit 823b9d13c2

@ -69,7 +69,7 @@ struct liWorker {
struct ev_loop *loop;
ev_prepare loop_prepare;
/* ev_check loop_check; */
ev_async worker_stop_watcher, worker_suspend_watcher, worker_exit_watcher;
ev_async worker_stop_watcher, worker_stopping_watcher, worker_suspend_watcher, worker_exit_watcher;
GQueue log_queue;
@ -101,6 +101,8 @@ struct liWorker {
ev_async new_con_watcher;
GAsyncQueue *new_con_queue;
liServerStateWait wait_for_stop_connections;
ev_timer stats_watcher;
liStatistics stats;
@ -122,6 +124,7 @@ LI_API void li_worker_free(liWorker *wrk);
LI_API void li_worker_run(liWorker *wrk);
LI_API void li_worker_stop(liWorker *context, liWorker *wrk);
LI_API void li_worker_stopping(liWorker *context, liWorker *wrk);
LI_API void li_worker_suspend(liWorker *context, liWorker *wrk);
LI_API void li_worker_exit(liWorker *context, liWorker *wrk);

@ -731,11 +731,8 @@ static void li_server_start_transition(liServer *srv, liServerState state) {
for (i = 0; i < srv->worker_count; i++) {
liWorker *wrk;
wrk = g_array_index(srv->workers, liWorker*, i);
li_worker_stop(srv->main_worker, wrk);
li_worker_stopping(srv->main_worker, wrk);
}
li_log_thread_wakeup(srv);
li_server_reached_state(srv, LI_SERVER_STOPPING);
break;
case LI_SERVER_DOWN:
/* wait */
@ -773,6 +770,7 @@ void li_server_reached_state(liServer *srv, liServerState state) {
liServerState want_state = li_server_next_state(srv);
liServerState old_state = srv->state;
GList *swlink;
guint i;
if (state != want_state) return;
if (state == srv->state) return;
@ -818,7 +816,16 @@ void li_server_reached_state(liServer *srv, liServerState state) {
case LI_SERVER_RUNNING:
break;
case LI_SERVER_SUSPENDING:
break;
case LI_SERVER_STOPPING:
/* stop all workers */
for (i = 0; i < srv->worker_count; i++) {
liWorker *wrk;
wrk = g_array_index(srv->workers, liWorker*, i);
li_worker_stop(srv->main_worker, wrk);
}
li_log_thread_wakeup(srv);
break;
case LI_SERVER_DOWN:
/* li_server_exit(srv); */

@ -231,7 +231,16 @@ static void li_worker_stop_cb(struct ev_loop *loop, ev_async *w, int revents) {
li_worker_stop(wrk, wrk);
}
/* stop worker watcher */
/* stopping worker watcher */
static void li_worker_stopping_cb(struct ev_loop *loop, ev_async *w, int revents) {
liWorker *wrk = (liWorker*) w->data;
UNUSED(loop);
UNUSED(revents);
li_worker_stopping(wrk, wrk);
}
/* suspend worker watcher */
static void li_worker_suspend_cb(struct ev_loop *loop, ev_async *w, int revents) {
liWorker *wrk = (liWorker*) w->data;
UNUSED(loop);
@ -383,6 +392,10 @@ liWorker* li_worker_new(liServer *srv, struct ev_loop *loop) {
wrk->worker_stop_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->worker_stop_watcher);
ev_init(&wrk->worker_stopping_watcher, li_worker_stopping_cb);
wrk->worker_stopping_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->worker_stopping_watcher);
ev_init(&wrk->worker_suspend_watcher, li_worker_suspend_cb);
wrk->worker_suspend_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->worker_suspend_watcher);
@ -516,6 +529,7 @@ void li_worker_stop(liWorker *context, liWorker *wrk) {
li_plugins_worker_stop(wrk);
ev_async_stop(wrk->loop, &wrk->worker_stop_watcher);
ev_async_stop(wrk->loop, &wrk->worker_stopping_watcher);
ev_async_stop(wrk->loop, &wrk->worker_suspend_watcher);
ev_async_stop(wrk->loop, &wrk->new_con_watcher);
li_waitqueue_stop(&wrk->io_timeout_queue);
@ -545,6 +559,36 @@ void li_worker_stop(liWorker *context, liWorker *wrk) {
}
}
void li_worker_stopping(liWorker *context, liWorker *wrk) {
liServer *srv = context->srv;
guint i;
if (context == srv->main_worker) {
li_server_state_wait(srv, &wrk->wait_for_stop_connections);
}
if (context == wrk) {
/* li_plugins_worker_stopping(wrk); ??? */
/* close keep alive connections */
for (i = wrk->connections_active; i-- > 0;) {
liConnection *con = g_array_index(wrk->connections, liConnection*, i);
if (con->state == LI_CON_STATE_KEEP_ALIVE) {
li_worker_con_put(con);
}
}
li_worker_check_keepalive(wrk);
li_worker_new_con_cb(wrk->loop, &wrk->new_con_watcher, 0); /* handle remaining new connections */
if (0 == g_atomic_int_get(&wrk->connection_load) && wrk->wait_for_stop_connections.active) {
li_server_state_ready(srv, &wrk->wait_for_stop_connections);
}
} else {
ev_async_send(wrk->loop, &wrk->worker_stopping_watcher);
}
}
void li_worker_suspend(liWorker *context, liWorker *wrk) {
if (context == wrk) {
guint i;
@ -645,4 +689,8 @@ void li_worker_con_put(liConnection *con) {
/* above treshold, update timestamp */
wrk->connections_gc_ts = now;
}
if (wrk->wait_for_stop_connections.active && 0 == g_atomic_int_get((gint*) &wrk->connection_load)) {
li_server_state_ready(wrk->srv, &wrk->wait_for_stop_connections);
}
}

Loading…
Cancel
Save