Browse Source

New instance state machine

personal/stbuehler/wip
Stefan Bühler 13 years ago
parent
commit
534a0f2d3a
  1. 19
      include/lighttpd/angel_server.h
  2. 4
      src/angel/angel_plugin.c
  3. 4
      src/angel/angel_plugin_core.c
  4. 233
      src/angel/angel_server.c

19
include/lighttpd/angel_server.h

@ -10,11 +10,12 @@
#endif
typedef enum {
LI_INSTANCE_DOWN, /* not running */
LI_INSTANCE_LOADING, /* startup */
LI_INSTANCE_WARMUP, /* running, but logging to files disabled */
LI_INSTANCE_ACTIVE, /* everything running */
LI_INSTANCE_SUSPEND /* handle remaining connections, suspend logs+accept() */
LI_INSTANCE_DOWN, /* not started yet */
LI_INSTANCE_SUSPENDED, /* inactive, neither accept nor logs, handle remaining connections */
LI_INSTANCE_SILENT, /* only accept(), no logging: waiting for another instance to suspend */
LI_INSTANCE_RUNNING, /* everything running */
LI_INSTANCE_SUSPENDING, /* suspended accept(), still logging, handle remaining connections */
LI_INSTANCE_FINISHED /* not running */
} liInstanceState;
struct liInstanceConf {
@ -40,7 +41,6 @@ struct liInstance {
liInstance *replace, *replace_by;
liAngelConnection *acon;
gboolean in_jobqueue;
};
struct liServer {
@ -52,9 +52,6 @@ struct liServer {
sig_w_TERM,
sig_w_PIPE;
GQueue job_queue;
ev_async job_watcher;
liPlugins plugins;
liLog log;
@ -63,6 +60,8 @@ struct liServer {
LI_API liServer* li_server_new(const gchar *module_dir);
LI_API void li_server_free(liServer* srv);
LI_API void li_server_stop(liServer *srv);
LI_API liInstance* li_server_new_instance(liServer *srv, liInstanceConf *ic);
LI_API void li_instance_replace(liInstance *oldi, liInstance *newi);
LI_API void li_instance_set_state(liInstance *i, liInstanceState s);
@ -74,6 +73,4 @@ LI_API void li_instance_conf_acquire(liInstanceConf *ic);
LI_API void li_instance_release(liInstance *i);
LI_API void li_instance_acquire(liInstance *i);
LI_API void li_instance_job_append(liInstance *i);
#endif

4
src/angel/angel_plugin.c

@ -146,7 +146,7 @@ gboolean plugins_config_load(liServer *srv, const gchar *filename) {
return FALSE;
}
if (!li_angel_config_parse_file(srv, filename, &error)) {
if (filename && !li_angel_config_parse_file(srv, filename, &error)) {
ERROR(srv, "failed to parse config file: %s", error->message);
g_error_free(error);
plugins_config_clean(srv);
@ -195,7 +195,7 @@ gboolean plugins_config_load(liServer *srv, const gchar *filename) {
if (!ps->config_filename) {
ps->config_filename = g_string_new(filename);
} else {
g_string_assign(ps->config_filename, filename);
g_string_assign(ps->config_filename, filename ? filename : "");
}
return TRUE;

4
src/angel/angel_plugin_core.c

@ -266,7 +266,7 @@ static void core_activate(liServer *srv, liPlugin *p) {
}
if (config->inst) {
li_instance_set_state(config->inst, LI_INSTANCE_DOWN);
li_instance_set_state(config->inst, LI_INSTANCE_FINISHED);
li_instance_release(config->inst);
config->inst = NULL;
}
@ -276,7 +276,7 @@ static void core_activate(liServer *srv, liPlugin *p) {
if (config->instconf) {
config->inst = li_server_new_instance(srv, config->instconf);
li_instance_set_state(config->inst, LI_INSTANCE_ACTIVE);
li_instance_set_state(config->inst, LI_INSTANCE_RUNNING);
ERROR(srv, "%s", "Starting instance");
}
}

233
src/angel/angel_server.c

@ -1,23 +1,30 @@
#include <lighttpd/angel_base.h>
static void instance_state_machine(liInstance *i);
static void instance_state_reached(liInstance *i, liInstanceState s);
static void jobqueue_callback(struct ev_loop *loop, ev_async *w, int revents) {
#define CATCH_SIGNAL(loop, cb, n) do { \
ev_init(&srv->sig_w_##n, cb); \
ev_signal_set(&srv->sig_w_##n, SIG##n); \
ev_signal_start(loop, &srv->sig_w_##n); \
srv->sig_w_##n.data = srv; \
/* Signal watchers shouldn't keep loop alive */ \
ev_unref(loop); \
} while (0)
#define UNCATCH_SIGNAL(loop, n) li_ev_safe_ref_and_stop(ev_signal_stop, loop, &srv->sig_w_##n)
static void sigint_cb(struct ev_loop *loop, struct ev_signal *w, int revents) {
liServer *srv = (liServer*) w->data;
liInstance *i;
GQueue todo;
UNUSED(loop);
UNUSED(revents);
todo = srv->job_queue;
g_queue_init(&srv->job_queue);
li_server_stop(srv);
}
while (NULL != (i = g_queue_pop_head(&todo))) {
i->in_jobqueue = FALSE;
instance_state_machine(i);
li_instance_release(i);
}
static void sigpipe_cb(struct ev_loop *loop, struct ev_signal *w, int revents) {
/* ignore */
UNUSED(loop); UNUSED(w); UNUSED(revents);
}
liServer* li_server_new(const gchar *module_dir) {
@ -25,11 +32,9 @@ liServer* li_server_new(const gchar *module_dir) {
srv->loop = ev_default_loop(0);
/* TODO: handle signals */
ev_async_init(&srv->job_watcher, jobqueue_callback);
ev_async_start(srv->loop, &srv->job_watcher);
ev_unref(srv->loop);
srv->job_watcher.data = srv;
CATCH_SIGNAL(srv->loop, sigint_cb, INT);
CATCH_SIGNAL(srv->loop, sigint_cb, TERM);
CATCH_SIGNAL(srv->loop, sigpipe_cb, PIPE);
log_init(srv);
plugins_init(srv, module_dir);
@ -40,9 +45,21 @@ void li_server_free(liServer* srv) {
plugins_clear(srv);
log_clean(srv);
UNCATCH_SIGNAL(srv->loop, INT);
UNCATCH_SIGNAL(srv->loop, TERM);
UNCATCH_SIGNAL(srv->loop, PIPE);
g_slice_free(liServer, srv);
}
void li_server_stop(liServer *srv) {
UNCATCH_SIGNAL(srv->loop, INT);
UNCATCH_SIGNAL(srv->loop, TERM);
plugins_config_load(srv, NULL);
}
static void instance_angel_call_cb(liAngelConnection *acon,
const gchar *mod, gsize mod_len, const gchar *action, gsize action_len,
gint32 id,
@ -94,20 +111,21 @@ static void instance_angel_close_cb(liAngelConnection *acon, GError *err) {
static void instance_child_cb(struct ev_loop *loop, ev_child *w, int revents) {
liInstance *i = (liInstance*) w->data;
liInstanceState news;
if (i->s_cur == LI_INSTANCE_LOADING) {
if (i->s_cur == LI_INSTANCE_DOWN) {
ERROR(i->srv, "spawning child %i failed, not restarting", i->proc->child_pid);
i->s_dest = i->s_cur = LI_INSTANCE_DOWN; /* TODO: retry spawn later? */
news = i->s_dest = LI_INSTANCE_FINISHED; /* TODO: retry spawn later? */
} else {
ERROR(i->srv, "child %i died", i->proc->child_pid);
i->s_cur = LI_INSTANCE_DOWN;
news = LI_INSTANCE_DOWN;
}
li_proc_free(i->proc);
i->proc = NULL;
li_angel_connection_free(i->acon);
i->acon = NULL;
ev_child_stop(loop, w);
li_instance_job_append(i);
instance_state_reached(i, news);
li_instance_release(i);
}
@ -139,7 +157,7 @@ static void instance_spawn(liInstance *i) {
close(confd[1]);
ev_child_set(&i->child_watcher, i->proc->child_pid, 0);
ev_child_start(i->srv->loop, &i->child_watcher);
i->s_cur = LI_INSTANCE_LOADING;
i->s_cur = LI_INSTANCE_DOWN;
li_instance_acquire(i);
DEBUG(i->srv, "Instance (%i) spawned: %s", i->proc->child_pid, i->ic->cmd[0]);
}
@ -166,75 +184,110 @@ void li_instance_set_state(liInstance *i, liInstanceState s) {
if (i->s_dest == s) return;
switch (s) {
case LI_INSTANCE_DOWN:
break;
case LI_INSTANCE_LOADING:
case LI_INSTANCE_WARMUP:
return; /* These cannot be set */
case LI_INSTANCE_ACTIVE:
case LI_INSTANCE_SUSPEND:
case LI_INSTANCE_SUSPENDING:
ERROR(i->srv, "Invalid destination state %i", s);
return; /* cannot set this */
case LI_INSTANCE_SILENT:
case LI_INSTANCE_SUSPENDED:
case LI_INSTANCE_RUNNING:
case LI_INSTANCE_FINISHED:
break;
}
i->s_dest = s;
if (s == LI_INSTANCE_DOWN) {
if (i->s_cur != LI_INSTANCE_DOWN) {
kill(i->proc->child_pid, SIGTERM);
}
if (!i->proc && LI_INSTANCE_FINISHED != s) {
instance_spawn(i);
} else {
if (!i->proc) {
instance_spawn(i);
return;
} else {
GError *error = NULL;
GString *buf = g_string_sized_new(0);
switch (s) {
case LI_INSTANCE_DOWN:
case LI_INSTANCE_LOADING:
case LI_INSTANCE_WARMUP:
break;
case LI_INSTANCE_ACTIVE:
li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("run"), buf, &error);
break;
case LI_INSTANCE_SUSPEND:
li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("suspend"), buf, &error);
break;
GError *error = NULL;
switch (s) {
case LI_INSTANCE_DOWN:
case LI_INSTANCE_SUSPENDING:
break; /* cannot be set */
case LI_INSTANCE_SILENT:
li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("run-silent"), NULL, &error);
break;
case LI_INSTANCE_SUSPENDED:
li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("suspend"), NULL, &error);
break;
case LI_INSTANCE_RUNNING:
li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("run"), NULL, &error);
break;
case LI_INSTANCE_FINISHED:
if (i->proc) {
kill(i->proc->child_pid, SIGTERM);
} else {
instance_state_reached(i, LI_INSTANCE_FINISHED);
}
break;
}
if (error) {
GERROR(i->srv, error, "set state %i failed, killing instance:", s);
g_error_free(error);
if (i->proc) {
kill(i->proc->child_pid, SIGTERM);
} else {
instance_state_reached(i, LI_INSTANCE_FINISHED);
}
}
}
}
static void instance_state_machine(liInstance *i) {
liInstanceState olds = i->s_dest;
while (i->s_cur != i->s_dest && i->s_cur != olds) {
olds = i->s_cur;
static void instance_state_reached(liInstance *i, liInstanceState s) {
GError *error = NULL;
i->s_cur = s;
switch (s) {
case LI_INSTANCE_DOWN:
/* last child died */
if (i->s_dest == LI_INSTANCE_FINISHED) {
i->s_cur = LI_INSTANCE_FINISHED;
} else {
instance_spawn(i);
}
break;
case LI_INSTANCE_SUSPENDED:
switch (i->s_dest) {
case LI_INSTANCE_DOWN:
if (!i->proc) {
i->s_cur = LI_INSTANCE_DOWN;
break;
}
kill(i->proc->child_pid, SIGINT);
return;
case LI_INSTANCE_LOADING:
break;
case LI_INSTANCE_WARMUP:
if (!i->proc) {
instance_spawn(i);
return;
}
break; /* impossible */
case LI_INSTANCE_SUSPENDED:
break;
case LI_INSTANCE_ACTIVE:
if (!i->proc) {
instance_spawn(i);
return;
}
case LI_INSTANCE_SILENT:
/* make sure we move to SILENT after we spawned the instance */
li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("run-silent"), NULL, &error);
break;
case LI_INSTANCE_SUSPEND:
if (!i->proc) {
instance_spawn(i);
return;
}
case LI_INSTANCE_RUNNING:
/* make sure we move to RUNNING after we spawned the instance */
li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("run"), NULL, &error);
break;
case LI_INSTANCE_SUSPENDING:
case LI_INSTANCE_FINISHED:
break; /* nothing to do, instance should already know what to do */
}
break;
case LI_INSTANCE_SILENT:
/* TODO: replace another instance? */
break;
case LI_INSTANCE_RUNNING:
/* nothing to do, instance should already know what to do */
break;
case LI_INSTANCE_SUSPENDING:
/* nothing to do, instance should already know what to do */
break;
case LI_INSTANCE_FINISHED:
if (i->s_dest != LI_INSTANCE_FINISHED) {
/* TODO: replacing another instance failed? */
}
break;
}
if (error) {
GERROR(i->srv, error, "reaching state %i failed, killing instance:", s);
g_error_free(error);
if (i->proc) {
kill(i->proc->child_pid, SIGTERM);
} else {
instance_state_reached(i, LI_INSTANCE_FINISHED);
}
}
}
@ -243,18 +296,14 @@ void li_instance_release(liInstance *i) {
liServer *srv;
liInstance *t;
if (!i) return;
assert(g_atomic_int_get(&i->refcount) > 0);
if (!g_atomic_int_dec_and_test(&i->refcount)) return;
srv = i->srv;
if (i->proc) {
ev_child_stop(srv->loop, &i->child_watcher);
kill(i->proc->child_pid, SIGTERM);
li_proc_free(i->proc);
i->proc = NULL;
i->s_cur = LI_INSTANCE_DOWN;
li_angel_connection_free(i->acon);
i->acon = NULL;
}
g_assert(g_atomic_int_get(&i->refcount) > 0);
if (!g_atomic_int_dec_and_test(&i->refcount)) return;
g_assert(!i->proc);
DEBUG(srv, "%s", "instance released");
li_instance_conf_release(i->ic);
i->ic = NULL;
@ -297,13 +346,3 @@ void li_instance_conf_acquire(liInstanceConf *ic) {
assert(g_atomic_int_get(&ic->refcount) > 0);
g_atomic_int_inc(&ic->refcount);
}
void li_instance_job_append(liInstance *i) {
liServer *srv = i->srv;
if (!i->in_jobqueue) {
li_instance_acquire(i);
i->in_jobqueue = TRUE;
g_queue_push_tail(&srv->job_queue, i);
ev_async_send(srv->loop, &srv->job_watcher);
}
}

Loading…
Cancel
Save