From f8be820c364df06010b0fa702cdc7a94800c0d23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Tue, 7 Jul 2009 22:40:44 +0200 Subject: [PATCH] Big lighttpd-angel update; still not complete, but supports fork+exec+setuid/gid+listen (no checks in listen yet) --- include/lighttpd/angel.h | 5 +- include/lighttpd/angel_base.h | 3 + include/lighttpd/angel_connection.h | 20 +- include/lighttpd/angel_plugin.h | 9 +- include/lighttpd/angel_plugin_core.h | 10 + include/lighttpd/angel_server.h | 46 +++- include/lighttpd/base.h | 3 + include/lighttpd/server.h | 3 + src/CMakeLists.txt | 4 +- src/angel.c | 75 ++++++- src/angel_connection.c | 252 ++++++++++++++++------ src/angel_log.c | 3 + src/angel_main.c | 25 ++- src/angel_plugin.c | 37 +++- src/angel_plugin_core.c | 300 +++++++++++++++++++++++++- src/angel_server.c | 305 ++++++++++++++++++++++++++- src/lighttpd.c | 10 +- src/log.c | 3 +- src/plugin_core.c | 8 +- src/server.c | 16 +- 20 files changed, 1020 insertions(+), 117 deletions(-) diff --git a/include/lighttpd/angel.h b/include/lighttpd/angel.h index 2b16edd..a52f27d 100644 --- a/include/lighttpd/angel.h +++ b/include/lighttpd/angel.h @@ -2,12 +2,13 @@ #define _LIGHTTPD_ANGEL_H_ /* interface to the angel; implementation needs to work without angel too */ +LI_API void angel_setup(server *srv); /* listen to a socket */ -LI_API int angel_listen(server *srv, GString *str); +LI_API void angel_listen(server *srv, GString *str); /* send log messages during startup to angel, frees the string */ -LI_API gboolean angel_log(server *srv, GString *str); +LI_API void angel_log(server *srv, GString *str); /* angle_fake definitions, only for internal use */ diff --git a/include/lighttpd/angel_base.h b/include/lighttpd/angel_base.h index d837624..db0b6c9 100644 --- a/include/lighttpd/angel_base.h +++ b/include/lighttpd/angel_base.h @@ -17,6 +17,9 @@ typedef struct server server; struct instance; typedef struct instance instance; +struct instance_conf; +typedef struct instance_conf instance_conf; + #include #include diff --git a/include/lighttpd/angel_connection.h b/include/lighttpd/angel_connection.h index 85d8eb2..2225cca 100644 --- a/include/lighttpd/angel_connection.h +++ b/include/lighttpd/angel_connection.h @@ -11,24 +11,20 @@ typedef struct angel_connection angel_connection; struct angel_call; typedef struct angel_call angel_call; -typedef void (*AngelCallback)(gpointer ctx, gboolean timeout, GString *error, GString *data, GArray *fds); +/* error, data and fds-array will be freed/closed by the angel api itself; if you want to use the fds set the array size to 0 */ +typedef void (*AngelCallback)(angel_call *acall, gpointer ctx, gboolean timeout, GString *error, GString *data, GArray *fds); typedef void (*AngelReceiveCall)(angel_connection *acon, const gchar *mod, gsize mod_len, const gchar *action, gsize action_len, gint32 id, GString *data); -typedef void (*AngelReceiveResult)(angel_connection *acon, - const gchar *mod, gsize mod_len, const gchar *action, gsize action_len, - gint32 id, - GString *error, GString *data, GArray *fds); - /* gets called after read/write errors */ typedef void (*AngelCloseCallback)(angel_connection *acon, GError *err); struct angel_connection { gpointer data; - GStaticMutex mutex; /* angel itself has no threads */ + GMutex *mutex; struct ev_loop *loop; int fd; idlist *call_id_list; @@ -39,7 +35,6 @@ struct angel_connection { angel_buffer in; AngelReceiveCall recv_call; - AngelReceiveResult recv_result; AngelCloseCallback close_cb; /* parse input */ @@ -84,11 +79,13 @@ typedef enum { } AngelConnectionError; /* create connection */ -LI_API angel_connection* angel_connection_create( +LI_API angel_connection* angel_connection_new( struct ev_loop *loop, int fd, gpointer data, - AngelReceiveCall recv_call, AngelReceiveResult recv_result, AngelCloseCallback close_cb); + AngelReceiveCall recv_call, AngelCloseCallback close_cb); +LI_API void angel_connection_free(angel_connection *acon); -LI_API angel_call *angel_call_create(AngelCallback callback, ev_tstamp timeout); + +LI_API angel_call *angel_call_new(AngelCallback callback, ev_tstamp timeout); /* returns TRUE if a call was cancelled; make sure you don't call free while you're calling send_call */ LI_API gboolean angel_call_free(angel_call *call); @@ -109,7 +106,6 @@ LI_API gboolean angel_send_call( LI_API gboolean angel_send_result( angel_connection *acon, - const gchar *mod, gsize mod_len, const gchar *action, gsize action_len, gint32 id, GString *error, GString *data, GArray *fds, GError **err); diff --git a/include/lighttpd/angel_plugin.h b/include/lighttpd/angel_plugin.h index 428b82e..5892857 100644 --- a/include/lighttpd/angel_plugin.h +++ b/include/lighttpd/angel_plugin.h @@ -18,6 +18,8 @@ typedef gboolean (*PluginCheckConfig) (server *srv, plugin *p); typedef void (*PluginActivateConfig)(server *srv, plugin *p); typedef void (*PluginParseItem) (server *srv, plugin *p, value **options); +typedef void (*PluginHandleCall) (server *srv, instance *i, plugin *p, gint32 id, GString *data); + typedef enum { PLUGIN_ITEM_OPTION_MANDATORY = 1 } plugin_item_option_flags; @@ -32,7 +34,7 @@ struct plugin_item { const gchar *name; PluginParseItem handle_parse_item; - const plugin_item_option options[]; + const plugin_item_option *options; }; struct plugin { @@ -42,8 +44,8 @@ struct plugin { gpointer data; /**< private plugin data */ const plugin_item *items; + GHashTable *angel_callbacks; /**< map (const gchar*) -> PluginHandleCall */ - PluginInit plugin_init_marker; /**< identify plugin; PluginInit must be unique per plugin */ PluginFree handle_free; /**< called before plugin is unloaded */ PluginCleanConfig handle_clean_config; /**< called before the reloading of the config is started or after the reloading failed */ @@ -59,6 +61,7 @@ struct Plugins { struct modules *modules; GHashTable *module_refs, *load_module_refs; /** gchar* -> server_module */ + GHashTable *ht_plugins, *load_ht_plugins; GPtrArray *plugins, *load_plugins; /* plugin* */ }; @@ -69,7 +72,7 @@ void plugins_clear(server *srv); void plugins_config_clean(server *srv); gboolean plugins_config_load(server *srv, const gchar *filename); -gboolean plugins_handle_item(server *srv, GString *itemname, value *hash); +void plugins_handle_item(server *srv, GString *itemname, value *hash); /* "core" is a reserved module name for interal use */ gboolean plugins_load_module(server *srv, const gchar *name); diff --git a/include/lighttpd/angel_plugin_core.h b/include/lighttpd/angel_plugin_core.h index 0cc540e..666a313 100644 --- a/include/lighttpd/angel_plugin_core.h +++ b/include/lighttpd/angel_plugin_core.h @@ -3,6 +3,16 @@ #include +typedef struct { + /* Load */ + instance_conf *load_instconf; + gboolean load_failed; + + /* Running */ + instance_conf *instconf; + instance *inst; +} plugin_core_config_t; + gboolean plugin_core_init(server *srv); #endif diff --git a/include/lighttpd/angel_server.h b/include/lighttpd/angel_server.h index bb2255a..91f2242 100644 --- a/include/lighttpd/angel_server.h +++ b/include/lighttpd/angel_server.h @@ -9,10 +9,38 @@ #define LIGHTTPD_ANGEL_MAGIC ((guint)0x3e14ac65) #endif +typedef enum { + INSTANCE_DOWN, /* not running */ + INSTANCE_LOADING, /* startup */ + INSTANCE_WARMUP, /* running, but logging to files disabled */ + INSTANCE_ACTIVE, /* everything running */ + INSTANCE_SUSPEND /* handle remaining connections, suspend logs+accept() */ +} instance_state_t; + +struct instance_conf { + gint refcount; + + gchar **cmd; + GString *username; + uid_t uid; + gid_t gid; +}; + struct instance { + gint refcount; + + server *srv; + instance_conf *ic; + pid_t pid; + ev_child child_watcher; + + instance_state_t s_cur, s_dest; - angel_connection *con; + instance *replace, *replace_by; + + angel_connection *acon; + gboolean in_jobqueue; }; struct server { @@ -24,6 +52,9 @@ struct server { sig_w_TERM, sig_w_PIPE; + GQueue job_queue; + ev_async job_watcher; + Plugins plugins; log_t log; @@ -32,4 +63,17 @@ struct server { LI_API server* server_new(const gchar *module_dir); LI_API void server_free(server* srv); +LI_API instance* server_new_instance(server *srv, instance_conf *ic); +LI_API void instance_replace(instance *oldi, instance *newi); +LI_API void instance_set_state(instance *i, instance_state_t s); + +LI_API instance_conf* instance_conf_new(gchar **cmd, GString *username, uid_t uid, gid_t gid); +LI_API void instance_conf_release(instance_conf *ic); +LI_API void instance_conf_acquire(instance_conf *ic); + +LI_API void instance_release(instance *i); +LI_API void instance_acquire(instance *i); + +LI_API void instance_job_append(instance *i); + #endif diff --git a/include/lighttpd/base.h b/include/lighttpd/base.h index c303dee..bde73b6 100644 --- a/include/lighttpd/base.h +++ b/include/lighttpd/base.h @@ -27,6 +27,9 @@ #include #include +#include +#include + #include #include diff --git a/include/lighttpd/server.h b/include/lighttpd/server.h index 547ed7d..b2033a6 100644 --- a/include/lighttpd/server.h +++ b/include/lighttpd/server.h @@ -22,12 +22,14 @@ struct server_socket { struct server { guint32 magic; /** server magic version, check against LIGHTTPD_SERVER_MAGIC in plugins */ server_state state; /** atomic access */ + angel_connection *acon; struct worker *main_worker; guint worker_count; GArray *workers; GArray *ts_formats; /** array of (GString*), add with server_ts_format_add() */ + struct ev_loop *loop; guint loop_flags; ev_signal sig_w_INT, @@ -85,6 +87,7 @@ struct server { LI_API server* server_new(const gchar *module_dir); LI_API void server_free(server* srv); LI_API gboolean server_loop_init(server *srv); +LI_API gboolean server_worker_init(server *srv); LI_API void server_listen(server *srv, int fd); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b52e039..2bcf106 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -377,8 +377,8 @@ ADD_EXECUTABLE(lighttpd-angel utils.c ) -ADD_TARGET_PROPERTIES(lighttpd-angel LINK_FLAGS "${LUA_LDFLAGS} ${EV_LDFLAGS} ${GMODULE_LDFLAGS} ${WARN_FLAGS}") -ADD_TARGET_PROPERTIES(lighttpd-angel COMPILE_FLAGS "${LUA_CFLAGS} ${EV_CFLAGS} ${GMODULE_CFLAGS} ${WARN_FLAGS}") +ADD_TARGET_PROPERTIES(lighttpd-angel LINK_FLAGS "${LUA_LDFLAGS} ${EV_LDFLAGS} ${GTHREAD_LDFLAGS} ${GMODULE_LDFLAGS} ${WARN_FLAGS}") +ADD_TARGET_PROPERTIES(lighttpd-angel COMPILE_FLAGS "${LUA_CFLAGS} ${EV_CFLAGS} ${GTHREAD_CFLAGS} ${GMODULE_CFLAGS} ${WARN_FLAGS}") IF(HAVE_PCRE_H) TARGET_LINK_LIBRARIES(lighttpd ${PCRE_LIBRARY}) diff --git a/src/angel.c b/src/angel.c index f9fca1c..5a2d302 100644 --- a/src/angel.c +++ b/src/angel.c @@ -2,12 +2,79 @@ #include #include +static void angel_call_cb(angel_connection *acon, + const gchar *mod, gsize mod_len, const gchar *action, gsize action_len, + gint32 id, GString *data) { + server *srv = acon->data; + ERROR(srv, "received message for %s:%s, not implemented yet", mod, action); + if (-1 != id) angel_send_result(acon, id, g_string_new_len(CONST_STR_LEN("not implemented yet")), NULL, NULL, NULL); +} + +static void angel_close_cb(angel_connection *acon, GError *err) { + server *srv = acon->data; + ERROR(srv, "fatal: angel connection close: %s", err ? err->message : g_strerror(errno)); + if (err) g_error_free(err); + exit(1); +} + +void angel_setup(server *srv) { + srv->acon = angel_connection_new(srv->loop, 0, srv, angel_call_cb, angel_close_cb); +} + +static void angel_listen_cb(angel_call *acall, gpointer ctx, gboolean timeout, GString *error, GString *data, GArray *fds) { + server *srv = ctx; + guint i; + UNUSED(data); + + angel_call_free(acall); + + ERROR(srv, "%s", "listen_cb"); + + if (timeout) { + ERROR(srv, "listen failed: %s", "time out"); + return; + } + + if (error->len > 0) { + ERROR(srv, "listen failed: %s", error->str); + /* TODO: exit? */ + return; + } + + if (fds && fds->len > 0) { + for (i = 0; i < fds->len; i++) { + INFO(srv, "listening on fd %i", g_array_index(fds, int, i)); + server_listen(srv, g_array_index(fds, int, i)); + } + g_array_set_size(fds, 0); + } else { + ERROR(srv, "listen failed: %s", "received no filedescriptors"); + } +} + /* listen to a socket */ -int angel_listen(server *srv, GString *str) { - return angel_fake_listen(srv, str); +void angel_listen(server *srv, GString *str) { + if (srv->acon) { + angel_call *acall = angel_call_new(angel_listen_cb, 3.0); + GError *err = NULL; + + acall->context = srv; + if (!angel_send_call(srv->acon, CONST_STR_LEN("core"), CONST_STR_LEN("listen"), acall, g_string_new_len(GSTR_LEN(str)), &err)) { + ERROR(srv, "couldn't send call: %s", err->message); + g_error_free(err); + } + } else { + int fd = angel_fake_listen(srv, str); + if (-1 == fd) { + ERROR(srv, "listen('%s') failed", str->str); + /* TODO: exit? */ + } else { + server_listen(srv, fd); + } + } } /* send log messages while startup to angel */ -gboolean angel_log(server *srv, GString *str) { - return angel_fake_log(srv, str); +void angel_log(server *srv, GString *str) { + angel_fake_log(srv, str); } diff --git a/src/angel_connection.c b/src/angel_connection.c index 6029bbd..08b6dac 100644 --- a/src/angel_connection.c +++ b/src/angel_connection.c @@ -5,6 +5,8 @@ #define ANGEL_MAGIC ((gint32) 0x8a930a9f) +static void close_fd_array(GArray *fds); + typedef enum { ANGEL_CALL_SEND_SIMPLE = 1, ANGEL_CALL_SEND_CALL = 2, @@ -56,6 +58,7 @@ static void send_queue_item_free(angel_connection_send_item_t *i) { g_string_free(i->value.string.buf, TRUE); break; case ANGEL_CONNECTION_ITEM_FDS: + close_fd_array(i->value.fds.fds); g_array_free(i->value.fds.fds, TRUE); break; } @@ -72,6 +75,7 @@ static void send_queue_clean(GQueue *queue) { break; case ANGEL_CONNECTION_ITEM_FDS: if (i->value.fds.pos < i->value.fds.fds->len) return; + close_fd_array(i->value.fds.fds); g_array_free(i->value.fds.fds, TRUE); break; } @@ -100,7 +104,7 @@ static gboolean angel_fill_buffer(angel_connection *acon, guint need, GError **e old_len = acon->in.data->len; g_string_set_size(acon->in.data, need); for ( ; want > 0; ) { - r = read(acon->fd, acon->in.data + old_len, want); + r = read(acon->fd, acon->in.data->str + old_len, want); if (r < 0) { switch (errno) { case EINTR: @@ -131,16 +135,63 @@ static gboolean angel_fill_buffer(angel_connection *acon, guint need, GError **e return TRUE; } +static void close_fd_array(GArray *fds) { + guint i; + for (i = 0; i < fds->len; i++) { + close(g_array_index(fds, int, i)); + } + g_array_set_size(fds, 0); +} + static gboolean angel_dispatch(angel_connection *acon, GError **err) { gint32 id = acon->parse.id, type = acon->parse.type; angel_call *call = NULL; + AngelCallback cb = NULL; + gpointer ctx; - if (type != ANGEL_CALL_SEND_SIMPLE) { - g_static_mutex_lock(&acon->mutex); + switch (type) { + case ANGEL_CALL_SEND_SIMPLE: + if (-1 != id) { + g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA, + "Invalid id: %i, should be -1 for simple call", (gint) id); + close_fd_array(acon->parse.fds); + return FALSE; + } + + if (acon->parse.error->len > 0 || acon->parse.fds->len > 0) { + g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA, + "Wrong data in call"); + close_fd_array(acon->parse.fds); + return FALSE; + } + acon->recv_call(acon, GSTR_LEN(acon->parse.mod), GSTR_LEN(acon->parse.action), + id, acon->parse.data); + break; + case ANGEL_CALL_SEND_CALL: + if (-1 == id) { + g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA, + "Invalid id: -1, should be >= 0 for call"); + close_fd_array(acon->parse.fds); + return FALSE; + } + + if (acon->parse.error->len > 0 || acon->parse.fds->len > 0) { + g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA, + "Wrong data in call"); + close_fd_array(acon->parse.fds); + return FALSE; + } + acon->recv_call(acon, GSTR_LEN(acon->parse.mod), GSTR_LEN(acon->parse.action), + id, acon->parse.data); + break; + case ANGEL_CALL_SEND_RESULT: + g_printerr("received result: %i\n", id); + g_mutex_lock(acon->mutex); if (!idlist_is_used(acon->call_id_list, id)) { - g_static_mutex_unlock(&acon->mutex); + g_mutex_unlock(acon->mutex); g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA, "Invalid id: %i", (gint) id); + close_fd_array(acon->parse.fds); return FALSE; } idlist_put(acon->call_id_list, id); @@ -149,39 +200,23 @@ static gboolean angel_dispatch(angel_connection *acon, GError **err) { g_ptr_array_index(acon->call_table, id) = NULL; if (call) { ev_timer_stop(acon->loop, &call->timeout_watcher); - if (!call->callback) { + ctx = call->context; + if (NULL == (cb = call->callback)) { g_slice_free(angel_call, call); - call = NULL; } } } - - g_static_mutex_unlock(&acon->mutex); - } else if (-1 != id) { - g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA, - "Invalid id: %i, should be -1 for simple call", (gint) id); - return FALSE; - } + g_mutex_unlock(acon->mutex); - switch (type) { - case ANGEL_CALL_SEND_SIMPLE: - if (acon->parse.error->len > 0 || acon->parse.fds->len > 0) { - g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA, - "Wrong data in call"); - return FALSE; - } - acon->recv_call(acon, GSTR_LEN(acon->parse.mod), GSTR_LEN(acon->parse.action), - id, acon->parse.data); - break; - case ANGEL_CALL_SEND_RESULT: - if (call) { - acon->recv_result(acon, GSTR_LEN(acon->parse.mod), GSTR_LEN(acon->parse.action), - id, acon->parse.error, acon->parse.data, acon->parse.fds); + if (cb) { + cb(call, ctx, FALSE, acon->parse.error, acon->parse.data, acon->parse.fds); } + close_fd_array(acon->parse.fds); break; default: g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA, "Invalid type: %i", (gint) type); + close_fd_array(acon->parse.fds); return FALSE; } @@ -228,10 +263,12 @@ static gboolean angel_connection_read(angel_connection *acon, GError **err) { "receive fd error: %s", g_strerror(errno)); return FALSE; case -2: + g_printerr("waiting for fds: %i\n", acon->parse.missing_fds); return TRUE; /* need more data */ } } + acon->parse.have_header = FALSE; if (!angel_data_read_mem(&acon->in, &acon->parse.mod, acon->parse.mod_len, err)) return FALSE; if (!angel_data_read_mem(&acon->in, &acon->parse.action, acon->parse.action_len, err)) return FALSE; if (!angel_data_read_mem(&acon->in, &acon->parse.error, acon->parse.error_len, err)) return FALSE; @@ -256,9 +293,9 @@ static void angel_connection_io_cb(struct ev_loop *loop, ev_io *w, int revents) gboolean out_queue_empty; angel_connection_send_item_t *send_item; - g_static_mutex_lock(&acon->mutex); + g_mutex_lock(acon->mutex); send_item = g_queue_peek_head(acon->out); - g_static_mutex_unlock(&acon->mutex); + g_mutex_unlock(acon->mutex); for (i = 0; send_item && (i < 10); i++) { /* don't send more than 10 chunks */ switch (send_item->type) { @@ -295,7 +332,9 @@ static void angel_connection_io_cb(struct ev_loop *loop, ev_io *w, int revents) case ANGEL_CONNECTION_ITEM_FDS: while (send_item->value.fds.pos < send_item->value.fds.fds->len) { switch (send_fd(w->fd, g_array_index(send_item->value.fds.fds, int, send_item->value.fds.pos))) { - case 0: continue; + case 0: + send_item->value.fds.pos++; + continue; case -1: /* Fatal error, connection has to be closed */ ev_async_stop(loop, &acon->out_notify_watcher); ev_io_stop(loop, &acon->fd_watcher); @@ -303,24 +342,23 @@ static void angel_connection_io_cb(struct ev_loop *loop, ev_io *w, int revents) return; case -2: goto write_eagain; } - send_item->value.fds.pos++; } break; } send_queue_item_free(send_item); - g_static_mutex_lock(&acon->mutex); + g_mutex_lock(acon->mutex); g_queue_pop_head(acon->out); send_item = g_queue_peek_head(acon->out); - g_static_mutex_unlock(&acon->mutex); + g_mutex_unlock(acon->mutex); } write_eagain: - g_static_mutex_lock(&acon->mutex); + g_mutex_lock(acon->mutex); send_queue_clean(acon->out); out_queue_empty = (0 == acon->out->length); - g_static_mutex_unlock(&acon->mutex); + g_mutex_unlock(acon->mutex); if (out_queue_empty) ev_io_rem_events(loop, w, EV_WRITE); } @@ -342,30 +380,82 @@ static void angel_connection_out_notify_cb(struct ev_loop *loop, ev_async *w, in } /* create connection */ -angel_connection* angel_connection_create(struct ev_loop *loop, int fd, gpointer data, - AngelReceiveCall recv_call, AngelReceiveResult recv_result, AngelCloseCallback close_cb) { +angel_connection* angel_connection_new(struct ev_loop *loop, int fd, gpointer data, + AngelReceiveCall recv_call, AngelCloseCallback close_cb) { angel_connection *acon = g_slice_new0(angel_connection); acon->data = data; - g_static_mutex_init(&acon->mutex); + acon->mutex = g_mutex_new(); acon->loop = loop; acon->fd = fd; acon->call_id_list = idlist_new(65535); + acon->call_table = g_ptr_array_new(); ev_io_init(&acon->fd_watcher, angel_connection_io_cb, fd, EV_READ); + ev_io_start(acon->loop, &acon->fd_watcher); acon->fd_watcher.data = acon; ev_async_init(&acon->out_notify_watcher, angel_connection_out_notify_cb); + ev_async_start(acon->loop, &acon->out_notify_watcher); acon->out_notify_watcher.data = acon; acon->out = g_queue_new(); - acon->in.data = g_string_sized_new(0); + acon->in.data = g_string_sized_new(1024); acon->in.pos = 0; + acon->parse.mod = g_string_sized_new(0); + acon->parse.action = g_string_sized_new(0); + acon->parse.error = g_string_sized_new(0); + acon->parse.data = g_string_sized_new(0); + acon->parse.fds = g_array_new(FALSE, FALSE, sizeof(int)); + acon->recv_call = recv_call; - acon->recv_result = recv_result; acon->close_cb = close_cb; return acon; } +void angel_connection_free(angel_connection *acon) { + angel_connection_send_item_t *send_item; + guint i; + + g_printerr("angel_connection_free\n"); + + if (!acon) return; + + close(acon->fd); + acon->fd = -1; + + for (i = 0; i < acon->call_table->len; i++) { + angel_call *acall = g_ptr_array_index(acon->call_table, i); + AngelCallback cb; + if (!acall) continue; + g_ptr_array_index(acon->call_table, i) = NULL; + + cb = acall->callback; + ev_timer_stop(acon->loop, &acall->timeout_watcher); + if (cb) { + cb(acall, acall->context, TRUE, NULL, NULL, NULL); + } else { + g_slice_free(angel_call, acall); + } + } + g_ptr_array_free(acon->call_table, TRUE); + + g_mutex_free(acon->mutex); + acon->mutex = NULL; + + ev_io_stop(acon->loop, &acon->fd_watcher); + ev_async_stop(acon->loop, &acon->out_notify_watcher); + + idlist_free(acon->call_id_list); + while (NULL != (send_item = g_queue_pop_head(acon->out))) { + send_queue_item_free(send_item); + } + g_queue_free(acon->out); + g_string_free(acon->in.data, TRUE); + /* TODO */ + + g_slice_free(angel_connection, acon); +} + static void angel_call_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { angel_call* call = (angel_call*) w->data; angel_connection *acon = call->acon; @@ -373,23 +463,23 @@ static void angel_call_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents gpointer ctx; UNUSED(loop); UNUSED(revents); - g_static_mutex_lock(&acon->mutex); + g_mutex_lock(acon->mutex); g_ptr_array_index(acon->call_table, call->id) = NULL; if (NULL == (cb = call->callback)) { g_slice_free(angel_call, call); } ctx = call->context; - g_static_mutex_unlock(&acon->mutex); + g_mutex_unlock(acon->mutex); - if (cb) cb(ctx, TRUE, NULL, NULL, NULL); + if (cb) cb(call, ctx, TRUE, NULL, NULL, NULL); } -angel_call *angel_call_create(AngelCallback callback, ev_tstamp timeout) { +angel_call *angel_call_new(AngelCallback callback, ev_tstamp timeout) { angel_call* call = g_slice_new0(angel_call); g_assert(NULL != callback); call->callback = callback; - ev_timer_init(&call->timeout_watcher, angel_call_timeout_cb, 0, timeout); + ev_timer_init(&call->timeout_watcher, angel_call_timeout_cb, timeout, 0); call->timeout_watcher.data = call; call->id = -1; @@ -402,14 +492,14 @@ gboolean angel_call_free(angel_call *call) { if (call->acon) { angel_connection *acon = call->acon; - g_static_mutex_lock(&acon->mutex); + g_mutex_lock(acon->mutex); if (-1 != call->id) { r = TRUE; call->callback = NULL; } else { g_slice_free(angel_call, call); } - g_static_mutex_unlock(&acon->mutex); + g_mutex_unlock(acon->mutex); } else { g_slice_free(angel_call, call); } @@ -425,17 +515,26 @@ static gboolean prepare_call_header(GString **pbuf, buf = g_string_sized_new(8*4 + mod_len + action_len); *pbuf = buf; + g_printerr("Prepare call with id: %i\n", id); + if (!angel_data_write_int32(buf, ANGEL_MAGIC, err)) return FALSE; if (!angel_data_write_int32(buf, type, err)) return FALSE; if (!angel_data_write_int32(buf, id, err)) return FALSE; - if (!angel_data_write_int32(buf, mod_len, err)) return FALSE; - if (!angel_data_write_int32(buf, action_len, err)) return FALSE; + if (type != ANGEL_CALL_SEND_RESULT) { + if (!angel_data_write_int32(buf, mod_len, err)) return FALSE; + if (!angel_data_write_int32(buf, action_len, err)) return FALSE; + } else { + if (!angel_data_write_int32(buf, 0, err)) return FALSE; + if (!angel_data_write_int32(buf, 0, err)) return FALSE; + } if (!angel_data_write_int32(buf, error_len, err)) return FALSE; if (!angel_data_write_int32(buf, data_len, err)) return FALSE; if (!angel_data_write_int32(buf, fd_count, err)) return FALSE; - g_string_append_len(buf, mod, mod_len); - g_string_append_len(buf, action, action_len); + if (type != ANGEL_CALL_SEND_RESULT) { + g_string_append_len(buf, mod, mod_len); + g_string_append_len(buf, action, action_len); + } return TRUE; } @@ -450,6 +549,11 @@ gboolean angel_send_simple_call( if (err && *err) goto error; + if (-1 == acon->fd) { + g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_CLOSED, "connection already closed"); + goto error; + } + if (data->len > ANGEL_CALL_MAX_STR_LEN) { g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_INVALID, "data too lang for angel call: %" G_GSIZE_FORMAT " > %i", data->len, ANGEL_CALL_MAX_STR_LEN); goto error; @@ -457,11 +561,11 @@ gboolean angel_send_simple_call( if (!prepare_call_header(&buf, ANGEL_CALL_SEND_SIMPLE, -1, mod, mod_len, action, action_len, 0, data->len, 0, err)) goto error; - g_static_mutex_lock(&acon->mutex); + g_mutex_lock(acon->mutex); queue_was_empty = (0 == acon->out->length); send_queue_push_string(acon->out, buf); send_queue_push_string(acon->out, data); - g_static_mutex_unlock(&acon->mutex); + g_mutex_unlock(acon->mutex); if (queue_was_empty) ev_async_send(acon->loop, &acon->out_notify_watcher); @@ -485,38 +589,45 @@ gboolean angel_send_call( if (err && *err) goto error; - g_static_mutex_lock(&acon->mutex); + if (-1 == acon->fd) { + g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_CLOSED, "connection already closed"); + goto error; + } + + g_mutex_lock(acon->mutex); if (-1 != call->id) { - g_static_mutex_unlock(&acon->mutex); + g_mutex_unlock(acon->mutex); g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_ALREADY_RUNNING, "call already running"); goto error_before_new_id; } if (-1 == (call->id = idlist_get(acon->call_id_list))) { - g_static_mutex_unlock(&acon->mutex); + g_mutex_unlock(acon->mutex); g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_OUT_OF_CALL_IDS, "out of call ids"); goto error; } + call->acon = acon; if ((guint) call->id >= acon->call_table->len) { g_ptr_array_set_size(acon->call_table, call->id + 1); } g_ptr_array_index(acon->call_table, call->id) = call; - g_static_mutex_unlock(&acon->mutex); - + g_mutex_unlock(acon->mutex); - if (data->len > ANGEL_CALL_MAX_STR_LEN) { + if (data && data->len > ANGEL_CALL_MAX_STR_LEN) { g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_INVALID, "data too lang for angel call: %" G_GSIZE_FORMAT " > %i", data->len, ANGEL_CALL_MAX_STR_LEN); goto error; } - if (!prepare_call_header(&buf, ANGEL_CALL_SEND_CALL, call->id, mod, mod_len, action, action_len, 0, data->len, 0, err)) goto error; + if (!prepare_call_header(&buf, ANGEL_CALL_SEND_CALL, call->id, mod, mod_len, action, action_len, 0, data ? data->len : 0, 0, err)) goto error; + + ev_timer_start(acon->loop, &call->timeout_watcher); - g_static_mutex_lock(&acon->mutex); + g_mutex_lock(acon->mutex); queue_was_empty = (0 == acon->out->length); send_queue_push_string(acon->out, buf); send_queue_push_string(acon->out, data); - g_static_mutex_unlock(&acon->mutex); + g_mutex_unlock(acon->mutex); if (queue_was_empty) ev_async_send(acon->loop, &acon->out_notify_watcher); @@ -527,6 +638,7 @@ error: if (-1 != call->id) { idlist_put(acon->call_id_list, call->id); call->id = -1; + call->acon = NULL; } error_before_new_id: if (data) g_string_free(data, TRUE); @@ -536,7 +648,6 @@ error_before_new_id: gboolean angel_send_result( angel_connection *acon, - const gchar *mod, gsize mod_len, const gchar *action, gsize action_len, gint32 id, GString *error, GString *data, GArray *fds, GError **err) { @@ -545,20 +656,25 @@ gboolean angel_send_result( if (err && *err) goto error; - if (data->len > ANGEL_CALL_MAX_STR_LEN) { + if (-1 == acon->fd) { + g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_CLOSED, "connection already closed"); + goto error; + } + + if (data && data->len > ANGEL_CALL_MAX_STR_LEN) { g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_INVALID, "data too lang for angel call: %" G_GSIZE_FORMAT " > %i", data->len, ANGEL_CALL_MAX_STR_LEN); goto error; } - if (!prepare_call_header(&buf, ANGEL_CALL_SEND_RESULT, id, mod, mod_len, action, action_len, error->len, data->len, fds->len, err)) goto error; + if (!prepare_call_header(&buf, ANGEL_CALL_SEND_RESULT, id, NULL, 0, NULL, 0, error ? error->len : 0, data ? data->len : 0, fds ? fds->len : 0, err)) goto error; - g_static_mutex_lock(&acon->mutex); + g_mutex_lock(acon->mutex); queue_was_empty = (0 == acon->out->length); send_queue_push_string(acon->out, buf); send_queue_push_string(acon->out, error); send_queue_push_string(acon->out, data); send_queue_push_fds(acon->out, fds); - g_static_mutex_unlock(&acon->mutex); + g_mutex_unlock(acon->mutex); if (queue_was_empty) ev_async_send(acon->loop, &acon->out_notify_watcher); @@ -568,6 +684,8 @@ gboolean angel_send_result( error: if (data) g_string_free(data, TRUE); if (buf) g_string_free(buf, TRUE); + if (error) g_string_free(error, TRUE); + if (fds) close_fd_array(fds); return FALSE; return FALSE; diff --git a/src/angel_log.c b/src/angel_log.c index 107a803..7165d6d 100644 --- a/src/angel_log.c +++ b/src/angel_log.c @@ -18,6 +18,9 @@ void log_init(server *srv) { srv->log.levels[LOG_LEVEL_ERROR] = TRUE; srv->log.levels[LOG_LEVEL_WARNING] = TRUE; + srv->log.levels[LOG_LEVEL_INFO] = TRUE; /* TODO: remove debug levels */ + srv->log.levels[LOG_LEVEL_DEBUG] = TRUE; + srv->log.fd = -1; srv->log.ts_cache = g_string_sized_new(0); srv->log.log_line = g_string_sized_new(0); diff --git a/src/angel_main.c b/src/angel_main.c index 9176010..bab97e9 100644 --- a/src/angel_main.c +++ b/src/angel_main.c @@ -1,6 +1,16 @@ #include #include +#include + +# ifndef HAVE_ISSETUGID + +static int l_issetugid() { + return (geteuid() != getuid() || getegid() != getgid()); +} + +# define issetugid l_issetugid +# endif int main(int argc, char *argv[]) { GError *error = NULL; @@ -51,6 +61,15 @@ int main(int argc, char *argv[]) { goto cleanup; } + if (!(getuid() == 0) && issetugid()) { + g_printerr("Are you nuts ? Don't apply a SUID bit to this binary\n"); + result = -1; + goto cleanup; + } + + /* initialize threading */ + g_thread_init(NULL); + srv = server_new(module_dir); if (!plugins_config_load(srv, config_path)) { @@ -58,7 +77,11 @@ int main(int argc, char *argv[]) { goto cleanup; } - g_printerr("lighttpd-angel: Parsed config file\n"); + INFO(srv, "%s", "parsed config file"); + + ev_loop(srv->loop, 0); + + INFO(srv, "%s", "going down"); cleanup: if (srv) server_free(srv); diff --git a/src/angel_plugin.c b/src/angel_plugin.c index 1daf899..9478c18 100644 --- a/src/angel_plugin.c +++ b/src/angel_plugin.c @@ -38,12 +38,14 @@ static server_item* server_item_new(plugin *p, const plugin_item *p_item) { static void plugin_free(server *srv, plugin *p) { if (p->handle_free) p->handle_free(srv, p); + g_hash_table_destroy(p->angel_callbacks); g_slice_free(plugin, p); } static plugin* plugin_new(const char *name) { plugin *p = g_slice_new0(plugin); p->name = name; + p->angel_callbacks = g_hash_table_new(g_str_hash, g_str_equal); return p; } @@ -89,6 +91,9 @@ void plugins_init(server *srv, const gchar *module_dir) { ps->module_refs = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, _server_module_release); ps->load_module_refs = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, _server_module_release); + ps->ht_plugins = g_hash_table_new(g_str_hash, g_str_equal); + ps->load_ht_plugins = g_hash_table_new(g_str_hash, g_str_equal); + ps->plugins = g_ptr_array_new(); ps->load_plugins = g_ptr_array_new(); } @@ -104,6 +109,9 @@ void plugins_clear(server *srv) { g_hash_table_destroy(ps->module_refs); g_hash_table_destroy(ps->load_module_refs); + g_hash_table_remove_all(ps->ht_plugins); + g_hash_table_remove_all(ps->load_ht_plugins); + g_ptr_array_free(ps->plugins, TRUE); g_ptr_array_free(ps->load_plugins, TRUE); @@ -123,6 +131,7 @@ void plugins_config_clean(server *srv) { g_hash_table_remove_all(ps->load_items); g_hash_table_remove_all(ps->load_module_refs); + g_hash_table_remove_all(ps->load_ht_plugins); g_ptr_array_set_size(ps->load_plugins, 0); } @@ -131,32 +140,44 @@ gboolean plugins_config_load(server *srv, const gchar *filename) { GError *error = NULL; guint i; + if (!plugins_load_module(srv, NULL)) { + ERROR(srv, "%s", "failed loading core plugins"); + plugins_config_clean(srv); + return FALSE; + } + if (!angel_config_parse_file(srv, filename, &error)) { - ERROR(srv, "failed to parse config file: %s\n", error->message); + ERROR(srv, "failed to parse config file: %s", error->message); g_error_free(error); plugins_config_clean(srv); return FALSE; } /* check new config */ - for (i = ps->plugins->len; i-- > 0; ) { + for (i = ps->load_plugins->len; i-- > 0; ) { plugin *p = g_ptr_array_index(ps->load_plugins, i); if (p->handle_check_config) { if (!p->handle_check_config(srv, p)) { + ERROR(srv, "%s", "config check failed"); plugins_config_clean(srv); return FALSE; } } } + ERROR(srv, "%s", "activate"); + /* activate new config */ - for (i = ps->plugins->len; i-- > 0; ) { + for (i = ps->load_plugins->len; i-- > 0; ) { plugin *p = g_ptr_array_index(ps->load_plugins, i); + ERROR(srv, "activate: %s", p->name); if (p->handle_activate_config) { p->handle_activate_config(srv, p); } } + ERROR(srv, "%s", "done"); + { /* swap the arrays */ GPtrArray *tmp = ps->load_plugins; ps->load_plugins = ps->plugins; ps->plugins = tmp; } @@ -164,9 +185,11 @@ gboolean plugins_config_load(server *srv, const gchar *filename) { GHashTable *tmp; tmp = ps->load_items; ps->load_items = ps->items; ps->items = tmp; tmp = ps->load_module_refs; ps->load_module_refs = ps->module_refs; ps->module_refs = tmp; + tmp = ps->load_ht_plugins; ps->load_ht_plugins = ps->ht_plugins; ps->ht_plugins = tmp; } g_hash_table_remove_all(ps->load_items); g_hash_table_remove_all(ps->load_module_refs); + g_hash_table_remove_all(ps->load_ht_plugins); g_ptr_array_set_size(ps->load_plugins, 0); if (!ps->config_filename) { @@ -178,7 +201,7 @@ gboolean plugins_config_load(server *srv, const gchar *filename) { return TRUE; } -gboolean plugins_handle_item(server *srv, GString *itemname, value *hash) { +void plugins_handle_item(server *srv, GString *itemname, value *hash) { Plugins *ps = &srv->plugins; server_item *si; @@ -186,7 +209,7 @@ gboolean plugins_handle_item(server *srv, GString *itemname, value *hash) { /* debug items */ { GString *tmp = value_to_string(hash); - ERROR(srv, "Item '%s': %s\n", itemname->str, tmp->str); + ERROR(srv, "Item '%s': %s", itemname->str, tmp->str); g_string_free(tmp, TRUE); } #endif @@ -239,7 +262,6 @@ gboolean plugins_handle_item(server *srv, GString *itemname, value *hash) { g_slice_free1(sizeof(value*) * si->option_count, optlist); } - return TRUE; } static gboolean plugins_activate_module(server *srv, server_module *sm) { @@ -266,6 +288,8 @@ static gboolean plugins_activate_module(server *srv, server_module *sm) { } } + g_hash_table_insert(ps->load_ht_plugins, (gpointer) p->name, p); + return TRUE; item_collission: @@ -341,6 +365,7 @@ plugin *angel_plugin_register(server *srv, module *mod, const gchar *name, Plugi p = plugin_new(name); if (!init(srv, p)) { + ERROR(srv, "Couldn't load plugin '%s' for module '%s': init failed", name, mod->name->str); plugin_free(srv, p); return NULL; } diff --git a/src/angel_plugin_core.c b/src/angel_plugin_core.c index 83d82c0..865d12a 100644 --- a/src/angel_plugin_core.c +++ b/src/angel_plugin_core.c @@ -1,8 +1,302 @@ #include +#include -gboolean plugin_core_init(server *srv) { - /* load core plugins */ +#include +#include + +static void core_instance_parse(server *srv, plugin *p, value **options) { + GPtrArray *cmd; + gchar **cmdarr; + plugin_core_config_t *config = (plugin_core_config_t*) p->data; + uid_t uid = -1; + gid_t gid = -1; + GString *user = NULL; + + if (config->load_instconf) { + ERROR(srv, "%s", "Already configure the instance"); + config->load_failed = FALSE; + return; + } + + /* set user and group */ + if (options[0]) { + struct passwd *pwd; + user = options[0]->data.string; + if (NULL == (pwd = getpwnam(user->str))) { + ERROR(srv, "can't find username '%s'", user->str); + config->load_failed = FALSE; + return; + } + + uid = pwd->pw_uid; + gid = pwd->pw_gid; + } + + if (options[1]) { + struct group *grp; + GString *group = options[1]->data.string; + if (NULL == (grp = getgrnam(group->str))) { + ERROR(srv, "can't find groupname '%s'", group->str); + config->load_failed = FALSE; + return; + } + + gid = grp->gr_gid; + } + + if (0 == uid) { + ERROR(srv, "%s", "I will not set uid to 0"); + config->load_failed = FALSE; + return; + } + if (0 == gid) { + ERROR(srv, "%s", "I will not set gid to 0"); + config->load_failed = FALSE; + return; + } + + cmd = g_ptr_array_new(); +#if 0 + g_ptr_array_add(cmd, g_strndup(CONST_STR_LEN("/usr/bin/valgrind"))); +#endif + if (options[2]) { + g_ptr_array_add(cmd, g_strndup(GSTR_LEN(options[2]->data.string))); + } else { + g_ptr_array_add(cmd, g_strndup(CONST_STR_LEN("/usr/bin/lighttpd"))); + } + + g_ptr_array_add(cmd, g_strndup(CONST_STR_LEN("--angel"))); + g_ptr_array_add(cmd, g_strndup(CONST_STR_LEN("-c"))); + if (options[3]) { + g_ptr_array_add(cmd, g_strndup(GSTR_LEN(options[3]->data.string))); + } else if (options[4]) { + g_ptr_array_add(cmd, g_strndup(GSTR_LEN(options[4]->data.string))); + g_ptr_array_add(cmd, g_strndup(CONST_STR_LEN("-l"))); + } else { + g_ptr_array_add(cmd, g_strndup(CONST_STR_LEN("/etc/lighttpd2/lighttpd.conf"))); + } + + g_ptr_array_add(cmd, g_strndup(CONST_STR_LEN("-m"))); + if (options[5]) { + g_ptr_array_add(cmd, g_strndup(GSTR_LEN(options[5]->data.string))); + } else { + g_ptr_array_add(cmd, g_strndup(CONST_STR_LEN("/usr/lib/lighttpd2/"))); + } + + g_ptr_array_add(cmd, NULL); + cmdarr = (gchar**) g_ptr_array_free(cmd, FALSE); + config->load_instconf = instance_conf_new(cmdarr, user, uid, gid); +} + +static const plugin_item_option core_instance_options[] = { + { "user", VALUE_STRING, 0 }, + { "group", VALUE_STRING, 0 }, + { "binary", VALUE_STRING, 0 }, + { "config", VALUE_STRING, 0 }, + { "luaconfig", VALUE_STRING, 0 }, + { "modules", VALUE_STRING, 0 }, + { NULL, 0, 0 } +}; + +static const plugin_item core_items[] = { + { "instance", core_instance_parse, core_instance_options }, + { NULL, NULL, NULL } +}; + +static int do_listen(server *srv, GString *str) { + guint32 ipv4; +#ifdef HAVE_IPV6 + guint8 ipv6[16]; +#endif + guint16 port = 80; + + if (parse_ipv4(str->str, &ipv4, NULL, &port)) { + int s, v; + struct sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = ipv4; + addr.sin_port = htons(port); + if (-1 == (s = socket(AF_INET, SOCK_STREAM, 0))) { + ERROR(srv, "Couldn't open socket: %s", g_strerror(errno)); + return -1; + } + v = 1; + if (-1 == setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &v, sizeof(v))) { + close(s); + ERROR(srv, "Couldn't setsockopt(SO_REUSEADDR): %s", g_strerror(errno)); + return -1; + } + if (-1 == bind(s, (struct sockaddr*)&addr, sizeof(addr))) { + close(s); + ERROR(srv, "Couldn't bind socket to '%s': %s", str->str, g_strerror(errno)); + return -1; + } + if (-1 == listen(s, 1000)) { + close(s); + ERROR(srv, "Couldn't listen on '%s': %s", str->str, g_strerror(errno)); + return -1; + } + DEBUG(srv, "listen to ipv4: '%s' port: %d", str->str, port); + return s; +#ifdef HAVE_IPV6 + } else if (parse_ipv6(str->str, ipv6, NULL, &port)) { + GString *ipv6_str = g_string_sized_new(0); + int s, v; + struct sockaddr_in6 addr; + ipv6_tostring(ipv6_str, ipv6); + + memset(&addr, 0, sizeof(addr)); + addr.sin6_family = AF_INET6; + memcpy(&addr.sin6_addr, ipv6, 16); + addr.sin6_port = htons(port); + if (-1 == (s = socket(AF_INET6, SOCK_STREAM, 0))) { + ERROR(srv, "Couldn't open socket: %s", g_strerror(errno)); + g_string_free(ipv6_str, TRUE); + return -1; + } + v = 1; + if (-1 == setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &v, sizeof(v))) { + close(s); + ERROR(srv, "Couldn't setsockopt(SO_REUSEADDR): %s", g_strerror(errno)); + g_string_free(ipv6_str, TRUE); + return -1; + } + if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &v, sizeof(v))) { + close(s); + ERROR(srv, "Couldn't setsockopt(IPV6_V6ONLY): %s", g_strerror(errno)); + g_string_free(ipv6_str, TRUE); + return -1; + } + if (-1 == bind(s, (struct sockaddr*)&addr, sizeof(addr))) { + close(s); + ERROR(srv, "Couldn't bind socket to '%s': %s", ipv6_str->str, g_strerror(errno)); + g_string_free(ipv6_str, TRUE); + return -1; + } + if (-1 == listen(s, 1000)) { + close(s); + ERROR(srv, "Couldn't listen on '%s': %s", ipv6_str->str, g_strerror(errno)); + g_string_free(ipv6_str, TRUE); + return -1; + } + DEBUG(srv, "listen to ipv6: '%s' port: %d", ipv6_str->str, port); + g_string_free(ipv6_str, TRUE); + return s; +#endif + } else { + ERROR(srv, "Invalid ip: '%s'", str->str); + return -1; + } +} + +static void core_listen(server *srv, instance *i, plugin *p, gint32 id, GString *data) { + GError *err = NULL; + gint fd; + GArray *fds; + DEBUG(srv, "core_listen(%i) '%s'", id, data->str); + + if (-1 == id) return; /* ignore simple calls */ + + fd = do_listen(srv, data); + + if (-1 == fd) { + GString *error = g_string_sized_new(0); + g_string_printf(error, "Couldn't listen to '%s'", data->str); + if (!angel_send_result(i->acon, id, error, NULL, NULL, &err)) { + ERROR(srv, "Couldn't send result: %s", err->message); + g_error_free(err); + } + return; + } + + fds = g_array_new(FALSE, FALSE, sizeof(int)); + g_array_append_val(fds, fd); + + if (!angel_send_result(i->acon, id, NULL, NULL, fds, &err)) { + ERROR(srv, "Couldn't send result: %s", err->message); + g_error_free(err); + return; + } +} + +static void core_clean(server *srv, plugin *p); +static void core_free(server *srv, plugin *p) { + plugin_core_config_t *config = (plugin_core_config_t*) p->data; + + core_clean(srv, p); + + if (config->instconf) { + instance_conf_release(config->instconf); + config->instconf = NULL; + } + + if (config->inst) { + instance_set_state(config->inst, INSTANCE_DOWN); + instance_release(config->inst); + config->inst = NULL; + } +} + +static void core_clean(server *srv, plugin *p) { + plugin_core_config_t *config = (plugin_core_config_t*) p->data; UNUSED(srv); + + if (config->load_instconf) { + instance_conf_release(config->load_instconf); + config->load_instconf = NULL; + } + + config->load_failed = FALSE; +} + +static gboolean core_check(server *srv, plugin *p) { + plugin_core_config_t *config = (plugin_core_config_t*) p->data; + UNUSED(srv); + return !config->load_failed; +} + +static void core_activate(server *srv, plugin *p) { + plugin_core_config_t *config = (plugin_core_config_t*) p->data; + + if (config->instconf) { + instance_conf_release(config->instconf); + config->instconf = NULL; + } + + if (config->inst) { + instance_set_state(config->inst, INSTANCE_DOWN); + instance_release(config->inst); + config->inst = NULL; + } + + config->instconf = config->load_instconf; + config->load_instconf = NULL; + + if (config->instconf) { + config->inst = server_new_instance(srv, config->instconf); + instance_set_state(config->inst, INSTANCE_ACTIVE); + ERROR(srv, "%s", "Starting instance"); + } +} + +static gboolean core_init(server *srv, plugin *p) { + UNUSED(srv); + p->data = g_slice_new0(plugin_core_config_t); + p->items = core_items; + + p->handle_free = core_free; + p->handle_clean_config = core_clean; + p->handle_check_config = core_check; + p->handle_activate_config = core_activate; + + g_hash_table_insert(p->angel_callbacks, "listen", (gpointer)(intptr_t)core_listen); + return TRUE; -} \ No newline at end of file +} + +gboolean plugin_core_init(server *srv) { + /* load core plugins */ + return NULL != angel_plugin_register(srv, NULL, "core", core_init); +} diff --git a/src/angel_server.c b/src/angel_server.c index 3f210ed..da17765 100644 --- a/src/angel_server.c +++ b/src/angel_server.c @@ -1,12 +1,38 @@ #include +#include + +static void instance_state_machine(instance *i); + +static void jobqueue_callback(struct ev_loop *loop, ev_async *w, int revents) { + server *srv = (server*) w->data; + instance *i; + GQueue todo; + UNUSED(loop); + UNUSED(revents); + + todo = srv->job_queue; + g_queue_init(&srv->job_queue); + + while (NULL != (i = g_queue_pop_head(&todo))) { + i->in_jobqueue = FALSE; + instance_state_machine(i); + instance_release(i); + } +} + server* server_new(const gchar *module_dir) { server *srv = g_slice_new0(server); - /* TODO: handle sinals */ - srv->loop = ev_default_loop(0); + + /* TODO: handle signals */ + ev_async_init(&srv->job_watcher, jobqueue_callback); + ev_async_start(srv->loop, &srv->job_watcher); + ev_unref(srv->loop); + srv->job_watcher.data = srv; + log_init(srv); plugins_init(srv, module_dir); return srv; @@ -18,3 +44,278 @@ void server_free(server* srv) { log_clean(srv); g_slice_free(server, srv); } + +static void instance_angel_call_cb(angel_connection *acon, + const gchar *mod, gsize mod_len, const gchar *action, gsize action_len, + gint32 id, + GString *data) { + + instance *i = (instance*) acon->data; + server *srv = i->srv; + Plugins *ps = &srv->plugins; + plugin *p; + PluginHandleCall cb; + + p = g_hash_table_lookup(ps->ht_plugins, mod); + if (!p) { + GString *errstr = g_string_sized_new(0); + GError *err = NULL; + g_string_printf(errstr, "Plugin '%s' not available in lighttpd-angel", mod); + if (!angel_send_result(acon, id, errstr, NULL, NULL, &err)) { + ERROR(srv, "Couldn't send result: %s", err->message); + g_error_free(err); + } + return; + } + + cb = (PluginHandleCall)(intptr_t) g_hash_table_lookup(p->angel_callbacks, action); + if (!cb) { + GString *errstr = g_string_sized_new(0); + GError *err = NULL; + g_string_printf(errstr, "Action '%s' not available in plugin '%s' of lighttpd-angel", action, mod); + if (!angel_send_result(acon, id, errstr, NULL, NULL, &err)) { + ERROR(srv, "Couldn't send result: %s", err->message); + g_error_free(err); + } + return; + } + + cb(srv, i, p, id, data); +} + +static void instance_angel_close_cb(angel_connection *acon, GError *err) { + instance *i = (instance*) acon->data; + server *srv = i->srv; + + ERROR(srv, "angel connection closed: %s", err ? err->message : g_strerror(errno)); + if (err) g_error_free(err); + + i->acon = NULL; + angel_connection_free(acon); +} + +static void instance_child_cb(struct ev_loop *loop, ev_child *w, int revents) { + instance *i = (instance*) w->data; + + if (i->s_cur == INSTANCE_LOADING) { + ERROR(i->srv, "spawning child %i failed, not restarting", i->pid); + i->s_dest = i->s_cur = INSTANCE_DOWN; /* TODO: retry spawn later? */ + } else { + ERROR(i->srv, "child %i died", i->pid); + i->s_cur = INSTANCE_DOWN; + } + i->pid = -1; + angel_connection_free(i->acon); + i->acon = NULL; + ev_child_stop(loop, w); + instance_job_append(i); + instance_release(i); +} + +static void instance_spawn(instance *i) { + int confd[2]; + if (-1 == socketpair(AF_UNIX, SOCK_STREAM, 0, confd)) { + ERROR(i->srv, "socketpair error, cannot spawn instance: %s", g_strerror(errno)); + return; + } + fd_init(confd[0]); + fd_no_block(confd[1]); + + i->acon = angel_connection_new(i->srv->loop, confd[0], i, instance_angel_call_cb, instance_angel_close_cb); + i->pid = fork(); + switch (i->pid) { + case 0: { + gchar **args; + setsid(); /* lead session, so we don't recieve the signals for the angel */ + if (getuid() == 0 && (i->ic->uid != (uid_t) -1) && (i->ic->gid != (gid_t) -1)) { + setgid(i->ic->gid); + setgroups(0, NULL); + initgroups(i->ic->username->str, i->ic->gid); + setuid(i->ic->uid); + } + + if (confd[1] != 0) { + dup2(confd[1], 0); + close(confd[1]); + } + /* TODO: close stdout/stderr ? */ + execvp(i->ic->cmd[0], i->ic->cmd); + g_printerr("exec('%s') failed: %s\n", i->ic->cmd[0], g_strerror(errno)); + exit(-1); + } + case -1: + break; + default: + close(confd[1]); + ev_child_set(&i->child_watcher, i->pid, 0); + ev_child_start(i->srv->loop, &i->child_watcher); + i->s_cur = INSTANCE_LOADING; + instance_acquire(i); + ERROR(i->srv, "Instance (%i) spawned: %s", i->pid, i->ic->cmd[0]); + break; + } +} + +instance* server_new_instance(server *srv, instance_conf *ic) { + instance *i; + + i = g_slice_new0(instance); + i->refcount = 1; + i->srv = srv; + instance_conf_acquire(ic); + i->ic = ic; + i->pid = -1; + i->s_cur = i->s_dest = INSTANCE_DOWN; + ev_child_init(&i->child_watcher, instance_child_cb, -1, 0); + i->child_watcher.data = i; + + return i; +} + +void instance_replace(instance *oldi, instance *newi) { +} + +void instance_set_state(instance *i, instance_state_t s) { + if (i->s_dest == s) return; + switch (s) { + case INSTANCE_DOWN: + break; + case INSTANCE_LOADING: + case INSTANCE_WARMUP: + return; /* These cannot be set */ + case INSTANCE_ACTIVE: + case INSTANCE_SUSPEND: + break; + } + i->s_dest = s; + if (s == INSTANCE_DOWN) { + if (i->s_cur != INSTANCE_DOWN) { + kill(i->pid, SIGTERM); + } + } else { + if (i->pid == (pid_t) -1) { + instance_spawn(i); + return; + } else { + GError *error = NULL; + GString *buf = g_string_sized_new(0); + + switch (s) { + case INSTANCE_DOWN: + case INSTANCE_LOADING: + case INSTANCE_WARMUP: + break; + case INSTANCE_ACTIVE: + angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("run"), buf, &error); + break; + case INSTANCE_SUSPEND: + angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("suspend"), buf, &error); + break; + } + } + } +} + +static void instance_state_machine(instance *i) { + instance_state_t olds = i->s_dest; + while (i->s_cur != i->s_dest && i->s_cur != olds) { + olds = i->s_cur; + switch (i->s_dest) { + case INSTANCE_DOWN: + if (i->pid == (pid_t) -1) { + i->s_cur = INSTANCE_DOWN; + break; + } + kill(i->pid, SIGINT); + return; + case INSTANCE_LOADING: + break; + case INSTANCE_WARMUP: + if (i->pid == (pid_t) -1) { + instance_spawn(i); + return; + } + break; + case INSTANCE_ACTIVE: + if (i->pid == (pid_t) -1) { + instance_spawn(i); + return; + } + break; + case INSTANCE_SUSPEND: + if (i->pid == (pid_t) -1) { + instance_spawn(i); + return; + } + break; + } + } +} + +void instance_release(instance *i) { + server *srv; + instance *t; + if (!i) return; + assert(g_atomic_int_get(&i->refcount) > 0); + if (!g_atomic_int_dec_and_test(&i->refcount)) return; + srv = i->srv; + if (i->pid != (pid_t) -1) { + ev_child_stop(srv->loop, &i->child_watcher); + kill(i->pid, SIGTERM); + i->pid = -1; + i->s_cur = INSTANCE_DOWN; + angel_connection_free(i->acon); + i->acon = NULL; + } + + instance_conf_release(i->ic); + i->ic = NULL; + + t = i->replace; i->replace = NULL; + instance_release(t); + + t = i->replace_by; i->replace_by = NULL; + instance_release(t); + + g_slice_free(instance, i); +} + +void instance_acquire(instance *i) { + assert(g_atomic_int_get(&i->refcount) > 0); + g_atomic_int_inc(&i->refcount); +} + +instance_conf* instance_conf_new(gchar **cmd, GString *username, uid_t uid, gid_t gid) { + instance_conf *ic = g_slice_new0(instance_conf); + ic->refcount = 1; + ic->cmd = cmd; + if (username) { + ic->username = g_string_new_len(GSTR_LEN(username)); + } + ic->uid = uid; + ic->gid = gid; + return ic; +} + +void instance_conf_release(instance_conf *ic) { + if (!ic) return; + assert(g_atomic_int_get(&ic->refcount) > 0); + if (!g_atomic_int_dec_and_test(&ic->refcount)) return; + g_strfreev(ic->cmd); + g_slice_free(instance_conf, ic); +} + +void instance_conf_acquire(instance_conf *ic) { + assert(g_atomic_int_get(&ic->refcount) > 0); + g_atomic_int_inc(&ic->refcount); +} + +void instance_job_append(instance *i) { + server *srv = i->srv; + if (!i->in_jobqueue) { + instance_acquire(i); + i->in_jobqueue = TRUE; + g_queue_push_tail(&srv->job_queue, i); + ev_async_send(srv->loop, &srv->job_watcher); + } +} diff --git a/src/lighttpd.c b/src/lighttpd.c index 8129478..942da4e 100644 --- a/src/lighttpd.c +++ b/src/lighttpd.c @@ -22,6 +22,7 @@ int main(int argc, char *argv[]) { gboolean luaconfig = FALSE; gboolean test_config = FALSE; gboolean show_version = FALSE; + gboolean use_angel = FALSE; GList *ctx_stack = NULL; @@ -31,6 +32,7 @@ int main(int argc, char *argv[]) { { "test", 't', 0, G_OPTION_ARG_NONE, &test_config, "test config and exit", NULL }, { "module-dir", 'm', 0, G_OPTION_ARG_STRING, &module_dir, "module directory", "PATH" }, { "version", 'v', 0, G_OPTION_ARG_NONE, &show_version, "show version and exit", NULL }, + { "angel", 0, G_OPTION_FLAG_HIDDEN, G_OPTION_ARG_NONE, &use_angel, "spawned by angel", NULL }, { NULL, 0, 0, 0, NULL, NULL, NULL } }; @@ -72,9 +74,13 @@ int main(int argc, char *argv[]) { g_thread_init(NULL); srv = server_new(module_dir); + server_loop_init(srv); /* load core plugin */ srv->core_plugin = plugin_register(srv, "core", plugin_core_init); + if (use_angel) { + angel_setup(srv); + } /* if no path is specified for the config, read lighttpd.conf from current directory */ if (config_path == NULL) { @@ -139,12 +145,14 @@ int main(int argc, char *argv[]) { /* TRACE(srv, "%s", "Test!"); */ - server_loop_init(srv); + server_worker_init(srv); server_start(srv); if (!luaconfig) config_parser_finish(srv, ctx_stack, TRUE); + INFO(srv, "%s", "going down"); + server_free(srv); if (module_dir != def_module_dir) diff --git a/src/log.c b/src/log.c index 1de1e28..c408f2b 100644 --- a/src/log.c +++ b/src/log.c @@ -128,7 +128,8 @@ gboolean log_write_(server *srv, vrequest *vr, log_level_t log_level, guint flag if (g_atomic_int_get(&srv->state) == SERVER_STARTING) { log_unref(srv, log); - return angel_log(srv, log_line); + angel_log(srv, log_line); + return TRUE; } log_entry = g_slice_new(log_entry_t); log_entry->log = log; diff --git a/src/plugin_core.c b/src/plugin_core.c index 8bde3ca..875db18 100644 --- a/src/plugin_core.c +++ b/src/plugin_core.c @@ -503,7 +503,6 @@ static action* core_profile_mem(server *srv, plugin* p, value *val) { static gboolean core_listen(server *srv, plugin* p, value *val) { GString *ipstr; - int s; UNUSED(p); if (val->type != VALUE_STRING) { @@ -512,12 +511,7 @@ static gboolean core_listen(server *srv, plugin* p, value *val) { } ipstr = val->data.string; - if (-1 == (s = angel_listen(srv, ipstr))) { - ERROR(srv, "%s", "angel_listen failed"); - return FALSE; - } - - server_listen(srv, s); + angel_listen(srv, ipstr); return TRUE; } diff --git a/src/server.c b/src/server.c index 90a7f86..35f1882 100644 --- a/src/server.c +++ b/src/server.c @@ -206,14 +206,20 @@ static gpointer server_worker_cb(gpointer data) { } gboolean server_loop_init(server *srv) { - guint i; - struct ev_loop *loop = ev_default_loop(srv->loop_flags); + srv->loop = ev_default_loop(srv->loop_flags); - if (!loop) { + if (!srv->loop) { fatal ("could not initialise libev, bad $LIBEV_FLAGS in environment?"); return FALSE; } + return TRUE; +} + +gboolean server_worker_init(server *srv) { + struct ev_loop *loop = srv->loop; + guint i; + CATCH_SIGNAL(loop, sigint_cb, INT); CATCH_SIGNAL(loop, sigint_cb, TERM); CATCH_SIGNAL(loop, sigpipe_cb, PIPE); @@ -310,9 +316,9 @@ void server_listen(server *srv, int fd) { server_socket *sock = server_socket_new(fd); sock->srv = srv; - if (g_atomic_int_get(&srv->state) == SERVER_RUNNING) ev_io_start(srv->main_worker->loop, &sock->watcher); - g_ptr_array_add(srv->sockets, sock); + + if (g_atomic_int_get(&srv->state) == SERVER_RUNNING) ev_io_start(srv->main_worker->loop, &sock->watcher); } void server_start(server *srv) {