Browse Source

angel/lighty now synchronize the server state

personal/stbuehler/wip
Stefan Bühler 13 years ago
parent
commit
56ff7ca32a
  1. 3
      include/lighttpd/angel_server.h
  2. 14
      include/lighttpd/plugin.h
  3. 20
      include/lighttpd/server.h
  4. 4
      include/lighttpd/worker.h
  5. 19
      src/angel/angel_plugin_core.c
  6. 27
      src/angel/angel_server.c
  7. 10
      src/common/angel_connection.c
  8. 3
      src/main/angel.c
  9. 5
      src/main/connection.c
  10. 10
      src/main/lighttpd.c
  11. 20
      src/main/log.c
  12. 37
      src/main/plugin.c
  13. 27
      src/main/plugin_core.c
  14. 283
      src/main/server.c
  15. 31
      src/main/worker.c

3
include/lighttpd/angel_server.h

@ -12,7 +12,7 @@
typedef enum {
LI_INSTANCE_DOWN, /* not started yet */
LI_INSTANCE_SUSPENDED, /* inactive, neither accept nor logs, handle remaining connections */
LI_INSTANCE_SILENT, /* only accept(), no logging: waiting for another instance to suspend */
LI_INSTANCE_WARMUP, /* only accept(), no logging: waiting for another instance to suspend */
LI_INSTANCE_RUNNING, /* everything running */
LI_INSTANCE_SUSPENDING, /* suspended accept(), still logging, handle remaining connections */
LI_INSTANCE_FINISHED /* not running */
@ -65,6 +65,7 @@ LI_API void li_server_stop(liServer *srv);
LI_API liInstance* li_server_new_instance(liServer *srv, liInstanceConf *ic);
LI_API void li_instance_replace(liInstance *oldi, liInstance *newi);
LI_API void li_instance_set_state(liInstance *i, liInstanceState s);
LI_API void li_instance_state_reached(liInstance *i, liInstanceState s);
LI_API liInstanceConf* li_instance_conf_new(gchar **cmd, GString *username, uid_t uid, gid_t gid);
LI_API void li_instance_conf_release(liInstanceConf *ic);

14
include/lighttpd/plugin.h

@ -33,6 +33,8 @@ struct liPlugin {
size_t opt_base_index;
gboolean ready_for_next_state; /**< don't modify this; use li_plugin_ready_for_state() instead */
liPluginFreeCB free; /**< called before plugin is unloaded */
liPluginHandleVRequestCB handle_request_body;
@ -128,6 +130,18 @@ LI_API gboolean li_parse_option(liServer *srv, const char *name, liValue *val, l
LI_API void li_release_option(liServer *srv, liOptionSet *mark); /**< Does not free the option_set memory */
LI_API void li_plugins_prepare_callbacks(liServer *srv);
/* server state machine callbacks */
LI_API void li_plugins_prepare_worker(liWorker *srv); /* blocking callbacks */
LI_API void li_plugins_prepare(liServer *srv); /* "prepare", async */
LI_API void li_plugins_start_listen(liServer *srv); /* "warmup" */
LI_API void li_plugins_stop_listen(liServer *srv); /* "prepare suspend", async */
LI_API void li_plugins_start_log(liServer *srv); /* "run" */
LI_API void li_plugins_stop_log(liServer *srv); /* "suspend now" */
LI_API void li_plugin_ready_for_state(liServer *srv, liPlugin *p, liServerState state);
LI_API void li_plugins_handle_close(liConnection *con);
LI_API void li_plugins_handle_vrclose(liVRequest *vr);

20
include/lighttpd/server.h

@ -6,9 +6,14 @@
#endif
typedef enum {
LI_SERVER_STARTING, /** start up: don't write log files, don't accept connections */
LI_SERVER_RUNNING, /** running: write logs, accept connections */
LI_SERVER_STOPPING /** stopping: flush logs, don't accept new connections */
LI_SERVER_INIT, /** start state */
LI_SERVER_LOADING, /** config loaded, prepare listeing sockets/open log files */
LI_SERVER_SUSPENDED, /** ready to go, no logs */
LI_SERVER_WARMUP, /** listen() active, no logs yet, handling remaining connections */
LI_SERVER_RUNNING, /** listen() and logs active */
LI_SERVER_SUSPENDING, /** listen() stopped, logs active, handling remaining connections */
LI_SERVER_STOPPING, /** listen() stopped, no logs, handling remaining connections */
LI_SERVER_DOWN /** exit */
} liServerState;
struct liServerSocket {
@ -21,7 +26,7 @@ struct liServerSocket {
struct liServer {
guint32 magic; /** server magic version, check against LIGHTTPD_SERVER_MAGIC in plugins */
liServerState state; /** atomic access */
liServerState state, dest_state; /** atomic access */
liAngelConnection *acon;
liWorker *main_worker;
@ -91,10 +96,6 @@ LI_API gboolean li_server_worker_init(liServer *srv);
LI_API void li_server_listen(liServer *srv, int fd);
/* Start accepting connection, use log files, no new plugins after that */
LI_API void li_server_start(liServer *srv);
/* stop accepting connections, turn keep-alive off, close all shutdown sockets, set exiting = TRUE */
LI_API void li_server_stop(liServer *srv);
/* exit asap with cleanup */
LI_API void li_server_exit(liServer *srv);
@ -107,4 +108,7 @@ LI_API guint li_server_ts_format_add(liServer *srv, GString* format);
LI_API void li_server_socket_release(liServerSocket* sock);
LI_API void li_server_socket_acquire(liServerSocket* sock);
LI_API void li_server_goto_state(liServer *srv, liServerState state);
LI_API void li_server_reached_state(liServer *srv, liServerState state);
#endif

4
include/lighttpd/worker.h

@ -54,7 +54,7 @@ struct liWorker {
struct ev_loop *loop;
ev_prepare loop_prepare;
ev_check loop_check;
ev_async li_worker_stop_watcher, li_worker_exit_watcher;
ev_async li_worker_stop_watcher, li_worker_suspend_watcher, li_worker_exit_watcher;
guint connections_active; /** 0..con_act-1: active connections, con_act..used-1: free connections
* use with atomic, read direct from local worker context
@ -76,7 +76,6 @@ struct liWorker {
guint connection_load; /** incremented by server_accept_cb, decremented by worker_con_put. use atomic access */
GArray *timestamps_gmt; /** array of (worker_ts), use only from local worker context and through li_worker_current_timestamp(wrk, LI_GMTIME, ndx) */
GArray *timestamps_local;
@ -106,6 +105,7 @@ LI_API void li_worker_free(liWorker *wrk);
LI_API void li_worker_run(liWorker *wrk);
LI_API void li_worker_stop(liWorker *context, liWorker *wrk);
LI_API void li_worker_suspend(liWorker *context, liWorker *wrk);
LI_API void li_worker_exit(liWorker *context, liWorker *wrk);
LI_API void li_worker_new_con(liWorker *ctx, liWorker *wrk, liSocketAddress remote_addr, int s, liServerSocket *srv_sock);

19
src/angel/angel_plugin_core.c

@ -195,6 +195,8 @@ static void core_listen(liServer *srv, liInstance *i, liPlugin *p, gint32 id, GS
GError *err = NULL;
gint fd;
GArray *fds;
UNUSED(p);
DEBUG(srv, "core_listen(%i) '%s'", id, data->str);
if (-1 == id) return; /* ignore simple calls */
@ -221,6 +223,22 @@ static void core_listen(liServer *srv, liInstance *i, liPlugin *p, gint32 id, GS
}
}
static void core_reached_state(liServer *srv, liInstance *i, liPlugin *p, gint32 id, GString *data) {
UNUSED(srv);
UNUSED(p);
UNUSED(id);
if (0 == strcmp(data->str, "suspended")) {
li_instance_state_reached(i, LI_INSTANCE_SUSPENDED);
} else if (0 == strcmp(data->str, "warmup")) {
li_instance_state_reached(i, LI_INSTANCE_WARMUP);
} else if (0 == strcmp(data->str, "running")) {
li_instance_state_reached(i, LI_INSTANCE_RUNNING);
} else if (0 == strcmp(data->str, "suspending")) {
li_instance_state_reached(i, LI_INSTANCE_SUSPENDING);
}
}
static void core_clean(liServer *srv, liPlugin *p);
static void core_free(liServer *srv, liPlugin *p) {
liPluginCoreConfig *config = (liPluginCoreConfig*) p->data;
@ -292,6 +310,7 @@ static gboolean core_init(liServer *srv, liPlugin *p) {
p->handle_activate_config = core_activate;
g_hash_table_insert(p->angel_callbacks, "listen", (gpointer)(intptr_t)core_listen);
g_hash_table_insert(p->angel_callbacks, "reached-state", (gpointer)(intptr_t)core_reached_state);
return TRUE;
}

27
src/angel/angel_server.c

@ -1,8 +1,6 @@
#include <lighttpd/angel_base.h>
static void instance_state_reached(liInstance *i, liInstanceState s);
#define CATCH_SIGNAL(loop, cb, n) do { \
ev_init(&srv->sig_w_##n, cb); \
ev_signal_set(&srv->sig_w_##n, SIG##n); \
@ -70,6 +68,8 @@ static void instance_angel_call_cb(liAngelConnection *acon,
liPlugins *ps = &srv->plugins;
liPlugin *p;
liPluginHandleCallCB cb;
UNUSED(mod_len);
UNUSED(action_len);
p = g_hash_table_lookup(ps->ht_plugins, mod);
if (!p) {
@ -152,7 +152,7 @@ static void instance_child_cb(struct ev_loop *loop, ev_child *w, int revents) {
li_angel_connection_free(i->acon);
i->acon = NULL;
ev_child_stop(loop, w);
instance_state_reached(i, news);
li_instance_state_reached(i, news);
li_instance_release(i);
}
@ -205,6 +205,7 @@ liInstance* li_server_new_instance(liServer *srv, liInstanceConf *ic) {
}
void li_instance_replace(liInstance *oldi, liInstance *newi) {
/* TODO ??? */
}
void li_instance_set_state(liInstance *i, liInstanceState s) {
@ -214,7 +215,7 @@ void li_instance_set_state(liInstance *i, liInstanceState s) {
case LI_INSTANCE_SUSPENDING:
ERROR(i->srv, "Invalid destination state %i", s);
return; /* cannot set this */
case LI_INSTANCE_SILENT:
case LI_INSTANCE_WARMUP:
case LI_INSTANCE_SUSPENDED:
case LI_INSTANCE_RUNNING:
case LI_INSTANCE_FINISHED:
@ -230,8 +231,8 @@ void li_instance_set_state(liInstance *i, liInstanceState s) {
case LI_INSTANCE_DOWN:
case LI_INSTANCE_SUSPENDING:
break; /* cannot be set */
case LI_INSTANCE_SILENT:
li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("run-silent"), NULL, &error);
case LI_INSTANCE_WARMUP:
li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("warmup"), NULL, &error);
break;
case LI_INSTANCE_SUSPENDED:
li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("suspend"), NULL, &error);
@ -243,7 +244,7 @@ void li_instance_set_state(liInstance *i, liInstanceState s) {
if (i->proc) {
kill(i->proc->child_pid, SIGTERM);
} else {
instance_state_reached(i, LI_INSTANCE_FINISHED);
li_instance_state_reached(i, LI_INSTANCE_FINISHED);
}
break;
}
@ -254,13 +255,13 @@ void li_instance_set_state(liInstance *i, liInstanceState s) {
if (i->proc) {
kill(i->proc->child_pid, SIGTERM);
} else {
instance_state_reached(i, LI_INSTANCE_FINISHED);
li_instance_state_reached(i, LI_INSTANCE_FINISHED);
}
}
}
}
static void instance_state_reached(liInstance *i, liInstanceState s) {
void li_instance_state_reached(liInstance *i, liInstanceState s) {
GError *error = NULL;
i->s_cur = s;
@ -279,9 +280,9 @@ static void instance_state_reached(liInstance *i, liInstanceState s) {
break; /* impossible */
case LI_INSTANCE_SUSPENDED:
break;
case LI_INSTANCE_SILENT:
case LI_INSTANCE_WARMUP:
/* make sure we move to SILENT after we spawned the instance */
li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("run-silent"), NULL, &error);
li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("warmup"), NULL, &error);
break;
case LI_INSTANCE_RUNNING:
/* make sure we move to RUNNING after we spawned the instance */
@ -292,7 +293,7 @@ static void instance_state_reached(liInstance *i, liInstanceState s) {
break; /* nothing to do, instance should already know what to do */
}
break;
case LI_INSTANCE_SILENT:
case LI_INSTANCE_WARMUP:
/* TODO: replace another instance? */
break;
case LI_INSTANCE_RUNNING:
@ -314,7 +315,7 @@ static void instance_state_reached(liInstance *i, liInstanceState s) {
if (i->proc) {
kill(i->proc->child_pid, SIGTERM);
} else {
instance_state_reached(i, LI_INSTANCE_FINISHED);
li_instance_state_reached(i, LI_INSTANCE_FINISHED);
}
}
}

10
src/common/angel_connection.c

@ -557,17 +557,17 @@ gboolean li_angel_send_simple_call(
goto error;
}
if (data->len > ANGEL_CALL_MAX_STR_LEN) {
if (data && data->len > ANGEL_CALL_MAX_STR_LEN) {
g_set_error(err, LI_ANGEL_CALL_ERROR, LI_ANGEL_CALL_INVALID, "data too lang for angel call: %" G_GSIZE_FORMAT " > %i", data->len, ANGEL_CALL_MAX_STR_LEN);
goto error;
}
if (!prepare_call_header(&buf, ANGEL_CALL_SEND_SIMPLE, -1, mod, mod_len, action, action_len, 0, data->len, 0, err)) goto error;
if (!prepare_call_header(&buf, ANGEL_CALL_SEND_SIMPLE, -1, mod, mod_len, action, action_len, 0, data ? data->len : 0, 0, err)) goto error;
g_mutex_lock(acon->mutex);
queue_was_empty = (0 == acon->out->length);
send_queue_push_string(acon->out, buf);
send_queue_push_string(acon->out, data);
if (data) send_queue_push_string(acon->out, data);
g_mutex_unlock(acon->mutex);
if (queue_was_empty)
@ -629,7 +629,7 @@ gboolean li_angel_send_call(
g_mutex_lock(acon->mutex);
queue_was_empty = (0 == acon->out->length);
send_queue_push_string(acon->out, buf);
send_queue_push_string(acon->out, data);
if (data) send_queue_push_string(acon->out, data);
g_mutex_unlock(acon->mutex);
if (queue_was_empty)
@ -675,7 +675,7 @@ gboolean li_angel_send_result(
queue_was_empty = (0 == acon->out->length);
send_queue_push_string(acon->out, buf);
send_queue_push_string(acon->out, error);
send_queue_push_string(acon->out, data);
if (data) send_queue_push_string(acon->out, data);
send_queue_push_fds(acon->out, fds);
g_mutex_unlock(acon->mutex);

3
src/main/angel.c

@ -8,6 +8,8 @@ static void angel_call_cb(liAngelConnection *acon,
liServer *srv = acon->data;
liPlugin *p;
const liPluginAngel *acb;
UNUSED(action_len);
UNUSED(mod_len);
if (NULL == (p = g_hash_table_lookup(srv->plugins, mod))) goto not_found;
if (NULL == p->angelcbs) goto not_found;
@ -35,6 +37,7 @@ static void angel_close_cb(liAngelConnection *acon, GError *err) {
void li_angel_setup(liServer *srv) {
srv->acon = li_angel_connection_new(srv->loop, 0, srv, angel_call_cb, angel_close_cb);
srv->dest_state = LI_SERVER_SUSPENDED;
}
static void li_angel_listen_cb(liAngelCall *acall, gpointer ctx, gboolean timeout, GString *error, GString *data, GArray *fds) {

5
src/main/connection.c

@ -55,13 +55,16 @@ static void forward_response_body(liConnection *con) {
static void connection_request_done(liConnection *con) {
liVRequest *vr = con->mainvr;
liServerState s;
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(con->mainvr, "response end (keep_alive = %i)", con->keep_alive);
}
li_plugins_handle_close(con);
if (con->keep_alive && g_atomic_int_get(&con->srv->state) == LI_SERVER_RUNNING) {
s = g_atomic_int_get(&con->srv->dest_state);
if (con->keep_alive && (LI_SERVER_RUNNING == s || LI_SERVER_WARMUP == s)) {
li_connection_reset_keep_alive(con);
} else {
worker_con_put(con);

10
src/main/lighttpd.c

@ -143,14 +143,20 @@ int main(int argc, char *argv[]) {
#endif
}
if (!srv->mainaction) {
ERROR(srv, "%s", "No action handlers defined");
return 1;
}
/* if config should only be tested, exit here */
if (test_config)
return 0;
/* TRACE(srv, "%s", "Test!"); */
li_server_worker_init(srv);
li_server_start(srv);
li_server_reached_state(srv, LI_SERVER_LOADING);
li_worker_run(srv->main_worker);
li_server_reached_state(srv, LI_SERVER_DOWN);
if (!luaconfig)
config_parser_finish(srv, ctx_stack, TRUE);

20
src/main/log.c

@ -14,9 +14,17 @@
void li_log_write(liServer *srv, liLog *log, GString *msg) {
liLogEntry *log_entry;
if (g_atomic_int_get(&srv->state) == LI_SERVER_STARTING) {
switch (g_atomic_int_get(&srv->state)) {
case LI_SERVER_INIT:
case LI_SERVER_LOADING:
case LI_SERVER_SUSPENDED:
case LI_SERVER_WARMUP:
case LI_SERVER_STOPPING:
case LI_SERVER_DOWN:
li_angel_log(srv, msg);
return;
default:
break;
}
log_ref(srv, log);
@ -122,10 +130,18 @@ gboolean li_log_write_(liServer *srv, liVRequest *vr, liLogLevel log_level, guin
g_string_append_len(log_line, CONST_STR_LEN("\r\n"));
if (g_atomic_int_get(&srv->state) == LI_SERVER_STARTING) {
switch (g_atomic_int_get(&srv->state)) {
case LI_SERVER_INIT:
case LI_SERVER_LOADING:
case LI_SERVER_SUSPENDED:
case LI_SERVER_WARMUP:
case LI_SERVER_STOPPING:
case LI_SERVER_DOWN:
log_unref(srv, log);
li_angel_log(srv, log_line);
return TRUE;
default:
break;
}
log_entry = g_slice_new(liLogEntry);
log_entry->log = log;

37
src/main/plugin.c

@ -50,9 +50,12 @@ static void li_plugin_free_setups(liServer *srv, liPlugin *p) {
}
void li_plugin_free(liServer *srv, liPlugin *p) {
liServerState s;
if (!p) return;
if (g_atomic_int_get(&srv->state) == LI_SERVER_RUNNING) {
s = g_atomic_int_get(&srv->state);
if (LI_SERVER_INIT != s && LI_SERVER_DOWN != s) {
ERROR(srv, "Cannot free plugin '%s' while server is running", p->name);
return;
}
@ -71,8 +74,10 @@ void li_plugin_free(liServer *srv, liPlugin *p) {
void li_server_plugins_free(liServer *srv) {
gpointer key, val;
GHashTableIter i;
liServerState s;
if (g_atomic_int_get(&srv->state) == LI_SERVER_RUNNING) {
s = g_atomic_int_get(&srv->state);
if (LI_SERVER_INIT != s && LI_SERVER_DOWN != s) {
ERROR(srv, "%s", "Cannot free plugins while server is running");
return;
}
@ -95,13 +100,15 @@ void li_server_plugins_free(liServer *srv) {
liPlugin *li_plugin_register(liServer *srv, const gchar *name, liPluginInitCB init) {
liPlugin *p;
liServerState s;
if (!init) {
ERROR(srv, "Module '%s' needs an init function", name);
return NULL;
}
if (g_atomic_int_get(&srv->state) != LI_SERVER_STARTING) {
s = g_atomic_int_get(&srv->state);
if (LI_SERVER_INIT != s) {
ERROR(srv, "Cannot register plugin '%s' after server was started", name);
return NULL;
}
@ -433,3 +440,27 @@ static void li_plugin_free_default_options(liServer *srv, liPlugin *p) {
g_array_index(srv->option_def_values, liOptionValue, sopt->index) = oempty;
}
}
void li_plugins_prepare_worker(liWorker *srv) { /* blocking callbacks */
/* TODO */
}
void li_plugins_prepare(liServer* srv) { /* "prepare", async */
/* TODO */
}
void li_plugins_start_listen(liServer *srv) { /* "warmup" */
/* TODO */
}
void li_plugins_stop_listen(liServer *srv) { /* "prepare suspend", async */
/* TODO */
}
void li_plugins_start_log(liServer *srv) { /* "run" */
/* TODO */
}
void li_plugins_stop_log(liServer *srv) { /* "suspend now" */
/* TODO */
}
void li_plugin_ready_for_state(liServer *srv, liPlugin *p, liServerState state) {
/* TODO */
}

27
src/main/plugin_core.c

@ -1287,6 +1287,29 @@ static liAction* core_throttle_connection(liServer *srv, liPlugin* p, liValue *v
return li_action_new_function(core_handle_throttle_connection, NULL, NULL, GUINT_TO_POINTER((guint) rate));
}
static void core_warmup(liServer *srv, liPlugin *p, gint32 id, GString *data) {
UNUSED(p);
UNUSED(id);
UNUSED(data);
li_server_goto_state(srv, LI_SERVER_WARMUP);
}
static void core_run(liServer *srv, liPlugin *p, gint32 id, GString *data) {
UNUSED(p);
UNUSED(id);
UNUSED(data);
li_server_goto_state(srv, LI_SERVER_RUNNING);
}
static void core_suspend(liServer *srv, liPlugin *p, gint32 id, GString *data) {
UNUSED(p);
UNUSED(id);
UNUSED(data);
li_server_goto_state(srv, LI_SERVER_SUSPENDED);
}
static const liPluginOption options[] = {
{ "debug.log_request_handling", LI_VALUE_BOOLEAN, GINT_TO_POINTER(FALSE), NULL, NULL },
@ -1357,6 +1380,10 @@ static const liPluginSetup setups[] = {
};
static const liPluginAngel angelcbs[] = {
{ "warmup", core_warmup },
{ "run", core_run },
{ "suspend", core_suspend },
{ NULL, NULL }
};

283
src/main/server.c

@ -3,6 +3,7 @@
#include <lighttpd/plugin_core.h>
static void li_server_listen_cb(struct ev_loop *loop, ev_io *w, int revents);
static void li_server_stop(liServer *srv);
static liServerSocket* server_socket_new(int fd) {
liServerSocket *sock = g_slice_new0(liServerSocket);
@ -58,18 +59,15 @@ static void server_setup_free(gpointer _ss) {
static void sigint_cb(struct ev_loop *loop, struct ev_signal *w, int revents) {
liServer *srv = (liServer*) w->data;
UNUSED(loop);
UNUSED(revents);
if (g_atomic_int_get(&srv->state) != LI_SERVER_STOPPING) {
if (g_atomic_int_get(&srv->dest_state) != LI_SERVER_DOWN) {
INFO(srv, "%s", "Got signal, shutdown");
li_server_stop(srv);
li_server_goto_state(srv, LI_SERVER_DOWN);
} else {
INFO(srv, "%s", "Got second signal, force shutdown");
/* reset default behaviour which will kill us the third time */
UNCATCH_SIGNAL(loop, INT);
UNCATCH_SIGNAL(loop, TERM);
UNCATCH_SIGNAL(loop, PIPE);
exit(1);
}
}
@ -82,7 +80,8 @@ liServer* li_server_new(const gchar *module_dir) {
liServer* srv = g_slice_new0(liServer);
srv->magic = LIGHTTPD_SERVER_MAGIC;
srv->state = LI_SERVER_STARTING;
srv->state = LI_SERVER_INIT;
srv->dest_state = LI_SERVER_RUNNING;
srv->workers = g_array_new(FALSE, TRUE, sizeof(liWorker*));
srv->worker_count = 0;
@ -115,6 +114,7 @@ liServer* li_server_new(const gchar *module_dir) {
log_init(srv);
srv->io_timeout = 300; /* default I/O timeout */
srv->keep_alive_queue_timeout = 5;
return srv;
}
@ -310,75 +310,69 @@ static void li_server_listen_cb(struct ev_loop *loop, ev_io *w, int revents) {
}
}
/* main worker only */
void li_server_listen(liServer *srv, int fd) {
liServerSocket *sock = server_socket_new(fd);
sock->srv = srv;
g_ptr_array_add(srv->sockets, sock);
if (g_atomic_int_get(&srv->state) == LI_SERVER_RUNNING) ev_io_start(srv->main_worker->loop, &sock->watcher);
if (LI_SERVER_RUNNING == srv->state || LI_SERVER_WARMUP == srv->state) ev_io_start(srv->main_worker->loop, &sock->watcher);
}
void li_server_start(liServer *srv) {
static void li_server_start_listen(liServer *srv) {
guint i;
liServerState srvstate = g_atomic_int_get(&srv->state);
if (srvstate == LI_SERVER_STOPPING || srvstate == LI_SERVER_RUNNING) return; /* no restart after stop */
g_atomic_int_set(&srv->state, LI_SERVER_RUNNING);
if (!srv->mainaction) {
ERROR(srv, "%s", "No action handlers defined");
li_server_stop(srv);
return;
}
srv->keep_alive_queue_timeout = 5;
li_plugins_prepare_callbacks(srv);
for (i = 0; i < srv->sockets->len; i++) {
liServerSocket *sock = g_ptr_array_index(srv->sockets, i);
ev_io_start(srv->main_worker->loop, &sock->watcher);
}
srv->started = ev_now(srv->main_worker->loop);
{
GString *str = li_worker_current_timestamp(srv->main_worker, LI_LOCALTIME, LI_TS_FORMAT_DEFAULT);
srv->started = ev_now(srv->main_worker->loop);
srv->started_str = g_string_new_len(GSTR_LEN(str));
}
log_thread_start(srv);
li_worker_run(srv->main_worker);
}
void li_server_stop(liServer *srv) {
static void li_server_stop_listen(liServer *srv) {
guint i;
if (g_atomic_int_get(&srv->state) == LI_SERVER_STOPPING) return;
g_atomic_int_set(&srv->state, LI_SERVER_STOPPING);
for (i = 0; i < srv->sockets->len; i++) {
liServerSocket *sock = g_ptr_array_index(srv->sockets, i);
ev_io_stop(srv->main_worker->loop, &sock->watcher);
}
if (srv->main_worker) {
for (i = 0; i < srv->sockets->len; i++) {
liServerSocket *sock = g_ptr_array_index(srv->sockets, i);
ev_io_stop(srv->main_worker->loop, &sock->watcher);
}
/* suspend all workers (close keep-alive connections) */
for (i = 0; i < srv->worker_count; i++) {
liWorker *wrk;
wrk = g_array_index(srv->workers, liWorker*, i);
li_worker_suspend(srv->main_worker, wrk);
}
}
/* stop all workers */
for (i = 0; i < srv->worker_count; i++) {
liWorker *wrk;
wrk = g_array_index(srv->workers, liWorker*, i);
li_worker_stop(srv->main_worker, wrk);
}
static void li_server_stop(liServer *srv) {
guint i;
for (i = 0; i < srv->sockets->len; i++) {
liServerSocket *sock = g_ptr_array_index(srv->sockets, i);
ev_io_stop(srv->main_worker->loop, &sock->watcher);
}
log_thread_wakeup(srv);
/* stop all workers */
for (i = 0; i < srv->worker_count; i++) {
liWorker *wrk;
wrk = g_array_index(srv->workers, liWorker*, i);
li_worker_stop(srv->main_worker, wrk);
}
}
void li_server_exit(liServer *srv) {
li_server_stop(srv);
g_atomic_int_set(&srv->exiting, TRUE);
g_atomic_int_set(&srv->state, LI_SERVER_DOWN);
g_atomic_int_set(&srv->dest_state, LI_SERVER_DOWN);
/* exit all workers */
{
@ -444,3 +438,206 @@ guint li_server_ts_format_add(liServer *srv, GString* format) {
g_array_append_val(srv->ts_formats, format);
return i;
}
/* state machine: call this functions only in the main worker context */
/* Note: main worker doesn't need atomic read for state */
#if 0
case LI_SERVER_INIT:
case LI_SERVER_LOADING:
case LI_SERVER_SUSPENDED:
case LI_SERVER_WARMUP:
case LI_SERVER_RUNNING:
case LI_SERVER_SUSPENDING:
case LI_SERVER_STOPPING:
case LI_SERVER_DOWN:
#endif
static const gchar* li_server_state_string(liServerState state) {
switch (state) {
case LI_SERVER_INIT: return "init";
case LI_SERVER_LOADING: return "loading";
case LI_SERVER_SUSPENDED: return "suspended";
case LI_SERVER_WARMUP: return "warmup";
case LI_SERVER_RUNNING: return "running";
case LI_SERVER_SUSPENDING: return "suspending";
case LI_SERVER_STOPPING: return "stopping";
case LI_SERVER_DOWN: return "down";
}
return "<unkown>";
}
/* next state in the machine we want to reach to reach */
static liServerState li_server_next_state(liServer *srv) {
switch (srv->state) {
case LI_SERVER_INIT:
return LI_SERVER_LOADING;
case LI_SERVER_LOADING:
if (LI_SERVER_DOWN == srv->dest_state) return LI_SERVER_STOPPING;
return LI_SERVER_SUSPENDED;
case LI_SERVER_SUSPENDED:
switch (srv->dest_state) {
case LI_SERVER_INIT:
case LI_SERVER_LOADING:
case LI_SERVER_SUSPENDED:
return LI_SERVER_SUSPENDED;
case LI_SERVER_WARMUP:
case LI_SERVER_RUNNING:
case LI_SERVER_SUSPENDING:
return LI_SERVER_WARMUP;
case LI_SERVER_STOPPING:
case LI_SERVER_DOWN:
return LI_SERVER_STOPPING;
}
return LI_SERVER_DOWN;
case LI_SERVER_WARMUP:
if (LI_SERVER_WARMUP == srv->dest_state) return LI_SERVER_WARMUP;
return LI_SERVER_RUNNING;
case LI_SERVER_RUNNING:
if (LI_SERVER_RUNNING == srv->dest_state) return LI_SERVER_RUNNING;
return LI_SERVER_SUSPENDING;
case LI_SERVER_SUSPENDING:
if (LI_SERVER_RUNNING == srv->dest_state) return LI_SERVER_RUNNING;
if (LI_SERVER_SUSPENDING == srv->dest_state) return LI_SERVER_SUSPENDING;
return LI_SERVER_SUSPENDED;
case LI_SERVER_STOPPING:
case LI_SERVER_DOWN:
return LI_SERVER_DOWN;
}
return LI_SERVER_DOWN;
}
static void li_server_start_transition(liServer *srv, liServerState state) {
guint i;
DEBUG(srv, "Try reaching state: %s (dest: %s)", li_server_state_string(state), li_server_state_string(srv->dest_state));
switch (state) {
case LI_SERVER_INIT:
case LI_SERVER_LOADING:
case LI_SERVER_SUSPENDED:
/* TODO: wait for prepare / suspended */
li_server_reached_state(srv, LI_SERVER_SUSPENDED);
break;
case LI_SERVER_WARMUP:
li_server_start_listen(srv);
li_plugins_start_listen(srv);
li_server_reached_state(srv, LI_SERVER_WARMUP);
break;
case LI_SERVER_RUNNING:
if (LI_SERVER_WARMUP == srv->state) {
li_plugins_start_log(srv);
li_server_reached_state(srv, LI_SERVER_RUNNING);
} else if (LI_SERVER_SUSPENDING == srv->state) {
li_server_start_listen(srv);
li_plugins_start_listen(srv);
li_server_reached_state(srv, LI_SERVER_RUNNING);
}
break;
case LI_SERVER_SUSPENDING:
li_server_stop_listen(srv);
li_plugins_stop_listen(srv);
/* wait for closed connections and plugins */
/* TODO: wait */
li_server_reached_state(srv, LI_SERVER_SUSPENDING);
break;
case LI_SERVER_STOPPING:
/* stop all workers */
for (i = 0; i < srv->worker_count; i++) {
liWorker *wrk;
wrk = g_array_index(srv->workers, liWorker*, i);
li_worker_stop(srv->main_worker, wrk);
}
log_thread_wakeup(srv);
li_server_reached_state(srv, LI_SERVER_STOPPING);
break;
case LI_SERVER_DOWN:
/* wait */
break;
}
}
void li_server_goto_state(liServer *srv, liServerState state) {
if (srv->dest_state == LI_SERVER_DOWN || srv->dest_state == state) return; /* cannot undo this */
switch (state) {
case LI_SERVER_INIT:
case LI_SERVER_LOADING:
case LI_SERVER_SUSPENDING:
case LI_SERVER_STOPPING:
return; /* invalid dest states */
case LI_SERVER_WARMUP:
case LI_SERVER_RUNNING:
case LI_SERVER_SUSPENDED:
case LI_SERVER_DOWN:
break;
}
g_atomic_int_set(&srv->dest_state, state);
if (srv->dest_state != srv->state) {
liServerState want_state = li_server_next_state(srv);
li_server_start_transition(srv, want_state);
}
}
void li_server_reached_state(liServer *srv, liServerState state) {
liServerState want_state = li_server_next_state(srv);
liServerState old_state = srv->state;
if (state != want_state) return;
if (state == srv->state) return;
g_atomic_int_set(&srv->state, state);
DEBUG(srv, "Reached state: %s (dest: %s)", li_server_state_string(state), li_server_state_string(srv->dest_state));
switch (srv->state) {
case LI_SERVER_INIT:
break;
case LI_SERVER_LOADING:
li_plugins_prepare_callbacks(srv);
li_server_worker_init(srv);
{
GString *str = li_worker_current_timestamp(srv->main_worker, LI_LOCALTIME, LI_TS_FORMAT_DEFAULT);
srv->started = ev_now(srv->main_worker->loop);
srv->started_str = g_string_new_len(GSTR_LEN(str));
}
log_thread_start(srv);
li_plugins_prepare(srv);
/* wait for plugins to report success */
break;
case LI_SERVER_SUSPENDED:
if (LI_SERVER_SUSPENDING == old_state) {
li_plugins_stop_log(srv);
}
break;
case LI_SERVER_WARMUP:
case LI_SERVER_RUNNING:
break;
case LI_SERVER_SUSPENDING:
case LI_SERVER_STOPPING:
break;
case LI_SERVER_DOWN:
/* li_server_exit(srv); */
return;
}
if (srv->acon) {
GString *data = g_string_new(li_server_state_string(srv->state));
GError *err = NULL;
if (!li_angel_send_simple_call(srv->acon, CONST_STR_LEN("core"), CONST_STR_LEN("reached-state"), data, &err)) {
GERROR(srv, err, "%s", "couldn't send state update to angel");
g_error_free(err);
}
}
if (srv->dest_state != srv->state) {
want_state = li_server_next_state(srv);
li_server_start_transition(srv, want_state);
}
}

31
src/main/worker.c

@ -237,6 +237,15 @@ static void li_worker_stop_cb(struct ev_loop *loop, ev_async *w, int revents) {
li_worker_stop(wrk, wrk);
}
/* stop worker watcher */
static void li_worker_suspend_cb(struct ev_loop *loop, ev_async *w, int revents) {
liWorker *wrk = (liWorker*) w->data;
UNUSED(loop);
UNUSED(revents);
li_worker_suspend(wrk, wrk);
}
/* exit worker watcher */
static void li_worker_exit_cb(struct ev_loop *loop, ev_async *w, int revents) {
liWorker *wrk = (liWorker*) w->data;
@ -368,6 +377,10 @@ liWorker* li_worker_new(liServer *srv, struct ev_loop *loop) {
wrk->li_worker_stop_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->li_worker_stop_watcher);
ev_init(&wrk->li_worker_suspend_watcher, li_worker_suspend_cb);
wrk->li_worker_suspend_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->li_worker_suspend_watcher);
ev_init(&wrk->new_con_watcher, li_worker_new_con_cb);
wrk->new_con_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->new_con_watcher);
@ -507,6 +520,7 @@ void li_worker_stop(liWorker *context, liWorker *wrk) {
guint i;
ev_async_stop(wrk->loop, &wrk->li_worker_stop_watcher);
ev_async_stop(wrk->loop, &wrk->li_worker_suspend_watcher);
ev_async_stop(wrk->loop, &wrk->new_con_watcher);
li_waitqueue_stop(&wrk->io_timeout_queue);
li_waitqueue_stop(&wrk->throttle_queue);
@ -533,6 +547,23 @@ void li_worker_stop(liWorker *context, liWorker *wrk) {
}
}
void li_worker_suspend(liWorker *context, liWorker *wrk) {
if (context == wrk) {
guint i;
/* close keep alive connections */
for (i = wrk->connections_active; i-- > 0;) {
liConnection *con = g_array_index(wrk->connections, liConnection*, i);
if (con->state == LI_CON_STATE_KEEP_ALIVE)
worker_con_put(con);
}
li_worker_check_keepalive(wrk);
} else {
ev_async_send(wrk->loop, &wrk->li_worker_suspend_watcher);
}
}
void li_worker_exit(liWorker *context, liWorker *wrk) {
if (context == wrk) {
ev_unloop (wrk->loop, EVUNLOOP_ALL);

Loading…
Cancel
Save