Browse Source

add client addr string and traffic stats for connections and workers

personal/stbuehler/wip
Thomas Porzelt 14 years ago
parent
commit
0a63fc8058
  1. 40
      src/connection.c
  2. 14
      src/connection.h
  3. 16
      src/network.c
  4. 17
      src/network_linux_sendfile.c
  5. 17
      src/network_write.c
  6. 17
      src/network_writev.c
  7. 37
      src/worker.c
  8. 15
      src/worker.h

40
src/connection.c

@ -122,6 +122,7 @@ static gboolean connection_handle_read(connection *con) {
ev_timer_stop(con->wrk->loop, &con->keep_alive_data.watcher);
con->state = CON_STATE_READ_REQUEST_HEADER;
con->ts = CUR_TS(con->wrk);
} else if (con->state == CON_STATE_REQUEST_START) {
con->state = CON_STATE_READ_REQUEST_HEADER;
}
@ -294,8 +295,8 @@ connection* connection_new(worker *wrk) {
ev_init(&con->sock_watcher, connection_cb);
ev_io_set(&con->sock_watcher, -1, 0);
con->sock_watcher.data = con;
con->remote_addr_str = g_string_sized_new(0);
con->local_addr_str = g_string_sized_new(0);
con->remote_addr_str = g_string_sized_new(INET6_ADDRSTRLEN);
con->local_addr_str = g_string_sized_new(INET6_ADDRSTRLEN);
con->keep_alive = TRUE;
con->raw_in = chunkqueue_new();
@ -356,6 +357,15 @@ void connection_reset(connection *con) {
con->keep_alive_data.timeout = 0;
con->keep_alive_data.max_idle = 0;
ev_timer_stop(con->wrk->loop, &con->keep_alive_data.watcher);
/* reset stats */
con->stats.bytes_in = G_GUINT64_CONSTANT(0);
con->stats.bytes_in_5s = G_GUINT64_CONSTANT(0);
con->stats.bytes_in_5s_diff = G_GUINT64_CONSTANT(0);
con->stats.bytes_out = G_GUINT64_CONSTANT(0);
con->stats.bytes_out_5s = G_GUINT64_CONSTANT(0);
con->stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0);
con->stats.last_avg = 0;
}
void server_check_keepalive(server *srv);
@ -387,8 +397,6 @@ void connection_reset_keep_alive(connection *con) {
con->expect_100_cont = FALSE;
ev_io_set_events(con->wrk->loop, &con->sock_watcher, EV_READ);
g_string_truncate(con->remote_addr_str, 0);
g_string_truncate(con->local_addr_str, 0);
con->keep_alive = TRUE;
con->raw_out->is_closed = FALSE;
@ -397,6 +405,17 @@ void connection_reset_keep_alive(connection *con) {
vrequest_reset(con->mainvr);
http_request_parser_reset(&con->req_parser_ctx);
con->ts = CUR_TS(con->wrk);
/* reset stats */
con->stats.bytes_in = G_GUINT64_CONSTANT(0);
con->stats.bytes_in_5s = G_GUINT64_CONSTANT(0);
con->stats.bytes_in_5s_diff = G_GUINT64_CONSTANT(0);
con->stats.bytes_out = G_GUINT64_CONSTANT(0);
con->stats.bytes_out_5s = G_GUINT64_CONSTANT(0);
con->stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0);
con->stats.last_avg = 0;
}
void connection_free(connection *con) {
@ -435,3 +454,16 @@ void connection_free(connection *con) {
g_slice_free(connection, con);
}
gchar *connection_state_str(connection_state_t state) {
static const gchar *states[] = {
"dead",
"keep-alive",
"request start",
"read request header",
"handle main vrequest",
"write"
};
return (gchar*)states[state];
}

14
src/connection.h

@ -56,6 +56,18 @@ struct connection {
guint max_idle;
ev_timer watcher;
} keep_alive_data;
time_t ts;
struct {
guint64 bytes_in;
guint64 bytes_out;
time_t last_avg;
guint64 bytes_in_5s;
guint64 bytes_out_5s;
guint64 bytes_in_5s_diff;
guint64 bytes_out_5s_diff;
} stats;
};
LI_API connection* connection_new(worker *wrk);
@ -68,4 +80,6 @@ LI_API void connection_error(connection *con);
LI_API void connection_handle_direct(connection *con);
LI_API void connection_handle_indirect(connection *con, plugin *p);
LI_API gchar *connection_state_str(connection_state_t state);
#endif

16
src/network.c

@ -69,6 +69,8 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) {
const off_t max_read = 16 * blocksize; /* 256k */
ssize_t r;
off_t len = 0;
worker *wrk;
ev_tstamp ts;
do {
GString *buf = g_string_sized_new(blocksize);
@ -94,6 +96,20 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) {
g_string_truncate(buf, r);
chunkqueue_append_string(cq, buf);
len += r;
/* stats */
wrk = vr->con->wrk;
wrk->stats.bytes_in += r;
vr->con->stats.bytes_in += r;
/* update 5s stats */
ts = CUR_TS(wrk);
if ((ts - vr->con->stats.last_avg) > 5) {
vr->con->stats.bytes_in_5s_diff = vr->con->stats.bytes_in - vr->con->stats.bytes_in_5s;
vr->con->stats.bytes_in_5s = vr->con->stats.bytes_in;
vr->con->stats.last_avg = ts;
}
} while (r == blocksize && len < max_read);
return NETWORK_STATUS_SUCCESS;

17
src/network_linux_sendfile.c

@ -8,6 +8,8 @@ network_status_t network_backend_sendfile(vrequest *vr, int fd, chunkqueue *cq,
gboolean did_write_something = FALSE;
chunkiter ci;
chunk *c;
worker *wrk;
time_t ts;
if (0 == cq->length) return NETWORK_STATUS_FATAL_ERROR;
@ -71,6 +73,21 @@ network_status_t network_backend_sendfile(vrequest *vr, int fd, chunkqueue *cq,
chunkqueue_skip(cq, r);
*write_max -= r;
did_write_something = TRUE;
/* stats */
wrk = vr->con->wrk;
wrk->stats.bytes_out += r;
vr->con->stats.bytes_out += r;
/* update 5s stats */
ts = CUR_TS(wrk);
if ((ts - vr->con->stats.last_avg) > 5) {
vr->con->stats.bytes_out_5s_diff = vr->con->wrk->stats.bytes_out - vr->con->wrk->stats.bytes_out_5s;
vr->con->stats.bytes_out_5s = vr->con->stats.bytes_out;
vr->con->stats.last_avg = ts;
}
if (0 == cq->length) return NETWORK_STATUS_SUCCESS;
} while (r == toSend && *write_max > 0);

17
src/network_write.c

@ -8,6 +8,8 @@ network_status_t network_backend_write(vrequest *vr, int fd, chunkqueue *cq, gof
ssize_t r;
gboolean did_write_something = FALSE;
chunkiter ci;
worker *wrk;
time_t ts;
do {
if (0 == cq->length)
@ -44,6 +46,21 @@ network_status_t network_backend_write(vrequest *vr, int fd, chunkqueue *cq, gof
chunkqueue_skip(cq, r);
did_write_something = TRUE;
*write_max -= r;
/* stats */
wrk = vr->con->wrk;
vr->con->wrk->stats.bytes_out += r;
vr->con->stats.bytes_out += r;
/* update 5s stats */
ts = CUR_TS(wrk);
if ((ts - vr->con->stats.last_avg) > 5) {
vr->con->stats.bytes_out_5s_diff = vr->con->stats.bytes_out - vr->con->stats.bytes_out_5s;
vr->con->stats.bytes_out_5s = vr->con->stats.bytes_out;
vr->con->stats.last_avg = ts;
}
} while (r == block_len && *write_max > 0);
return NETWORK_STATUS_SUCCESS;

17
src/network_writev.c

@ -31,6 +31,8 @@ network_status_t network_backend_writev(vrequest *vr, int fd, chunkqueue *cq, go
gboolean did_write_something = FALSE;
chunkiter ci;
chunk *c;
worker *wrk;
time_t ts;
network_status_t res = NETWORK_STATUS_FATAL_ERROR;
GArray *chunks = g_array_sized_new(FALSE, TRUE, sizeof(struct iovec), UIO_MAXIOV);
@ -86,6 +88,21 @@ network_status_t network_backend_writev(vrequest *vr, int fd, chunkqueue *cq, go
}
chunkqueue_skip(cq, r);
*write_max -= r;
/* stats */
wrk = vr->con->wrk;
vr->con->wrk->stats.bytes_out += r;
vr->con->stats.bytes_out += r;
/* update 5s stats */
ts = CUR_TS(wrk);
if ((ts - vr->con->stats.last_avg) > 5) {
vr->con->stats.bytes_out_5s_diff = vr->con->stats.bytes_out - vr->con->stats.bytes_out_5s;
vr->con->stats.bytes_out_5s = vr->con->stats.bytes_out;
vr->con->stats.last_avg = ts;
}
if (r != we_have) {
res = NETWORK_STATUS_SUCCESS;
goto cleanup;

37
src/worker.c

@ -143,6 +143,8 @@ void worker_new_con(worker *ctx, worker *wrk, sock_addr *remote_addr, int s) {
con->remote_addr = *remote_addr;
ev_io_set(&con->sock_watcher, s, EV_READ);
ev_io_start(wrk->loop, &con->sock_watcher);
con->ts = CUR_TS(con->wrk);
sockaddr_to_string(remote_addr, con->remote_addr_str);
} else {
worker_new_con_data *d = g_slice_new(worker_new_con_data);
d->remote_addr = *remote_addr;
@ -164,8 +166,8 @@ static void worker_new_con_cb(struct ev_loop *loop, ev_async *w, int revents) {
}
}
/* stat watcher */
static void worker_stat_watcher_cb(struct ev_loop *loop, ev_timer *w, int revents) {
/* stats watcher */
static void worker_stats_watcher_cb(struct ev_loop *loop, ev_timer *w, int revents) {
worker *wrk = (worker*) w->data;
UNUSED(loop);
UNUSED(revents);
@ -179,6 +181,28 @@ static void worker_stat_watcher_cb(struct ev_loop *loop, ev_timer *w, int revent
TRACE(wrk->srv, "worker %u: %.2f requests per second", wrk->ndx, wrk->stats.requests_per_sec);
}
/* 5s averages */
if ((now - wrk->stats.last_avg) > 5) {
/* bytes in */
wrk->stats.bytes_in_5s_diff = wrk->stats.bytes_in - wrk->stats.bytes_in_5s;
wrk->stats.bytes_in_5s = wrk->stats.bytes_in;
/* bytes out */
wrk->stats.bytes_out_5s_diff = wrk->stats.bytes_out - wrk->stats.bytes_out_5s;
wrk->stats.bytes_out_5s = wrk->stats.bytes_out;
/* requests */
wrk->stats.requests_5s_diff = wrk->stats.requests - wrk->stats.requests_5s;
wrk->stats.requests_5s = wrk->stats.requests;
/* active connections */
wrk->stats.active_cons_5s = wrk->connections_active;
wrk->stats.last_avg = now;
}
wrk->stats.active_cons_cum += wrk->connections_active;
wrk->stats.last_requests = wrk->stats.requests;
wrk->stats.last_update = now;
}
@ -216,9 +240,9 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) {
ev_async_start(wrk->loop, &wrk->new_con_watcher);
wrk->new_con_queue = g_async_queue_new();
ev_timer_init(&wrk->stat_watcher, worker_stat_watcher_cb, 1, 1);
wrk->stat_watcher.data = wrk;
ev_timer_start(wrk->loop, &wrk->stat_watcher);
ev_timer_init(&wrk->stats_watcher, worker_stats_watcher_cb, 1, 1);
wrk->stats_watcher.data = wrk;
ev_timer_start(wrk->loop, &wrk->stats_watcher);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
ev_init(&wrk->collect_watcher, collect_watcher_cb);
@ -265,7 +289,7 @@ void worker_free(worker *wrk) {
g_async_queue_unref(wrk->new_con_queue);
ev_ref(wrk->loop);
ev_timer_stop(wrk->loop, &wrk->stat_watcher);
ev_timer_stop(wrk->loop, &wrk->stats_watcher);
ev_ref(wrk->loop);
ev_async_stop(wrk->loop, &wrk->collect_watcher);
@ -364,4 +388,3 @@ void worker_con_put(connection *con) {
g_array_index(wrk->connections, connection*, tmp->idx) = tmp;
}
}

15
src/worker.h

@ -10,12 +10,23 @@ typedef struct statistics_t statistics_t;
struct statistics_t {
guint64 bytes_out; /** bytes transfered, outgoing */
guint64 bytes_int; /** bytes transfered, incoming */
guint64 bytes_in; /** bytes transfered, incoming */
guint64 requests; /** processed requests */
guint64 active_cons_cum; /** cummulative value of active connections, updated once a second */
guint64 actions_executed; /** actions executed */
/* 5 seconds frame avg */
guint64 requests_5s;
guint64 requests_5s_diff;
guint64 bytes_out_5s;
guint64 bytes_out_5s_diff;
guint64 bytes_in_5s;
guint64 bytes_in_5s_diff;
guint active_cons_5s;
time_t last_avg;
/* updated in timer */
guint64 last_requests;
double requests_per_sec;
@ -64,7 +75,7 @@ struct worker {
ev_async new_con_watcher;
GAsyncQueue *new_con_queue;
ev_timer stat_watcher;
ev_timer stats_watcher;
statistics_t stats;
/* collect framework */

Loading…
Cancel
Save