From 30ebce774e923fa2eeb7d132289c39c65f35bf72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Sat, 13 Feb 2010 15:29:21 +0100 Subject: [PATCH] Add "state-wait" api for server state machine --- include/lighttpd/plugin.h | 5 +- include/lighttpd/server.h | 21 ++++++++ include/lighttpd/typedefs.h | 2 + src/main/log.c | 25 +++++++-- src/main/plugin.c | 6 +-- src/main/server.c | 100 ++++++++++++++++++++++++++++++++---- 6 files changed, 138 insertions(+), 21 deletions(-) diff --git a/include/lighttpd/plugin.h b/include/lighttpd/plugin.h index d22bc0b..bf0f3a3 100644 --- a/include/lighttpd/plugin.h +++ b/include/lighttpd/plugin.h @@ -17,6 +17,9 @@ typedef liAction*(*liPluginCreateActionCB) (liServer *srv, liPlugin *p, liValue typedef gboolean (*liPluginSetupCB) (liServer *srv, liPlugin *p, liValue *val, gpointer userdata); typedef void (*liPluginAngelCB) (liServer *srv, liPlugin *p, gint32 id, GString *data); +typedef void (*liPluginServerPrepareWorker)(liServer *srv, liPlugin *p, liWorker *wrk); +typedef void (*liPluginServerPrepare)(liServer *srv, liPlugin *p); + typedef void (*liPluginHandleCloseCB) (liConnection *con, liPlugin *p); typedef liHandlerResult(*liPluginHandleVRequestCB)(liVRequest *vr, liPlugin *p); typedef void (*liPluginHandleVRCloseCB) (liVRequest *vr, liPlugin *p); @@ -165,8 +168,6 @@ LI_API void li_plugins_stop_listen(liServer *srv); /* "prepare suspend", async * LI_API void li_plugins_start_log(liServer *srv); /* "run" */ LI_API void li_plugins_stop_log(liServer *srv); /* "suspend now" */ -LI_API void li_plugin_ready_for_state(liServer *srv, liPlugin *p, liServerState state); - LI_API void li_plugins_handle_close(liConnection *con); LI_API void li_plugins_handle_vrclose(liVRequest *vr); diff --git a/include/lighttpd/server.h b/include/lighttpd/server.h index 063763a..e1817e6 100644 --- a/include/lighttpd/server.h +++ b/include/lighttpd/server.h @@ -17,6 +17,8 @@ typedef liNetworkStatus (*liConnectionWriteCB)(liConnection *con, goffset write_ typedef liNetworkStatus (*liConnectionReadCB)(liConnection *con); typedef void (*liServerSocketReleaseCB)(liServerSocket *srv_sock); +typedef void (*liServerStateWaitCancelled)(liServer *srv, liServerStateWait *w); + typedef enum { LI_SERVER_INIT, /** start state */ LI_SERVER_LOADING, /** config loaded, prepare listeing sockets/open log files */ @@ -42,11 +44,24 @@ struct liServerSocket { liServerSocketReleaseCB release_cb; }; +struct liServerStateWait { + GList queue_link; + gboolean active; + liServerStateWaitCancelled cancel_cb; + gpointer data; +}; + struct liServer { guint32 magic; /** server magic version, check against LIGHTTPD_SERVER_MAGIC in plugins */ liServerState state, dest_state; /** atomic access */ liAngelConnection *acon; + /* state machine handling */ + GMutex *statelock; + GQueue state_wait_queue; + liServerState state_wait_for; + ev_async state_ready_watcher; + GMutex *lualock; struct lua_State *L; /** NULL if compiled without Lua */ @@ -136,4 +151,10 @@ LI_API void li_server_socket_acquire(liServerSocket* sock); LI_API void li_server_goto_state(liServer *srv, liServerState state); LI_API void li_server_reached_state(liServer *srv, liServerState state); +/** threadsafe */ +LI_API void li_server_state_ready(liServer *srv, liServerStateWait *sw); + +/** only call from server state plugin hooks; push new wait condition to wait queue */ +LI_API void li_server_state_wait(liServer *srv, liServerStateWait *sw); + #endif diff --git a/include/lighttpd/typedefs.h b/include/lighttpd/typedefs.h index 16519db..3be671a 100644 --- a/include/lighttpd/typedefs.h +++ b/include/lighttpd/typedefs.h @@ -201,6 +201,8 @@ typedef struct liResponse liResponse; /* server.h */ +typedef struct liServerStateWait liServerStateWait; + typedef struct liServer liServer; typedef struct liServerSocket liServerSocket; diff --git a/src/main/log.c b/src/main/log.c index 8428152..1a77b7f 100644 --- a/src/main/log.c +++ b/src/main/log.c @@ -12,6 +12,8 @@ #include #include +#define DEFAULT_TS_FORMAT "%d/%b/%Y %T %Z" + static void log_free_unlocked(liServer *srv, liLog *log); static void log_thread_stop(liServer *srv); static void log_thread_finish(liServer *srv); @@ -51,8 +53,10 @@ gboolean li_log_write_(liServer *srv, liVRequest *vr, liLogLevel log_level, guin ts = CORE_OPTIONPTR(LI_CORE_OPTION_LOG_TS_FORMAT).ptr; } else { - liOptionPtrValue *ologval; - ologval = g_array_index(srv->optionptr_def_values, liOptionPtrValue*, 0 + LI_CORE_OPTION_LOG); + liOptionPtrValue *ologval = NULL; + if (0 + LI_CORE_OPTION_LOG < srv->optionptr_def_values->len) { + ologval = g_array_index(srv->optionptr_def_values, liOptionPtrValue*, 0 + LI_CORE_OPTION_LOG); + } if (ologval != NULL) logs = ologval->data.list; } @@ -62,7 +66,7 @@ gboolean li_log_write_(liServer *srv, liVRequest *vr, liLogLevel log_level, guin return TRUE;*/ } - if (!ts) { + if (NULL == ts && 0 < srv->logs.timestamps->len) { ts = g_array_index(srv->logs.timestamps, liLogTimestamp*, 0); } @@ -105,6 +109,9 @@ gboolean li_log_write_(liServer *srv, liVRequest *vr, liLogLevel log_level, guin /* for normal error messages, we prepend a timestamp */ if (flags & LOG_FLAG_TIMESTAMP) { time_t cur_ts; + liLogTimestamp fake_ts; + GString fake_ts_format; + GString *tmpstr = NULL; g_mutex_lock(srv->logs.mutex); @@ -114,6 +121,14 @@ gboolean li_log_write_(liServer *srv, liVRequest *vr, liLogLevel log_level, guin else cur_ts = time(NULL); + if (NULL == ts) { + ts = &fake_ts; + ts->last_ts = 0; + fake_ts_format = li_const_gstring(CONST_STR_LEN(DEFAULT_TS_FORMAT)); + ts->format = &fake_ts_format; + ts->cached = tmpstr = g_string_sized_new(255); + } + if (cur_ts != ts->last_ts) { gsize s; #ifdef HAVE_LOCALTIME_R @@ -138,6 +153,8 @@ gboolean li_log_write_(liServer *srv, liVRequest *vr, liLogLevel log_level, guin g_string_prepend_c(log_line, ' '); g_string_prepend_len(log_line, GSTR_LEN(ts->cached)); + if (NULL != tmpstr) g_string_free(tmpstr, TRUE); + g_mutex_unlock(srv->logs.mutex); } @@ -415,7 +432,7 @@ void li_log_init(liServer *srv) { srv->logs.thread_alive = FALSE; /* first entry in srv->logs.timestamps is the default timestamp */ - li_log_timestamp_new(srv, g_string_new_len(CONST_STR_LEN("%d/%b/%Y %T %Z"))); + li_log_timestamp_new(srv, g_string_new_len(CONST_STR_LEN(DEFAULT_TS_FORMAT))); /* first entry in srv->logs.targets is the plain good old stderr */ str = g_string_new_len(CONST_STR_LEN("stderr")); diff --git a/src/main/plugin.c b/src/main/plugin.c index 403f3bf..0799812 100644 --- a/src/main/plugin.c +++ b/src/main/plugin.c @@ -589,7 +589,7 @@ static void li_plugin_free_default_options(liServer *srv, liPlugin *p) { } } -void li_plugins_prepare_worker(liWorker *srv) { /* blocking callbacks */ +void li_plugins_prepare_worker(liWorker *wrk) { /* blocking callbacks */ /* TODO */ } void li_plugins_prepare(liServer* srv) { /* "prepare", async */ @@ -608,7 +608,3 @@ void li_plugins_start_log(liServer *srv) { /* "run" */ void li_plugins_stop_log(liServer *srv) { /* "suspend now" */ /* TODO */ } - -void li_plugin_ready_for_state(liServer *srv, liPlugin *p, liServerState state) { - /* TODO */ -} diff --git a/src/main/server.c b/src/main/server.c index 671204a..f4824fe 100644 --- a/src/main/server.c +++ b/src/main/server.c @@ -14,6 +14,7 @@ static void li_server_listen_cb(struct ev_loop *loop, ev_io *w, int revents); static void li_server_stop(liServer *srv); +static void state_ready_cb(struct ev_loop *loop, struct ev_async *w, int revents); static liServerSocket* server_socket_new(int fd) { liServerSocket *sock = g_slice_new0(liServerSocket); @@ -94,6 +95,12 @@ liServer* li_server_new(const gchar *module_dir) { srv->state = LI_SERVER_INIT; srv->dest_state = LI_SERVER_RUNNING; + srv->statelock = g_mutex_new(); + g_queue_init(&srv->state_wait_queue); + srv->state_wait_for = srv->state; + ev_init(&srv->state_ready_watcher, state_ready_cb); + srv->state_ready_watcher.data = srv; + #ifdef HAVE_LUA_H srv->L = luaL_newstate(); luaL_openlibs(srv->L); @@ -190,6 +197,8 @@ void li_server_free(liServer* srv) { li_action_release(srv, srv->mainaction); + li_ev_safe_ref_and_stop(ev_async_stop, srv->loop, &srv->state_ready_watcher); + #ifdef HAVE_LUA_H lua_close(srv->L); srv->L = NULL; @@ -283,6 +292,9 @@ gboolean li_server_loop_init(liServer *srv) { return FALSE; } + ev_async_start(srv->loop, &srv->state_ready_watcher); + ev_unref(srv->loop); /* don't keep loop alive */ + return TRUE; } @@ -629,36 +641,40 @@ static liServerState li_server_next_state(liServer *srv) { static void li_server_start_transition(liServer *srv, liServerState state) { guint i; + liServerStateWait sw_dummy; + DEBUG(srv, "Try reaching state: %s (dest: %s)", li_server_state_string(state), li_server_state_string(srv->dest_state)); + srv->state_wait_for = state; + memset(&sw_dummy, 0, sizeof(sw_dummy)); + li_server_state_wait(srv, &sw_dummy); + switch (state) { case LI_SERVER_INIT: case LI_SERVER_LOADING: + li_server_reached_state(srv, state); + break; case LI_SERVER_SUSPENDED: - /* TODO: wait for prepare / suspended */ - li_server_reached_state(srv, LI_SERVER_SUSPENDED); + if (srv->state == LI_SERVER_LOADING) { + li_plugins_prepare(srv); + } break; case LI_SERVER_WARMUP: li_server_start_listen(srv); li_plugins_start_listen(srv); - li_server_reached_state(srv, LI_SERVER_WARMUP); break; case LI_SERVER_RUNNING: if (LI_SERVER_WARMUP == srv->state) { li_plugins_start_log(srv); - li_server_reached_state(srv, LI_SERVER_RUNNING); } else if (LI_SERVER_SUSPENDING == srv->state) { li_server_start_listen(srv); li_plugins_start_listen(srv); - li_server_reached_state(srv, LI_SERVER_RUNNING); } break; case LI_SERVER_SUSPENDING: li_server_stop_listen(srv); li_plugins_stop_listen(srv); /* wait for closed connections and plugins */ - /* TODO: wait */ - li_server_reached_state(srv, LI_SERVER_SUSPENDING); break; case LI_SERVER_STOPPING: /* stop all workers */ @@ -675,6 +691,8 @@ static void li_server_start_transition(liServer *srv, liServerState state) { /* wait */ break; } + + li_server_state_ready(srv, &sw_dummy); } void li_server_goto_state(liServer *srv, liServerState state) { @@ -704,6 +722,7 @@ void li_server_goto_state(liServer *srv, liServerState state) { void li_server_reached_state(liServer *srv, liServerState state) { liServerState want_state = li_server_next_state(srv); liServerState old_state = srv->state; + GList *swlink; if (state != want_state) return; if (state == srv->state) return; @@ -711,6 +730,18 @@ void li_server_reached_state(liServer *srv, liServerState state) { g_atomic_int_set(&srv->state, state); DEBUG(srv, "Reached state: %s (dest: %s)", li_server_state_string(state), li_server_state_string(srv->dest_state)); + /* cleanup state_wait_queue */ + g_mutex_lock(srv->statelock); + + while (NULL != (swlink = g_queue_pop_head_link(&srv->state_wait_queue))) { + liServerStateWait *sw = swlink->data; + sw->active = FALSE; + if (sw->cancel_cb) { + sw->cancel_cb(srv, sw); + } + } + g_mutex_unlock(srv->statelock); + switch (srv->state) { case LI_SERVER_INIT: break; @@ -725,9 +756,6 @@ void li_server_reached_state(liServer *srv, liServerState state) { } li_log_thread_start(srv); - - li_plugins_prepare(srv); - /* wait for plugins to report success */ break; case LI_SERVER_SUSPENDED: if (LI_SERVER_SUSPENDING == old_state) { @@ -760,3 +788,55 @@ void li_server_reached_state(liServer *srv, liServerState state) { li_server_start_transition(srv, want_state); } } + +static void state_ready_cb(struct ev_loop *loop, struct ev_async *w, int revents) { + liServer *srv = w->data; + + UNUSED(loop); + UNUSED(revents); + + g_mutex_lock(srv->statelock); + + if (srv->state_wait_queue.length > 0) { + /* not ready - ignore event */ + g_mutex_unlock(srv->statelock); + return; + } + + g_mutex_unlock(srv->statelock); + + if (srv->state_wait_for != li_server_next_state(srv)) { + /* not the state we have been waiting for - ignore */ + return; + } + + /* IMPORTANT: do not call this while statelock is locked */ + li_server_reached_state(srv, srv->state_wait_for); +} + +/** threadsafe */ +void li_server_state_ready(liServer *srv, liServerStateWait *sw) { + g_mutex_lock(srv->statelock); + + if (sw->active) { + g_queue_unlink(&srv->state_wait_queue, &sw->queue_link); + sw->active = FALSE; + + if (srv->state_wait_queue.length == 0) { + ev_async_send(srv->loop, &srv->state_ready_watcher); + } + } + + g_mutex_unlock(srv->statelock); +} + +/** only call from server state plugin hooks; push new wait condition to wait queue */ +void li_server_state_wait(liServer *srv, liServerStateWait *sw) { + g_mutex_lock(srv->statelock); + + sw->queue_link.data = sw; + g_queue_push_tail_link(&srv->state_wait_queue, &sw->queue_link); + sw->active = TRUE; + + g_mutex_unlock(srv->statelock); +}