diff --git a/src/connection.c b/src/connection.c index f331585..a6a2b53 100644 --- a/src/connection.c +++ b/src/connection.c @@ -122,6 +122,7 @@ static gboolean connection_handle_read(connection *con) { ev_timer_stop(con->wrk->loop, &con->keep_alive_data.watcher); con->state = CON_STATE_READ_REQUEST_HEADER; + con->ts = CUR_TS(con->wrk); } else if (con->state == CON_STATE_REQUEST_START) { con->state = CON_STATE_READ_REQUEST_HEADER; } @@ -294,8 +295,8 @@ connection* connection_new(worker *wrk) { ev_init(&con->sock_watcher, connection_cb); ev_io_set(&con->sock_watcher, -1, 0); con->sock_watcher.data = con; - con->remote_addr_str = g_string_sized_new(0); - con->local_addr_str = g_string_sized_new(0); + con->remote_addr_str = g_string_sized_new(INET6_ADDRSTRLEN); + con->local_addr_str = g_string_sized_new(INET6_ADDRSTRLEN); con->keep_alive = TRUE; con->raw_in = chunkqueue_new(); @@ -356,6 +357,15 @@ void connection_reset(connection *con) { con->keep_alive_data.timeout = 0; con->keep_alive_data.max_idle = 0; ev_timer_stop(con->wrk->loop, &con->keep_alive_data.watcher); + + /* reset stats */ + con->stats.bytes_in = G_GUINT64_CONSTANT(0); + con->stats.bytes_in_5s = G_GUINT64_CONSTANT(0); + con->stats.bytes_in_5s_diff = G_GUINT64_CONSTANT(0); + con->stats.bytes_out = G_GUINT64_CONSTANT(0); + con->stats.bytes_out_5s = G_GUINT64_CONSTANT(0); + con->stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0); + con->stats.last_avg = 0; } void server_check_keepalive(server *srv); @@ -387,8 +397,6 @@ void connection_reset_keep_alive(connection *con) { con->expect_100_cont = FALSE; ev_io_set_events(con->wrk->loop, &con->sock_watcher, EV_READ); - g_string_truncate(con->remote_addr_str, 0); - g_string_truncate(con->local_addr_str, 0); con->keep_alive = TRUE; con->raw_out->is_closed = FALSE; @@ -397,6 +405,17 @@ void connection_reset_keep_alive(connection *con) { vrequest_reset(con->mainvr); http_request_parser_reset(&con->req_parser_ctx); + + con->ts = CUR_TS(con->wrk); + + /* reset stats */ + con->stats.bytes_in = G_GUINT64_CONSTANT(0); + con->stats.bytes_in_5s = G_GUINT64_CONSTANT(0); + con->stats.bytes_in_5s_diff = G_GUINT64_CONSTANT(0); + con->stats.bytes_out = G_GUINT64_CONSTANT(0); + con->stats.bytes_out_5s = G_GUINT64_CONSTANT(0); + con->stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0); + con->stats.last_avg = 0; } void connection_free(connection *con) { @@ -435,3 +454,16 @@ void connection_free(connection *con) { g_slice_free(connection, con); } + +gchar *connection_state_str(connection_state_t state) { + static const gchar *states[] = { + "dead", + "keep-alive", + "request start", + "read request header", + "handle main vrequest", + "write" + }; + + return (gchar*)states[state]; +} diff --git a/src/connection.h b/src/connection.h index 882eba4..364e96c 100644 --- a/src/connection.h +++ b/src/connection.h @@ -56,6 +56,18 @@ struct connection { guint max_idle; ev_timer watcher; } keep_alive_data; + + time_t ts; + + struct { + guint64 bytes_in; + guint64 bytes_out; + time_t last_avg; + guint64 bytes_in_5s; + guint64 bytes_out_5s; + guint64 bytes_in_5s_diff; + guint64 bytes_out_5s_diff; + } stats; }; LI_API connection* connection_new(worker *wrk); @@ -68,4 +80,6 @@ LI_API void connection_error(connection *con); LI_API void connection_handle_direct(connection *con); LI_API void connection_handle_indirect(connection *con, plugin *p); +LI_API gchar *connection_state_str(connection_state_t state); + #endif diff --git a/src/network.c b/src/network.c index 05c1caa..169b3e0 100644 --- a/src/network.c +++ b/src/network.c @@ -69,6 +69,8 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) { const off_t max_read = 16 * blocksize; /* 256k */ ssize_t r; off_t len = 0; + worker *wrk; + ev_tstamp ts; do { GString *buf = g_string_sized_new(blocksize); @@ -94,6 +96,20 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) { g_string_truncate(buf, r); chunkqueue_append_string(cq, buf); len += r; + + /* stats */ + wrk = vr->con->wrk; + wrk->stats.bytes_in += r; + vr->con->stats.bytes_in += r; + + /* update 5s stats */ + ts = CUR_TS(wrk); + + if ((ts - vr->con->stats.last_avg) > 5) { + vr->con->stats.bytes_in_5s_diff = vr->con->stats.bytes_in - vr->con->stats.bytes_in_5s; + vr->con->stats.bytes_in_5s = vr->con->stats.bytes_in; + vr->con->stats.last_avg = ts; + } } while (r == blocksize && len < max_read); return NETWORK_STATUS_SUCCESS; diff --git a/src/network_linux_sendfile.c b/src/network_linux_sendfile.c index 3b733a4..4030947 100644 --- a/src/network_linux_sendfile.c +++ b/src/network_linux_sendfile.c @@ -8,6 +8,8 @@ network_status_t network_backend_sendfile(vrequest *vr, int fd, chunkqueue *cq, gboolean did_write_something = FALSE; chunkiter ci; chunk *c; + worker *wrk; + time_t ts; if (0 == cq->length) return NETWORK_STATUS_FATAL_ERROR; @@ -71,6 +73,21 @@ network_status_t network_backend_sendfile(vrequest *vr, int fd, chunkqueue *cq, chunkqueue_skip(cq, r); *write_max -= r; did_write_something = TRUE; + + /* stats */ + wrk = vr->con->wrk; + wrk->stats.bytes_out += r; + vr->con->stats.bytes_out += r; + + /* update 5s stats */ + ts = CUR_TS(wrk); + + if ((ts - vr->con->stats.last_avg) > 5) { + vr->con->stats.bytes_out_5s_diff = vr->con->wrk->stats.bytes_out - vr->con->wrk->stats.bytes_out_5s; + vr->con->stats.bytes_out_5s = vr->con->stats.bytes_out; + vr->con->stats.last_avg = ts; + } + if (0 == cq->length) return NETWORK_STATUS_SUCCESS; } while (r == toSend && *write_max > 0); diff --git a/src/network_write.c b/src/network_write.c index 6293b01..febb91b 100644 --- a/src/network_write.c +++ b/src/network_write.c @@ -8,6 +8,8 @@ network_status_t network_backend_write(vrequest *vr, int fd, chunkqueue *cq, gof ssize_t r; gboolean did_write_something = FALSE; chunkiter ci; + worker *wrk; + time_t ts; do { if (0 == cq->length) @@ -44,6 +46,21 @@ network_status_t network_backend_write(vrequest *vr, int fd, chunkqueue *cq, gof chunkqueue_skip(cq, r); did_write_something = TRUE; *write_max -= r; + + /* stats */ + wrk = vr->con->wrk; + vr->con->wrk->stats.bytes_out += r; + vr->con->stats.bytes_out += r; + + /* update 5s stats */ + ts = CUR_TS(wrk); + + if ((ts - vr->con->stats.last_avg) > 5) { + vr->con->stats.bytes_out_5s_diff = vr->con->stats.bytes_out - vr->con->stats.bytes_out_5s; + vr->con->stats.bytes_out_5s = vr->con->stats.bytes_out; + vr->con->stats.last_avg = ts; + } + } while (r == block_len && *write_max > 0); return NETWORK_STATUS_SUCCESS; diff --git a/src/network_writev.c b/src/network_writev.c index 3770c79..0b7bad5 100644 --- a/src/network_writev.c +++ b/src/network_writev.c @@ -31,6 +31,8 @@ network_status_t network_backend_writev(vrequest *vr, int fd, chunkqueue *cq, go gboolean did_write_something = FALSE; chunkiter ci; chunk *c; + worker *wrk; + time_t ts; network_status_t res = NETWORK_STATUS_FATAL_ERROR; GArray *chunks = g_array_sized_new(FALSE, TRUE, sizeof(struct iovec), UIO_MAXIOV); @@ -86,6 +88,21 @@ network_status_t network_backend_writev(vrequest *vr, int fd, chunkqueue *cq, go } chunkqueue_skip(cq, r); *write_max -= r; + + /* stats */ + wrk = vr->con->wrk; + vr->con->wrk->stats.bytes_out += r; + vr->con->stats.bytes_out += r; + + /* update 5s stats */ + ts = CUR_TS(wrk); + + if ((ts - vr->con->stats.last_avg) > 5) { + vr->con->stats.bytes_out_5s_diff = vr->con->stats.bytes_out - vr->con->stats.bytes_out_5s; + vr->con->stats.bytes_out_5s = vr->con->stats.bytes_out; + vr->con->stats.last_avg = ts; + } + if (r != we_have) { res = NETWORK_STATUS_SUCCESS; goto cleanup; diff --git a/src/worker.c b/src/worker.c index 790362d..6d0ef2d 100644 --- a/src/worker.c +++ b/src/worker.c @@ -143,6 +143,8 @@ void worker_new_con(worker *ctx, worker *wrk, sock_addr *remote_addr, int s) { con->remote_addr = *remote_addr; ev_io_set(&con->sock_watcher, s, EV_READ); ev_io_start(wrk->loop, &con->sock_watcher); + con->ts = CUR_TS(con->wrk); + sockaddr_to_string(remote_addr, con->remote_addr_str); } else { worker_new_con_data *d = g_slice_new(worker_new_con_data); d->remote_addr = *remote_addr; @@ -164,8 +166,8 @@ static void worker_new_con_cb(struct ev_loop *loop, ev_async *w, int revents) { } } -/* stat watcher */ -static void worker_stat_watcher_cb(struct ev_loop *loop, ev_timer *w, int revents) { +/* stats watcher */ +static void worker_stats_watcher_cb(struct ev_loop *loop, ev_timer *w, int revents) { worker *wrk = (worker*) w->data; UNUSED(loop); UNUSED(revents); @@ -179,6 +181,28 @@ static void worker_stat_watcher_cb(struct ev_loop *loop, ev_timer *w, int revent TRACE(wrk->srv, "worker %u: %.2f requests per second", wrk->ndx, wrk->stats.requests_per_sec); } + /* 5s averages */ + if ((now - wrk->stats.last_avg) > 5) { + /* bytes in */ + wrk->stats.bytes_in_5s_diff = wrk->stats.bytes_in - wrk->stats.bytes_in_5s; + wrk->stats.bytes_in_5s = wrk->stats.bytes_in; + + /* bytes out */ + wrk->stats.bytes_out_5s_diff = wrk->stats.bytes_out - wrk->stats.bytes_out_5s; + wrk->stats.bytes_out_5s = wrk->stats.bytes_out; + + /* requests */ + wrk->stats.requests_5s_diff = wrk->stats.requests - wrk->stats.requests_5s; + wrk->stats.requests_5s = wrk->stats.requests; + + /* active connections */ + wrk->stats.active_cons_5s = wrk->connections_active; + + wrk->stats.last_avg = now; + } + + wrk->stats.active_cons_cum += wrk->connections_active; + wrk->stats.last_requests = wrk->stats.requests; wrk->stats.last_update = now; } @@ -216,9 +240,9 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) { ev_async_start(wrk->loop, &wrk->new_con_watcher); wrk->new_con_queue = g_async_queue_new(); - ev_timer_init(&wrk->stat_watcher, worker_stat_watcher_cb, 1, 1); - wrk->stat_watcher.data = wrk; - ev_timer_start(wrk->loop, &wrk->stat_watcher); + ev_timer_init(&wrk->stats_watcher, worker_stats_watcher_cb, 1, 1); + wrk->stats_watcher.data = wrk; + ev_timer_start(wrk->loop, &wrk->stats_watcher); ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */ ev_init(&wrk->collect_watcher, collect_watcher_cb); @@ -265,7 +289,7 @@ void worker_free(worker *wrk) { g_async_queue_unref(wrk->new_con_queue); ev_ref(wrk->loop); - ev_timer_stop(wrk->loop, &wrk->stat_watcher); + ev_timer_stop(wrk->loop, &wrk->stats_watcher); ev_ref(wrk->loop); ev_async_stop(wrk->loop, &wrk->collect_watcher); @@ -364,4 +388,3 @@ void worker_con_put(connection *con) { g_array_index(wrk->connections, connection*, tmp->idx) = tmp; } } - diff --git a/src/worker.h b/src/worker.h index 57ff8af..d99643a 100644 --- a/src/worker.h +++ b/src/worker.h @@ -10,12 +10,23 @@ typedef struct statistics_t statistics_t; struct statistics_t { guint64 bytes_out; /** bytes transfered, outgoing */ - guint64 bytes_int; /** bytes transfered, incoming */ + guint64 bytes_in; /** bytes transfered, incoming */ guint64 requests; /** processed requests */ + guint64 active_cons_cum; /** cummulative value of active connections, updated once a second */ guint64 actions_executed; /** actions executed */ + /* 5 seconds frame avg */ + guint64 requests_5s; + guint64 requests_5s_diff; + guint64 bytes_out_5s; + guint64 bytes_out_5s_diff; + guint64 bytes_in_5s; + guint64 bytes_in_5s_diff; + guint active_cons_5s; + time_t last_avg; + /* updated in timer */ guint64 last_requests; double requests_per_sec; @@ -64,7 +75,7 @@ struct worker { ev_async new_con_watcher; GAsyncQueue *new_con_queue; - ev_timer stat_watcher; + ev_timer stats_watcher; statistics_t stats; /* collect framework */