Added basic notify + locking for workers

personal/stbuehler/wip
Stefan Bühler 15 years ago
parent ee59334e9d
commit 0a4230cced

@ -4,6 +4,7 @@
#include "utils.h"
#include "plugin_core.h"
/* only call it from the worker context the con belongs to */
void con_put(connection *con); /* server.c */
void internal_error(connection *con) {

@ -17,16 +17,33 @@ static void server_setup_free(gpointer _ss) {
g_slice_free(server_setup, _ss);
}
#define CATCH_SIGNAL(loop, cb, n) do {\
ev_init(&srv->sig_w_##n, cb); \
ev_signal_set(&srv->sig_w_##n, SIG##n); \
ev_signal_start(loop, &srv->sig_w_##n); \
srv->sig_w_##n.data = srv; \
ev_unref(loop); /* Signal watchers shouldn't keep loop alive */ \
} while (0)
#define UNCATCH_SIGNAL(loop, n) do {\
ev_ref(loop); \
ev_signal_stop(loop, &srv->sig_w_##n); \
} while (0)
static void sigint_cb(struct ev_loop *loop, struct ev_signal *w, int revents) {
server *srv = (server*) w->data;
UNUSED(revents);
if (!srv->exiting) {
INFO(srv, "Got signal, shutdown");
server_exit(srv);
server_stop(srv);
} else {
INFO(srv, "Got second signal, force shutdown");
ev_unloop (loop, EVUNLOOP_ALL);
/* reset default behaviour which will kill us the third time */
UNCATCH_SIGNAL(loop, INT);
UNCATCH_SIGNAL(loop, TERM);
UNCATCH_SIGNAL(loop, PIPE);
}
}
@ -35,20 +52,15 @@ static void sigpipe_cb(struct ev_loop *loop, struct ev_signal *w, int revents) {
UNUSED(loop); UNUSED(w); UNUSED(revents);
}
#define CATCH_SIGNAL(loop, cb, n) do {\
ev_init(&srv->sig_w_##n, cb); \
ev_signal_set(&srv->sig_w_##n, SIG##n); \
ev_signal_start(loop, &srv->sig_w_##n); \
srv->sig_w_##n.data = srv; \
ev_unref(loop); /* Signal watchers shouldn't keep loop alive */ \
} while (0)
server* server_new() {
server* srv = g_slice_new0(server);
srv->magic = LIGHTTPD_SERVER_MAGIC;
srv->state = SERVER_STARTING;
srv->workers = g_array_new(FALSE, TRUE, sizeof(worker*));
g_static_rec_mutex_init(&srv->lock_con);
srv->connections_active = 0;
srv->connections = g_array_new(FALSE, TRUE, sizeof(connection*));
srv->sockets = g_array_new(FALSE, TRUE, sizeof(server_socket*));
@ -75,7 +87,16 @@ void server_free(server* srv) {
srv->exiting = TRUE;
server_stop(srv);
worker_free(srv->main_worker);
/* join all workers */
{
guint i;
for (i = 1; i < srv->worker_count; i++) {
worker *wrk;
wrk = g_array_index(srv->workers, worker*, i);
worker_exit(srv->main_worker, wrk);
g_thread_join(wrk->thread);
}
}
{ /* close connections */
guint i;
@ -93,6 +114,16 @@ void server_free(server* srv) {
g_array_free(srv->connections, TRUE);
}
/* free all workers */
{
guint i;
for (i = 0; i < srv->worker_count; i++) {
worker *wrk;
wrk = g_array_index(srv->workers, worker*, i);
worker_free(wrk);
}
}
{
guint i; for (i = 0; i < srv->sockets->len; i++) {
server_socket *sock = g_array_index(srv->sockets, server_socket*, i);
@ -129,7 +160,14 @@ void server_free(server* srv) {
g_slice_free(server, srv);
}
static gpointer server_worker_cb(gpointer data) {
worker *wrk = (worker*) data;
worker_run(wrk);
return NULL;
}
gboolean server_loop_init(server *srv) {
guint i;
struct ev_loop *loop = ev_default_loop(srv->loop_flags);
if (!loop) {
@ -141,13 +179,27 @@ gboolean server_loop_init(server *srv) {
CATCH_SIGNAL(loop, sigint_cb, TERM);
CATCH_SIGNAL(loop, sigpipe_cb, PIPE);
srv->main_worker = worker_new(srv, loop);
if (srv->worker_count < 1) srv->worker_count = 1;
g_array_set_size(srv->workers, srv->worker_count);
srv->main_worker = g_array_index(srv->workers, worker*, 0) = worker_new(srv, loop);
for (i = 1; i < srv->worker_count; i++) {
GError *error = NULL;
worker *wrk;
loop = ev_loop_new(srv->loop_flags);
wrk = g_array_index(srv->workers, worker*, i) = worker_new(srv, loop);
if (NULL == (wrk->thread = g_thread_create(server_worker_cb, wrk, TRUE, &error))) {
g_error ( "g_thread_create failed: %s", error->message );
return FALSE;
}
}
return TRUE;
}
static connection* con_get(server *srv) {
connection *con;
WORKER_LOCK(srv, &srv->lock_con);
if (srv->connections_active >= srv->connections->len) {
con = connection_new(srv);
con->idx = srv->connections_active++;
@ -155,6 +207,7 @@ static connection* con_get(server *srv) {
} else {
con = g_array_index(srv->connections, connection*, srv->connections_active++);
}
WORKER_UNLOCK(srv, &srv->lock_con);
return con;
}
@ -162,6 +215,8 @@ void con_put(connection *con) {
server *srv = con->srv;
connection_reset(con);
g_atomic_int_add((gint*) &con->wrk->connection_load, -1);
WORKER_LOCK(srv, &srv->lock_con);
con->wrk = NULL;
srv->connections_active--;
if (con->idx != srv->connections_active) {
@ -174,6 +229,7 @@ void con_put(connection *con) {
g_array_index(srv->connections, connection*, con->idx) = con;
g_array_index(srv->connections, connection*, tmp->idx) = tmp;
}
WORKER_UNLOCK(srv, &srv->lock_con);
}
static void server_listen_cb(struct ev_loop *loop, ev_io *w, int revents) {
@ -187,11 +243,24 @@ static void server_listen_cb(struct ev_loop *loop, ev_io *w, int revents) {
while (-1 != (s = accept(w->fd, (struct sockaddr*) &remote_addr, &l))) {
connection *con = con_get(srv);
con->wrk = srv->main_worker; /* TODO: balance workers; push con in a queue for the worker */
worker *wrk = srv->main_worker;
guint i, min_load = g_atomic_int_get(&wrk->connection_load);
for (i = 1; i < srv->worker_count; i++) {
worker *wt = g_array_index(srv->workers, worker*, i);
guint load = g_atomic_int_get(&wt->connection_load);
if (load < min_load) {
wrk = wt;
min_load = load;
}
}
g_atomic_int_inc((gint*) &wrk->connection_load);
con->wrk = wrk;
con->state = CON_STATE_REQUEST_START;
con->remote_addr = remote_addr;
ev_io_set(&con->sock_watcher, s, EV_READ);
ev_io_start(con->wrk->loop, &con->sock_watcher);
worker_new_con(srv->main_worker, con);
}
#ifdef _WIN32
@ -274,26 +343,38 @@ void server_start(server *srv) {
void server_stop(server *srv) {
guint i;
if (srv->state == SERVER_STOPPING) return;
srv->state = SERVER_STOPPING;
g_atomic_int_set(&srv->exiting, TRUE);
if (g_atomic_int_get(&srv->state) == SERVER_STOPPING) return;
g_atomic_int_set(&srv->state, SERVER_STOPPING);
for (i = 0; i < srv->sockets->len; i++) {
server_socket *sock = g_array_index(srv->sockets, server_socket*, i);
ev_io_stop(srv->main_worker->loop, &sock->watcher);
}
for (i = srv->connections_active; i-- > 0;) {
connection *con = g_array_index(srv->connections, connection*, i);
if (con->state == CON_STATE_KEEP_ALIVE)
con_put(con);
/* stop all workers */
for (i = 0; i < srv->worker_count; i++) {
worker *wrk;
wrk = g_array_index(srv->workers, worker*, i);
worker_stop(srv->main_worker, wrk);
}
log_thread_wakeup(srv);
}
void server_exit(server *srv) {
g_atomic_int_set(&srv->exiting, TRUE);
server_stop(srv);
log_thread_wakeup(srv);
/* exit all workers */
{
guint i;
for (i = 0; i < srv->worker_count; i++) {
worker *wrk;
wrk = g_array_index(srv->workers, worker*, i);
worker_exit(srv->main_worker, wrk);
}
}
}
void joblist_append(connection *con) {

@ -38,6 +38,9 @@ struct server {
server_state state;
struct worker *main_worker;
guint worker_count;
GArray *workers;
GThreadPool *worker_pool;
guint loop_flags;
ev_signal
@ -47,8 +50,15 @@ struct server {
ev_prepare srv_prepare;
ev_check srv_check;
/** this lock protects:
* srv->connections_active (read with g_atomic_get)
* srv->connections
* wkr->connection_load (read with g_atomic_get)
*/
GStaticRecMutex lock_con;
guint connections_active; /** 0..con_act-1: active connections, con_act..used-1: free connections */
GArray *connections; /** array of (connection*) */
GArray *sockets; /** array of (server_socket*) */
GHashTable *plugins; /**< const gchar* => (plugin*) */
@ -91,9 +101,9 @@ LI_API void server_listen(server *srv, int fd);
/* Start accepting connection, use log files, no new plugins after that */
LI_API void server_start(server *srv);
/* stop accepting connections, turn keep-alive off */
/* stop accepting connections, turn keep-alive off, close all shutdown sockets, set exiting = TRUE */
LI_API void server_stop(server *srv);
/* close connections, close logs, stop log-thread */
/* exit asap with cleanup */
LI_API void server_exit(server *srv);
LI_API void joblist_append(connection *con);

@ -39,11 +39,9 @@ void worker_add_closing_socket(worker *wrk, int fd) {
}
/* Kill it - frees fd */
/*
static void worker_rem_closing_socket(worker *wrk, worker_closing_socket *scs) {
ev_feed_fd_event(wrk->loop, scs->fd, EV_READ);
}
*/
/* Keep alive */
@ -105,6 +103,44 @@ GString *worker_current_timestamp(worker *wrk) {
return wrk->ts_date_str;
}
/* stop worker watcher */
static void worker_stop_cb(struct ev_loop *loop, ev_async *w, int revents) {
UNUSED(loop);
UNUSED(revents);
worker *wrk = (worker*) w->data;
worker_stop(wrk, wrk);
}
/* exit worker watcher */
static void worker_exit_cb(struct ev_loop *loop, ev_async *w, int revents) {
UNUSED(loop);
UNUSED(revents);
worker *wrk = (worker*) w->data;
worker_exit(wrk, wrk);
}
/* new con watcher */
void worker_new_con(worker *wrk, connection *con) {
if (wrk == con->wrk) {
ev_io_start(wrk->loop, &con->sock_watcher);
} else {
wrk = con->wrk;
g_async_queue_push(wrk->new_con_queue, con);
ev_async_send(wrk->loop, &wrk->new_con_watcher);
}
}
static void worker_new_con_cb(struct ev_loop *loop, ev_async *w, int revents) {
worker *wrk = (worker*) w->data;
connection *con;
UNUSED(loop);
UNUSED(revents);
while (NULL != (con = g_async_queue_try_pop(wrk->new_con_queue))) {
worker_new_con(wrk, con);
}
}
/* init */
worker* worker_new(struct server *srv, struct ev_loop *loop) {
@ -121,6 +157,20 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) {
wrk->last_generated_date_ts = 0;
wrk->ts_date_str = g_string_sized_new(255);
ev_init(&wrk->worker_exit_watcher, worker_exit_cb);
wrk->worker_exit_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->worker_exit_watcher);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive; it is never stopped */
ev_init(&wrk->worker_stop_watcher, worker_stop_cb);
wrk->worker_stop_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->worker_stop_watcher);
ev_init(&wrk->new_con_watcher, worker_new_con_cb);
wrk->new_con_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->new_con_watcher);
wrk->new_con_queue = g_async_queue_new();
return wrk;
}
@ -144,3 +194,39 @@ void worker_free(worker *wrk) {
void worker_run(worker *wrk) {
ev_loop(wrk->loop, 0);
}
void worker_stop(worker *context, worker *wrk) {
if (context == wrk) {
guint i;
server *srv = wrk->srv;
ev_async_stop(wrk->loop, &wrk->worker_stop_watcher);
ev_async_stop(wrk->loop, &wrk->new_con_watcher);
WORKER_LOCK(srv, &srv->lock_con);
for (i = srv->connections_active; i-- > 0;) {
connection *con = g_array_index(srv->connections, connection*, i);
if (con->wrk == wrk && con->state == CON_STATE_KEEP_ALIVE)
con_put(con);
}
WORKER_UNLOCK(srv, &srv->lock_con);
worker_check_keepalive(wrk);
{ /* force closing sockets */
GList *iter;
for (iter = g_queue_peek_head_link(&wrk->closing_sockets); iter; iter = g_list_next(iter)) {
worker_rem_closing_socket(wrk, (worker_closing_socket*) iter->data);
}
}
} else {
ev_async_send(wrk->loop, &wrk->worker_stop_watcher);
}
}
void worker_exit(worker *context, worker *wrk) {
if (context == wrk) {
ev_unloop (wrk->loop, EVUNLOOP_ALL);
} else {
ev_async_send(wrk->loop, &wrk->worker_exit_watcher);
}
}

@ -10,12 +10,21 @@ struct server;
#define CUR_TS(wrk) ((time_t)ev_now((wrk)->loop))
/* only locks if there is more than one worker */
#define WORKER_LOCK(srv, lock) \
if ((srv)->worker_count > 1) g_static_rec_mutex_lock(lock)
#define WORKER_UNLOCK(srv, lock) \
if ((srv)->worker_count > 1) g_static_rec_mutex_unlock(lock)
struct worker {
struct server *srv;
GThread *thread; /* managed by server.c */
struct ev_loop *loop;
ev_prepare loop_prepare;
ev_check loop_check;
ev_async worker_stop_watcher, worker_exit_watcher;
GQueue closing_sockets; /** wait for EOF before shutdown(SHUT_RD) and close() */
@ -29,12 +38,21 @@ struct worker {
time_t last_generated_date_ts;
GString *ts_date_str; /**< use server_current_timestamp(srv) */
/* incoming queues */
/* - new connections (after accept) */
ev_async new_con_watcher;
GAsyncQueue *new_con_queue;
};
LI_API worker* worker_new(struct server *srv, struct ev_loop *loop);
LI_API void worker_free(worker *wrk);
LI_API void worker_run(worker *wrk);
LI_API void worker_stop(worker *context, worker *wrk);
LI_API void worker_exit(worker *context, worker *wrk);
LI_API void worker_new_con(worker *wrk, connection *con);
LI_API void worker_check_keepalive(worker *wrk);

Loading…
Cancel
Save