diff --git a/include/lighttpd/angel_plugin.h b/include/lighttpd/angel_plugin.h index b139102..63cda55 100644 --- a/include/lighttpd/angel_plugin.h +++ b/include/lighttpd/angel_plugin.h @@ -10,15 +10,18 @@ typedef struct liPluginItemOption liPluginItemOption; typedef struct liPlugin liPlugin; typedef struct liPlugins liPlugins; -typedef gboolean (*liPluginInitCB) (liServer *srv, liPlugin *p); -typedef void (*liPluginFreeCB) (liServer *srv, liPlugin *p); +typedef gboolean (*liPluginInitCB) (liServer *srv, liPlugin *p); +typedef void (*liPluginFreeCB) (liServer *srv, liPlugin *p); -typedef void (*liPluginCleanConfigCB) (liServer *srv, liPlugin *p); -typedef gboolean (*liPluginCheckConfigCB) (liServer *srv, liPlugin *p); -typedef void (*liPluginActivateConfigCB)(liServer *srv, liPlugin *p); -typedef void (*liPluginParseItemCB) (liServer *srv, liPlugin *p, liValue **options); +typedef void (*liPluginCleanConfigCB) (liServer *srv, liPlugin *p); +typedef gboolean (*liPluginCheckConfigCB) (liServer *srv, liPlugin *p); +typedef void (*liPluginActivateConfigCB) (liServer *srv, liPlugin *p); +typedef void (*liPluginParseItemCB) (liServer *srv, liPlugin *p, liValue **options); -typedef void (*liPluginHandleCallCB) (liServer *srv, liInstance *i, liPlugin *p, gint32 id, GString *data); +typedef void (*liPluginHandleCallCB) (liServer *srv, liPlugin *p, liInstance *i, gint32 id, GString *data); + +typedef void (*liPluginInstanceReplacedCB) (liServer *srv, liPlugin *p, liInstance *oldi, liInstance *newi); +typedef void (*liPluginInstanceReachedStateCB)(liServer *srv, liPlugin *p, liInstance *i, liInstanceState s); typedef enum { LI_PLUGIN_ITEM_OPTION_MANDATORY = 1 @@ -51,6 +54,9 @@ struct liPlugin { liPluginCleanConfigCB handle_clean_config; /**< called before the reloading of the config is started or after the reloading failed */ liPluginCheckConfigCB handle_check_config; /**< called before activating a config to ensure everything works */ liPluginActivateConfigCB handle_activate_config; /**< called to activate a config after successful loading it. this cannot fail */ + + liPluginInstanceReplacedCB handle_instance_replaced; + liPluginInstanceReachedStateCB handle_instance_reached_state; }; struct liPlugins { @@ -78,5 +84,16 @@ LI_API void li_plugins_handle_item(liServer *srv, GString *itemname, liValue *ha LI_API gboolean li_plugins_load_module(liServer *srv, const gchar *name); /* Needed by modules to register their plugin(s) */ LI_API liPlugin *li_angel_plugin_register(liServer *srv, liModule *mod, const gchar *name, liPluginInitCB init); +INLINE void li_angel_plugin_add_angel_cb(liPlugin *p, const gchar *name, liPluginHandleCallCB cb); + +/* called when replace was successful or failed - check states to find out */ +LI_API void li_angel_plugin_replaced_instance(liServer *srv, liInstance *oldi, liInstance *newi); +LI_API void li_angel_plugin_instance_reached_state(liServer *srv, liInstance *i, liInstanceState s); + +/* inline implementations */ + +INLINE void li_angel_plugin_add_angel_cb(liPlugin *p, const gchar *name, liPluginHandleCallCB cb) { + g_hash_table_insert(p->angel_callbacks, (gchar*) name, (gpointer)(intptr_t) cb); +} #endif diff --git a/include/lighttpd/angel_plugin_core.h b/include/lighttpd/angel_plugin_core.h index 914288d..9a6d119 100644 --- a/include/lighttpd/angel_plugin_core.h +++ b/include/lighttpd/angel_plugin_core.h @@ -6,14 +6,18 @@ typedef struct liPluginCoreConfig liPluginCoreConfig; struct liPluginCoreConfig { /* Load */ - liInstanceConf *load_instconf; gboolean load_failed; + liInstanceConf *load_instconf; GPtrArray *load_listen_masks; /* Running */ liInstanceConf *instconf; - liInstance *inst; GPtrArray *listen_masks; + + liInstance *inst; + GHashTable *listen_sockets; + + ev_signal sig_hup; }; typedef struct liPluginCoreListenMask liPluginCoreListenMask; diff --git a/include/lighttpd/angel_server.h b/include/lighttpd/angel_server.h index b3f69cb..9237ee8 100644 --- a/include/lighttpd/angel_server.h +++ b/include/lighttpd/angel_server.h @@ -9,14 +9,7 @@ #define LIGHTTPD_ANGEL_MAGIC ((guint)0x3e14ac65) #endif -typedef enum { - LI_INSTANCE_DOWN, /* not started yet */ - LI_INSTANCE_SUSPENDED, /* inactive, neither accept nor logs, handle remaining connections */ - LI_INSTANCE_WARMUP, /* only accept(), no logging: waiting for another instance to suspend */ - LI_INSTANCE_RUNNING, /* everything running */ - LI_INSTANCE_SUSPENDING, /* suspended accept(), still logging, handle remaining connections */ - LI_INSTANCE_FINISHED /* not running */ -} liInstanceState; +typedef void (*liInstanceResourceFreeCB) (liServer *srv, liInstance *i, liPlugin *p, liInstanceResource *res); struct liInstanceConf { gint refcount; @@ -44,6 +37,8 @@ struct liInstance { liInstance *replace, *replace_by; liAngelConnection *acon; + + GPtrArray *resources; }; struct liServer { @@ -60,13 +55,21 @@ struct liServer { liLog log; }; +struct liInstanceResource { + liInstanceResourceFreeCB free_cb; + liPlugin *plugin; /* may be NULL - we don't care about that */ + guint ndx; /* internal array index */ + + gpointer data; +}; + LI_API liServer* li_server_new(const gchar *module_dir); LI_API void li_server_free(liServer* srv); LI_API void li_server_stop(liServer *srv); LI_API liInstance* li_server_new_instance(liServer *srv, liInstanceConf *ic); -LI_API void li_instance_replace(liInstance *oldi, liInstance *newi); +LI_API gboolean li_instance_replace(liInstance *oldi, liInstance *newi); LI_API void li_instance_set_state(liInstance *i, liInstanceState s); LI_API void li_instance_state_reached(liInstance *i, liInstanceState s); @@ -77,4 +80,7 @@ LI_API void li_instance_conf_acquire(liInstanceConf *ic); LI_API void li_instance_release(liInstance *i); LI_API void li_instance_acquire(liInstance *i); +LI_API void li_instance_add_resource(liInstance *i, liInstanceResource *res, liInstanceResourceFreeCB free_cb, liPlugin *p, gpointer data); +LI_API void li_instance_rem_resource(liInstance *i, liInstanceResource *res); + #endif diff --git a/include/lighttpd/angel_typedefs.h b/include/lighttpd/angel_typedefs.h index bb73b31..b8d779d 100644 --- a/include/lighttpd/angel_typedefs.h +++ b/include/lighttpd/angel_typedefs.h @@ -8,8 +8,18 @@ typedef struct liProc liProc; /* angel_server.h */ +typedef enum { + LI_INSTANCE_DOWN, /* not started yet */ + LI_INSTANCE_SUSPENDED, /* inactive, neither accept nor logs, handle remaining connections */ + LI_INSTANCE_WARMUP, /* only accept(), no logging: waiting for another instance to suspend */ + LI_INSTANCE_RUNNING, /* everything running */ + LI_INSTANCE_SUSPENDING, /* suspended accept(), still logging, handle remaining connections */ + LI_INSTANCE_FINISHED /* not running */ +} liInstanceState; + typedef struct liServer liServer; typedef struct liInstance liInstance; typedef struct liInstanceConf liInstanceConf; +typedef struct liInstanceResource liInstanceResource; #endif diff --git a/src/angel/angel_plugin.c b/src/angel/angel_plugin.c index c13d3dc..78a273c 100644 --- a/src/angel/angel_plugin.c +++ b/src/angel/angel_plugin.c @@ -109,8 +109,8 @@ void li_plugins_clear(liServer *srv) { g_hash_table_destroy(ps->module_refs); g_hash_table_destroy(ps->load_module_refs); - g_hash_table_remove_all(ps->ht_plugins); - g_hash_table_remove_all(ps->load_ht_plugins); + g_hash_table_destroy(ps->ht_plugins); + g_hash_table_destroy(ps->load_ht_plugins); g_ptr_array_free(ps->plugins, TRUE); g_ptr_array_free(ps->load_plugins, TRUE); @@ -374,3 +374,23 @@ liPlugin *li_angel_plugin_register(liServer *srv, liModule *mod, const gchar *na return p; } + +void li_angel_plugin_replaced_instance(liServer *srv, liInstance *oldi, liInstance *newi) { + liPlugins *ps = &srv->plugins; + guint i; + + for (i = 0; i < ps->plugins->len; i++) { + liPlugin *p = g_ptr_array_index(ps->plugins, i); + if (p->handle_instance_replaced) p->handle_instance_replaced(srv, p, oldi, newi); + } +} + +void li_angel_plugin_instance_reached_state(liServer *srv, liInstance *inst, liInstanceState s) { + liPlugins *ps = &srv->plugins; + guint i; + + for (i = 0; i < ps->plugins->len; i++) { + liPlugin *p = g_ptr_array_index(ps->plugins, i); + if (p->handle_instance_reached_state) p->handle_instance_reached_state(srv, p, inst, s); + } +} diff --git a/src/angel/angel_plugin_core.c b/src/angel/angel_plugin_core.c index 966c711..3123fbb 100644 --- a/src/angel/angel_plugin_core.c +++ b/src/angel/angel_plugin_core.c @@ -2,6 +2,22 @@ #include #include +typedef struct listen_socket listen_socket; +typedef struct listen_ref_resource listen_ref_resource; + +struct listen_socket { + gint refcount; + + liSocketAddress addr; + int fd; +}; + +struct listen_ref_resource { + liInstanceResource ires; + + listen_socket *sock; +}; + #include #include @@ -255,48 +271,134 @@ static const liPluginItem core_items[] = { { NULL, NULL, NULL } }; -static int do_listen(liServer *srv, liPluginCoreConfig *config, GString *str) { - guint32 ipv4; -#ifdef HAVE_IPV6 - guint8 ipv6[16]; -#endif - guint16 port; +static listen_socket* listen_new_socket(liSocketAddress *addr, int fd) { + listen_socket *sock = g_slice_new0(listen_socket); + + sock->refcount = 0; + + sock->addr = *addr; + sock->fd = fd; + + return sock; +} + +static void listen_socket_acquire(listen_socket *sock) { + g_atomic_int_inc(&sock->refcount); +} + +static void listen_ref_release(liServer *srv, liInstance *i, liPlugin *p, liInstanceResource *res) { + listen_ref_resource *ref = res->data; + listen_socket *sock = ref->sock; + UNUSED(i); + UNUSED(srv); + + assert(g_atomic_int_get(&sock->refcount) > 0); + if (g_atomic_int_dec_and_test(&sock->refcount)) { + liPluginCoreConfig *config = (liPluginCoreConfig*) p->data; + + g_hash_table_remove(config->listen_sockets, &sock->addr); + } + + g_slice_free(listen_ref_resource, ref); +} + +static void _listen_socket_free(gpointer ptr) { + listen_socket *sock = ptr; + + li_sockaddr_clear(&sock->addr); + close(sock->fd); + + g_slice_free(listen_socket, sock); +} + +static void listen_socket_add(liInstance *i, liPlugin *p, listen_socket *sock) { + listen_ref_resource *ref = g_slice_new0(listen_ref_resource); + + listen_socket_acquire(sock); + ref->sock = sock; + + li_instance_add_resource(i, &ref->ires, listen_ref_release, p, ref); +} + +static gboolean listen_check_acl(liServer *srv, liPluginCoreConfig *config, liSocketAddress *addr) { guint i; liPluginCoreListenMask *mask; - if (li_parse_ipv4(str->str, &ipv4, NULL, &port)) { - int s, v; - struct sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - - if (!port) port = 80; + switch (addr->addr->plain.sa_family) { + case AF_INET: { + struct sockaddr_in *ipv4 = &addr->addr->ipv4; + guint port = ntohs(ipv4->sin_port); if (config->listen_masks->len) { for (i = 0; i < config->listen_masks->len; i++) { mask = g_ptr_array_index(config->listen_masks, i); switch (mask->type) { case LI_PLUGIN_CORE_LISTEN_MASK_IPV4: - if (!li_ipv4_in_ipv4_net(ipv4, mask->value.ipv4.addr, mask->value.ipv4.networkmask)) continue; + if (!li_ipv4_in_ipv4_net(ipv4->sin_addr.s_addr, mask->value.ipv4.addr, mask->value.ipv4.networkmask)) continue; if ((mask->value.ipv4.port != port) && (mask->value.ipv4.port != 0 || (port != 80 && port != 443))) continue; - break; + return TRUE; case LI_PLUGIN_CORE_LISTEN_MASK_IPV6: - if (!li_ipv4_in_ipv6_net(ipv4, mask->value.ipv6.addr, mask->value.ipv6.network)) continue; + if (!li_ipv4_in_ipv6_net(ipv4->sin_addr.s_addr, mask->value.ipv6.addr, mask->value.ipv6.network)) continue; if ((mask->value.ipv6.port != port) && (mask->value.ipv6.port != 0 || (port != 80 && port != 443))) continue; - break; - case LI_PLUGIN_CORE_LISTEN_MASK_UNIX: + return TRUE; + default: continue; } - break; } - if (i == config->listen_masks->len) { - ERROR(srv, "listen to socket '%s' not allowed", str->str); - return -1; + return FALSE; + } else { + return (ipv4->sin_port == 80 || ipv4->sin_port == 443); + } + } break; +#ifdef HAVE_IPV6 + case AF_INET6: { + struct sockaddr_in6 *ipv6 = &addr->addr->ipv6; + guint port = ntohs(ipv6->sin6_port); + + if (config->listen_masks->len) { + for (i = 0; i < config->listen_masks->len; i++) { + mask = g_ptr_array_index(config->listen_masks, i); + switch (mask->type) { + case LI_PLUGIN_CORE_LISTEN_MASK_IPV4: + if (!li_ipv6_in_ipv4_net(ipv6->sin6_addr.s6_addr, mask->value.ipv4.addr, mask->value.ipv4.networkmask)) continue; + if ((mask->value.ipv4.port != port) && (mask->value.ipv4.port != 0 || (port != 80 && port != 443))) continue; + return TRUE; + case LI_PLUGIN_CORE_LISTEN_MASK_IPV6: + if (!li_ipv6_in_ipv6_net(ipv6->sin6_addr.s6_addr, mask->value.ipv6.addr, mask->value.ipv6.network)) continue; + if ((mask->value.ipv6.port != port) && (mask->value.ipv6.port != 0 || (port != 80 && port != 443))) continue; + return TRUE; + default: + continue; + } } + return FALSE; + } else { + return (ipv6->sin6_port == 80 || ipv6->sin6_port == 443); + } + } break; +#endif +#ifdef HAVE_SYS_UN_H + case AF_UNIX: { + if (config->listen_masks->len) { + /* TODO: support unix addresses */ + } else { + return FALSE; /* don't allow unix by default */ } + } break; +#endif + default: + ERROR(srv, "Address family %i not supported", addr->addr->plain.sa_family); + break; + } + return FALSE; +} + +static int do_listen(liServer *srv, liSocketAddress *addr, GString *str) { + int s, v; + GString *ipv6_str; - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = ipv4; - addr.sin_port = htons(port); + switch (addr->addr->plain.sa_family) { + case AF_INET: if (-1 == (s = socket(AF_INET, SOCK_STREAM, 0))) { ERROR(srv, "Couldn't open socket: %s", g_strerror(errno)); return -1; @@ -307,7 +409,7 @@ static int do_listen(liServer *srv, liPluginCoreConfig *config, GString *str) { ERROR(srv, "Couldn't setsockopt(SO_REUSEADDR): %s", g_strerror(errno)); return -1; } - if (-1 == bind(s, (struct sockaddr*)&addr, sizeof(addr))) { + if (-1 == bind(s, &addr->addr->plain, addr->len)) { close(s); ERROR(srv, "Couldn't bind socket to '%s': %s", str->str, g_strerror(errno)); return -1; @@ -317,43 +419,13 @@ static int do_listen(liServer *srv, liPluginCoreConfig *config, GString *str) { ERROR(srv, "Couldn't listen on '%s': %s", str->str, g_strerror(errno)); return -1; } - DEBUG(srv, "listen to ipv4: '%s' port: %d", str->str, port); + DEBUG(srv, "listen to ipv4: '%s' port: %d", str->str, addr->addr->ipv4.sin_port); return s; #ifdef HAVE_IPV6 - } else if (li_parse_ipv6(str->str, ipv6, NULL, &port)) { - GString *ipv6_str = g_string_sized_new(0); - int s, v; - struct sockaddr_in6 addr; - li_ipv6_tostring(ipv6_str, ipv6); - if (!port) port = 80; + case AF_INET6: + ipv6_str = g_string_sized_new(0); + li_ipv6_tostring(ipv6_str, addr->addr->ipv6.sin6_addr.s6_addr); - if (config->listen_masks->len) { - for (i = 0; i < config->listen_masks->len; i++) { - mask = g_ptr_array_index(config->listen_masks, i); - switch (mask->type) { - case LI_PLUGIN_CORE_LISTEN_MASK_IPV4: - if (!li_ipv6_in_ipv4_net(ipv6, mask->value.ipv4.addr, mask->value.ipv4.networkmask)) continue; - if ((mask->value.ipv4.port != port) && (mask->value.ipv4.port != 0 || (port != 80 && port != 443))) continue; - break; - case LI_PLUGIN_CORE_LISTEN_MASK_IPV6: - if (!li_ipv6_in_ipv6_net(ipv6, mask->value.ipv6.addr, mask->value.ipv6.network)) continue; - if ((mask->value.ipv6.port != port) && (mask->value.ipv6.port != 0 || (port != 80 && port != 443))) continue; - break; - case LI_PLUGIN_CORE_LISTEN_MASK_UNIX: - continue; - } - break; - } - if (i == config->listen_masks->len) { - ERROR(srv, "listen to socket '%s' not allowed", str->str); - return -1; - } - } - - memset(&addr, 0, sizeof(addr)); - addr.sin6_family = AF_INET6; - memcpy(&addr.sin6_addr, ipv6, 16); - addr.sin6_port = htons(port); if (-1 == (s = socket(AF_INET6, SOCK_STREAM, 0))) { ERROR(srv, "Couldn't open socket: %s", g_strerror(errno)); g_string_free(ipv6_str, TRUE); @@ -372,7 +444,7 @@ static int do_listen(liServer *srv, liPluginCoreConfig *config, GString *str) { g_string_free(ipv6_str, TRUE); return -1; } - if (-1 == bind(s, (struct sockaddr*)&addr, sizeof(addr))) { + if (-1 == bind(s, &addr->addr->plain, addr->len)) { close(s); ERROR(srv, "Couldn't bind socket to '%s': %s", ipv6_str->str, g_strerror(errno)); g_string_free(ipv6_str, TRUE); @@ -384,32 +456,86 @@ static int do_listen(liServer *srv, liPluginCoreConfig *config, GString *str) { g_string_free(ipv6_str, TRUE); return -1; } - DEBUG(srv, "listen to ipv6: '%s' port: %d", ipv6_str->str, port); + DEBUG(srv, "listen to ipv6: '%s' port: %d", ipv6_str->str, addr->addr->ipv6.sin6_port); g_string_free(ipv6_str, TRUE); return s; #endif -/* TODO: listen unix socket */ - } else { - ERROR(srv, "Invalid ip: '%s'", str->str); - return -1; +#ifdef HAVE_SYS_UN_H + case AF_UNIX: + ERROR(srv, "Unix sockets not supported: %s", str->str); + /* TODO: support unix addresses */ + break; +#endif + default: + ERROR(srv, "Address family %i not supported", addr->addr->plain.sa_family); + break; } + return -1; } -static void core_listen(liServer *srv, liInstance *i, liPlugin *p, gint32 id, GString *data) { +static void core_listen(liServer *srv, liPlugin *p, liInstance *i, gint32 id, GString *data) { GError *err = NULL; gint fd; GArray *fds; liPluginCoreConfig *config = (liPluginCoreConfig*) p->data; + liSocketAddress addr; + listen_socket *sock; DEBUG(srv, "core_listen(%i) '%s'", id, data->str); if (-1 == id) return; /* ignore simple calls */ - fd = do_listen(srv, config, data); + addr = li_sockaddr_from_string(data, 80); + if (!addr.addr) { + GString *error = g_string_sized_new(0); + g_string_printf(error, "Invalid socket address: '%s'", data->str); + if (!li_angel_send_result(i->acon, id, error, NULL, NULL, &err)) { + ERROR(srv, "Couldn't send result: %s", err->message); + g_error_free(err); + } + return; + } + + if (!listen_check_acl(srv, config, &addr)) { + GString *error = g_string_sized_new(0); + li_sockaddr_clear(&addr); + g_string_printf(error, "Socket address not allowed: '%s'", data->str); + if (!li_angel_send_result(i->acon, id, error, NULL, NULL, &err)) { + ERROR(srv, "Couldn't send result: %s", err->message); + g_error_free(err); + } + return; + } + + if (NULL == (sock = g_hash_table_lookup(config->listen_sockets, &addr))) { + fd = do_listen(srv, &addr, data); + + if (-1 == fd) { + GString *error = g_string_sized_new(0); + li_sockaddr_clear(&addr); + g_string_printf(error, "Couldn't listen to '%s'", data->str); + if (!li_angel_send_result(i->acon, id, error, NULL, NULL, &err)) { + ERROR(srv, "Couldn't send result: %s", err->message); + g_error_free(err); + } + return; + } + + li_fd_init(fd); + sock = listen_new_socket(&addr, fd); + g_hash_table_insert(config->listen_sockets, &sock->addr, sock); + } else { + li_sockaddr_clear(&addr); + } + + listen_socket_add(i, p, sock); + + fd = dup(sock->fd); if (-1 == fd) { + /* socket ref will be released when instance is released */ GString *error = g_string_sized_new(0); - g_string_printf(error, "Couldn't listen to '%s'", data->str); + g_string_printf(error, "Couldn't duplicate fd"); if (!li_angel_send_result(i->acon, id, error, NULL, NULL, &err)) { ERROR(srv, "Couldn't send result: %s", err->message); g_error_free(err); @@ -427,7 +553,7 @@ static void core_listen(liServer *srv, liInstance *i, liPlugin *p, gint32 id, GS } } -static void core_reached_state(liServer *srv, liInstance *i, liPlugin *p, gint32 id, GString *data) { +static void core_reached_state(liServer *srv, liPlugin *p, liInstance *i, gint32 id, GString *data) { UNUSED(srv); UNUSED(p); UNUSED(id); @@ -448,6 +574,8 @@ static void core_free(liServer *srv, liPlugin *p) { liPluginCoreConfig *config = (liPluginCoreConfig*) p->data; guint i; + li_ev_safe_ref_and_stop(ev_signal_stop, srv->loop, &config->sig_hup); + core_clean(srv, p); if (config->instconf) { @@ -456,7 +584,7 @@ static void core_free(liServer *srv, liPlugin *p) { } if (config->inst) { - li_instance_set_state(config->inst, LI_INSTANCE_DOWN); + li_instance_set_state(config->inst, LI_INSTANCE_FINISHED); li_instance_release(config->inst); config->inst = NULL; } @@ -466,8 +594,11 @@ static void core_free(liServer *srv, liPlugin *p) { } g_ptr_array_free(config->listen_masks, TRUE); g_ptr_array_free(config->load_listen_masks, TRUE); + g_hash_table_destroy(config->listen_sockets); config->listen_masks = NULL; config->load_listen_masks = NULL; + + g_slice_free(liPluginCoreConfig, config); } static void core_clean(liServer *srv, liPlugin *p) { @@ -527,6 +658,31 @@ static void core_activate(liServer *srv, liPlugin *p) { } } +static void core_instance_replaced(liServer *srv, liPlugin *p, liInstance *oldi, liInstance *newi) { + liPluginCoreConfig *config = (liPluginCoreConfig*) p->data; + UNUSED(srv); + + if (oldi == config->inst && LI_INSTANCE_FINISHED == oldi->s_cur) { + li_instance_acquire(newi); + config->inst = newi; + li_instance_release(oldi); + } +} + +static void core_handle_sig_hup(struct ev_loop *loop, ev_signal *w, int revents) { + liPluginCoreConfig *config = w->data; + liInstance *oldi, *newi; + UNUSED(loop); + UNUSED(revents); + + if (NULL == (oldi = config->inst)) return; + + if (oldi->replace_by) return; + newi = li_server_new_instance(oldi->srv, config->instconf); + li_instance_replace(oldi, newi); + li_instance_release(newi); +} + static gboolean core_init(liServer *srv, liPlugin *p) { liPluginCoreConfig *config; UNUSED(srv); @@ -537,12 +693,19 @@ static gboolean core_init(liServer *srv, liPlugin *p) { p->handle_clean_config = core_clean; p->handle_check_config = core_check; p->handle_activate_config = core_activate; + p->handle_instance_replaced = core_instance_replaced; config->listen_masks = g_ptr_array_new(); config->load_listen_masks = g_ptr_array_new(); + config->listen_sockets = g_hash_table_new_full(li_hash_sockaddr, li_equal_sockaddr, NULL, _listen_socket_free); + + li_angel_plugin_add_angel_cb(p, "listen", core_listen); + li_angel_plugin_add_angel_cb(p, "reached-state", core_reached_state); - g_hash_table_insert(p->angel_callbacks, "listen", (gpointer)(intptr_t)core_listen); - g_hash_table_insert(p->angel_callbacks, "reached-state", (gpointer)(intptr_t)core_reached_state); + ev_signal_init(&config->sig_hup, core_handle_sig_hup, SIGHUP); + config->sig_hup.data = config; + ev_signal_start(srv->loop, &config->sig_hup); + ev_unref(srv->loop); return TRUE; } diff --git a/src/angel/angel_proc.c b/src/angel/angel_proc.c index 0611ef6..c06a644 100644 --- a/src/angel/angel_proc.c +++ b/src/angel/angel_proc.c @@ -9,7 +9,7 @@ static void read_pipe(liServer *srv, liErrorPipe *epipe, gboolean flush) { const ssize_t max_read = 8192; - ssize_t r, toread; + ssize_t r, toread = 0; GString *buf; int count = 10; @@ -104,8 +104,8 @@ void li_error_pipe_free(liErrorPipe *epipe) { ev_io_stop(srv->loop, &epipe->fd_watcher); li_error_pipe_flush(epipe); - if (-1 != epipe->fds[0]) close(epipe->fds[0]); - if (-1 != epipe->fds[1]) close(epipe->fds[1]); + if (-1 != epipe->fds[0]) { close(epipe->fds[0]); epipe->fds[0] = -1; } + if (-1 != epipe->fds[1]) { close(epipe->fds[1]); epipe->fds[1] = -1; } g_slice_free(liErrorPipe, epipe); } @@ -114,7 +114,7 @@ void li_error_pipe_free(liErrorPipe *epipe) { void li_error_pipe_activate(liErrorPipe *epipe) { liServer *srv = epipe->srv; - if (-1 != epipe->fds[1]) close(epipe->fds[1]); + if (-1 != epipe->fds[1]) { close(epipe->fds[1]); epipe->fds[1] = -1; } ev_io_start(srv->loop, &epipe->fd_watcher); } diff --git a/src/angel/angel_server.c b/src/angel/angel_server.c index 80f24ca..3d63e6a 100644 --- a/src/angel/angel_server.c +++ b/src/angel/angel_server.c @@ -95,7 +95,7 @@ static void instance_angel_call_cb(liAngelConnection *acon, return; } - cb(srv, i, p, id, data); + cb(srv, p, i, id, data); } static void instance_angel_close_cb(liAngelConnection *acon, GError *err) { @@ -200,12 +200,31 @@ liInstance* li_server_new_instance(liServer *srv, liInstanceConf *ic) { i->s_cur = i->s_dest = LI_INSTANCE_DOWN; ev_child_init(&i->child_watcher, instance_child_cb, -1, 0); i->child_watcher.data = i; + i->resources = g_ptr_array_new(); return i; } -void li_instance_replace(liInstance *oldi, liInstance *newi) { - /* TODO ??? */ +gboolean li_instance_replace(liInstance *oldi, liInstance *newi) { + if (oldi->replace_by || newi->replace) return FALSE; + oldi->replace_by = newi; + newi->replace = oldi; + li_instance_acquire(oldi); + li_instance_acquire(newi); + + li_instance_set_state(newi, LI_INSTANCE_WARMUP); + + return TRUE; +} + +static void li_instance_unset_replace(liInstance *oldi, liInstance *newi) { + g_assert(newi == oldi->replace_by); oldi->replace_by = NULL; + g_assert(oldi == newi->replace); newi->replace = NULL; + + li_angel_plugin_replaced_instance(oldi->srv, oldi, newi); + + li_instance_release(oldi); + li_instance_release(newi); } void li_instance_set_state(liInstance *i, liInstanceState s) { @@ -275,6 +294,9 @@ void li_instance_state_reached(liInstance *i, liInstanceState s) { } break; case LI_INSTANCE_SUSPENDED: + if (i->replace_by && i->replace_by->s_dest == LI_INSTANCE_WARMUP) { + li_instance_set_state(i->replace_by, LI_INSTANCE_RUNNING); + } switch (i->s_dest) { case LI_INSTANCE_DOWN: break; /* impossible */ @@ -294,7 +316,10 @@ void li_instance_state_reached(liInstance *i, liInstanceState s) { } break; case LI_INSTANCE_WARMUP: - /* TODO: replace another instance? */ + if (i->replace) { + /* stop old instance */ + li_instance_set_state(i->replace, LI_INSTANCE_FINISHED); + } break; case LI_INSTANCE_RUNNING: /* nothing to do, instance should already know what to do */ @@ -304,7 +329,19 @@ void li_instance_state_reached(liInstance *i, liInstanceState s) { break; case LI_INSTANCE_FINISHED: if (i->s_dest != LI_INSTANCE_FINISHED) { - /* TODO: replacing another instance failed? */ + if (i->replace) { + ERROR(i->srv, "%s", "Replacing instance failed, continue old instance"); + li_instance_set_state(i->replace, LI_INSTANCE_RUNNING); + + li_instance_unset_replace(i->replace, i); + } + } else { + if (i->replace_by) { + if (i->replace_by->s_dest == LI_INSTANCE_WARMUP) { + li_instance_set_state(i->replace_by, LI_INSTANCE_RUNNING); + } + li_instance_unset_replace(i, i->replace_by); + } } break; } @@ -317,12 +354,16 @@ void li_instance_state_reached(liInstance *i, liInstanceState s) { } else { li_instance_state_reached(i, LI_INSTANCE_FINISHED); } + } else { + li_angel_plugin_instance_reached_state(i->srv, i, s); } } void li_instance_release(liInstance *i) { liServer *srv; liInstance *t; + guint j; + if (!i) return; srv = i->srv; @@ -342,6 +383,14 @@ void li_instance_release(liInstance *i) { t = i->replace_by; i->replace_by = NULL; li_instance_release(t); + for (j = 0; j < i->resources->len; j++) { + liInstanceResource *res = g_ptr_array_index(i->resources, j); + res->ndx = -1; + res->free_cb(srv, i, res->plugin, res); + } + + g_ptr_array_free(i->resources, TRUE); + g_slice_free(liInstance, i); } @@ -369,6 +418,8 @@ void li_instance_conf_release(liInstanceConf *ic) { if (!ic) return; assert(g_atomic_int_get(&ic->refcount) > 0); if (!g_atomic_int_dec_and_test(&ic->refcount)) return; + + if (ic->username) g_string_free(ic->username, TRUE); g_strfreev(ic->cmd); g_strfreev(ic->env); g_slice_free(liInstanceConf, ic); @@ -378,3 +429,21 @@ void li_instance_conf_acquire(liInstanceConf *ic) { assert(g_atomic_int_get(&ic->refcount) > 0); g_atomic_int_inc(&ic->refcount); } + +void li_instance_add_resource(liInstance *i, liInstanceResource *res, liInstanceResourceFreeCB free_cb, liPlugin *p, gpointer data) { + res->free_cb = free_cb; + res->data = data; + res->plugin = p; + res->ndx = i->resources->len; + + g_ptr_array_add(i->resources, res); +} + +void li_instance_rem_resource(liInstance *i, liInstanceResource *res) { + liInstanceResource *res2; + g_assert(res == g_ptr_array_index(i->resources, res->ndx)); + + g_ptr_array_remove_index_fast(i->resources, res->ndx); + res2 = g_ptr_array_index(i->resources, res->ndx); + res2->ndx = res->ndx; +} diff --git a/src/common/utils.c b/src/common/utils.c index 0c626ed..4074a40 100644 --- a/src/common/utils.c +++ b/src/common/utils.c @@ -101,12 +101,15 @@ gint li_send_fd(gint s, gint fd) { /* write fd to unix socket s */ struct iovec iov; #ifdef CMSG_FIRSTHDR struct cmsghdr *cmsg; -#ifndef CMSG_SPACE -#define CMSG_SPACE(x) x+100 -#endif +# ifndef CMSG_SPACE +# define CMSG_SPACE(x) x+100 +# endif gchar buf[CMSG_SPACE(sizeof(gint))]; #endif + memset(&msg, 0, sizeof(msg)); + memset(&iov, 0, sizeof(iov)); + iov.iov_len = 1; iov.iov_base = "x"; msg.msg_iov = &iov; @@ -161,6 +164,9 @@ gint li_receive_fd(gint s, gint *fd) { /* read fd from unix socket s */ gchar x[100]; gchar name[100]; + memset(&msg, 0, sizeof(msg)); + memset(&iov, 0, sizeof(iov)); + iov.iov_base = x; iov.iov_len = 100; msg.msg_name = name;