Browse Source

Add the "collect" framework, which collects results from a function executed in each worker context

personal/stbuehler/wip
Stefan Bühler 14 years ago
parent
commit
a88ca5238c
  1. 1
      src/CMakeLists.txt
  2. 110
      src/collect.c
  3. 52
      src/collect.h
  4. 6
      src/server.c
  5. 16
      src/worker.c
  6. 4
      src/worker.h
  7. 1
      src/wscript

1
src/CMakeLists.txt

@ -268,6 +268,7 @@ SET(COMMON_SRC
base.c
chunk.c
chunk_parser.c
collect.c
condition.c
condition_parsers.c
config_parser.c

110
src/collect.c

@ -0,0 +1,110 @@
#include "base.h"
#include "collect.h"
struct collect_job;
typedef struct collect_job collect_job;
struct collect_job {
enum { COLLECT_FUNC, COLLECT_CB } type;
collect_info *ci;
};
static collect_info* collect_info_new(worker *ctx, CollectFunc func, gpointer fdata, CollectFree free_fdata, CollectCallback cb, gpointer cbdata) {
collect_info *ci = g_slice_new(collect_info);
ci->wrk = ctx;
ci->counter = ctx->srv->worker_count;
ci->stopped = FALSE;
ci->func = func;
ci->fdata = fdata;
ci->free_fdata = free_fdata;
ci->cb = cb;
ci->cbdata = cbdata;
ci->results = g_ptr_array_sized_new(ctx->srv->worker_count);
g_ptr_array_set_size(ci->results, ctx->srv->worker_count);
return ci;
}
static void collect_info_free(collect_info *ci) {
g_ptr_array_free(ci->results, TRUE);
g_slice_free(collect_info, ci);
}
static void collect_insert_callback(worker *ctx, collect_info *ci) {
if (ctx == ci->wrk) {
/* we are in the destiation context */
ci->cb(ci->cbdata, ci->fdata, ci->results, !ci->stopped);
collect_info_free(ci);
} else {
worker *wrk = ci->wrk;
collect_job *j = g_slice_new(collect_job);
j->type = COLLECT_CB;
j->ci = ci;
g_async_queue_push(wrk->collect_queue, j);
ev_async_send(wrk->loop, &wrk->collect_watcher);
}
}
static void collect_send_result(worker *ctx, collect_info *ci) {
if (!g_atomic_int_dec_and_test(&ci->counter)) return; /* not all workers done yet */
if (g_atomic_int_get(&ctx->srv->exiting)) {
/* cleanup state, just call the callback with complete = FALSE */
ci->cb(ci->cbdata, ci->fdata, ci->results, FALSE);
collect_info_free(ci);
} else {
/* no worker is freed yet */
collect_insert_callback(ctx, ci);
}
}
static void collect_insert_func(worker *ctx, collect_info *ci) {
guint i;
server *srv = ctx->srv;
for (i = 0; i < srv->worker_count; i++) {
worker *wrk;
wrk = g_array_index(srv->workers, worker*, i);
if (wrk == ctx) {
/* we are in the destiation context */
g_ptr_array_index(ci->results, wrk->ndx) = ci->func(wrk, ci->fdata);
collect_send_result(wrk, ci);
} else {
collect_job *j = g_slice_new(collect_job);
j->type = COLLECT_FUNC;
j->ci = ci;
g_async_queue_push(wrk->collect_queue, j);
ev_async_send(wrk->loop, &wrk->collect_watcher);
}
}
}
collect_info* collect_start(worker *ctx, CollectFunc func, gpointer fdata, CollectFree free_fdata, CollectCallback cb, gpointer cbdata) {
collect_info *ci = collect_info_new(ctx, func, fdata, free_fdata, cb, cbdata);
collect_insert_func(ctx, ci);
return ci;
}
void collect_break(collect_info* ci) {
ci->stopped = TRUE;
}
void collect_watcher_cb(struct ev_loop *loop, ev_async *w, int revents) {
worker *wrk = (worker*) w->data;
collect_job *j;
UNUSED(loop);
UNUSED(revents);
while (NULL != (j = (collect_job*) g_async_queue_try_pop(wrk->collect_queue))) {
collect_info *ci = j->ci;
switch (j->type) {
case COLLECT_FUNC:
g_ptr_array_index(ci->results, wrk->ndx) = ci->func(wrk, ci->fdata);
collect_send_result(wrk, ci);
break;
case COLLECT_CB:
ci->cb(ci->cbdata, ci->fdata, ci->results, !ci->stopped);
collect_info_free(ci);
break;
}
g_slice_free(collect_job, j);
}
}

52
src/collect.h

@ -0,0 +1,52 @@
#ifndef _LIGHTTPD_COLLECT_H_
#define _LIGHTTPD_COLLECT_H_
/* executes a function in each worker context */
/** CollectFunc: the type of functions to execute in each workers context
* - wrk: the current worker
* - fdata: optional user data
* this data either must be persistent data, i.e. not connection related,
* or you have to specify a CollectFree func for it
* the return value will be placed in the GArray
*/
typedef gpointer (*CollectFunc)(worker *wrk, gpointer fdata);
/** CollectCallback: the type of functions to call after a function was called in each workers context
* - cbdata: optional callback data
* depending on the data you should only use it when complete == TRUE
* - fdata : the data the CollectFunc got
* - result: the return values
* - complete: determines if the function was called in every context or was cancelled
* if this is FALSE, it may be called from another context than collect_start was called
*/
typedef void (*CollectCallback)(gpointer cbdata, gpointer fdata, GPtrArray *result, gboolean complete);
typedef void (*CollectFree)(gpointer data);
struct collect_info;
typedef struct collect_info collect_info;
/* internal structure */
struct collect_info {
worker *wrk;
gint counter;
gboolean stopped;
CollectFunc func;
gpointer fdata;
CollectFree free_fdata;
CollectCallback cb;
gpointer cbdata;
GPtrArray *results;
};
LI_API collect_info* collect_start(worker *ctx, CollectFunc func, gpointer fdata, CollectFree free_fdata, CollectCallback cb, gpointer cbdata);
LI_API void collect_break(collect_info* ci); /** this will result in complete == FALSE in the callback */
/* internal functions */
LI_API void collect_watcher_cb(struct ev_loop *loop, ev_async *w, int revents);
#endif

6
src/server.c

@ -32,7 +32,7 @@ static void sigint_cb(struct ev_loop *loop, struct ev_signal *w, int revents) {
server *srv = (server*) w->data;
UNUSED(revents);
if (!g_atomic_int_get(&srv->exiting)) {
if (g_atomic_int_get(&srv->state) != SERVER_STOPPING) {
INFO(srv, "Got signal, shutdown");
server_stop(srv);
} else {
@ -80,6 +80,7 @@ void server_free(server* srv) {
if (!srv) return;
server_stop(srv);
g_atomic_int_set(&srv->exiting, TRUE);
/* join all workers */
{
@ -295,7 +296,6 @@ void server_start(server *srv) {
void server_stop(server *srv) {
guint i;
g_atomic_int_set(&srv->exiting, TRUE);
if (g_atomic_int_get(&srv->state) == SERVER_STOPPING) return;
g_atomic_int_set(&srv->state, SERVER_STOPPING);
@ -318,6 +318,8 @@ void server_stop(server *srv) {
void server_exit(server *srv) {
server_stop(srv);
g_atomic_int_set(&srv->exiting, TRUE);
/* exit all workers */
{
guint i;

16
src/worker.c

@ -2,6 +2,7 @@
#include <sched.h>
#include "base.h"
#include "collect.h"
static connection* worker_con_get(worker *wrk);
void worker_con_put(connection *con);
@ -32,7 +33,7 @@ void worker_add_closing_socket(worker *wrk, int fd) {
worker_closing_socket *scs = g_slice_new0(worker_closing_socket);
shutdown(fd, SHUT_WR);
if (g_atomic_int_get(&wrk->srv->exiting)) {
if (g_atomic_int_get(&wrk->srv->state) == SERVER_STOPPING) {
shutdown(fd, SHUT_RD);
close(fd);
return;
@ -176,7 +177,7 @@ static void worker_stat_watcher_cb(struct ev_loop *loop, ev_timer *w, int revent
wrk->stats.requests_per_sec =
(wrk->stats.requests - wrk->stats.last_requests) / (now - wrk->stats.last_update);
if (wrk->stats.requests_per_sec > 0)
TRACE(wrk->srv, "worker %u: %2f requests per second", wrk->ndx, wrk->stats.requests_per_sec);
TRACE(wrk->srv, "worker %u: %.2f requests per second", wrk->ndx, wrk->stats.requests_per_sec);
}
wrk->stats.last_requests = wrk->stats.requests;
@ -221,6 +222,12 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) {
ev_timer_start(wrk->loop, &wrk->stat_watcher);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
ev_init(&wrk->collect_watcher, collect_watcher_cb);
wrk->collect_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->collect_watcher);
wrk->collect_queue = g_async_queue_new();
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
return wrk;
}
@ -262,6 +269,11 @@ void worker_free(worker *wrk) {
ev_ref(wrk->loop);
ev_timer_stop(wrk->loop, &wrk->stat_watcher);
ev_ref(wrk->loop);
ev_async_stop(wrk->loop, &wrk->collect_watcher);
collect_watcher_cb(wrk->loop, &wrk->collect_watcher, 0);
g_async_queue_unref(wrk->collect_queue);
g_slice_free(worker, wrk);
}

4
src/worker.h

@ -69,6 +69,10 @@ struct worker {
ev_timer stat_watcher;
statistics_t stats;
/* collect framework */
ev_async collect_watcher;
GAsyncQueue *collect_queue;
};
LI_API worker* worker_new(struct server *srv, struct ev_loop *loop);

1
src/wscript

@ -11,6 +11,7 @@ common_source='''
base.c
chunk.c
chunk_parser.c
collect.c
condition.c
condition_parsers.rl
config_parser.rl

Loading…
Cancel
Save