2
0
Fork 0

Add max-connections check, set to max-fds/4

personal/stbuehler/wip
Stefan Bühler 2009-10-04 14:25:59 +02:00
parent 1d6a2b3d6f
commit a14a709237
6 changed files with 98 additions and 5 deletions

View File

@ -30,6 +30,7 @@ AC_HEADER_SYS_WAIT
AC_CHECK_HEADERS([ \
stddef.h \
sys/mman.h \
sys/resource.h \
sys/sendfile.h \
sys/types.h \
sys/uio.h \
@ -114,6 +115,7 @@ AC_SEARCH_LIBS([inet_addr],[nsl socket])
# Checks for library functions.
AC_CHECK_FUNCS([ \
chroot \
getrlimit \
gmtime_r \
inet_aton \
inet_ntop \

View File

@ -61,8 +61,7 @@ struct liServer {
sig_w_INT,
sig_w_TERM,
sig_w_PIPE;
ev_prepare srv_prepare;
ev_check srv_check;
ev_timer srv_1sec_timer;
GPtrArray *sockets; /** array of (server_socket*) */
@ -99,6 +98,9 @@ struct liServer {
ev_tstamp started;
GString *started_str;
guint connection_load, max_connections;
gboolean connection_limit_hit; /** true if limit was hit and the sockets are disabled */
/* keep alive timeout */
guint keep_alive_queue_timeout;

View File

@ -35,6 +35,7 @@ CHECK_INCLUDE_FILES(inttypes.h HAVE_INTTYPES_H)
CHECK_INCLUDE_FILES(stddef.h HAVE_STDDEF_H)
CHECK_INCLUDE_FILES(stdint.h HAVE_STDINT_H)
CHECK_INCLUDE_FILES(sys/mman.h HAVE_SYS_MMAN_H)
CHECK_INCLUDE_FILES(sys/resource.h HAVE_SYS_RESOURCE_H)
CHECK_INCLUDE_FILES(sys/sendfile.h HAVE_SYS_SENDFILE_H)
CHECK_INCLUDE_FILES(sys/types.h HAVE_SYS_TYPES_H)
CHECK_INCLUDE_FILES(sys/uio.h HAVE_SYS_UIO_H)
@ -53,6 +54,7 @@ CHECK_TYPE_SIZE(long SIZEOF_LONG)
CHECK_TYPE_SIZE(off_t SIZEOF_OFF_T)
CHECK_FUNCTION_EXISTS(chroot HAVE_CHROOT)
CHECK_FUNCTION_EXISTS(getrlimit HAVE_GETRLIMIT)
CHECK_FUNCTION_EXISTS(gmtime_r HAVE_GMTIME_R)
CHECK_FUNCTION_EXISTS(inet_aton HAVE_INET_ATON)
CHECK_FUNCTION_EXISTS(inet_ntop HAVE_INET_NTOP)

View File

@ -8,6 +8,9 @@
# include <lauxlib.h>
#endif
#ifdef HAVE_SYS_RESOURCE_H
# include <sys/resource.h>
#endif
static void li_server_listen_cb(struct ev_loop *loop, ev_io *w, int revents);
static void li_server_stop(liServer *srv);
@ -127,6 +130,36 @@ liServer* li_server_new(const gchar *module_dir) {
log_init(srv);
srv->connection_load = 0;
srv->max_connections = 256; /* assume max-fds = 1024 */
srv->connection_limit_hit = FALSE;
#ifdef HAVE_GETRLIMIT
{
struct rlimit rlim;
rlim_t max_fds = 1024;
if (0 != getrlimit(RLIMIT_NOFILE, &rlim)) {
ERROR(srv, "couldn't get 'max filedescriptors': %s", g_strerror(errno));
} else {
max_fds = rlim.rlim_cur;
if (rlim.rlim_cur < rlim.rlim_max) {
/* go for maximum */
rlim.rlim_cur = rlim.rlim_max;
if (0 != setrlimit(RLIMIT_NOFILE, &rlim)) {
ERROR(srv, "couldn't set 'max filedescriptors': %s", g_strerror(errno));
return -1;
} else {
max_fds = rlim.rlim_cur;
}
}
}
if (max_fds / 4 > G_MAXUINT32) {
srv->max_connections = G_MAXUINT32;
} else {
srv->max_connections = max_fds / 4;
}
}
#endif
srv->io_timeout = 300; /* default I/O timeout */
srv->keep_alive_queue_timeout = 5;
@ -241,6 +274,26 @@ gboolean li_server_loop_init(liServer *srv) {
return TRUE;
}
static void li_server_1sec_timer(struct ev_loop *loop, ev_timer *w, int revents) {
liServer *srv = w->data;
UNUSED(loop);
UNUSED(revents);
if (srv->connection_limit_hit) {
guint srv_cur_load = g_atomic_int_get(&srv->connection_load);
guint srv_max_load = g_atomic_int_get(&srv->max_connections);
if (srv_cur_load <= (srv_max_load - srv_max_load/8)) { /* cur_load <= 7/8 * max_load */
guint i;
for (i = 0; i < srv->sockets->len; i++) {
liServerSocket *sock = g_ptr_array_index(srv->sockets, i);
ev_io_start(srv->main_worker->loop, &sock->watcher);
}
srv->connection_limit_hit = FALSE;
}
}
}
gboolean li_server_worker_init(liServer *srv) {
struct ev_loop *loop = srv->loop;
guint i;
@ -249,6 +302,11 @@ gboolean li_server_worker_init(liServer *srv) {
CATCH_SIGNAL(loop, sigint_cb, TERM);
CATCH_SIGNAL(loop, sigpipe_cb, PIPE);
ev_timer_init(&srv->srv_1sec_timer, li_server_1sec_timer, 1.0, 1.0);
srv->srv_1sec_timer.data = srv;
ev_timer_start(loop, &srv->srv_1sec_timer);
ev_unref(loop); /* don't keep loop alive */
if (srv->worker_count < 1) srv->worker_count = 1;
g_array_set_size(srv->workers, srv->worker_count);
srv->main_worker = g_array_index(srv->workers, liWorker*, 0) = li_worker_new(srv, loop);
@ -271,6 +329,17 @@ gboolean li_server_worker_init(liServer *srv) {
return TRUE;
}
static void server_connection_limit_hit(liServer *srv) {
guint i;
for (i = 0; i < srv->sockets->len; i++) {
liServerSocket *sock = g_ptr_array_index(srv->sockets, i);
ev_io_stop(srv->main_worker->loop, &sock->watcher);
}
srv->connection_limit_hit = TRUE;
}
static void li_server_listen_cb(struct ev_loop *loop, ev_io *w, int revents) {
liServerSocket *sock = (liServerSocket*) w->data;
liServer *srv = sock->srv;
@ -281,9 +350,21 @@ static void li_server_listen_cb(struct ev_loop *loop, ev_io *w, int revents) {
UNUSED(loop);
UNUSED(revents);
while (-1 != (s = accept(w->fd, &sa, &l))) {
liWorker *wrk = srv->main_worker;
guint i, min_load = g_atomic_int_get(&wrk->connection_load);
for ( ;; ) {
liWorker *wrk;
guint i, min_load, srv_cur_load, srv_max_load;
srv_cur_load = g_atomic_int_get(&srv->connection_load);
srv_max_load = g_atomic_int_get(&srv->max_connections);
if (srv_cur_load >= srv_max_load) {
server_connection_limit_hit(srv);
return;
}
if (-1 == (s = accept(w->fd, &sa, &l))) break;
wrk = srv->main_worker;
min_load = g_atomic_int_get(&wrk->connection_load);
if (l <= sizeof(sa)) {
remote_addr.addr = g_slice_alloc(l);
@ -306,6 +387,7 @@ static void li_server_listen_cb(struct ev_loop *loop, ev_io *w, int revents) {
}
g_atomic_int_inc((gint*) &wrk->connection_load);
g_atomic_int_inc((gint*) &srv->connection_load);
li_server_socket_acquire(sock);
li_worker_new_con(srv->main_worker, wrk, remote_addr, s, sock);
}
@ -363,6 +445,7 @@ static void li_server_stop_listen(liServer *srv) {
liServerSocket *sock = g_ptr_array_index(srv->sockets, i);
ev_io_stop(srv->main_worker->loop, &sock->watcher);
}
srv->connection_limit_hit = FALSE; /* reset flag */
/* suspend all workers (close keep-alive connections) */
for (i = 0; i < srv->worker_count; i++) {
@ -379,6 +462,7 @@ static void li_server_stop(liServer *srv) {
liServerSocket *sock = g_ptr_array_index(srv->sockets, i);
ev_io_stop(srv->main_worker->loop, &sock->watcher);
}
srv->connection_limit_hit = FALSE; /* reset flag */
/* stop all workers */
for (i = 0; i < srv->worker_count; i++) {

View File

@ -635,6 +635,7 @@ void worker_con_put(liConnection *con) {
/* already disconnected */
return;
g_atomic_int_add((gint*) &wrk->srv->connection_load, -1);
g_atomic_int_add((gint*) &wrk->connection_load, -1);
g_atomic_int_add((gint*) &wrk->connections_active, -1);

View File

@ -151,6 +151,7 @@ def configure(conf):
conf.check(header_name='arpa/inet.h')
conf.check(header_name='sys/uio.h')
conf.check(header_name='sys/mman.h')
conf.check(header_name='sys/resource.h')
conf.check(header_name='sys/sendfile.h')
conf.check(header_name='sys/un.h')
@ -160,6 +161,7 @@ def configure(conf):
conf.check(function_name='sendfile64', header_name='sys/sendfile.h', define_name='HAVE_SENDFILE64')
else:
conf.check(function_name='sendfile', header_name=['sys/types.h','sys/socket.h','sys/uio.h'], define_name='HAVE_SENDFILE')
conf.check(function_name='getrlimit', header_name='sys/resource.h', define_name='HAVE_GETRLIMIT')
conf.check(function_name='writev', header_name='sys/uio.h', define_name='HAVE_WRITEV')
conf.check(function_name='inet_aton', header_name='arpa/inet.h', define_name='HAVE_INET_ATON')
conf.check(function_name='inet_atop', define_name='HAVE_IPV6')