diff --git a/src/base.h b/src/base.h index 62224e6..418c8ad 100644 --- a/src/base.h +++ b/src/base.h @@ -21,6 +21,8 @@ } while (0) #include "typedefs.h" +#include "utils.h" + #include "module.h" #include "chunk.h" diff --git a/src/connection.c b/src/connection.c index bb3c477..8a6052a 100644 --- a/src/connection.c +++ b/src/connection.c @@ -110,6 +110,7 @@ void connection_internal_error(connection *con) { static gboolean connection_handle_read(connection *con) { vrequest *vr = con->mainvr; + if (con->raw_in->length == 0) return TRUE; if (con->state == CON_STATE_KEEP_ALIVE) { @@ -124,10 +125,7 @@ static gboolean connection_handle_read(connection *con) { con->state = CON_STATE_READ_REQUEST_HEADER; con->ts = CUR_TS(con->wrk); - connection_io_timeout_init(con); } else { - if (vr->con->io_timeout.last_io != CUR_TS(vr->con->wrk)) - connection_io_timeout_reset(con); if (con->state == CON_STATE_REQUEST_START) con->state = CON_STATE_READ_REQUEST_HEADER; } @@ -325,6 +323,8 @@ connection* connection_new(worker *wrk) { ev_init(&con->keep_alive_data.watcher, connection_keepalive_cb); con->keep_alive_data.watcher.data = con; + con->io_timeout_elem.data = con; + return con; } @@ -373,7 +373,7 @@ void connection_reset(connection *con) { con->stats.last_avg = 0; /* remove from timeout queue */ - connection_io_timeout_remove(con); + waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem); } void server_check_keepalive(server *srv); @@ -425,7 +425,7 @@ void connection_reset_keep_alive(connection *con) { con->stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0); con->stats.last_avg = 0; - connection_io_timeout_remove(con); + waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem); } void connection_free(connection *con) { @@ -477,57 +477,3 @@ gchar *connection_state_str(connection_state_t state) { return (gchar*)states[state]; } - -void connection_io_timeout_init(connection *con) { - worker *wrk = con->wrk; - - if (wrk->io_timeout_queue_tail) - wrk->io_timeout_queue_tail->io_timeout.next = con; - else - /* if there is no tail, it means the queue is empty */ - wrk->io_timeout_queue_head = con; - - wrk->io_timeout_queue_tail = con; - con->io_timeout.last_io = CUR_TS(wrk); - con->io_timeout.next = NULL; -} - -void connection_io_timeout_reset(connection *con) { - /* move con to the end of the timeout queue */ - worker *wrk = con->wrk; - - if (con == wrk->io_timeout_queue_head && con != wrk->io_timeout_queue_tail) - wrk->io_timeout_queue_head = con->io_timeout.next; - - if (con != wrk->io_timeout_queue_tail) - con->io_timeout.prev = wrk->io_timeout_queue_tail; - - if (wrk->io_timeout_queue_tail) - wrk->io_timeout_queue_tail->io_timeout.next = con; - - con->io_timeout.next = NULL; - wrk->io_timeout_queue_tail = con; - con->io_timeout.last_io = CUR_TS(wrk); -} - -void connection_io_timeout_remove(connection *con) { - /* remove con from the timeout queue */ - worker *wrk = con->wrk; - - /* check if connection is in the timeout queue, it might not be the case when it is in keep alive idle state */ - if (con->io_timeout.prev == NULL && con->io_timeout.next == NULL && con != wrk->io_timeout_queue_head) - return; - - if (con == wrk->io_timeout_queue_head) - wrk->io_timeout_queue_head = con->io_timeout.next; - else - con->io_timeout.prev->io_timeout.next = con->io_timeout.next; - - if (con == wrk->io_timeout_queue_tail) - wrk->io_timeout_queue_tail = con->io_timeout.prev; - else - con->io_timeout.next->io_timeout.prev = con->io_timeout.prev; - - con->io_timeout.prev = NULL; - con->io_timeout.next = NULL; -} diff --git a/src/connection.h b/src/connection.h index 2773407..a066d74 100644 --- a/src/connection.h +++ b/src/connection.h @@ -58,11 +58,7 @@ struct connection { } keep_alive_data; /* I/O timeout data */ - struct { - ev_tstamp last_io; - connection *prev; - connection *next; - } io_timeout; + waitqueue_elem io_timeout_elem; time_t ts; @@ -82,10 +78,6 @@ LI_API void connection_reset(connection *con); LI_API void connection_reset_keep_alive(connection *con); LI_API void connection_free(connection *con); -LI_API void connection_io_timeout_init(connection *con); -LI_API void connection_io_timeout_reset(connection *con); -LI_API void connection_io_timeout_remove(connection *con); - LI_API void connection_error(connection *con); LI_API void connection_handle_direct(connection *con); diff --git a/src/network.c b/src/network.c index 3116aad..ae9ca2a 100644 --- a/src/network.c +++ b/src/network.c @@ -37,6 +37,7 @@ ssize_t net_read(int fd, void *buf, ssize_t nbyte) { network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) { network_status_t res; + ev_tstamp now = CUR_TS(vr->con->wrk); #ifdef TCP_CORK int corked = 0; #endif @@ -61,8 +62,9 @@ network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) { } #endif - if (vr->con->io_timeout.last_io != CUR_TS(vr->con->wrk)) - connection_io_timeout_reset(vr->con); + /* only update once a second, the cast is to round the timestamp */ + if ((guint)vr->con->io_timeout_elem.ts != now) + waitqueue_push(&vr->con->wrk->io_timeout_queue, &vr->con->io_timeout_elem); return res; } @@ -72,8 +74,8 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) { const off_t max_read = 16 * blocksize; /* 256k */ ssize_t r; off_t len = 0; - worker *wrk; - ev_tstamp ts; + worker *wrk = vr->con->wrk; + ev_tstamp now = CUR_TS(wrk); do { GString *buf = g_string_sized_new(blocksize); @@ -106,14 +108,17 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) { vr->con->stats.bytes_in += r; /* update 5s stats */ - ts = CUR_TS(wrk); - if ((ts - vr->con->stats.last_avg) > 5) { + if ((now - vr->con->stats.last_avg) > 5) { vr->con->stats.bytes_in_5s_diff = vr->con->stats.bytes_in - vr->con->stats.bytes_in_5s; vr->con->stats.bytes_in_5s = vr->con->stats.bytes_in; - vr->con->stats.last_avg = ts; + vr->con->stats.last_avg = now; } } while (r == blocksize && len < max_read); + /* only update once a second, the cast is to round the timestamp */ + if ((guint)vr->con->io_timeout_elem.ts != now) + waitqueue_push(&vr->con->wrk->io_timeout_queue, &vr->con->io_timeout_elem); + return NETWORK_STATUS_SUCCESS; } diff --git a/src/server.h b/src/server.h index 1a59b00..2ce41e6 100644 --- a/src/server.h +++ b/src/server.h @@ -72,7 +72,7 @@ struct server { /* keep alive timeout */ guint keep_alive_queue_timeout; - guint io_timeout; + gdouble io_timeout; }; diff --git a/src/typedefs.h b/src/typedefs.h index d40564b..cda39b5 100644 --- a/src/typedefs.h +++ b/src/typedefs.h @@ -154,6 +154,16 @@ typedef struct response response; struct server; typedef struct server server; +/* utils.h */ + +struct waitqueue_elem; +typedef struct waitqueue_elem waitqueue_elem; + +struct waitqueue; +typedef struct waitqueue waitqueue; + +typedef void (*waitqueue_cb) (struct ev_loop *loop, struct ev_timer *w, int revents); + /* value.h */ struct value; diff --git a/src/utils.c b/src/utils.c index ca3e420..772d2b8 100644 --- a/src/utils.c +++ b/src/utils.c @@ -510,3 +510,105 @@ GString *sockaddr_to_string(sock_addr *saddr, GString *dest) { return dest; } + + +void waitqueue_init(waitqueue *queue, struct ev_loop *loop, waitqueue_cb callback, gdouble delay, gpointer data) { + ev_timer_init(&queue->timer, callback, delay, delay); + ev_timer_start(loop, &queue->timer); + + queue->timer.data = data; + queue->head = queue->tail = NULL; + queue->loop = loop; + queue->delay = delay; +} + +void waitqueue_stop(waitqueue *queue) { + ev_timer_stop(queue->loop, &queue->timer); +} + +void waitqueue_update(waitqueue *queue) { + ev_tstamp repeat; + + if (queue->head) { + repeat = queue->head->ts + queue->delay - ev_now(queue->loop); + } else + repeat = queue->delay; + + if (queue->timer.repeat != repeat) + { + queue->timer.repeat = repeat; + ev_timer_again(queue->loop, &queue->timer); + } +} + +void waitqueue_push(waitqueue *queue, waitqueue_elem *elem) { + elem->ts = ev_now(queue->loop); + + if (!elem->queued) { + elem->queued = TRUE; + /* not in the queue yet, insert at the end */ + if (!queue->head) { + /* queue is empty */ + queue->head = elem; + queue->tail = elem; + elem->prev = NULL; + elem->next = NULL; + } else { + /* queue not empty */ + elem->prev = queue->tail; + elem->next = NULL; + queue->tail->next = elem; + queue->tail = elem; + } + } else { + /* already queued, move to end */ + if (elem == queue->tail) + return; + + if (elem == queue->head) { + queue->head = elem->next; + if (elem->next) + elem->next->prev = NULL; + } + + elem->prev = queue->tail; + elem->next = NULL; + queue->tail->next = elem; + queue->tail = elem; + } +} + +waitqueue_elem *waitqueue_pop(waitqueue *queue) { + waitqueue_elem *elem = queue->head; + ev_tstamp now = ev_now(queue->loop); + + if (!elem || (elem->ts + queue->delay) >= now) { + return NULL; + } + + if (elem != queue->tail) + elem->next->prev = NULL; + + queue->head = elem->next; + + elem->queued = FALSE; + + return elem; +} + +void waitqueue_remove(waitqueue *queue, waitqueue_elem *elem) { + if (!elem->queued) + return; + + if (elem == queue->head) + queue->head = elem->next; + else + elem->prev->next = elem->next; + + if (elem == queue->tail) + queue->tail = elem->prev; + else + elem->next->prev = elem->prev; + + elem->queued = FALSE; +} diff --git a/src/utils.h b/src/utils.h index 5519c26..e2b733a 100644 --- a/src/utils.h +++ b/src/utils.h @@ -12,6 +12,25 @@ typedef enum { COUNTER_UNITS } counter_type; + +struct waitqueue_elem { + gboolean queued; + ev_tstamp ts; + waitqueue_elem *prev; + waitqueue_elem *next; + gpointer data; +}; + +struct waitqueue { + waitqueue_elem *head; + waitqueue_elem *tail; + ev_timer timer; + struct ev_loop *loop; + gdouble delay; +}; + + + LI_API void fatal(const gchar* msg); /* set O_NONBLOCK and FD_CLOEXEC */ @@ -51,4 +70,24 @@ LI_API GString *mimetype_get(vrequest *vr, GString *filename); /* converts a sock_addr to a human readable string. ipv4 and ipv6 supported. if dest is NULL, a new string will be allocated */ LI_API GString *sockaddr_to_string(sock_addr *saddr, GString *dest); + +/* + * waitqueues are queues used to implement delays for certain tasks in a lightweight, non-blocking way + * they are used for io timeouts or throttling for example + * waitqueue_push, waitqueue_pop and waitqueue_remove have O(1) complexity + */ + +/* initializes a waitqueue by creating and starting the ev_timer. precision is sub-seconds */ +LI_API void waitqueue_init(waitqueue *queue, struct ev_loop *loop, waitqueue_cb callback, gdouble delay, gpointer data); +/* stops the waitqueue. to restart it, simply call waitqueue_update */ +LI_API void waitqueue_stop(waitqueue *queue); +/* updates the timeout of the waitqueue, you should allways call this at the end of your callback */ +LI_API void waitqueue_update(waitqueue *queue); +/* moves the element to the end of the queue if already queued, appends it to the end otherwise */ +LI_API void waitqueue_push(waitqueue *queue, waitqueue_elem *elem); +/* pops the first ready! element from the queue or NULL if none ready yet. this should be called in your callback */ +LI_API waitqueue_elem *waitqueue_pop(waitqueue *queue); +/* removes an element from the queue */ +LI_API void waitqueue_remove(waitqueue *queue, waitqueue_elem *elem); + #endif diff --git a/src/worker.c b/src/worker.c index 4b53886..b0a8df3 100644 --- a/src/worker.c +++ b/src/worker.c @@ -101,20 +101,21 @@ static void worker_keepalive_cb(struct ev_loop *loop, ev_timer *w, int revents) static void worker_io_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { worker *wrk = (worker*) w->data; connection *con; + waitqueue_elem *wqe; + ev_tstamp now = CUR_TS(wrk); UNUSED(loop); UNUSED(revents); - for (con = wrk->io_timeout_queue_head; con != NULL; con = con->io_timeout.next) { - if ((con->io_timeout.last_io + wrk->srv->io_timeout) < CUR_TS(wrk)) { - /* connection has timed out */ - CON_TRACE(con, "connection io-timeout from %s after %u seconds", con->remote_addr_str->str, wrk->srv->io_timeout); - plugins_handle_close(con); - worker_con_put(con); - } else { - break; - } + while ((wqe = waitqueue_pop(&wrk->io_timeout_queue)) != NULL) { + /* connection has timed out */ + con = wqe->data; + CON_TRACE(con, "connection io-timeout from %s after %.2f seconds", con->remote_addr_str->str, now - wqe->ts); + plugins_handle_close(con); + worker_con_put(con); } + + waitqueue_update(&wrk->io_timeout_queue); } /* cache timestamp */ @@ -166,7 +167,7 @@ void worker_new_con(worker *ctx, worker *wrk, sock_addr *remote_addr, int s) { ev_io_start(wrk->loop, &con->sock_watcher); con->ts = CUR_TS(con->wrk); sockaddr_to_string(remote_addr, con->remote_addr_str); - connection_io_timeout_init(con); + waitqueue_push(&wrk->io_timeout_queue, &con->io_timeout_elem); } else { worker_new_con_data *d = g_slice_new(worker_new_con_data); d->remote_addr = *remote_addr; @@ -273,9 +274,8 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) { wrk->collect_queue = g_async_queue_new(); ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */ - ev_timer_init(&wrk->io_timer, worker_io_timeout_cb, 1, 1); - wrk->io_timer.data = wrk; - ev_timer_start(wrk->loop, &wrk->io_timer); + /* io timeout timer */ + waitqueue_init(&wrk->io_timeout_queue, wrk->loop, worker_io_timeout_cb, srv->io_timeout, wrk); ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */ return wrk; diff --git a/src/worker.h b/src/worker.h index 4a387aa..1853f01 100644 --- a/src/worker.h +++ b/src/worker.h @@ -65,9 +65,9 @@ struct worker { ev_timer keep_alive_timer; GQueue keep_alive_queue; - ev_timer io_timer; - connection *io_timeout_queue_head; - connection *io_timeout_queue_tail; + waitqueue io_timeout_queue; + + waitqueue throttle_queue; guint connection_load; /** incremented by server_accept_cb, decremented by worker_con_put. use atomic access */