diff --git a/include/lighttpd/typedefs.h b/include/lighttpd/typedefs.h index 276c33d..74d50ff 100644 --- a/include/lighttpd/typedefs.h +++ b/include/lighttpd/typedefs.h @@ -234,8 +234,6 @@ typedef struct liConInfo liConInfo; typedef struct liVRequest liVRequest; -typedef struct liVRequestRef liVRequestRef; - typedef struct liFilter liFilter; typedef struct liFilters liFilters; diff --git a/include/lighttpd/virtualrequest.h b/include/lighttpd/virtualrequest.h index 4d36544..e01b979 100644 --- a/include/lighttpd/virtualrequest.h +++ b/include/lighttpd/virtualrequest.h @@ -5,6 +5,8 @@ #error Please include instead of this file #endif +#include + typedef enum { /* waiting for request headers */ LI_VRS_CLEAN, @@ -85,16 +87,9 @@ struct liFilters { liChunkQueue *in, *out; }; -struct liVRequestRef { - gint refcount; - liWorker *wrk; - liVRequest *vr; /* This is only accesible by the worker thread the vrequest belongs to, and it may be NULL if the vrequest is already reset */ -}; - struct liVRequest { liConInfo *coninfo; liWorker *wrk; - liVRequestRef *ref; liOptionValue *options; liOptionPtrValue **optionptrs; @@ -123,8 +118,7 @@ struct liVRequest { liActionStack action_stack; gboolean actions_wait_for_response; - gint queued; - GList job_queue_link; + liJob job; GPtrArray *stat_cache_entries; @@ -167,10 +161,6 @@ LI_API void li_vrequest_free(liVRequest *vr); */ LI_API void li_vrequest_reset(liVRequest *vr, gboolean keepalive); -LI_API liVRequestRef* li_vrequest_get_ref(liVRequest *vr); -LI_API void li_vrequest_ref_acquire(liVRequestRef *vr_ref); -LI_API liVRequest* li_vrequest_ref_release(liVRequestRef *vr_ref); - LI_API liFilter* li_vrequest_add_filter_in(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, gpointer param); LI_API liFilter* li_vrequest_add_filter_out(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, gpointer param); @@ -201,7 +191,7 @@ LI_API gboolean li_vrequest_is_handled(liVRequest *vr); LI_API void li_vrequest_state_machine(liVRequest *vr); LI_API void li_vrequest_joblist_append(liVRequest *vr); -LI_API void li_vrequest_joblist_append_async(liVRequestRef *vr_ref); +LI_API liJobRef* li_vrequest_get_ref(liVRequest *vr); LI_API gboolean li_vrequest_redirect(liVRequest *vr, GString *uri); diff --git a/include/lighttpd/worker.h b/include/lighttpd/worker.h index ef753df..20c85b1 100644 --- a/include/lighttpd/worker.h +++ b/include/lighttpd/worker.h @@ -6,6 +6,7 @@ #endif #include +#include struct lua_State; @@ -66,8 +67,8 @@ struct liWorker { struct lua_State *L; /** NULL if compiled without Lua */ struct ev_loop *loop; - ev_prepare loop_prepare; - ev_check loop_check; + /* ev_prepare loop_prepare; */ + /* ev_check loop_check; */ ev_async worker_stop_watcher, worker_suspend_watcher, worker_exit_watcher; GQueue log_queue; @@ -107,11 +108,7 @@ struct liWorker { ev_async collect_watcher; GAsyncQueue *collect_queue; - GQueue job_queue; - ev_timer job_queue_watcher; - - GAsyncQueue *job_async_queue; - ev_async job_async_queue_watcher; + liJobQueue jobqueue; liTaskletPool *tasklets; diff --git a/src/main/subrequest_lua.c b/src/main/subrequest_lua.c index 4881ffd..eedeaac 100644 --- a/src/main/subrequest_lua.c +++ b/src/main/subrequest_lua.c @@ -13,7 +13,7 @@ struct liSubrequest { int func_notify_ref, func_error_ref; liVRequest *vr; - liVRequestRef *parentvr_ref; + liJobRef *parentvr_ref; liConInfo coninfo; @@ -260,7 +260,7 @@ static liHandlerResult subvr_check(liVRequest *vr) { sr->notified_response_headers = sr->have_response_headers; if (sr->notified_out_closed) { /* reques done */ - li_vrequest_joblist_append_async(sr->parentvr_ref); + li_job_async(sr->parentvr_ref); } return LI_HANDLER_GO_ON; @@ -357,8 +357,8 @@ static int lua_subrequest_gc(lua_State *L) { g_slice_free(liSubrequest, sr); - li_vrequest_joblist_append_async(sr->parentvr_ref); - li_vrequest_ref_release(sr->parentvr_ref); + li_job_async(sr->parentvr_ref); + li_job_ref_release(sr->parentvr_ref); if (dolock) li_lua_lock(srv); diff --git a/src/main/virtualrequest.c b/src/main/virtualrequest.c index e4f2c5b..6823023 100644 --- a/src/main/virtualrequest.c +++ b/src/main/virtualrequest.c @@ -136,16 +136,17 @@ liFilter* li_vrequest_add_filter_out(liVRequest *vr, liFilterHandlerCB handle_da return filters_add(&vr->filters_out, handle_data, handle_free, param); } +static void vrequest_job_cb(liJob *job) { + liVRequest *vr = LI_CONTAINER_OF(job, liVRequest, job); + li_vrequest_state_machine(vr); +} + liVRequest* li_vrequest_new(liWorker *wrk, liConInfo *coninfo) { liServer *srv = wrk->srv; liVRequest *vr = g_slice_new0(liVRequest); vr->coninfo = coninfo; vr->wrk = wrk; - vr->ref = g_slice_new0(liVRequestRef); - vr->ref->refcount = 1; - vr->ref->vr = vr; - vr->ref->wrk = wrk; vr->state = LI_VRS_CLEAN; vr->plugin_ctx = g_ptr_array_new(); @@ -183,9 +184,9 @@ liVRequest* li_vrequest_new(liWorker *wrk, liConInfo *coninfo) { vr->in_buffer_state.flush_limit = -1; /* wait until upload is complete */ vr->in_buffer_state.split_on_file_chunks = FALSE; - vr->stat_cache_entries = g_ptr_array_sized_new(2); + li_job_init(&vr->job, vrequest_job_cb); - vr->job_queue_link.data = vr; + vr->stat_cache_entries = g_ptr_array_sized_new(2); li_action_stack_init(&vr->action_stack); @@ -214,10 +215,7 @@ void li_vrequest_free(liVRequest* vr) { li_chunkqueue_free(vr->in); li_filter_buffer_on_disk_reset(&vr->in_buffer_state); - if (g_atomic_int_get(&vr->queued)) { /* atomic access shouldn't be needed here; no one else can access vr here... */ - g_queue_unlink(&vr->wrk->job_queue, &vr->job_queue_link); - g_atomic_int_set(&vr->queued, 0); - } + li_job_clear(&vr->job); g_slice_free1(srv->option_def_values->len * sizeof(liOptionValue), vr->options); { @@ -235,11 +233,6 @@ void li_vrequest_free(liVRequest* vr) { } g_ptr_array_free(vr->stat_cache_entries, TRUE); - vr->ref->vr = NULL; - if (g_atomic_int_dec_and_test(&vr->ref->refcount)) { - g_slice_free(liVRequestRef, vr->ref); - } - g_slice_free(liVRequest, vr); } @@ -279,10 +272,7 @@ void li_vrequest_reset(liVRequest *vr, gboolean keepalive) { li_chunkqueue_use_limit(vr->out, vr); li_chunkqueue_set_limit(vr->vr_out, vr->out->limit); - if (g_atomic_int_get(&vr->queued)) { /* atomic access shouldn't be needed here; no one else can access vr here... */ - g_queue_unlink(&vr->wrk->job_queue, &vr->job_queue_link); - g_atomic_int_set(&vr->queued, 0); - } + li_job_reset(&vr->job); while (vr->stat_cache_entries->len > 0 ) { liStatCacheEntry *sce = g_ptr_array_index(vr->stat_cache_entries, 0); @@ -302,40 +292,6 @@ void li_vrequest_reset(liVRequest *vr, gboolean keepalive) { } } } - - if (1 != g_atomic_int_get(&vr->ref->refcount)) { - /* If we are not the only user of vr->ref we have to get a new one and detach the old */ - vr->ref->vr = NULL; - if (g_atomic_int_dec_and_test(&vr->ref->refcount)) { - g_slice_free(liVRequestRef, vr->ref); - } - vr->ref = g_slice_new0(liVRequestRef); - vr->ref->refcount = 1; - vr->ref->vr = vr; - vr->ref->wrk = vr->wrk; - } -} - -liVRequestRef* li_vrequest_get_ref(liVRequest *vr) { - liVRequestRef* vr_ref = vr->ref; - g_assert(vr_ref->refcount > 0); - g_atomic_int_inc(&vr_ref->refcount); - return vr_ref; -} - -void li_vrequest_ref_acquire(liVRequestRef *vr_ref) { - g_assert(vr_ref->refcount > 0); - g_atomic_int_inc(&vr_ref->refcount); -} - -liVRequest* li_vrequest_ref_release(liVRequestRef *vr_ref) { - liVRequest *vr = vr_ref->vr; - g_assert(vr_ref->refcount > 0); - if (g_atomic_int_dec_and_test(&vr_ref->refcount)) { - g_assert(vr == NULL); /* we are the last user, and the ref holded by vr itself is handled extra, so the vr was already reset */ - g_slice_free(liVRequestRef, vr_ref); - } - return vr; } void li_vrequest_error(liVRequest *vr) { @@ -647,19 +603,11 @@ void li_vrequest_state_machine(liVRequest *vr) { } void li_vrequest_joblist_append(liVRequest *vr) { - liWorker *wrk = vr->wrk; - GQueue *const q = &wrk->job_queue; - if (!g_atomic_int_compare_and_exchange(&vr->queued, 0, 1)) return; /* already in queue */ - g_queue_push_tail_link(q, &vr->job_queue_link); + li_job_later(&vr->wrk->jobqueue, &vr->job); } -void li_vrequest_joblist_append_async(liVRequestRef *vr_ref) { - liWorker *wrk = vr_ref->wrk; - GAsyncQueue *const q = wrk->job_async_queue; - if (NULL == q) return; - li_vrequest_ref_acquire(vr_ref); - g_async_queue_push(q, vr_ref); - ev_async_send(wrk->loop, &wrk->job_async_queue_watcher); +liJobRef* li_vrequest_get_ref(liVRequest *vr) { + return li_job_ref(&vr->wrk->jobqueue, &vr->job); } gboolean li_vrequest_redirect(liVRequest *vr, GString *uri) { diff --git a/src/main/worker.c b/src/main/worker.c index 8644129..4525012 100644 --- a/src/main/worker.c +++ b/src/main/worker.c @@ -168,65 +168,6 @@ static void worker_io_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) li_waitqueue_update(&wrk->io_timeout_queue); } -static void worker_job_queue(liWorker *wrk, int loops) { - int i; - - for (i = 0; i < loops; i++) { - GQueue q = wrk->job_queue; - GList *l; - liVRequest *vr; - - if (q.length == 0) return; - - g_queue_init(&wrk->job_queue); /* reset queue, elements are in q */ - - while (NULL != (l = g_queue_pop_head_link(&q))) { - vr = l->data; - g_assert(g_atomic_int_compare_and_exchange(&vr->queued, 1, 0)); - li_vrequest_state_machine(vr); - } - } - - if (wrk->job_queue.length > 0) { - /* make sure we will run again soon */ - ev_timer_start(wrk->loop, &wrk->job_queue_watcher); - } -} - -/* run vreqest state machine */ -static void li_worker_prepare_cb(struct ev_loop *loop, ev_prepare *w, int revents) { - liWorker *wrk = (liWorker*) w->data; - UNUSED(loop); - UNUSED(revents); - - worker_job_queue(wrk, 3); -} - -static void worker_job_queue_cb(struct ev_loop *loop, ev_timer *w, int revents) { - UNUSED(loop); - UNUSED(revents); - UNUSED(w); - - /* just kept loop alive, call state machines in prepare */ -} - -/* run vreqest state machine for async queued jobs */ -static void worker_job_async_queue_cb(struct ev_loop *loop, ev_async *w, int revents) { - liWorker *wrk = (liWorker*) w->data; - GAsyncQueue *q = wrk->job_async_queue; - liVRequestRef *vr_ref; - liVRequest *vr; - UNUSED(loop); - UNUSED(revents); - - while (NULL != (vr_ref = g_async_queue_try_pop(q))) { - if (NULL != (vr = li_vrequest_ref_release(vr_ref))) { - li_vrequest_state_machine(vr); - } - } -} - - /* cache timestamp */ GString *li_worker_current_timestamp(liWorker *wrk, liTimeFunc timefunc, guint format_ndx) { gsize len; @@ -444,11 +385,6 @@ liWorker* li_worker_new(liServer *srv, struct ev_loop *loop) { g_array_index(wrk->timestamps_local, liWorkerTS, i).str = g_string_sized_new(255); } - ev_init(&wrk->loop_prepare, li_worker_prepare_cb); - wrk->loop_prepare.data = wrk; - ev_prepare_start(wrk->loop, &wrk->loop_prepare); - ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */ - ev_init(&wrk->worker_exit_watcher, li_worker_exit_cb); wrk->worker_exit_watcher.data = wrk; ev_async_start(wrk->loop, &wrk->worker_exit_watcher); @@ -484,16 +420,7 @@ liWorker* li_worker_new(liServer *srv, struct ev_loop *loop) { /* throttling */ li_waitqueue_init(&wrk->throttle_queue, wrk->loop, li_throttle_cb, THROTTLE_GRANULARITY, wrk); - /* job queue */ - g_queue_init(&wrk->job_queue); - ev_timer_init(&wrk->job_queue_watcher, worker_job_queue_cb, 0, 0); - wrk->job_queue_watcher.data = wrk; - - wrk->job_async_queue = g_async_queue_new(); - ev_async_init(&wrk->job_async_queue_watcher, worker_job_async_queue_cb); - wrk->job_async_queue_watcher.data = wrk; - ev_async_start(wrk->loop, &wrk->job_async_queue_watcher); - ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */ + li_job_queue_init(&wrk->jobqueue, wrk->loop); wrk->tasklets = li_tasklet_pool_new(wrk->loop, srv->tasklet_pool_threads); @@ -505,6 +432,8 @@ liWorker* li_worker_new(liServer *srv, struct ev_loop *loop) { void li_worker_free(liWorker *wrk) { if (!wrk) return; + li_job_queue_clear(&wrk->jobqueue); + { /* close connections */ guint i; if (wrk->connections_active > 0) { @@ -528,8 +457,6 @@ void li_worker_free(liWorker *wrk) { g_queue_clear(&wrk->closing_sockets); } - li_ev_safe_ref_and_stop(ev_async_stop, wrk->loop, &wrk->job_async_queue_watcher); - { /* free timestamps */ guint i; for (i = 0; i < wrk->timestamps_gmt->len; i++) { @@ -542,23 +469,6 @@ void li_worker_free(liWorker *wrk) { li_ev_safe_ref_and_stop(ev_async_stop, wrk->loop, &wrk->worker_exit_watcher); - { - GAsyncQueue *q = wrk->job_async_queue; - liVRequestRef *vr_ref; - liVRequest *vr; - - while (NULL != (vr_ref = g_async_queue_try_pop(q))) { - if (NULL != (vr = li_vrequest_ref_release(vr_ref))) { - g_assert(g_atomic_int_compare_and_exchange(&vr->queued, 1, 0)); - li_vrequest_state_machine(vr); - } - } - - g_async_queue_unref(q); - wrk->job_async_queue = NULL; - } - - g_async_queue_unref(wrk->new_con_queue); li_ev_safe_ref_and_stop(ev_timer_stop, wrk->loop, &wrk->stats_watcher); @@ -567,8 +477,6 @@ void li_worker_free(liWorker *wrk) { li_collect_watcher_cb(wrk->loop, &wrk->collect_watcher, 0); g_async_queue_unref(wrk->collect_queue); - li_ev_safe_ref_and_stop(ev_prepare_stop, wrk->loop, &wrk->loop_prepare); - g_string_free(wrk->tmp_str, TRUE); li_stat_cache_free(wrk->stat_cache); diff --git a/src/modules/mod_balancer.c b/src/modules/mod_balancer.c index 342b1da..1076e2e 100644 --- a/src/modules/mod_balancer.c +++ b/src/modules/mod_balancer.c @@ -83,7 +83,7 @@ struct bcontext { /* context for a balancer in a vrequest */ gint selected; /* selected backend */ GList backlog_link; - liVRequestRef *ref; + liJobRef *ref; gboolean scheduled; }; @@ -164,7 +164,7 @@ static gboolean balancer_fill_backends(balancer *b, liServer *srv, liValue *val) static void _balancer_context_backlog_unlink(balancer *b, bcontext *bc) { if (NULL != bc->backlog_link.data) { g_queue_unlink(&b->backlog, &bc->backlog_link); - li_vrequest_ref_release(bc->ref); + li_job_ref_release(bc->ref); bc->backlog_link.data = NULL; bc->backlog_link.next = bc->backlog_link.prev = NULL; } @@ -235,10 +235,10 @@ static gboolean _balancer_backlog_schedule(liWorker *wrk, balancer *b) { bc = it->data; bc->scheduled = 1; - li_vrequest_joblist_append_async(bc->ref); + li_job_async(bc->ref); g_queue_unlink(&b->backlog, it); - li_vrequest_ref_release(bc->ref); + li_job_ref_release(bc->ref); it->data = NULL; it->next = it->prev = NULL; } diff --git a/src/modules/mod_memcached.c b/src/modules/mod_memcached.c index 6926c1a..0ea8f85 100644 --- a/src/modules/mod_memcached.c +++ b/src/modules/mod_memcached.c @@ -616,7 +616,7 @@ static const liPluginSetup setups[] = { typedef struct { liMemcachedRequest *req; int result_ref; /* table if vr_ref == NULL, callback function otherwise */ - liVRequestRef *vr_ref; + liJobRef *vr_ref; lua_State *L; } mc_lua_request; @@ -677,7 +677,7 @@ static void lua_memcache_callback(liMemcachedRequest *request, liMemcachedResult if (mreq->vr_ref) { lua_pop(L, 1); - li_vrequest_joblist_append_async(mreq->vr_ref); + li_job_async(mreq->vr_ref); } else { liServer *srv; int errfunc; @@ -1028,7 +1028,7 @@ static int lua_memcached_req_gc(lua_State *L) { if (!preq || !*preq) return 0; req = *preq; - li_vrequest_ref_release(req->vr_ref); + li_job_ref_release(req->vr_ref); if (req->req) { req->req->callback = NULL;