Changed collect api: removed fdata_free (wasn't used) and return NULL from collect_start if collect finished before returning.

personal/stbuehler/wip
Stefan Bühler 14 years ago
parent 171ab51db5
commit d7e53a44de

@ -10,8 +10,6 @@
/** CollectFunc: the type of functions to execute in each workers context
* - wrk: the current worker
* - fdata: optional user data
* this data either must be persistent data, i.e. not connection related,
* or you have to specify a CollectFree func for it
* the return value will be placed in the GArray
*/
typedef gpointer (*CollectFunc)(worker *wrk, gpointer fdata);
@ -19,15 +17,13 @@ typedef gpointer (*CollectFunc)(worker *wrk, gpointer fdata);
/** CollectCallback: the type of functions to call after a function was called in each workers context
* - cbdata: optional callback data
* depending on the data you should only use it when complete == TRUE
* - fdata : the data the CollectFunc got
* - fdata : the data the CollectFunc got (this data must be valid until cb is called)
* - result: the return values
* - complete: determines if the function was called in every context or was cancelled
* - complete: determines if cbdata is still valid
* if this is FALSE, it may be called from another context than collect_start was called
*/
typedef void (*CollectCallback)(gpointer cbdata, gpointer fdata, GPtrArray *result, gboolean complete);
typedef void (*CollectFree)(gpointer data);
struct collect_info;
typedef struct collect_info collect_info;
@ -39,7 +35,6 @@ struct collect_info {
CollectFunc func;
gpointer fdata;
CollectFree free_fdata;
CollectCallback cb;
gpointer cbdata;
@ -47,8 +42,9 @@ struct collect_info {
GPtrArray *results;
};
LI_API collect_info* collect_start(worker *ctx, CollectFunc func, gpointer fdata, CollectFree free_fdata, CollectCallback cb, gpointer cbdata);
LI_API void collect_break(collect_info* ci); /** this will result in complete == FALSE in the callback */
/** collect_start returns NULL if the callback was called directly (e.g. for only one worker and ctx = wrk) */
LI_API collect_info* collect_start(worker *ctx, CollectFunc func, gpointer fdata, CollectCallback cb, gpointer cbdata);
LI_API void collect_break(collect_info* ci); /** this will result in complete == FALSE in the callback; call it if cbdata gets invalid */
/* internal functions */
LI_API void collect_watcher_cb(struct ev_loop *loop, ev_async *w, int revents);

@ -8,14 +8,13 @@ struct collect_job {
collect_info *ci;
};
static collect_info* collect_info_new(worker *ctx, CollectFunc func, gpointer fdata, CollectFree free_fdata, CollectCallback cb, gpointer cbdata) {
static collect_info* collect_info_new(worker *ctx, CollectFunc func, gpointer fdata, CollectCallback cb, gpointer cbdata) {
collect_info *ci = g_slice_new(collect_info);
ci->wrk = ctx;
ci->counter = ctx->srv->worker_count;
ci->stopped = FALSE;
ci->func = func;
ci->fdata = fdata;
ci->free_fdata = free_fdata;
ci->cb = cb;
ci->cbdata = cbdata;
ci->results = g_ptr_array_sized_new(ctx->srv->worker_count);
@ -28,11 +27,13 @@ static void collect_info_free(collect_info *ci) {
g_slice_free(collect_info, ci);
}
static void collect_insert_callback(worker *ctx, collect_info *ci) {
/* returns true if callback was called directly */
static gboolean collect_insert_callback(worker *ctx, collect_info *ci) {
if (ctx == ci->wrk) {
/* we are in the destiation context */
ci->cb(ci->cbdata, ci->fdata, ci->results, !ci->stopped);
collect_info_free(ci);
return TRUE;
} else {
worker *wrk = ci->wrk;
collect_job *j = g_slice_new(collect_job);
@ -41,21 +42,25 @@ static void collect_insert_callback(worker *ctx, collect_info *ci) {
g_async_queue_push(wrk->collect_queue, j);
ev_async_send(wrk->loop, &wrk->collect_watcher);
}
return FALSE;
}
static void collect_send_result(worker *ctx, collect_info *ci) {
if (!g_atomic_int_dec_and_test(&ci->counter)) return; /* not all workers done yet */
/* returns true if callback was called directly */
static gboolean collect_send_result(worker *ctx, collect_info *ci) {
if (!g_atomic_int_dec_and_test(&ci->counter)) return FALSE; /* not all workers done yet */
if (g_atomic_int_get(&ctx->srv->exiting)) {
/* cleanup state, just call the callback with complete = FALSE */
ci->cb(ci->cbdata, ci->fdata, ci->results, FALSE);
collect_info_free(ci);
return TRUE;
} else {
/* no worker is freed yet */
collect_insert_callback(ctx, ci);
return collect_insert_callback(ctx, ci);
}
}
static void collect_insert_func(worker *ctx, collect_info *ci) {
/* returns true if callback was called directly */
static gboolean collect_insert_func(worker *ctx, collect_info *ci) {
guint i;
server *srv = ctx->srv;
for (i = 0; i < srv->worker_count; i++) {
@ -64,7 +69,7 @@ static void collect_insert_func(worker *ctx, collect_info *ci) {
if (wrk == ctx) {
/* we are in the destiation context */
g_ptr_array_index(ci->results, wrk->ndx) = ci->func(wrk, ci->fdata);
collect_send_result(wrk, ci);
if (collect_send_result(wrk, ci)) return TRUE;
} else {
collect_job *j = g_slice_new(collect_job);
j->type = COLLECT_FUNC;
@ -73,11 +78,12 @@ static void collect_insert_func(worker *ctx, collect_info *ci) {
ev_async_send(wrk->loop, &wrk->collect_watcher);
}
}
return FALSE;
}
collect_info* collect_start(worker *ctx, CollectFunc func, gpointer fdata, CollectFree free_fdata, CollectCallback cb, gpointer cbdata) {
collect_info *ci = collect_info_new(ctx, func, fdata, free_fdata, cb, cbdata);
collect_insert_func(ctx, ci);
collect_info* collect_start(worker *ctx, CollectFunc func, gpointer fdata, CollectCallback cb, gpointer cbdata) {
collect_info *ci = collect_info_new(ctx, func, fdata, cb, cbdata);
if (collect_insert_func(ctx, ci)) return NULL; /* collect info is invalid now */
return ci;
}

Loading…
Cancel
Save