Move the connection tables into the workers, so no locking needed for them.

personal/stbuehler/wip
Stefan Bühler 15 years ago
parent 2b9147ba50
commit 00143835d4

@ -5,7 +5,7 @@
#include "plugin_core.h"
/* only call it from the worker context the con belongs to */
void con_put(connection *con); /* server.c */
void worker_con_put(connection *con); /* worker.c */
void internal_error(connection *con) {
if (con->response_headers_sent) {
@ -130,11 +130,13 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
static void connection_keepalive_cb(struct ev_loop *loop, ev_timer *w, int revents) {
connection *con = (connection*) w->data;
UNUSED(loop); UNUSED(revents);
con_put(con);
worker_con_put(con);
}
connection* connection_new(server *srv) {
connection* connection_new(worker *wrk) {
server *srv = wrk->srv;
connection *con = g_slice_new0(connection);
con->wrk = wrk;
con->srv = srv;
con->state = CON_STATE_DEAD;
@ -217,7 +219,7 @@ void connection_reset_keep_alive(connection *con) {
{
con->keep_alive_data.max_idle = GPOINTER_TO_INT(CORE_OPTION(CORE_OPTION_MAX_KEEP_ALIVE_IDLE));
if (con->keep_alive_data.max_idle == 0) {
con_put(con);
worker_con_put(con);
return;
}
if (con->keep_alive_data.max_idle >= con->srv->keep_alive_queue_timeout) {
@ -463,7 +465,7 @@ void connection_state_machine(connection *con) {
if (con->keep_alive) {
connection_reset_keep_alive(con);
} else {
con_put(con);
worker_con_put(con);
done = TRUE;
}
break;
@ -475,7 +477,7 @@ void connection_state_machine(connection *con) {
plugins_handle_close(con);
con_put(con);
worker_con_put(con);
done = TRUE;
break;
@ -486,7 +488,7 @@ void connection_state_machine(connection *con) {
plugins_handle_close(con);
con_put(con);
worker_con_put(con);
done = TRUE;
break;
}

@ -99,7 +99,7 @@ struct connection {
} keep_alive_data;
};
LI_API connection* connection_new(server *srv);
LI_API connection* connection_new(worker *wrk);
LI_API void connection_reset(connection *con);
LI_API void connection_free(connection *con);

@ -3,8 +3,6 @@
#include "utils.h"
#include "plugin_core.h"
void con_put(connection *con);
static void server_option_free(gpointer _so) {
g_slice_free(server_option, _so);
}
@ -60,9 +58,6 @@ server* server_new() {
srv->workers = g_array_new(FALSE, TRUE, sizeof(worker*));
g_static_rec_mutex_init(&srv->lock_con);
srv->connections_active = 0;
srv->connections = g_array_new(FALSE, TRUE, sizeof(connection*));
srv->sockets = g_array_new(FALSE, TRUE, sizeof(server_socket*));
srv->plugins = g_hash_table_new(g_str_hash, g_str_equal);
@ -97,22 +92,6 @@ void server_free(server* srv) {
}
}
{ /* close connections */
guint i;
if (srv->connections_active > 0) {
ERROR(srv, "Server shutdown with unclosed connections: %u", srv->connections_active);
for (i = srv->connections_active; i-- > 0;) {
connection *con = g_array_index(srv->connections, connection*, i);
connection_set_state(con, CON_STATE_ERROR);
connection_state_machine(con); /* cleanup plugins */
}
}
for (i = 0; i < srv->connections->len; i++) {
connection_free(g_array_index(srv->connections, connection*, i));
}
g_array_free(srv->connections, TRUE);
}
/* free all workers */
{
guint i;
@ -147,6 +126,7 @@ void server_free(server* srv) {
g_array_free(srv->plugins_handle_close, TRUE);
action_release(srv, srv->mainaction);
g_slice_free1(srv->option_count * sizeof(*srv->option_def_values), srv->option_def_values);
/* free logs */
g_thread_join(srv->log_thread);
@ -207,43 +187,6 @@ gboolean server_loop_init(server *srv) {
return TRUE;
}
static connection* con_get(server *srv) {
connection *con;
WORKER_LOCK(srv, &srv->lock_con);
if (srv->connections_active >= srv->connections->len) {
con = connection_new(srv);
con->idx = srv->connections_active;
g_array_append_val(srv->connections, con);
} else {
con = g_array_index(srv->connections, connection*, srv->connections_active);
}
g_atomic_int_inc((gint*) &srv->connections_active);
WORKER_UNLOCK(srv, &srv->lock_con);
return con;
}
void con_put(connection *con) {
server *srv = con->srv;
connection_reset(con);
g_atomic_int_add((gint*) &con->wrk->connection_load, -1);
WORKER_LOCK(srv, &srv->lock_con);
con->wrk = NULL;
g_atomic_int_add((gint*) &srv->connections_active, -1);
if (con->idx != srv->connections_active) {
/* Swap [con->idx] and [srv->connections_active] */
connection *tmp;
assert(con->idx < srv->connections_active); /* con must be an active connection) */
tmp = g_array_index(srv->connections, connection*, srv->connections_active);
tmp->idx = con->idx;
con->idx = srv->connections_active;
g_array_index(srv->connections, connection*, con->idx) = con;
g_array_index(srv->connections, connection*, tmp->idx) = tmp;
}
WORKER_UNLOCK(srv, &srv->lock_con);
}
static void server_listen_cb(struct ev_loop *loop, ev_io *w, int revents) {
server_socket *sock = (server_socket*) w->data;
server *srv = sock->srv;
@ -254,7 +197,6 @@ static void server_listen_cb(struct ev_loop *loop, ev_io *w, int revents) {
UNUSED(revents);
while (-1 != (s = accept(w->fd, (struct sockaddr*) &remote_addr, &l))) {
connection *con = con_get(srv);
worker *wrk = srv->main_worker;
guint i, min_load = g_atomic_int_get(&wrk->connection_load), sel = 0;
@ -269,12 +211,8 @@ static void server_listen_cb(struct ev_loop *loop, ev_io *w, int revents) {
}
g_atomic_int_inc((gint*) &wrk->connection_load);
/* TRACE(srv, "selected worker %u with load %u", sel, min_load); */
con->wrk = wrk;
con->state = CON_STATE_REQUEST_START;
con->remote_addr = remote_addr;
ev_io_set(&con->sock_watcher, s, EV_READ);
worker_new_con(srv->main_worker, con);
TRACE(srv, "selected worker %u with load %u", sel, min_load);
worker_new_con(srv->main_worker, wrk, &remote_addr, s);
}
#ifdef _WIN32

@ -49,15 +49,6 @@ struct server {
ev_prepare srv_prepare;
ev_check srv_check;
/** this lock protects: (atomic access means here: normal read + atomic write with lock, atomic read without)
* srv->connections_active (atomic access)
* srv->connections
* wrk->connection_load (atomic access)
*/
GStaticRecMutex lock_con;
guint connections_active; /** 0..con_act-1: active connections, con_act..used-1: free connections */
GArray *connections; /** array of (connection*) */
GArray *sockets; /** array of (server_socket*) */
GHashTable *plugins; /**< const gchar* => (plugin*) */

@ -3,7 +3,8 @@
#include "base.h"
void con_put(connection *con);
static connection* worker_con_get(worker *wrk);
void worker_con_put(connection *con);
/* closing sockets - wait for proper shutdown */
@ -83,7 +84,7 @@ static void worker_keepalive_cb(struct ev_loop *loop, ev_timer *w, int revents)
ev_timer_start(wrk->loop, &con->keep_alive_data.watcher);
} else {
/* close it */
con_put(con);
worker_con_put(con);
}
}
@ -126,25 +127,40 @@ static void worker_exit_cb(struct ev_loop *loop, ev_async *w, int revents) {
worker_exit(wrk, wrk);
}
struct worker_new_con_data;
typedef struct worker_new_con_data worker_new_con_data;
struct worker_new_con_data {
sock_addr remote_addr;
int s;
};
/* new con watcher */
void worker_new_con(worker *wrk, connection *con) {
if (wrk == con->wrk) {
void worker_new_con(worker *ctx, worker *wrk, sock_addr *remote_addr, int s) {
if (ctx == wrk) {
connection *con = worker_con_get(wrk);
con->state = CON_STATE_REQUEST_START;
con->remote_addr = *remote_addr;
ev_io_set(&con->sock_watcher, s, EV_READ);
ev_io_start(wrk->loop, &con->sock_watcher);
} else {
wrk = con->wrk;
g_async_queue_push(wrk->new_con_queue, con);
worker_new_con_data *d = g_slice_new(worker_new_con_data);
d->remote_addr = *remote_addr;
d->s = s;
g_async_queue_push(wrk->new_con_queue, d);
ev_async_send(wrk->loop, &wrk->new_con_watcher);
}
}
static void worker_new_con_cb(struct ev_loop *loop, ev_async *w, int revents) {
worker *wrk = (worker*) w->data;
connection *con;
worker_new_con_data *d;
UNUSED(loop);
UNUSED(revents);
while (NULL != (con = g_async_queue_try_pop(wrk->new_con_queue))) {
worker_new_con(wrk, con);
while (NULL != (d = g_async_queue_try_pop(wrk->new_con_queue))) {
worker_new_con(wrk, wrk, &d->remote_addr, d->s);
g_slice_free(worker_new_con_data, d);
}
}
@ -159,6 +175,9 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) {
ev_init(&wrk->keep_alive_timer, worker_keepalive_cb);
wrk->keep_alive_timer.data = wrk;
wrk->connections_active = 0;
wrk->connections = g_array_new(FALSE, TRUE, sizeof(connection*));
wrk->tmp_str = g_string_sized_new(255);
wrk->last_generated_date_ts = 0;
@ -184,6 +203,22 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) {
void worker_free(worker *wrk) {
if (!wrk) return;
{ /* close connections */
guint i;
if (wrk->connections_active > 0) {
ERROR(wrk->srv, "Server shutdown with unclosed connections: %u", wrk->connections_active);
for (i = wrk->connections_active; i-- > 0;) {
connection *con = g_array_index(wrk->connections, connection*, i);
connection_set_state(con, CON_STATE_ERROR);
connection_state_machine(con); /* cleanup plugins */
}
}
for (i = 0; i < wrk->connections->len; i++) {
connection_free(g_array_index(wrk->connections, connection*, i));
}
g_array_free(wrk->connections, TRUE);
}
{ /* force closing sockets */
GList *iter;
for (iter = g_queue_peek_head_link(&wrk->closing_sockets); iter; iter = g_list_next(iter)) {
@ -225,18 +260,18 @@ void worker_run(worker *wrk) {
void worker_stop(worker *context, worker *wrk) {
if (context == wrk) {
guint i;
server *srv = wrk->srv;
ev_async_stop(wrk->loop, &wrk->worker_stop_watcher);
ev_async_stop(wrk->loop, &wrk->new_con_watcher);
worker_new_con_cb(wrk->loop, &wrk->new_con_watcher, 0); /* handle remaining new connections */
WORKER_LOCK(srv, &srv->lock_con);
for (i = srv->connections_active; i-- > 0;) {
connection *con = g_array_index(srv->connections, connection*, i);
if (con->wrk == wrk && con->state == CON_STATE_KEEP_ALIVE)
con_put(con);
/* close keep alive connections */
for (i = wrk->connections_active; i-- > 0;) {
connection *con = g_array_index(wrk->connections, connection*, i);
if (con->state == CON_STATE_KEEP_ALIVE)
worker_con_put(con);
}
WORKER_UNLOCK(srv, &srv->lock_con);
worker_check_keepalive(wrk);
{ /* force closing sockets */
@ -257,3 +292,37 @@ void worker_exit(worker *context, worker *wrk) {
ev_async_send(wrk->loop, &wrk->worker_exit_watcher);
}
}
static connection* worker_con_get(worker *wrk) {
connection *con;
if (wrk->connections_active >= wrk->connections->len) {
con = connection_new(wrk);
con->idx = wrk->connections_active;
g_array_append_val(wrk->connections, con);
} else {
con = g_array_index(wrk->connections, connection*, wrk->connections_active);
}
g_atomic_int_inc((gint*) &wrk->connections_active);
return con;
}
void worker_con_put(connection *con) {
worker *wrk = con->wrk;
connection_reset(con);
g_atomic_int_add((gint*) &wrk->connection_load, -1);
g_atomic_int_add((gint*) &wrk->connections_active, -1);
if (con->idx != wrk->connections_active) {
/* Swap [con->idx] and [wrk->connections_active] */
connection *tmp;
assert(con->idx < wrk->connections_active); /* con must be an active connection */
tmp = g_array_index(wrk->connections, connection*, wrk->connections_active);
tmp->idx = con->idx;
con->idx = wrk->connections_active;
g_array_index(wrk->connections, connection*, con->idx) = con;
g_array_index(wrk->connections, connection*, tmp->idx) = tmp;
}
}

@ -27,6 +27,11 @@ struct worker {
ev_check loop_check;
ev_async worker_stop_watcher, worker_exit_watcher;
guint connections_active; /** 0..con_act-1: active connections, con_act..used-1: free connections
* use with atomic, read direct from local worker context
*/
GArray *connections; /** array of (connection*), use only from local worker context */
GQueue closing_sockets; /** wait for EOF before shutdown(SHUT_RD) and close() */
GString *tmp_str; /**< can be used everywhere for local temporary needed strings */
@ -35,7 +40,7 @@ struct worker {
ev_timer keep_alive_timer;
GQueue keep_alive_queue;
guint connection_load;
guint connection_load; /** incremented by server_accept_cb, decremented by worker_con_put. use atomic access */
time_t last_generated_date_ts;
GString *ts_date_str; /**< use server_current_timestamp(srv) */
@ -53,7 +58,7 @@ LI_API void worker_run(worker *wrk);
LI_API void worker_stop(worker *context, worker *wrk);
LI_API void worker_exit(worker *context, worker *wrk);
LI_API void worker_new_con(worker *wrk, connection *con);
LI_API void worker_new_con(worker *ctx, worker *wrk, sock_addr *remote_addr, int s);
LI_API void worker_check_keepalive(worker *wrk);

Loading…
Cancel
Save