Browse Source

waitqueue: modify callback

personal/stbuehler/wip
Stefan Bühler 12 years ago
parent
commit
be65208143
  1. 2
      include/lighttpd/throttle.h
  2. 5
      include/lighttpd/waitqueue.h
  3. 14
      src/common/waitqueue.c
  4. 11
      src/main/log.c
  5. 13
      src/main/stat_cache.c
  6. 12
      src/main/throttle.c
  7. 11
      src/main/worker.c
  8. 8
      src/modules/mod_limit.c
  9. 9
      src/modules/mod_progress.c

2
include/lighttpd/throttle.h

@ -21,7 +21,7 @@ struct liThrottleParam {
};
LI_API void li_throttle_reset(liVRequest *vr);
LI_API void li_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents);
LI_API void li_throttle_cb(liWaitQueue *wq, gpointer data);
LI_API liThrottlePool *li_throttle_pool_new(liServer *srv, GString *name, guint rate);
LI_API void li_throttle_pool_free(liServer *srv, liThrottlePool *pool);

5
include/lighttpd/waitqueue.h

@ -5,7 +5,7 @@
typedef struct liWaitQueueElem liWaitQueueElem;
typedef struct liWaitQueue liWaitQueue;
typedef void (*liWaitQueueCB) (struct ev_loop *loop, struct ev_timer *w, int revents);
typedef void (*liWaitQueueCB) (liWaitQueue *wq, gpointer data);
struct liWaitQueueElem {
gboolean queued;
@ -21,6 +21,9 @@ struct liWaitQueue {
ev_timer timer;
struct ev_loop *loop;
gdouble delay;
liWaitQueueCB callback;
gpointer data;
};
/*

14
src/common/waitqueue.c

@ -1,13 +1,23 @@
#include <lighttpd/waitqueue.h>
static void wq_cb(struct ev_loop *loop, ev_timer *w, int revents) {
liWaitQueue *queue = w->data;
UNUSED(loop); UNUSED(revents);
queue->callback(queue, queue->data);
}
void li_waitqueue_init(liWaitQueue *queue, struct ev_loop *loop, liWaitQueueCB callback, gdouble delay, gpointer data) {
ev_timer_init(&queue->timer, callback, delay, delay);
ev_timer_init(&queue->timer, wq_cb, delay, delay);
queue->timer.data = data;
queue->timer.data = queue;
queue->head = queue->tail = NULL;
queue->loop = loop;
queue->delay = delay;
queue->callback = callback;
queue->data = data;
}
void li_waitqueue_stop(liWaitQueue *queue) {

11
src/main/log.c

@ -104,19 +104,16 @@ static void log_close(liServer *srv, liLog *log) {
g_slice_free(liLog, log);
}
static void log_close_cb(struct ev_loop *loop, struct ev_timer *w, int revents) {
static void log_close_cb(liWaitQueue *wq, gpointer data) {
/* callback for the close queue */
liServer *srv = (liServer*) w->data;
liServer *srv = (liServer*) data;
liWaitQueueElem *wqe;
UNUSED(loop);
UNUSED(revents);
while ((wqe = li_waitqueue_pop(&srv->logs.close_queue)) != NULL) {
while ((wqe = li_waitqueue_pop(wq)) != NULL) {
log_close(srv, wqe->data);
}
li_waitqueue_update(&srv->logs.close_queue);
li_waitqueue_update(wq);
}
void li_log_init(liServer *srv) {

13
src/main/stat_cache.c

@ -4,7 +4,7 @@
#include <lighttpd/plugin_core.h>
static void stat_cache_delete_cb(struct ev_loop *loop, ev_timer *w, int revents);
static void stat_cache_delete_cb(liWaitQueue *wq, gpointer daa);
static void stat_cache_entry_release(liStatCacheEntry *sce);
static void stat_cache_entry_acquire(liStatCacheEntry *sce);
@ -62,21 +62,18 @@ void li_stat_cache_free(liStatCache *sc) {
g_slice_free(liStatCache, sc);
}
static void stat_cache_delete_cb(struct ev_loop *loop, ev_timer *w, int revents) {
liStatCache *sc = (liStatCache*) w->data;
static void stat_cache_delete_cb(liWaitQueue *wq, gpointer data) {
liStatCache *sc = data;
liWaitQueueElem *wqe;
UNUSED(loop);
UNUSED(revents);
while ((wqe = li_waitqueue_pop(&sc->delete_queue)) != NULL) {
while ((wqe = li_waitqueue_pop(wq)) != NULL) {
liStatCacheEntry *sce = wqe->data;
/* stat cache entry TTL over */
stat_cache_remove_from_cache(sc, sce);
}
li_waitqueue_update(&sc->delete_queue);
li_waitqueue_update(wq);
}
static void stat_cache_finished(gpointer data) {

12
src/main/throttle.c

@ -151,7 +151,7 @@ void li_throttle_reset(liVRequest *vr) {
vr->throttled = FALSE;
}
void li_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) {
void li_throttle_cb(liWaitQueue *wq, gpointer data) {
liWaitQueueElem *wqe;
liThrottlePool *pool;
liVRequest *vr;
@ -159,12 +159,10 @@ void li_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) {
ev_tstamp now;
guint supply;
UNUSED(revents);
wrk = data;
now = ev_now(wrk->loop);
wrk = w->data;
now = ev_now(loop);
while (NULL != (wqe = li_waitqueue_pop(&wrk->throttle_queue))) {
while (NULL != (wqe = li_waitqueue_pop(wq))) {
vr = wqe->data;
if (vr->throttle.pool.ptr) {
@ -191,7 +189,7 @@ void li_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) {
vr->coninfo->callbacks->handle_check_io(vr);
}
li_waitqueue_update(&wrk->throttle_queue);
li_waitqueue_update(wq);
}
void li_throttle_update(liVRequest *vr, goffset transferred, goffset write_max) {

11
src/main/worker.c

@ -144,16 +144,13 @@ static void worker_keepalive_cb(struct ev_loop *loop, ev_timer *w, int revents)
}
/* check for timeouted connections */
static void worker_io_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
liWorker *wrk = (liWorker*) w->data;
static void worker_io_timeout_cb(liWaitQueue *wq, gpointer data) {
liWorker *wrk = data;
liConnection *con;
liWaitQueueElem *wqe;
ev_tstamp now = CUR_TS(wrk);
UNUSED(loop);
UNUSED(revents);
while ((wqe = li_waitqueue_pop(&wrk->io_timeout_queue)) != NULL) {
while ((wqe = li_waitqueue_pop(wq)) != NULL) {
liVRequest *vr;
/* connection has timed out */
con = wqe->data;
@ -165,7 +162,7 @@ static void worker_io_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents)
li_worker_con_put(con);
}
li_waitqueue_update(&wrk->io_timeout_queue);
li_waitqueue_update(wq);
}
/* cache timestamp */

8
src/modules/mod_limit.c

@ -148,13 +148,11 @@ static void mod_limit_context_free(liServer *srv, mod_limit_context *ctx) {
g_slice_free(mod_limit_context, ctx);
}
static void mod_limit_timeout_callback(struct ev_loop *loop, ev_async *w, int revents) {
liWaitQueue *wq = (liWaitQueue*) w->data;
static void mod_limit_timeout_callback(liWaitQueue *wq, gpointer data) {
liWaitQueueElem *wqe;
mod_limit_req_ip_data *rid;
UNUSED(loop);
UNUSED(revents);
UNUSED(data);
while ((wqe = li_waitqueue_pop(wq)) != NULL) {
rid = wqe->data;
@ -411,7 +409,7 @@ static void mod_limit_prepare_worker(liServer *srv, liPlugin *p, liWorker *wrk)
mld = p->data;
}
li_waitqueue_init(&(mld->timeout_queues[wrk->ndx]), wrk->loop, (liWaitQueueCB)mod_limit_timeout_callback, 1.0, &(mld->timeout_queues[wrk->ndx]));
li_waitqueue_init(&(mld->timeout_queues[wrk->ndx]), wrk->loop, mod_limit_timeout_callback, 1.0, NULL);
}
static void plugin_limit_free(liServer *srv, liPlugin *p) {

9
src/modules/mod_progress.c

@ -112,13 +112,10 @@ typedef struct mod_progress_job mod_progress_job;
/* global data */
static mod_progress_data progress_data;
static void progress_timeout_callback(struct ev_loop *loop, ev_async *w, int revents) {
liWorker *wrk = (liWorker*) w->data;
liWaitQueue *wq = &progress_data.timeout_queues[wrk->ndx];
static void progress_timeout_callback(liWaitQueue *wq, gpointer data) {
liWorker *wrk = data;
liWaitQueueElem *wqe;
mod_progress_node *node;
UNUSED(loop);
UNUSED(revents);
while ((wqe = li_waitqueue_pop(wq)) != NULL) {
node = wqe->data;
@ -494,7 +491,7 @@ static void progress_prepare_worker(liServer *srv, liPlugin *p, liWorker *wrk) {
}
progress_data.hash_tables[wrk->ndx] = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, progress_hashtable_free_callback);
li_waitqueue_init(&(progress_data.timeout_queues[wrk->ndx]), wrk->loop, (liWaitQueueCB)progress_timeout_callback, progress_data.ttl, wrk);
li_waitqueue_init(&(progress_data.timeout_queues[wrk->ndx]), wrk->loop, progress_timeout_callback, progress_data.ttl, wrk);
}

Loading…
Cancel
Save