Browse Source

[common]: Add generic jobqueue

personal/stbuehler/wip
Stefan Bühler 12 years ago
parent
commit
c3dc0d8d7e
  1. 61
      include/lighttpd/jobqueue.h
  2. 1
      src/CMakeLists.txt
  3. 1
      src/common/Makefile.am
  4. 222
      src/common/jobqueue.c
  5. 1
      src/common/wscript

61
include/lighttpd/jobqueue.h

@ -0,0 +1,61 @@
#ifndef _LIGHTTPD_JOBQUEUE_H_
#define _LIGHTTPD_JOBQUEUE_H_
#include <lighttpd/settings.h>
typedef struct liJob liJob;
typedef struct liJobRef liJobRef;
typedef struct liJobQueue liJobQueue;
typedef void (*liJobCB)(liJob *job);
/* All data here is private; use the functions to interact with the job-queue */
struct liJob {
guint generation;
GList link;
liJobCB callback;
liJobRef *ref;
};
struct liJobRef {
gint refcount;
liJob *job;
liJobQueue *queue;
};
struct liJobQueue {
struct ev_loop *loop;
guint generation;
ev_prepare prepare_watcher;
GQueue queue;
ev_timer queue_watcher;
GAsyncQueue *async_queue;
ev_async async_queue_watcher;
};
LI_API void li_job_queue_init(liJobQueue *jq, struct ev_loop *loop);
LI_API void li_job_queue_clear(liJobQueue *jq); /* runs until all jobs are done */
LI_API void li_job_init(liJob *job, liJobCB callback);
LI_API void li_job_reset(liJob *job);
LI_API void li_job_clear(liJob *job);
/* marks the job for later execution */
LI_API void li_job_later(liJobQueue *jq, liJob *job);
LI_API void li_job_later_ref(liJobRef *jobref); /* NOT thread-safe! */
/* if the job didn't run in this generation yet, run it now; otherwise mark it for later execution */
LI_API void li_job_now(liJobQueue *jq, liJob *job);
LI_API void li_job_now_ref(liJobRef *jobref); /* NOT thread-safe! */
LI_API void li_job_async(liJobRef *jobref);
/* marks the job for later execution; this is the only threadsafe way to push a job to the queue */
LI_API liJobRef* li_job_ref(liJobQueue *jq, liJob *job);
LI_API void li_job_ref_release(liJobRef *jobref);
LI_API void li_job_ref_acquire(liJobRef *jobref);
#endif

1
src/CMakeLists.txt

@ -174,6 +174,7 @@ SET(COMMON_SRC
encoding.c
idlist.c
ip_parsers.c
jobqueue.c
memcached.c
mempool.c
module.c

1
src/common/Makefile.am

@ -9,6 +9,7 @@ common_src= \
encoding.c \
idlist.c \
ip_parsers.c \
jobqueue.c \
mempool.c \
module.c \
radix.c \

222
src/common/jobqueue.c

@ -0,0 +1,222 @@
#include <lighttpd/jobqueue.h>
#include <lighttpd/utils.h>
#define INC_GEN(jq) do { jq->generation++; if (0 == jq->generation) jq->generation = 1; } while (0)
static void job_queue_run(liJobQueue* jq, int loops) {
int i;
for (i = 0; i < loops; i++) {
GQueue q = jq->queue;
GList *l;
liJob *job;
INC_GEN(jq);
if (q.length == 0) return;
g_queue_init(&jq->queue); /* reset queue, elements are in q */
while (NULL != (l = g_queue_pop_head_link(&q))) {
job = LI_CONTAINER_OF(l, liJob, link);
job->generation = jq->generation;
job->link.data = NULL;
job->callback(job);
}
}
if (jq->queue.length > 0) {
/* make sure we will run again soon */
ev_timer_start(jq->loop, &jq->queue_watcher);
}
}
static void job_queue_prepare_cb(struct ev_loop *loop, ev_prepare *w, int revents) {
liJobQueue* jq = (liJobQueue*) w->data;
UNUSED(loop);
UNUSED(revents);
job_queue_run(jq, 3);
}
static void job_queue_watcher_cb(struct ev_loop *loop, ev_timer *w, int revents) {
UNUSED(loop);
UNUSED(revents);
UNUSED(w);
/* just keep loop alive, run jobs in prepare */
}
/* run jobs for async queued jobs */
static void job_async_queue_cb(struct ev_loop *loop, ev_async *w, int revents) {
liJobQueue* jq = (liJobQueue*) w->data;
GAsyncQueue *q = jq->async_queue;
liJobRef *jobref;
UNUSED(loop);
UNUSED(revents);
while (NULL != (jobref = g_async_queue_try_pop(q))) {
li_job_now_ref(jobref);
li_job_ref_release(jobref);
}
}
void li_job_queue_init(liJobQueue* jq, struct ev_loop *loop) {
jq->loop = loop;
ev_init(&jq->prepare_watcher, job_queue_prepare_cb);
jq->prepare_watcher.data = jq;
ev_prepare_start(jq->loop, &jq->prepare_watcher);
ev_unref(jq->loop); /* this watcher shouldn't keep the loop alive */
/* job queue */
g_queue_init(&jq->queue);
ev_timer_init(&jq->queue_watcher, job_queue_watcher_cb, 0, 0);
jq->queue_watcher.data = jq;
jq->async_queue = g_async_queue_new();
ev_async_init(&jq->async_queue_watcher, job_async_queue_cb);
jq->async_queue_watcher.data = jq;
ev_async_start(jq->loop, &jq->async_queue_watcher);
ev_unref(jq->loop); /* this watcher shouldn't keep the loop alive */
}
void li_job_queue_clear(liJobQueue *jq) {
while (jq->queue.length > 0 || g_async_queue_length(jq->async_queue) > 0) {
liJobRef *jobref;
while (NULL != (jobref = g_async_queue_try_pop(jq->async_queue))) {
li_job_now_ref(jobref);
li_job_ref_release(jobref);
}
job_queue_run(jq, 1);
}
g_async_queue_unref(jq->async_queue);
jq->async_queue = NULL;
li_ev_safe_ref_and_stop(ev_async_stop, jq->loop, &jq->async_queue_watcher);
li_ev_safe_ref_and_stop(ev_prepare_stop, jq->loop, &jq->prepare_watcher);
ev_timer_stop(jq->loop, &jq->queue_watcher);
}
void li_job_init(liJob *job, liJobCB callback) {
job->generation = 0;
job->link.prev = job->link.next = job->link.data = 0;
job->callback = callback;
job->ref = 0;
}
void li_job_reset(liJob *job) {
if (NULL != job->link.data) {
liJobQueue *jq = job->link.data;
g_queue_unlink(&jq->queue, &job->link);
job->link.data = NULL;
}
job->generation = 0;
if (NULL != job->ref) {
/* keep it if refcount == 1, as we are the only reference then */
if (1 < g_atomic_int_get(&job->ref->refcount)) {
li_job_ref_release(job->ref);
job->ref = NULL;
}
}
}
void li_job_clear(liJob *job) {
if (NULL != job->link.data) {
liJobQueue *jq = job->link.data;
g_queue_unlink(&jq->queue, &job->link);
job->link.data = NULL;
}
job->generation = 0;
if (NULL != job->ref) {
job->ref->job = NULL;
li_job_ref_release(job->ref);
job->ref = NULL;
}
job->callback = NULL;
}
void li_job_later(liJobQueue *jq, liJob *job) {
if (NULL != job->link.data) return; /* already queued */
job->link.data = jq;
g_queue_push_tail_link(&jq->queue, &job->link);
}
void li_job_later_ref(liJobRef *jobref) {
liJob *job = jobref->job;
if (NULL != job) li_job_later(jobref->queue, job);
}
void li_job_now(liJobQueue *jq, liJob *job) {
if (job->generation != jq->generation) {
job->generation = jq->generation;
/* unqueue if queued */
if (NULL != job->link.data) {
assert(jq == job->link.data);
g_queue_unlink(&jq->queue, &job->link);
job->link.data = NULL;
}
job->callback(job);
} else {
li_job_later(jq, job);
}
}
void li_job_now_ref(liJobRef *jobref) {
liJob *job = jobref->job;
if (NULL != job) li_job_now(jobref->queue, job);
}
void li_job_async(liJobRef *jobref) {
liJobQueue *jq = jobref->queue;
GAsyncQueue *const q = jq->async_queue;
if (NULL == q) return;
li_job_ref_acquire(jobref);
g_async_queue_push(q, jobref);
ev_async_send(jq->loop, &jq->async_queue_watcher);
}
liJobRef* li_job_ref(liJobQueue *jq, liJob *job) {
liJobRef *ref = job->ref;
if (NULL != ref) {
li_job_ref_acquire(ref);
return ref;
}
ref = g_slice_new0(liJobRef);
ref->refcount = 2; /* job->ref + returned ref */
ref->job = job;
ref->queue = jq;
job->ref = ref;
return ref;
}
void li_job_ref_release(liJobRef *jobref) {
g_assert(jobref->refcount > 0);
if (g_atomic_int_dec_and_test(&jobref->refcount)) {
g_slice_free(liJobRef, jobref);
}
}
void li_job_ref_acquire(liJobRef *jobref) {
g_assert(jobref->refcount > 0);
g_atomic_int_inc(&jobref->refcount);
}

1
src/common/wscript

@ -23,6 +23,7 @@ def build(bld):
encoding.c
idlist.c
ip_parsers.rl
jobqueue.c
memcached.c
mempool.c
module.c

Loading…
Cancel
Save