implement waitqueues (utils), change connection io timeouts to use those
This commit is contained in:
parent
8161336538
commit
d56e5c2ea9
|
@ -21,6 +21,8 @@
|
|||
} while (0)
|
||||
|
||||
#include "typedefs.h"
|
||||
#include "utils.h"
|
||||
|
||||
#include "module.h"
|
||||
|
||||
#include "chunk.h"
|
||||
|
|
|
@ -110,6 +110,7 @@ void connection_internal_error(connection *con) {
|
|||
|
||||
static gboolean connection_handle_read(connection *con) {
|
||||
vrequest *vr = con->mainvr;
|
||||
|
||||
if (con->raw_in->length == 0) return TRUE;
|
||||
|
||||
if (con->state == CON_STATE_KEEP_ALIVE) {
|
||||
|
@ -124,10 +125,7 @@ static gboolean connection_handle_read(connection *con) {
|
|||
con->state = CON_STATE_READ_REQUEST_HEADER;
|
||||
con->ts = CUR_TS(con->wrk);
|
||||
|
||||
connection_io_timeout_init(con);
|
||||
} else {
|
||||
if (vr->con->io_timeout.last_io != CUR_TS(vr->con->wrk))
|
||||
connection_io_timeout_reset(con);
|
||||
if (con->state == CON_STATE_REQUEST_START)
|
||||
con->state = CON_STATE_READ_REQUEST_HEADER;
|
||||
}
|
||||
|
@ -325,6 +323,8 @@ connection* connection_new(worker *wrk) {
|
|||
ev_init(&con->keep_alive_data.watcher, connection_keepalive_cb);
|
||||
con->keep_alive_data.watcher.data = con;
|
||||
|
||||
con->io_timeout_elem.data = con;
|
||||
|
||||
return con;
|
||||
}
|
||||
|
||||
|
@ -373,7 +373,7 @@ void connection_reset(connection *con) {
|
|||
con->stats.last_avg = 0;
|
||||
|
||||
/* remove from timeout queue */
|
||||
connection_io_timeout_remove(con);
|
||||
waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
|
||||
}
|
||||
|
||||
void server_check_keepalive(server *srv);
|
||||
|
@ -425,7 +425,7 @@ void connection_reset_keep_alive(connection *con) {
|
|||
con->stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0);
|
||||
con->stats.last_avg = 0;
|
||||
|
||||
connection_io_timeout_remove(con);
|
||||
waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
|
||||
}
|
||||
|
||||
void connection_free(connection *con) {
|
||||
|
@ -477,57 +477,3 @@ gchar *connection_state_str(connection_state_t state) {
|
|||
|
||||
return (gchar*)states[state];
|
||||
}
|
||||
|
||||
void connection_io_timeout_init(connection *con) {
|
||||
worker *wrk = con->wrk;
|
||||
|
||||
if (wrk->io_timeout_queue_tail)
|
||||
wrk->io_timeout_queue_tail->io_timeout.next = con;
|
||||
else
|
||||
/* if there is no tail, it means the queue is empty */
|
||||
wrk->io_timeout_queue_head = con;
|
||||
|
||||
wrk->io_timeout_queue_tail = con;
|
||||
con->io_timeout.last_io = CUR_TS(wrk);
|
||||
con->io_timeout.next = NULL;
|
||||
}
|
||||
|
||||
void connection_io_timeout_reset(connection *con) {
|
||||
/* move con to the end of the timeout queue */
|
||||
worker *wrk = con->wrk;
|
||||
|
||||
if (con == wrk->io_timeout_queue_head && con != wrk->io_timeout_queue_tail)
|
||||
wrk->io_timeout_queue_head = con->io_timeout.next;
|
||||
|
||||
if (con != wrk->io_timeout_queue_tail)
|
||||
con->io_timeout.prev = wrk->io_timeout_queue_tail;
|
||||
|
||||
if (wrk->io_timeout_queue_tail)
|
||||
wrk->io_timeout_queue_tail->io_timeout.next = con;
|
||||
|
||||
con->io_timeout.next = NULL;
|
||||
wrk->io_timeout_queue_tail = con;
|
||||
con->io_timeout.last_io = CUR_TS(wrk);
|
||||
}
|
||||
|
||||
void connection_io_timeout_remove(connection *con) {
|
||||
/* remove con from the timeout queue */
|
||||
worker *wrk = con->wrk;
|
||||
|
||||
/* check if connection is in the timeout queue, it might not be the case when it is in keep alive idle state */
|
||||
if (con->io_timeout.prev == NULL && con->io_timeout.next == NULL && con != wrk->io_timeout_queue_head)
|
||||
return;
|
||||
|
||||
if (con == wrk->io_timeout_queue_head)
|
||||
wrk->io_timeout_queue_head = con->io_timeout.next;
|
||||
else
|
||||
con->io_timeout.prev->io_timeout.next = con->io_timeout.next;
|
||||
|
||||
if (con == wrk->io_timeout_queue_tail)
|
||||
wrk->io_timeout_queue_tail = con->io_timeout.prev;
|
||||
else
|
||||
con->io_timeout.next->io_timeout.prev = con->io_timeout.prev;
|
||||
|
||||
con->io_timeout.prev = NULL;
|
||||
con->io_timeout.next = NULL;
|
||||
}
|
||||
|
|
|
@ -58,11 +58,7 @@ struct connection {
|
|||
} keep_alive_data;
|
||||
|
||||
/* I/O timeout data */
|
||||
struct {
|
||||
ev_tstamp last_io;
|
||||
connection *prev;
|
||||
connection *next;
|
||||
} io_timeout;
|
||||
waitqueue_elem io_timeout_elem;
|
||||
|
||||
time_t ts;
|
||||
|
||||
|
@ -82,10 +78,6 @@ LI_API void connection_reset(connection *con);
|
|||
LI_API void connection_reset_keep_alive(connection *con);
|
||||
LI_API void connection_free(connection *con);
|
||||
|
||||
LI_API void connection_io_timeout_init(connection *con);
|
||||
LI_API void connection_io_timeout_reset(connection *con);
|
||||
LI_API void connection_io_timeout_remove(connection *con);
|
||||
|
||||
LI_API void connection_error(connection *con);
|
||||
|
||||
LI_API void connection_handle_direct(connection *con);
|
||||
|
|
|
@ -37,6 +37,7 @@ ssize_t net_read(int fd, void *buf, ssize_t nbyte) {
|
|||
|
||||
network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) {
|
||||
network_status_t res;
|
||||
ev_tstamp now = CUR_TS(vr->con->wrk);
|
||||
#ifdef TCP_CORK
|
||||
int corked = 0;
|
||||
#endif
|
||||
|
@ -61,8 +62,9 @@ network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) {
|
|||
}
|
||||
#endif
|
||||
|
||||
if (vr->con->io_timeout.last_io != CUR_TS(vr->con->wrk))
|
||||
connection_io_timeout_reset(vr->con);
|
||||
/* only update once a second, the cast is to round the timestamp */
|
||||
if ((guint)vr->con->io_timeout_elem.ts != now)
|
||||
waitqueue_push(&vr->con->wrk->io_timeout_queue, &vr->con->io_timeout_elem);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
@ -72,8 +74,8 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) {
|
|||
const off_t max_read = 16 * blocksize; /* 256k */
|
||||
ssize_t r;
|
||||
off_t len = 0;
|
||||
worker *wrk;
|
||||
ev_tstamp ts;
|
||||
worker *wrk = vr->con->wrk;
|
||||
ev_tstamp now = CUR_TS(wrk);
|
||||
|
||||
do {
|
||||
GString *buf = g_string_sized_new(blocksize);
|
||||
|
@ -106,14 +108,17 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) {
|
|||
vr->con->stats.bytes_in += r;
|
||||
|
||||
/* update 5s stats */
|
||||
ts = CUR_TS(wrk);
|
||||
|
||||
if ((ts - vr->con->stats.last_avg) > 5) {
|
||||
if ((now - vr->con->stats.last_avg) > 5) {
|
||||
vr->con->stats.bytes_in_5s_diff = vr->con->stats.bytes_in - vr->con->stats.bytes_in_5s;
|
||||
vr->con->stats.bytes_in_5s = vr->con->stats.bytes_in;
|
||||
vr->con->stats.last_avg = ts;
|
||||
vr->con->stats.last_avg = now;
|
||||
}
|
||||
} while (r == blocksize && len < max_read);
|
||||
|
||||
/* only update once a second, the cast is to round the timestamp */
|
||||
if ((guint)vr->con->io_timeout_elem.ts != now)
|
||||
waitqueue_push(&vr->con->wrk->io_timeout_queue, &vr->con->io_timeout_elem);
|
||||
|
||||
return NETWORK_STATUS_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ struct server {
|
|||
/* keep alive timeout */
|
||||
guint keep_alive_queue_timeout;
|
||||
|
||||
guint io_timeout;
|
||||
gdouble io_timeout;
|
||||
};
|
||||
|
||||
|
||||
|
|
|
@ -154,6 +154,16 @@ typedef struct response response;
|
|||
struct server;
|
||||
typedef struct server server;
|
||||
|
||||
/* utils.h */
|
||||
|
||||
struct waitqueue_elem;
|
||||
typedef struct waitqueue_elem waitqueue_elem;
|
||||
|
||||
struct waitqueue;
|
||||
typedef struct waitqueue waitqueue;
|
||||
|
||||
typedef void (*waitqueue_cb) (struct ev_loop *loop, struct ev_timer *w, int revents);
|
||||
|
||||
/* value.h */
|
||||
|
||||
struct value;
|
||||
|
|
102
src/utils.c
102
src/utils.c
|
@ -510,3 +510,105 @@ GString *sockaddr_to_string(sock_addr *saddr, GString *dest) {
|
|||
|
||||
return dest;
|
||||
}
|
||||
|
||||
|
||||
void waitqueue_init(waitqueue *queue, struct ev_loop *loop, waitqueue_cb callback, gdouble delay, gpointer data) {
|
||||
ev_timer_init(&queue->timer, callback, delay, delay);
|
||||
ev_timer_start(loop, &queue->timer);
|
||||
|
||||
queue->timer.data = data;
|
||||
queue->head = queue->tail = NULL;
|
||||
queue->loop = loop;
|
||||
queue->delay = delay;
|
||||
}
|
||||
|
||||
void waitqueue_stop(waitqueue *queue) {
|
||||
ev_timer_stop(queue->loop, &queue->timer);
|
||||
}
|
||||
|
||||
void waitqueue_update(waitqueue *queue) {
|
||||
ev_tstamp repeat;
|
||||
|
||||
if (queue->head) {
|
||||
repeat = queue->head->ts + queue->delay - ev_now(queue->loop);
|
||||
} else
|
||||
repeat = queue->delay;
|
||||
|
||||
if (queue->timer.repeat != repeat)
|
||||
{
|
||||
queue->timer.repeat = repeat;
|
||||
ev_timer_again(queue->loop, &queue->timer);
|
||||
}
|
||||
}
|
||||
|
||||
void waitqueue_push(waitqueue *queue, waitqueue_elem *elem) {
|
||||
elem->ts = ev_now(queue->loop);
|
||||
|
||||
if (!elem->queued) {
|
||||
elem->queued = TRUE;
|
||||
/* not in the queue yet, insert at the end */
|
||||
if (!queue->head) {
|
||||
/* queue is empty */
|
||||
queue->head = elem;
|
||||
queue->tail = elem;
|
||||
elem->prev = NULL;
|
||||
elem->next = NULL;
|
||||
} else {
|
||||
/* queue not empty */
|
||||
elem->prev = queue->tail;
|
||||
elem->next = NULL;
|
||||
queue->tail->next = elem;
|
||||
queue->tail = elem;
|
||||
}
|
||||
} else {
|
||||
/* already queued, move to end */
|
||||
if (elem == queue->tail)
|
||||
return;
|
||||
|
||||
if (elem == queue->head) {
|
||||
queue->head = elem->next;
|
||||
if (elem->next)
|
||||
elem->next->prev = NULL;
|
||||
}
|
||||
|
||||
elem->prev = queue->tail;
|
||||
elem->next = NULL;
|
||||
queue->tail->next = elem;
|
||||
queue->tail = elem;
|
||||
}
|
||||
}
|
||||
|
||||
waitqueue_elem *waitqueue_pop(waitqueue *queue) {
|
||||
waitqueue_elem *elem = queue->head;
|
||||
ev_tstamp now = ev_now(queue->loop);
|
||||
|
||||
if (!elem || (elem->ts + queue->delay) >= now) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (elem != queue->tail)
|
||||
elem->next->prev = NULL;
|
||||
|
||||
queue->head = elem->next;
|
||||
|
||||
elem->queued = FALSE;
|
||||
|
||||
return elem;
|
||||
}
|
||||
|
||||
void waitqueue_remove(waitqueue *queue, waitqueue_elem *elem) {
|
||||
if (!elem->queued)
|
||||
return;
|
||||
|
||||
if (elem == queue->head)
|
||||
queue->head = elem->next;
|
||||
else
|
||||
elem->prev->next = elem->next;
|
||||
|
||||
if (elem == queue->tail)
|
||||
queue->tail = elem->prev;
|
||||
else
|
||||
elem->next->prev = elem->prev;
|
||||
|
||||
elem->queued = FALSE;
|
||||
}
|
||||
|
|
39
src/utils.h
39
src/utils.h
|
@ -12,6 +12,25 @@ typedef enum {
|
|||
COUNTER_UNITS
|
||||
} counter_type;
|
||||
|
||||
|
||||
struct waitqueue_elem {
|
||||
gboolean queued;
|
||||
ev_tstamp ts;
|
||||
waitqueue_elem *prev;
|
||||
waitqueue_elem *next;
|
||||
gpointer data;
|
||||
};
|
||||
|
||||
struct waitqueue {
|
||||
waitqueue_elem *head;
|
||||
waitqueue_elem *tail;
|
||||
ev_timer timer;
|
||||
struct ev_loop *loop;
|
||||
gdouble delay;
|
||||
};
|
||||
|
||||
|
||||
|
||||
LI_API void fatal(const gchar* msg);
|
||||
|
||||
/* set O_NONBLOCK and FD_CLOEXEC */
|
||||
|
@ -51,4 +70,24 @@ LI_API GString *mimetype_get(vrequest *vr, GString *filename);
|
|||
/* converts a sock_addr to a human readable string. ipv4 and ipv6 supported. if dest is NULL, a new string will be allocated */
|
||||
LI_API GString *sockaddr_to_string(sock_addr *saddr, GString *dest);
|
||||
|
||||
|
||||
/*
|
||||
* waitqueues are queues used to implement delays for certain tasks in a lightweight, non-blocking way
|
||||
* they are used for io timeouts or throttling for example
|
||||
* waitqueue_push, waitqueue_pop and waitqueue_remove have O(1) complexity
|
||||
*/
|
||||
|
||||
/* initializes a waitqueue by creating and starting the ev_timer. precision is sub-seconds */
|
||||
LI_API void waitqueue_init(waitqueue *queue, struct ev_loop *loop, waitqueue_cb callback, gdouble delay, gpointer data);
|
||||
/* stops the waitqueue. to restart it, simply call waitqueue_update */
|
||||
LI_API void waitqueue_stop(waitqueue *queue);
|
||||
/* updates the timeout of the waitqueue, you should allways call this at the end of your callback */
|
||||
LI_API void waitqueue_update(waitqueue *queue);
|
||||
/* moves the element to the end of the queue if already queued, appends it to the end otherwise */
|
||||
LI_API void waitqueue_push(waitqueue *queue, waitqueue_elem *elem);
|
||||
/* pops the first ready! element from the queue or NULL if none ready yet. this should be called in your callback */
|
||||
LI_API waitqueue_elem *waitqueue_pop(waitqueue *queue);
|
||||
/* removes an element from the queue */
|
||||
LI_API void waitqueue_remove(waitqueue *queue, waitqueue_elem *elem);
|
||||
|
||||
#endif
|
||||
|
|
26
src/worker.c
26
src/worker.c
|
@ -101,20 +101,21 @@ static void worker_keepalive_cb(struct ev_loop *loop, ev_timer *w, int revents)
|
|||
static void worker_io_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
|
||||
worker *wrk = (worker*) w->data;
|
||||
connection *con;
|
||||
waitqueue_elem *wqe;
|
||||
ev_tstamp now = CUR_TS(wrk);
|
||||
|
||||
UNUSED(loop);
|
||||
UNUSED(revents);
|
||||
|
||||
for (con = wrk->io_timeout_queue_head; con != NULL; con = con->io_timeout.next) {
|
||||
if ((con->io_timeout.last_io + wrk->srv->io_timeout) < CUR_TS(wrk)) {
|
||||
/* connection has timed out */
|
||||
CON_TRACE(con, "connection io-timeout from %s after %u seconds", con->remote_addr_str->str, wrk->srv->io_timeout);
|
||||
plugins_handle_close(con);
|
||||
worker_con_put(con);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
while ((wqe = waitqueue_pop(&wrk->io_timeout_queue)) != NULL) {
|
||||
/* connection has timed out */
|
||||
con = wqe->data;
|
||||
CON_TRACE(con, "connection io-timeout from %s after %.2f seconds", con->remote_addr_str->str, now - wqe->ts);
|
||||
plugins_handle_close(con);
|
||||
worker_con_put(con);
|
||||
}
|
||||
|
||||
waitqueue_update(&wrk->io_timeout_queue);
|
||||
}
|
||||
|
||||
/* cache timestamp */
|
||||
|
@ -166,7 +167,7 @@ void worker_new_con(worker *ctx, worker *wrk, sock_addr *remote_addr, int s) {
|
|||
ev_io_start(wrk->loop, &con->sock_watcher);
|
||||
con->ts = CUR_TS(con->wrk);
|
||||
sockaddr_to_string(remote_addr, con->remote_addr_str);
|
||||
connection_io_timeout_init(con);
|
||||
waitqueue_push(&wrk->io_timeout_queue, &con->io_timeout_elem);
|
||||
} else {
|
||||
worker_new_con_data *d = g_slice_new(worker_new_con_data);
|
||||
d->remote_addr = *remote_addr;
|
||||
|
@ -273,9 +274,8 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) {
|
|||
wrk->collect_queue = g_async_queue_new();
|
||||
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
|
||||
|
||||
ev_timer_init(&wrk->io_timer, worker_io_timeout_cb, 1, 1);
|
||||
wrk->io_timer.data = wrk;
|
||||
ev_timer_start(wrk->loop, &wrk->io_timer);
|
||||
/* io timeout timer */
|
||||
waitqueue_init(&wrk->io_timeout_queue, wrk->loop, worker_io_timeout_cb, srv->io_timeout, wrk);
|
||||
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
|
||||
|
||||
return wrk;
|
||||
|
|
|
@ -65,9 +65,9 @@ struct worker {
|
|||
ev_timer keep_alive_timer;
|
||||
GQueue keep_alive_queue;
|
||||
|
||||
ev_timer io_timer;
|
||||
connection *io_timeout_queue_head;
|
||||
connection *io_timeout_queue_tail;
|
||||
waitqueue io_timeout_queue;
|
||||
|
||||
waitqueue throttle_queue;
|
||||
|
||||
guint connection_load; /** incremented by server_accept_cb, decremented by worker_con_put. use atomic access */
|
||||
|
||||
|
|
Loading…
Reference in New Issue