Browse Source

implement throttling, change some time_t to ev_tstamp

personal/stbuehler/wip
Thomas Porzelt 13 years ago
parent
commit
358b9f95f1
  1. 3
      src/connection.c
  2. 11
      src/connection.h
  3. 2
      src/log.c
  4. 2
      src/log.h
  5. 4
      src/modules/mod_status.c
  6. 6
      src/network.c
  7. 33
      src/network_linux_sendfile.c
  8. 2
      src/network_write.c
  9. 2
      src/network_writev.c
  10. 2
      src/plugin_core.c
  11. 4
      src/plugin_core.h
  12. 2
      src/server.c
  13. 23
      src/worker.c
  14. 4
      src/worker.h

3
src/connection.c

@ -324,6 +324,7 @@ connection* connection_new(worker *wrk) {
con->keep_alive_data.watcher.data = con;
con->io_timeout_elem.data = con;
con->throttle.queue_elem.data = con;
return con;
}
@ -374,6 +375,8 @@ void connection_reset(connection *con) {
/* remove from timeout queue */
waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
/* remove from throttle queue */
waitqueue_remove(&con->wrk->throttle_queue, &con->throttle.queue_elem);
}
void server_check_keepalive(server *srv);

11
src/connection.h

@ -60,12 +60,19 @@ struct connection {
/* I/O timeout data */
waitqueue_elem io_timeout_elem;
time_t ts;
/* I/O throttling */
struct {
waitqueue_elem queue_elem;
guint magazine;
ev_tstamp ts;
} throttle;
ev_tstamp ts;
struct {
guint64 bytes_in;
guint64 bytes_out;
time_t last_avg;
ev_tstamp last_avg;
guint64 bytes_in_5s;
guint64 bytes_out_5s;
guint64 bytes_in_5s_diff;

2
src/log.c

@ -98,7 +98,7 @@ gboolean log_write_(server *srv, connection *con, log_level_t log_level, guint f
/* if we have a worker context, we can use its timestamp to save us a call to time() */
if (con != NULL)
cur_ts = CUR_TS(con->wrk);
cur_ts = (time_t)CUR_TS(con->wrk);
else
cur_ts = time(NULL);

2
src/log.h

@ -141,7 +141,7 @@ struct log_t {
struct log_timestamp_t {
gint refcount;
time_t last_ts;
ev_tstamp last_ts;
GString *format;
GString *cached;
};

4
src/modules/mod_status.c

@ -146,7 +146,7 @@ struct mod_status_con_data {
GString *remote_addr_str, *local_addr_str;
gboolean is_ssl, keep_alive;
GString *host, *path;
time_t ts;
ev_tstamp ts;
guint64 bytes_in;
guint64 bytes_out;
guint64 bytes_in_5s_diff;
@ -394,7 +394,7 @@ static void status_collect_cb(gpointer cbdata, gpointer fdata, GPtrArray *result
for (guint j = 0; j < sd->connections->len; j++) {
mod_status_con_data *cd = &g_array_index(sd->connections, mod_status_con_data, j);
ts = counter_format2(CUR_TS(vr->con->wrk) - cd->ts, COUNTER_TIME, -1);
ts = counter_format2((guint64)(CUR_TS(vr->con->wrk) - cd->ts), COUNTER_TIME, -1);
bytes_in = counter_format2(cd->bytes_in, COUNTER_BYTES, 1);
bytes_in_5s = counter_format2(cd->bytes_in_5s_diff / G_GUINT64_CONSTANT(5), COUNTER_BYTES, 1);
bytes_out = counter_format2(cd->bytes_out, COUNTER_BYTES, 1);

6
src/network.c

@ -63,7 +63,7 @@ network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) {
#endif
/* only update once a second, the cast is to round the timestamp */
if ((guint)vr->con->io_timeout_elem.ts != now)
if ((vr->con->io_timeout_elem.ts + 1.) < now)
waitqueue_push(&vr->con->wrk->io_timeout_queue, &vr->con->io_timeout_elem);
return res;
@ -109,7 +109,7 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) {
/* update 5s stats */
if ((now - vr->con->stats.last_avg) > 5) {
if ((now - vr->con->stats.last_avg) >= 5.0) {
vr->con->stats.bytes_in_5s_diff = vr->con->stats.bytes_in - vr->con->stats.bytes_in_5s;
vr->con->stats.bytes_in_5s = vr->con->stats.bytes_in;
vr->con->stats.last_avg = now;
@ -117,7 +117,7 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) {
} while (r == blocksize && len < max_read);
/* only update once a second, the cast is to round the timestamp */
if ((guint)vr->con->io_timeout_elem.ts != now)
if ((vr->con->io_timeout_elem.ts + 1.) < now)
waitqueue_push(&vr->con->wrk->io_timeout_queue, &vr->con->io_timeout_elem);
return NETWORK_STATUS_SUCCESS;

33
src/network_linux_sendfile.c

@ -1,5 +1,6 @@
#include "base.h"
#include "plugin_core.h"
/* first chunk must be a FILE_CHUNK ! */
network_status_t network_backend_sendfile(vrequest *vr, int fd, chunkqueue *cq, goffset *write_max) {
@ -9,7 +10,7 @@ network_status_t network_backend_sendfile(vrequest *vr, int fd, chunkqueue *cq,
chunkiter ci;
chunk *c;
worker *wrk;
time_t ts;
ev_tstamp ts;
if (0 == cq->length) return NETWORK_STATUS_FATAL_ERROR;
@ -82,7 +83,7 @@ network_status_t network_backend_sendfile(vrequest *vr, int fd, chunkqueue *cq,
/* update 5s stats */
ts = CUR_TS(wrk);
if ((ts - vr->con->stats.last_avg) > 5) {
if ((ts - vr->con->stats.last_avg) >= 5.0) {
vr->con->stats.bytes_out_5s_diff = vr->con->wrk->stats.bytes_out - vr->con->wrk->stats.bytes_out_5s;
vr->con->stats.bytes_out_5s = vr->con->stats.bytes_out;
vr->con->stats.last_avg = ts;
@ -95,8 +96,24 @@ network_status_t network_backend_sendfile(vrequest *vr, int fd, chunkqueue *cq,
}
network_status_t network_write_sendfile(vrequest *vr, int fd, chunkqueue *cq) {
goffset write_max = 256*1024; // 256kB //;
goffset write_max;
if (CORE_OPTION(CORE_OPTION_THROTTLE).number) {
/* throttling is enabled */
ev_tstamp now = CUR_TS(vr->con->wrk);
if (G_UNLIKELY((now - vr->con->throttle.ts) > vr->con->wrk->throttle_queue.delay)) {
vr->con->throttle.magazine += CORE_OPTION(CORE_OPTION_THROTTLE).number * (now - vr->con->throttle.ts);
if (vr->con->throttle.magazine > CORE_OPTION(CORE_OPTION_THROTTLE).number)
vr->con->throttle.magazine = CORE_OPTION(CORE_OPTION_THROTTLE).number;
vr->con->throttle.ts = now;
/*g_print("throttle magazine: %u kbytes\n", vr->con->throttle.magazine / 1024);*/
}
write_max = vr->con->throttle.magazine;
} else
write_max = 256*1024; /* 256kB */
if (cq->length == 0) return NETWORK_STATUS_FATAL_ERROR;
do {
switch (chunkqueue_first_chunk(cq)->type) {
case MEM_CHUNK:
@ -108,6 +125,16 @@ network_status_t network_write_sendfile(vrequest *vr, int fd, chunkqueue *cq) {
default:
return NETWORK_STATUS_FATAL_ERROR;
}
/* check if throttle magazine is empty */
if (CORE_OPTION(CORE_OPTION_THROTTLE).number && write_max == 0) {
/* remove EV_WRITE from sockwatcher for now */
vr->con->throttle.magazine = 0;
ev_io_rem_events(vr->con->wrk->loop, &vr->con->sock_watcher, EV_WRITE);
waitqueue_push(&vr->con->wrk->throttle_queue, &vr->con->throttle.queue_elem);
return NETWORK_STATUS_WAIT_FOR_EVENT;
}
if (cq->length == 0) return NETWORK_STATUS_SUCCESS;
} while (write_max > 0);
return NETWORK_STATUS_SUCCESS;

2
src/network_write.c

@ -9,7 +9,7 @@ network_status_t network_backend_write(vrequest *vr, int fd, chunkqueue *cq, gof
gboolean did_write_something = FALSE;
chunkiter ci;
worker *wrk;
time_t ts;
ev_tstamp ts;
do {
if (0 == cq->length)

2
src/network_writev.c

@ -32,7 +32,7 @@ network_status_t network_backend_writev(vrequest *vr, int fd, chunkqueue *cq, go
chunkiter ci;
chunk *c;
worker *wrk;
time_t ts;
ev_tstamp ts;
network_status_t res = NETWORK_STATUS_FATAL_ERROR;
GArray *chunks = g_array_sized_new(FALSE, TRUE, sizeof(struct iovec), UIO_MAXIOV);

2
src/plugin_core.c

@ -801,6 +801,8 @@ static const plugin_option options[] = {
{ "docroot", VALUE_STRING, NULL, NULL, NULL },
{ "throttle", VALUE_NUMBER, GINT_TO_POINTER(0), NULL, NULL },
{ NULL, 0, NULL, NULL, NULL }
};

4
src/plugin_core.h

@ -16,7 +16,9 @@ enum core_options_t {
CORE_OPTION_MIME_TYPES,
CORE_OPTION_DOCROOT
CORE_OPTION_DOCROOT,
CORE_OPTION_THROTTLE
};
/* the core plugin always has base index 0, as it is the first plugin loaded */

2
src/server.c

@ -346,7 +346,7 @@ GString *server_current_timestamp() {
g_string_set_size(ts_str, 255);
s = strftime(ts_str->str, ts_str->allocated_len,
"%a, %d %b %Y %H:%M:%S GMT", gmtime(&(cur_ts)));
"%a, %d %b %Y %H:%M:%S GMT", gmtime(&cur_ts));
g_string_set_size(ts_str, s);
*last_ts = cur_ts;
}

23
src/worker.c

@ -118,9 +118,26 @@ static void worker_io_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents)
waitqueue_update(&wrk->io_timeout_queue);
}
/* check for throttled connections */
static void worker_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) {
worker *wrk = (worker*) w->data;
connection *con;
waitqueue_elem *wqe;
UNUSED(revents);
while ((wqe = waitqueue_pop(&wrk->throttle_queue)) != NULL) {
/* connection waited long enough to reenable sending of data again */
con = wqe->data;
ev_io_add_events(loop, &con->sock_watcher, EV_WRITE);
}
waitqueue_update(&wrk->throttle_queue);
}
/* cache timestamp */
GString *worker_current_timestamp(worker *wrk) {
time_t cur_ts = CUR_TS(wrk);
time_t cur_ts = (time_t)CUR_TS(wrk);
if (cur_ts != wrk->last_generated_date_ts) {
g_string_set_size(wrk->ts_date_str, 255);
strftime(wrk->ts_date_str->str, wrk->ts_date_str->allocated_len,
@ -278,6 +295,10 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) {
waitqueue_init(&wrk->io_timeout_queue, wrk->loop, worker_io_timeout_cb, srv->io_timeout, wrk);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
/* throttling */
waitqueue_init(&wrk->throttle_queue, wrk->loop, worker_throttle_cb, 0.5, wrk);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
return wrk;
}

4
src/worker.h

@ -25,7 +25,7 @@ struct statistics_t {
guint64 bytes_in_5s;
guint64 bytes_in_5s_diff;
guint active_cons_5s;
time_t last_avg;
ev_tstamp last_avg;
/* updated in timer */
guint64 last_requests;
@ -71,7 +71,7 @@ struct worker {
guint connection_load; /** incremented by server_accept_cb, decremented by worker_con_put. use atomic access */
time_t last_generated_date_ts;
ev_tstamp last_generated_date_ts;
GString *ts_date_str; /**< use worker_current_timestamp(wrk) */
/* incoming queues */

Loading…
Cancel
Save