Browse Source

[core] Use liJobQueue for vrequest job queue

personal/stbuehler/wip
Stefan Bühler 12 years ago
parent
commit
79d98f5991
  1. 2
      include/lighttpd/typedefs.h
  2. 18
      include/lighttpd/virtualrequest.h
  3. 11
      include/lighttpd/worker.h
  4. 8
      src/main/subrequest_lua.c
  5. 80
      src/main/virtualrequest.c
  6. 98
      src/main/worker.c
  7. 8
      src/modules/mod_balancer.c
  8. 6
      src/modules/mod_memcached.c

2
include/lighttpd/typedefs.h

@ -234,8 +234,6 @@ typedef struct liConInfo liConInfo;
typedef struct liVRequest liVRequest;
typedef struct liVRequestRef liVRequestRef;
typedef struct liFilter liFilter;
typedef struct liFilters liFilters;

18
include/lighttpd/virtualrequest.h

@ -5,6 +5,8 @@
#error Please include <lighttpd/base.h> instead of this file
#endif
#include <lighttpd/jobqueue.h>
typedef enum {
/* waiting for request headers */
LI_VRS_CLEAN,
@ -85,16 +87,9 @@ struct liFilters {
liChunkQueue *in, *out;
};
struct liVRequestRef {
gint refcount;
liWorker *wrk;
liVRequest *vr; /* This is only accesible by the worker thread the vrequest belongs to, and it may be NULL if the vrequest is already reset */
};
struct liVRequest {
liConInfo *coninfo;
liWorker *wrk;
liVRequestRef *ref;
liOptionValue *options;
liOptionPtrValue **optionptrs;
@ -123,8 +118,7 @@ struct liVRequest {
liActionStack action_stack;
gboolean actions_wait_for_response;
gint queued;
GList job_queue_link;
liJob job;
GPtrArray *stat_cache_entries;
@ -167,10 +161,6 @@ LI_API void li_vrequest_free(liVRequest *vr);
*/
LI_API void li_vrequest_reset(liVRequest *vr, gboolean keepalive);
LI_API liVRequestRef* li_vrequest_get_ref(liVRequest *vr);
LI_API void li_vrequest_ref_acquire(liVRequestRef *vr_ref);
LI_API liVRequest* li_vrequest_ref_release(liVRequestRef *vr_ref);
LI_API liFilter* li_vrequest_add_filter_in(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, gpointer param);
LI_API liFilter* li_vrequest_add_filter_out(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, gpointer param);
@ -201,7 +191,7 @@ LI_API gboolean li_vrequest_is_handled(liVRequest *vr);
LI_API void li_vrequest_state_machine(liVRequest *vr);
LI_API void li_vrequest_joblist_append(liVRequest *vr);
LI_API void li_vrequest_joblist_append_async(liVRequestRef *vr_ref);
LI_API liJobRef* li_vrequest_get_ref(liVRequest *vr);
LI_API gboolean li_vrequest_redirect(liVRequest *vr, GString *uri);

11
include/lighttpd/worker.h

@ -6,6 +6,7 @@
#endif
#include <lighttpd/tasklet.h>
#include <lighttpd/jobqueue.h>
struct lua_State;
@ -66,8 +67,8 @@ struct liWorker {
struct lua_State *L; /** NULL if compiled without Lua */
struct ev_loop *loop;
ev_prepare loop_prepare;
ev_check loop_check;
/* ev_prepare loop_prepare; */
/* ev_check loop_check; */
ev_async worker_stop_watcher, worker_suspend_watcher, worker_exit_watcher;
GQueue log_queue;
@ -107,11 +108,7 @@ struct liWorker {
ev_async collect_watcher;
GAsyncQueue *collect_queue;
GQueue job_queue;
ev_timer job_queue_watcher;
GAsyncQueue *job_async_queue;
ev_async job_async_queue_watcher;
liJobQueue jobqueue;
liTaskletPool *tasklets;

8
src/main/subrequest_lua.c

@ -13,7 +13,7 @@ struct liSubrequest {
int func_notify_ref, func_error_ref;
liVRequest *vr;
liVRequestRef *parentvr_ref;
liJobRef *parentvr_ref;
liConInfo coninfo;
@ -260,7 +260,7 @@ static liHandlerResult subvr_check(liVRequest *vr) {
sr->notified_response_headers = sr->have_response_headers;
if (sr->notified_out_closed) { /* reques done */
li_vrequest_joblist_append_async(sr->parentvr_ref);
li_job_async(sr->parentvr_ref);
}
return LI_HANDLER_GO_ON;
@ -357,8 +357,8 @@ static int lua_subrequest_gc(lua_State *L) {
g_slice_free(liSubrequest, sr);
li_vrequest_joblist_append_async(sr->parentvr_ref);
li_vrequest_ref_release(sr->parentvr_ref);
li_job_async(sr->parentvr_ref);
li_job_ref_release(sr->parentvr_ref);
if (dolock) li_lua_lock(srv);

80
src/main/virtualrequest.c

@ -136,16 +136,17 @@ liFilter* li_vrequest_add_filter_out(liVRequest *vr, liFilterHandlerCB handle_da
return filters_add(&vr->filters_out, handle_data, handle_free, param);
}
static void vrequest_job_cb(liJob *job) {
liVRequest *vr = LI_CONTAINER_OF(job, liVRequest, job);
li_vrequest_state_machine(vr);
}
liVRequest* li_vrequest_new(liWorker *wrk, liConInfo *coninfo) {
liServer *srv = wrk->srv;
liVRequest *vr = g_slice_new0(liVRequest);
vr->coninfo = coninfo;
vr->wrk = wrk;
vr->ref = g_slice_new0(liVRequestRef);
vr->ref->refcount = 1;
vr->ref->vr = vr;
vr->ref->wrk = wrk;
vr->state = LI_VRS_CLEAN;
vr->plugin_ctx = g_ptr_array_new();
@ -183,9 +184,9 @@ liVRequest* li_vrequest_new(liWorker *wrk, liConInfo *coninfo) {
vr->in_buffer_state.flush_limit = -1; /* wait until upload is complete */
vr->in_buffer_state.split_on_file_chunks = FALSE;
vr->stat_cache_entries = g_ptr_array_sized_new(2);
li_job_init(&vr->job, vrequest_job_cb);
vr->job_queue_link.data = vr;
vr->stat_cache_entries = g_ptr_array_sized_new(2);
li_action_stack_init(&vr->action_stack);
@ -214,10 +215,7 @@ void li_vrequest_free(liVRequest* vr) {
li_chunkqueue_free(vr->in);
li_filter_buffer_on_disk_reset(&vr->in_buffer_state);
if (g_atomic_int_get(&vr->queued)) { /* atomic access shouldn't be needed here; no one else can access vr here... */
g_queue_unlink(&vr->wrk->job_queue, &vr->job_queue_link);
g_atomic_int_set(&vr->queued, 0);
}
li_job_clear(&vr->job);
g_slice_free1(srv->option_def_values->len * sizeof(liOptionValue), vr->options);
{
@ -235,11 +233,6 @@ void li_vrequest_free(liVRequest* vr) {
}
g_ptr_array_free(vr->stat_cache_entries, TRUE);
vr->ref->vr = NULL;
if (g_atomic_int_dec_and_test(&vr->ref->refcount)) {
g_slice_free(liVRequestRef, vr->ref);
}
g_slice_free(liVRequest, vr);
}
@ -279,10 +272,7 @@ void li_vrequest_reset(liVRequest *vr, gboolean keepalive) {
li_chunkqueue_use_limit(vr->out, vr);
li_chunkqueue_set_limit(vr->vr_out, vr->out->limit);
if (g_atomic_int_get(&vr->queued)) { /* atomic access shouldn't be needed here; no one else can access vr here... */
g_queue_unlink(&vr->wrk->job_queue, &vr->job_queue_link);
g_atomic_int_set(&vr->queued, 0);
}
li_job_reset(&vr->job);
while (vr->stat_cache_entries->len > 0 ) {
liStatCacheEntry *sce = g_ptr_array_index(vr->stat_cache_entries, 0);
@ -302,40 +292,6 @@ void li_vrequest_reset(liVRequest *vr, gboolean keepalive) {
}
}
}
if (1 != g_atomic_int_get(&vr->ref->refcount)) {
/* If we are not the only user of vr->ref we have to get a new one and detach the old */
vr->ref->vr = NULL;
if (g_atomic_int_dec_and_test(&vr->ref->refcount)) {
g_slice_free(liVRequestRef, vr->ref);
}
vr->ref = g_slice_new0(liVRequestRef);
vr->ref->refcount = 1;
vr->ref->vr = vr;
vr->ref->wrk = vr->wrk;
}
}
liVRequestRef* li_vrequest_get_ref(liVRequest *vr) {
liVRequestRef* vr_ref = vr->ref;
g_assert(vr_ref->refcount > 0);
g_atomic_int_inc(&vr_ref->refcount);
return vr_ref;
}
void li_vrequest_ref_acquire(liVRequestRef *vr_ref) {
g_assert(vr_ref->refcount > 0);
g_atomic_int_inc(&vr_ref->refcount);
}
liVRequest* li_vrequest_ref_release(liVRequestRef *vr_ref) {
liVRequest *vr = vr_ref->vr;
g_assert(vr_ref->refcount > 0);
if (g_atomic_int_dec_and_test(&vr_ref->refcount)) {
g_assert(vr == NULL); /* we are the last user, and the ref holded by vr itself is handled extra, so the vr was already reset */
g_slice_free(liVRequestRef, vr_ref);
}
return vr;
}
void li_vrequest_error(liVRequest *vr) {
@ -647,19 +603,11 @@ void li_vrequest_state_machine(liVRequest *vr) {
}
void li_vrequest_joblist_append(liVRequest *vr) {
liWorker *wrk = vr->wrk;
GQueue *const q = &wrk->job_queue;
if (!g_atomic_int_compare_and_exchange(&vr->queued, 0, 1)) return; /* already in queue */
g_queue_push_tail_link(q, &vr->job_queue_link);
}
void li_vrequest_joblist_append_async(liVRequestRef *vr_ref) {
liWorker *wrk = vr_ref->wrk;
GAsyncQueue *const q = wrk->job_async_queue;
if (NULL == q) return;
li_vrequest_ref_acquire(vr_ref);
g_async_queue_push(q, vr_ref);
ev_async_send(wrk->loop, &wrk->job_async_queue_watcher);
li_job_later(&vr->wrk->jobqueue, &vr->job);
}
liJobRef* li_vrequest_get_ref(liVRequest *vr) {
return li_job_ref(&vr->wrk->jobqueue, &vr->job);
}
gboolean li_vrequest_redirect(liVRequest *vr, GString *uri) {

98
src/main/worker.c

@ -168,65 +168,6 @@ static void worker_io_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents)
li_waitqueue_update(&wrk->io_timeout_queue);
}
static void worker_job_queue(liWorker *wrk, int loops) {
int i;
for (i = 0; i < loops; i++) {
GQueue q = wrk->job_queue;
GList *l;
liVRequest *vr;
if (q.length == 0) return;
g_queue_init(&wrk->job_queue); /* reset queue, elements are in q */
while (NULL != (l = g_queue_pop_head_link(&q))) {
vr = l->data;
g_assert(g_atomic_int_compare_and_exchange(&vr->queued, 1, 0));
li_vrequest_state_machine(vr);
}
}
if (wrk->job_queue.length > 0) {
/* make sure we will run again soon */
ev_timer_start(wrk->loop, &wrk->job_queue_watcher);
}
}
/* run vreqest state machine */
static void li_worker_prepare_cb(struct ev_loop *loop, ev_prepare *w, int revents) {
liWorker *wrk = (liWorker*) w->data;
UNUSED(loop);
UNUSED(revents);
worker_job_queue(wrk, 3);
}
static void worker_job_queue_cb(struct ev_loop *loop, ev_timer *w, int revents) {
UNUSED(loop);
UNUSED(revents);
UNUSED(w);
/* just kept loop alive, call state machines in prepare */
}
/* run vreqest state machine for async queued jobs */
static void worker_job_async_queue_cb(struct ev_loop *loop, ev_async *w, int revents) {
liWorker *wrk = (liWorker*) w->data;
GAsyncQueue *q = wrk->job_async_queue;
liVRequestRef *vr_ref;
liVRequest *vr;
UNUSED(loop);
UNUSED(revents);
while (NULL != (vr_ref = g_async_queue_try_pop(q))) {
if (NULL != (vr = li_vrequest_ref_release(vr_ref))) {
li_vrequest_state_machine(vr);
}
}
}
/* cache timestamp */
GString *li_worker_current_timestamp(liWorker *wrk, liTimeFunc timefunc, guint format_ndx) {
gsize len;
@ -444,11 +385,6 @@ liWorker* li_worker_new(liServer *srv, struct ev_loop *loop) {
g_array_index(wrk->timestamps_local, liWorkerTS, i).str = g_string_sized_new(255);
}
ev_init(&wrk->loop_prepare, li_worker_prepare_cb);
wrk->loop_prepare.data = wrk;
ev_prepare_start(wrk->loop, &wrk->loop_prepare);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
ev_init(&wrk->worker_exit_watcher, li_worker_exit_cb);
wrk->worker_exit_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->worker_exit_watcher);
@ -484,16 +420,7 @@ liWorker* li_worker_new(liServer *srv, struct ev_loop *loop) {
/* throttling */
li_waitqueue_init(&wrk->throttle_queue, wrk->loop, li_throttle_cb, THROTTLE_GRANULARITY, wrk);
/* job queue */
g_queue_init(&wrk->job_queue);
ev_timer_init(&wrk->job_queue_watcher, worker_job_queue_cb, 0, 0);
wrk->job_queue_watcher.data = wrk;
wrk->job_async_queue = g_async_queue_new();
ev_async_init(&wrk->job_async_queue_watcher, worker_job_async_queue_cb);
wrk->job_async_queue_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->job_async_queue_watcher);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
li_job_queue_init(&wrk->jobqueue, wrk->loop);
wrk->tasklets = li_tasklet_pool_new(wrk->loop, srv->tasklet_pool_threads);
@ -505,6 +432,8 @@ liWorker* li_worker_new(liServer *srv, struct ev_loop *loop) {
void li_worker_free(liWorker *wrk) {
if (!wrk) return;
li_job_queue_clear(&wrk->jobqueue);
{ /* close connections */
guint i;
if (wrk->connections_active > 0) {
@ -528,8 +457,6 @@ void li_worker_free(liWorker *wrk) {
g_queue_clear(&wrk->closing_sockets);
}
li_ev_safe_ref_and_stop(ev_async_stop, wrk->loop, &wrk->job_async_queue_watcher);
{ /* free timestamps */
guint i;
for (i = 0; i < wrk->timestamps_gmt->len; i++) {
@ -542,23 +469,6 @@ void li_worker_free(liWorker *wrk) {
li_ev_safe_ref_and_stop(ev_async_stop, wrk->loop, &wrk->worker_exit_watcher);
{
GAsyncQueue *q = wrk->job_async_queue;
liVRequestRef *vr_ref;
liVRequest *vr;
while (NULL != (vr_ref = g_async_queue_try_pop(q))) {
if (NULL != (vr = li_vrequest_ref_release(vr_ref))) {
g_assert(g_atomic_int_compare_and_exchange(&vr->queued, 1, 0));
li_vrequest_state_machine(vr);
}
}
g_async_queue_unref(q);
wrk->job_async_queue = NULL;
}
g_async_queue_unref(wrk->new_con_queue);
li_ev_safe_ref_and_stop(ev_timer_stop, wrk->loop, &wrk->stats_watcher);
@ -567,8 +477,6 @@ void li_worker_free(liWorker *wrk) {
li_collect_watcher_cb(wrk->loop, &wrk->collect_watcher, 0);
g_async_queue_unref(wrk->collect_queue);
li_ev_safe_ref_and_stop(ev_prepare_stop, wrk->loop, &wrk->loop_prepare);
g_string_free(wrk->tmp_str, TRUE);
li_stat_cache_free(wrk->stat_cache);

8
src/modules/mod_balancer.c

@ -83,7 +83,7 @@ struct bcontext { /* context for a balancer in a vrequest */
gint selected; /* selected backend */
GList backlog_link;
liVRequestRef *ref;
liJobRef *ref;
gboolean scheduled;
};
@ -164,7 +164,7 @@ static gboolean balancer_fill_backends(balancer *b, liServer *srv, liValue *val)
static void _balancer_context_backlog_unlink(balancer *b, bcontext *bc) {
if (NULL != bc->backlog_link.data) {
g_queue_unlink(&b->backlog, &bc->backlog_link);
li_vrequest_ref_release(bc->ref);
li_job_ref_release(bc->ref);
bc->backlog_link.data = NULL;
bc->backlog_link.next = bc->backlog_link.prev = NULL;
}
@ -235,10 +235,10 @@ static gboolean _balancer_backlog_schedule(liWorker *wrk, balancer *b) {
bc = it->data;
bc->scheduled = 1;
li_vrequest_joblist_append_async(bc->ref);
li_job_async(bc->ref);
g_queue_unlink(&b->backlog, it);
li_vrequest_ref_release(bc->ref);
li_job_ref_release(bc->ref);
it->data = NULL;
it->next = it->prev = NULL;
}

6
src/modules/mod_memcached.c

@ -616,7 +616,7 @@ static const liPluginSetup setups[] = {
typedef struct {
liMemcachedRequest *req;
int result_ref; /* table if vr_ref == NULL, callback function otherwise */
liVRequestRef *vr_ref;
liJobRef *vr_ref;
lua_State *L;
} mc_lua_request;
@ -677,7 +677,7 @@ static void lua_memcache_callback(liMemcachedRequest *request, liMemcachedResult
if (mreq->vr_ref) {
lua_pop(L, 1);
li_vrequest_joblist_append_async(mreq->vr_ref);
li_job_async(mreq->vr_ref);
} else {
liServer *srv;
int errfunc;
@ -1028,7 +1028,7 @@ static int lua_memcached_req_gc(lua_State *L) {
if (!preq || !*preq) return 0;
req = *preq;
li_vrequest_ref_release(req->vr_ref);
li_job_ref_release(req->vr_ref);
if (req->req) {
req->req->callback = NULL;

Loading…
Cancel
Save