Browse Source

[core]: rework connection/vrequest structs; add liConInfo

personal/stbuehler/wip
Stefan Bühler 12 years ago
parent
commit
b876f8401d
  1. 42
      include/lighttpd/connection.h
  2. 4
      include/lighttpd/core_lua.h
  3. 11
      include/lighttpd/throttle.h
  4. 13
      include/lighttpd/typedefs.h
  5. 17
      include/lighttpd/utils.h
  6. 68
      include/lighttpd/virtualrequest.h
  7. 1
      src/CMakeLists.txt
  8. 1
      src/main/Makefile.am
  9. 32
      src/main/condition.c
  10. 257
      src/main/connection.c
  11. 44
      src/main/connection_lua.c
  12. 2
      src/main/core_lua.c
  13. 17
      src/main/plugin_core.c
  14. 8
      src/main/request.c
  15. 8
      src/main/response.c
  16. 94
      src/main/throttle.c
  17. 48
      src/main/virtualrequest.c
  18. 153
      src/main/virtualrequest_lua.c
  19. 10
      src/main/worker.c
  20. 1
      src/main/wscript
  21. 8
      src/modules/mod_access.c
  22. 32
      src/modules/mod_accesslog.c
  23. 18
      src/modules/mod_debug.c
  24. 30
      src/modules/mod_fastcgi.c
  25. 18
      src/modules/mod_limit.c
  26. 4
      src/modules/mod_openssl.c
  27. 8
      src/modules/mod_progress.c
  28. 8
      src/modules/mod_proxy.c
  29. 10
      src/modules/mod_redirect.c
  30. 10
      src/modules/mod_rewrite.c
  31. 30
      src/modules/mod_scgi.c
  32. 16
      src/modules/mod_status.c

42
include/lighttpd/connection.h

@ -32,6 +32,8 @@ struct liConnection {
liServerSocket *srv_sock;
gpointer srv_sock_data; /** private data for custom sockets (ssl) */
liConInfo info;
liConnectionState state;
gboolean response_headers_sent, expect_100_cont;
@ -40,13 +42,12 @@ struct liConnection {
liBuffer *raw_in_buffer;
ev_io sock_watcher;
liSocketAddress remote_addr, local_addr;
GString *remote_addr_str, *local_addr_str;
gboolean is_ssl, keep_alive;
liVRequest *mainvr;
liHttpRequestCtx req_parser_ctx;
ev_tstamp ts_started; /* when connection was started, not a (v)request */
/* Keep alive timeout data */
struct {
GList *link;
@ -58,38 +59,6 @@ struct liConnection {
/* I/O timeout data */
liWaitQueueElem io_timeout_elem;
/* I/O throttling */
gboolean throttled; /* TRUE if connection is throttled */
struct {
struct {
liThrottlePool *ptr; /* NULL if not in any throttling pool */
GList lnk;
GQueue *queue;
gint magazine;
} pool;
struct {
gchar unused; /* this struct is unused for now */
} ip;
struct {
guint rate; /* maximum transfer rate in bytes per second, 0 if unlimited */
gint magazine;
ev_tstamp last_update;
} con;
liWaitQueueElem wqueue_elem;
} throttle;
ev_tstamp ts_started;
struct {
guint64 bytes_in; /* total number of bytes received */
guint64 bytes_out; /* total number of bytes sent */
ev_tstamp last_avg;
guint64 bytes_in_5s; /* total number of bytes received at last 5s interval */
guint64 bytes_out_5s; /* total number of bytes sent at last 5s interval */
guint64 bytes_in_5s_diff; /* diff between bytes received at 5s interval n and interval n-1 */
guint64 bytes_out_5s_diff; /* diff between bytes sent at 5s interval n and interval n-1 */
} stats;
};
/* Internal functions */
@ -105,4 +74,7 @@ LI_API void li_connection_error(liConnection *con); /* used in worker.c */
/* public function */
LI_API gchar *li_connection_state_str(liConnectionState state);
/* returns NULL if the vrequest doesn't belong to a liConnection* object */
LI_API liConnection* li_connection_from_vrequest(liVRequest *vr);
#endif

4
include/lighttpd/core_lua.h

@ -55,6 +55,10 @@ LI_API void li_lua_init_vrequest_mt(lua_State *L);
LI_API liVRequest* li_lua_get_vrequest(lua_State *L, int ndx);
LI_API int li_lua_push_vrequest(lua_State *L, liVRequest *vr);
LI_API void li_lua_init_coninfo_mt(lua_State *L);
LI_API liConInfo* li_lua_get_coninfo(lua_State *L, int ndx);
LI_API int li_lua_push_coninfo(lua_State *L, liConInfo *vr);
LI_API int li_lua_fixindex(lua_State *L, int ndx);

11
include/lighttpd/throttle.h

@ -3,7 +3,6 @@
#define THROTTLE_GRANULARITY 0.2 /* defines how frequently a magazine is refilled. should be 0.1 <= x <= 1.0 */
typedef struct liThrottlePool liThrottlePool;
struct liThrottlePool {
GString *name;
guint rate; /** bytes/s */
@ -16,19 +15,21 @@ struct liThrottlePool {
ev_tstamp *last_con_rearm;
};
typedef struct liThrottleParam liThrottleParam;
struct liThrottleParam {
guint rate;
guint burst;
};
LI_API void li_throttle_reset(liConnection *con);
LI_API void li_throttle_reset(liVRequest *vr);
LI_API void li_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents);
LI_API liThrottlePool *li_throttle_pool_new(liServer *srv, GString *name, guint rate);
LI_API void li_throttle_pool_free(liServer *srv, liThrottlePool *pool);
LI_API void li_throttle_pool_acquire(liConnection *con, liThrottlePool *pool);
LI_API void li_throttle_pool_release(liConnection *con);
LI_API void li_throttle_pool_acquire(liVRequest *vr, liThrottlePool *pool);
LI_API void li_throttle_pool_release(liVRequest *vr);
/* update throttle data: notify it that we sent <transferred> bytes, and that we never send more than write_max at once */
LI_API void li_throttle_update(liVRequest *vr, goffset transferred, goffset write_max);
#endif

13
include/lighttpd/typedefs.h

@ -119,9 +119,8 @@ typedef enum {
LI_NETWORK_STATUS_SUCCESS, /**< some IO was actually done (read/write) or cq was empty for write */
LI_NETWORK_STATUS_FATAL_ERROR,
LI_NETWORK_STATUS_CONNECTION_CLOSE,
LI_NETWORK_STATUS_WAIT_FOR_EVENT, /**< read/write returned -1 with errno=EAGAIN/EWOULDBLOCK; no real IO was done
LI_NETWORK_STATUS_WAIT_FOR_EVENT /**< read/write returned -1 with errno=EAGAIN/EWOULDBLOCK; no real IO was done
internal: some io may be done */
LI_NETWORK_STATUS_WAIT_FOR_AIO_EVENT /**< nothing done yet, read/write will be done somewhere else */
} liNetworkStatus;
/* options.h */
@ -207,6 +206,12 @@ typedef struct liServer liServer;
typedef struct liServerSocket liServerSocket;
/* throttle.h */
typedef struct liThrottlePool liThrottlePool;
typedef struct liThrottleParam liThrottleParam;
/* value.h */
typedef struct liValue liValue;
@ -224,6 +229,10 @@ typedef enum {
/* virtualrequest.h */
typedef struct liConCallbacks liConCallbacks;
typedef struct liConInfo liConInfo;
typedef struct liVRequest liVRequest;
typedef struct liVRequestRef liVRequestRef;

17
include/lighttpd/utils.h

@ -9,8 +9,6 @@ typedef enum {
COUNTER_UNITS
} liCounterType;
LI_API void li_fatal(const gchar* msg);
/* set O_NONBLOCK and FD_CLOEXEC */
@ -123,6 +121,21 @@ LI_API GQuark li_sys_error_quark();
LI_API gboolean _li_set_sys_error(GError **error, const gchar *msg, const gchar *file, int lineno);
/* idea from linux kernel container_of: include/linux/kernel.h
* Please note that this "implementation" is not "type-safe"; it doesn't
* check the type of the ptr.
*/
/**
* LI_CONTAINER_OF - cast a member of a structure out to the containing structure
* @ptr: the pointer to the member.
* @type: the type of the container struct this is embedded in.
* @member: the name of the member within the struct.
*
*/
#define LI_CONTAINER_OF(ptr, type, member) \
((type *)( (char *) ptr - offsetof(type, member) ))
/* inline implementations */
INLINE void li_path_append_slash(GString *path) {

68
include/lighttpd/virtualrequest.h

@ -36,6 +36,40 @@ typedef liHandlerResult (*liFilterHandlerCB)(liVRequest *vr, liFilter *f);
typedef void (*liFilterFreeCB)(liVRequest *vr, liFilter *f);
typedef liHandlerResult (*liVRequestHandlerCB)(liVRequest *vr);
typedef liHandlerResult (*liVRequestPluginHandlerCB)(liVRequest *vr, liPlugin *p);
typedef gboolean (*liVRequestCheckIOCB)(liVRequest *vr);
struct liConCallbacks {
liVRequestHandlerCB
handle_request_headers,
handle_response_headers, handle_response_body,
handle_response_error; /* this is _not_ for 500 - internal error */
liVRequestCheckIOCB handle_check_io;
};
/* this data "belongs" to a vrequest, but is updated by the connection code */
struct liConInfo {
const liConCallbacks *callbacks;
liSocketAddress remote_addr, local_addr;
GString *remote_addr_str, *local_addr_str;
gboolean is_ssl;
gboolean keep_alive;
/* bytes in our "raw-io-out-queue" that hasn't be sent yet. (whatever "sent" means - in ssl buffer, kernel, ...) */
goffset out_queue_length;
/* use li_vrequest_update_stats_{in,out} to update this */
struct {
guint64 bytes_in; /* total number of bytes received */
guint64 bytes_out; /* total number of bytes sent */
ev_tstamp last_avg;
guint64 bytes_in_5s; /* total number of bytes received at last 5s interval */
guint64 bytes_out_5s; /* total number of bytes sent at last 5s interval */
guint64 bytes_in_5s_diff; /* diff between bytes received at 5s interval n and interval n-1 */
guint64 bytes_out_5s_diff; /* diff between bytes sent at 5s interval n and interval n-1 */
} stats;
};
struct liFilter {
liChunkQueue *in, *out;
@ -58,7 +92,7 @@ struct liVRequestRef {
};
struct liVRequest {
liConnection *con;
liConInfo *coninfo;
liWorker *wrk;
liVRequestRef *ref;
@ -69,11 +103,6 @@ struct liVRequest {
ev_tstamp ts_started;
liVRequestHandlerCB
handle_request_headers,
handle_response_headers, handle_response_body,
handle_response_error; /* this is _not_ for 500 - internal error */
GPtrArray *plugin_ctx;
liPlugin *backend;
@ -98,6 +127,27 @@ struct liVRequest {
GList job_queue_link;
GPtrArray *stat_cache_entries;
/* I/O throttling */
gboolean throttled; /* TRUE if vrequest is throttled */
struct {
gint magazine; /* currently available for use */
struct {
liThrottlePool *ptr; /* NULL if not in any throttling pool */
GList lnk;
GQueue *queue;
gint magazine;
} pool;
struct {
gchar unused; /* this struct is unused for now */
} ip;
struct {
guint rate; /* maximum transfer rate in bytes per second, 0 if unlimited */
ev_tstamp last_update;
} con;
liWaitQueueElem wqueue_elem;
} throttle;
};
#define VREQUEST_WAIT_FOR_RESPONSE_HEADERS(vr) \
@ -110,7 +160,7 @@ struct liVRequest {
} \
} while (0)
LI_API liVRequest* li_vrequest_new(liConnection *con, liVRequestHandlerCB handle_response_headers, liVRequestHandlerCB handle_response_body, liVRequestHandlerCB handle_response_error, liVRequestHandlerCB handle_request_headers);
LI_API liVRequest* li_vrequest_new(liConnection *con, liConInfo *coninfo);
LI_API void li_vrequest_free(liVRequest *vr);
/* if keepalive = TRUE, you either have to reset it later again with FALSE or call li_vrequest_start before reusing the vr;
* keepalive = TRUE doesn't reset the vr->request fields, so mod_status can show the last request data in the keep-alive state
@ -157,4 +207,8 @@ LI_API gboolean li_vrequest_redirect(liVRequest *vr, GString *uri);
LI_API gboolean li_vrequest_redirect_directory(liVRequest *vr);
/* updates worker stats too */
LI_API void li_vrequest_update_stats_in(liVRequest *vr, goffset transferred);
LI_API void li_vrequest_update_stats_out(liVRequest *vr, goffset transferred);
#endif

1
src/CMakeLists.txt

@ -246,7 +246,6 @@ SET(LIGHTTPD_SHARED_SRC ${LIGHTTPD_SHARED_SRC}
chunk_lua.c
core_lua.c
connection_lua.c
environment_lua.c
filters_lua.c
http_headers_lua.c

1
src/main/Makefile.am

@ -49,7 +49,6 @@ lua_src= \
value_lua.c \
\
chunk_lua.c \
connection_lua.c \
core_lua.c \
environment_lua.c \
filters_lua.c \

32
src/main/condition.c

@ -30,7 +30,7 @@ static const liConditionValueType cond_value_hints[] = {
/* uses wrk->tmp_str for temporary (and returned) strings */
liHandlerResult li_condition_get_value(liVRequest *vr, liConditionLValue *lvalue, liConditionValue *res, liConditionValueType prefer) {
liConnection *con = vr->con;
liConInfo *coninfo = vr->coninfo;
liHandlerResult r;
struct stat st;
int err;
@ -42,21 +42,21 @@ liHandlerResult li_condition_get_value(liVRequest *vr, liConditionLValue *lvalue
case LI_COMP_REQUEST_LOCALIP:
if (prefer == LI_COND_VALUE_HINT_STRING) {
res->match_type = LI_COND_VALUE_HINT_STRING;
res->data.str = con->local_addr_str->str;
res->data.str = coninfo->local_addr_str->str;
} else {
res->match_type = LI_COND_VALUE_HINT_SOCKADDR;
res->data.addr = con->local_addr;
res->data.addr = coninfo->local_addr;
}
break;
case LI_COMP_REQUEST_LOCALPORT:
res->match_type = LI_COND_VALUE_HINT_NUMBER;
switch (con->local_addr.addr->plain.sa_family) {
switch (coninfo->local_addr.addr->plain.sa_family) {
case AF_INET:
res->data.number = ntohs(con->local_addr.addr->ipv4.sin_port);
res->data.number = ntohs(coninfo->local_addr.addr->ipv4.sin_port);
break;
#ifdef HAVE_IPV6
case AF_INET6:
res->data.number = ntohs(con->local_addr.addr->ipv6.sin6_port);
res->data.number = ntohs(coninfo->local_addr.addr->ipv6.sin6_port);
break;
#endif
default:
@ -67,21 +67,21 @@ liHandlerResult li_condition_get_value(liVRequest *vr, liConditionLValue *lvalue
case LI_COMP_REQUEST_REMOTEIP:
if (prefer == LI_COND_VALUE_HINT_STRING) {
res->match_type = LI_COND_VALUE_HINT_STRING;
res->data.str = con->remote_addr_str->str;
res->data.str = coninfo->remote_addr_str->str;
} else {
res->match_type = LI_COND_VALUE_HINT_SOCKADDR;
res->data.addr = con->remote_addr;
res->data.addr = coninfo->remote_addr;
}
break;
case LI_COMP_REQUEST_REMOTEPORT:
res->match_type = LI_COND_VALUE_HINT_NUMBER;
switch (con->remote_addr.addr->plain.sa_family) {
switch (coninfo->remote_addr.addr->plain.sa_family) {
case AF_INET:
res->data.number = ntohs(con->remote_addr.addr->ipv4.sin_port);
res->data.number = ntohs(coninfo->remote_addr.addr->ipv4.sin_port);
break;
#ifdef HAVE_IPV6
case AF_INET6:
res->data.number = ntohs(con->remote_addr.addr->ipv6.sin6_port);
res->data.number = ntohs(coninfo->remote_addr.addr->ipv6.sin6_port);
break;
#endif
default:
@ -99,7 +99,7 @@ liHandlerResult li_condition_get_value(liVRequest *vr, liConditionLValue *lvalue
break;
case LI_COMP_REQUEST_SCHEME:
res->match_type = LI_COND_VALUE_HINT_STRING;
res->data.str = con->is_ssl ? "https" : "http";
res->data.str = coninfo->is_ssl ? "https" : "http";
break;
case LI_COMP_REQUEST_QUERY_STRING:
res->data.str = vr->request.uri.query->str;
@ -164,14 +164,14 @@ liHandlerResult li_condition_get_value(liVRequest *vr, liConditionLValue *lvalue
break;
case LI_COMP_REQUEST_HEADER:
res->match_type = LI_COND_VALUE_HINT_STRING;
li_http_header_get_all(con->wrk->tmp_str, vr->request.headers, GSTR_LEN(lvalue->key));
res->data.str = con->wrk->tmp_str->str;
li_http_header_get_all(vr->wrk->tmp_str, vr->request.headers, GSTR_LEN(lvalue->key));
res->data.str = vr->wrk->tmp_str->str;
break;
case LI_COMP_RESPONSE_HEADER:
VREQUEST_WAIT_FOR_RESPONSE_HEADERS(vr);
res->match_type = LI_COND_VALUE_HINT_STRING;
li_http_header_get_all(con->wrk->tmp_str, vr->response.headers, GSTR_LEN(lvalue->key));
res->data.str = con->wrk->tmp_str->str;
li_http_header_get_all(vr->wrk->tmp_str, vr->response.headers, GSTR_LEN(lvalue->key));
res->data.str = vr->wrk->tmp_str->str;
break;
case LI_COMP_ENVIRONMENT:
res->match_type = LI_COND_VALUE_HINT_STRING;

257
src/main/connection.c

@ -5,11 +5,26 @@
static void li_connection_reset_keep_alive(liConnection *con);
static void li_connection_internal_error(liConnection *con);
static void update_io_events(liConnection *con) {
int events = 0;
if ((con->state > LI_CON_STATE_HANDLE_MAINVR || con->mainvr->state >= LI_VRS_READ_CONTENT) && !con->in->is_closed) {
events = events | EV_READ;
}
if (con->raw_out->length > 0) {
if (!con->mainvr->throttled || con->mainvr->throttle.magazine > 0) {
events = events | EV_WRITE;
}
}
li_ev_io_set_events(con->wrk->loop, &con->sock_watcher, events);
}
static void parse_request_body(liConnection *con) {
if ((con->state > LI_CON_STATE_HANDLE_MAINVR || con->mainvr->state >= LI_VRS_READ_CONTENT) && !con->in->is_closed) {
goffset newbytes = 0;
li_ev_io_add_events(con->wrk->loop, &con->sock_watcher, EV_READ);
if (con->mainvr->request.content_length == -1) {
/* TODO: parse chunked encoded request body, filters */
/* li_chunkqueue_steal_all(con->in, con->raw_in); */
@ -20,14 +35,11 @@ static void parse_request_body(liConnection *con) {
}
if (con->in->bytes_in == con->mainvr->request.content_length) {
con->in->is_closed = TRUE;
li_ev_io_rem_events(con->wrk->loop, &con->sock_watcher, EV_READ);
}
}
if (newbytes > 0 || con->in->is_closed) {
li_vrequest_handle_request_body(con->mainvr);
}
} else {
li_ev_io_rem_events(con->wrk->loop, &con->sock_watcher, EV_READ);
}
}
@ -56,41 +68,37 @@ static void forward_response_body(liConnection *con) {
li_chunkqueue_steal_all(con->raw_out, con->out);
}
if (con->out->is_closed) con->raw_out->is_closed = TRUE;
con->info.out_queue_length = con->raw_out->length;
}
if (con->raw_out->length > 0) {
li_ev_io_add_events(con->wrk->loop, &con->sock_watcher, EV_WRITE);
} else {
li_ev_io_rem_events(con->wrk->loop, &con->sock_watcher, EV_WRITE);
}
} else {
li_ev_io_rem_events(con->wrk->loop, &con->sock_watcher, EV_WRITE);
}
}
/* don't use con afterwards */
static void connection_request_done(liConnection *con) {
liVRequest *vr = con->mainvr;
liServerState s;
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(con->mainvr, "response end (keep_alive = %i)", con->keep_alive);
VR_DEBUG(con->mainvr, "response end (keep_alive = %i)", con->info.keep_alive);
}
li_plugins_handle_close(con);
s = g_atomic_int_get(&con->srv->dest_state);
if (con->keep_alive && (LI_SERVER_RUNNING == s || LI_SERVER_WARMUP == s)) {
if (con->info.keep_alive && (LI_SERVER_RUNNING == s || LI_SERVER_WARMUP == s)) {
li_connection_reset_keep_alive(con);
} else {
li_worker_con_put(con);
}
}
/* return FALSE if you shouldn't use con afterwards */
static gboolean check_response_done(liConnection *con) {
if (con->in->is_closed && con->raw_out->is_closed && 0 == con->raw_out->length) {
connection_request_done(con);
return TRUE;
return FALSE;
}
return FALSE;
return TRUE;
}
static void connection_close(liConnection *con) {
@ -130,7 +138,7 @@ static void li_connection_internal_error(liConnection *con) {
/* We only need the http version from the http request, "keep-alive" reset doesn't reset it */
li_vrequest_reset(con->mainvr, TRUE);
con->keep_alive = FALSE;
con->info.keep_alive = FALSE;
con->mainvr->response.http_status = 500;
con->state = LI_CON_STATE_WRITE; /* skips further vrequest handling */
@ -162,7 +170,7 @@ static gboolean connection_handle_read(liConnection *con) {
con->keep_alive_requests++;
/* disable keep alive if limit is reached */
if (con->keep_alive_requests == CORE_OPTION(LI_CORE_OPTION_MAX_KEEP_ALIVE_REQUESTS).number)
con->keep_alive = FALSE;
con->info.keep_alive = FALSE;
con->state = LI_CON_STATE_READ_REQUEST_HEADER;
@ -188,7 +196,7 @@ static gboolean connection_handle_read(liConnection *con) {
li_counter_format(vr->request.uri.raw->len, COUNTER_BYTES, vr->wrk->tmp_str)->str
);
con->keep_alive = FALSE;
con->info.keep_alive = FALSE;
con->mainvr->response.http_status = 414; /* Request-URI Too Large */
li_vrequest_handle_direct(con->mainvr);
con->state = LI_CON_STATE_WRITE;
@ -210,7 +218,7 @@ static gboolean connection_handle_read(liConnection *con) {
}
con->wrk->stats.requests++;
con->keep_alive = FALSE;
con->info.keep_alive = FALSE;
/* set status 400 if not already set to e.g. 413 */
if (con->mainvr->response.http_status == 0)
con->mainvr->response.http_status = 400;
@ -230,7 +238,7 @@ static gboolean connection_handle_read(liConnection *con) {
if (!li_request_validate_header(con)) {
/* skip mainvr handling */
con->state = LI_CON_STATE_WRITE;
con->keep_alive = FALSE;
con->info.keep_alive = FALSE;
con->in->is_closed = TRUE;
forward_response_body(con);
} else {
@ -245,7 +253,6 @@ static gboolean connection_handle_read(liConnection *con) {
}
li_chunkqueue_append_mem(con->raw_out, CONST_STR_LEN("HTTP/1.1 100 Continue\r\n\r\n"));
con->expect_100_cont = FALSE;
li_ev_io_add_events(con->wrk->loop, &con->sock_watcher, EV_WRITE);
}
con->state = LI_CON_STATE_HANDLE_MAINVR;
@ -261,8 +268,6 @@ static gboolean connection_handle_read(liConnection *con) {
static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
liNetworkStatus res;
goffset write_max;
goffset transferred;
liConnection *con = (liConnection*) w->data;
gboolean update_io_timeout = FALSE;
@ -271,10 +276,8 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
li_waitqueue_push(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
if (revents & EV_READ) {
if (con->in->is_closed) {
/* don't read the next request before current one is done */
li_ev_io_rem_events(loop, w, EV_READ);
} else {
if (!con->in->is_closed) {
goffset transferred;
transferred = con->raw_in->length;
if (con->srv_sock->read_cb) {
@ -284,17 +287,9 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
}
transferred = con->raw_in->length - transferred;
con->wrk->stats.bytes_in += transferred;
con->stats.bytes_in += transferred;
update_io_timeout = update_io_timeout || (transferred > 0);
if ((ev_now(loop) - con->stats.last_avg) >= 5.0) {
con->stats.bytes_out_5s_diff = con->stats.bytes_out - con->stats.bytes_out_5s;
con->stats.bytes_out_5s = con->stats.bytes_out;
con->stats.bytes_in_5s_diff = con->stats.bytes_in - con->stats.bytes_in_5s;
con->stats.bytes_in_5s = con->stats.bytes_in;
con->stats.last_avg = ev_now(loop);
}
li_vrequest_update_stats_in(con->mainvr, transferred);
switch (res) {
case LI_NETWORK_STATUS_SUCCESS:
@ -312,21 +307,20 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
return;
case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
break;
case LI_NETWORK_STATUS_WAIT_FOR_AIO_EVENT:
/* TODO: aio */
li_ev_io_rem_events(loop, w, EV_READ);
_ERROR(con->srv, con->mainvr, "%s", "TODO: wait for aio");
break;
}
}
}
if (revents & EV_WRITE) {
if (con->raw_out->length > 0) {
if (con->throttled) {
write_max = MIN(con->throttle.con.magazine, 256*1024);
goffset transferred;
static const goffset WRITE_MAX = 256*1024; /* 256kB */
goffset write_max;
if (con->mainvr->throttled) {
write_max = MIN(con->mainvr->throttle.magazine, WRITE_MAX);
} else {
write_max = 256*1024; /* 256kB */
write_max = WRITE_MAX;
}
if (write_max > 0) {
@ -339,8 +333,7 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
}
transferred = transferred - con->raw_out->length;
con->wrk->stats.bytes_out += transferred;
con->stats.bytes_out += transferred;
con->info.out_queue_length = con->raw_out->length;
update_io_timeout = update_io_timeout || (transferred > 0);
switch (res) {
@ -357,47 +350,18 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
return;
case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
break;
case LI_NETWORK_STATUS_WAIT_FOR_AIO_EVENT:
/* TODO: aio */
li_ev_io_rem_events(loop, w, EV_WRITE);
_ERROR(con->srv, con->mainvr, "%s", "TODO: wait for aio");
break;
}
} else {
transferred = 0;
}
if ((ev_now(loop) - con->stats.last_avg) >= 5.0) {
con->stats.bytes_out_5s_diff = con->stats.bytes_out - con->stats.bytes_out_5s;
con->stats.bytes_out_5s = con->stats.bytes_out;
con->stats.bytes_in_5s_diff = con->stats.bytes_in - con->stats.bytes_in_5s;
con->stats.bytes_in_5s = con->stats.bytes_in;
con->stats.last_avg = ev_now(loop);
}
li_vrequest_update_stats_out(con->mainvr, transferred);
if (con->throttled) {
con->throttle.con.magazine -= transferred;
/*g_print("%p wrote %"G_GINT64_FORMAT"/%"G_GINT64_FORMAT" bytes, mags: %d/%d, queued: %s\n", (void*)con,
transferred, write_max, con->throttle.pool.magazine, con->throttle.con.magazine, con->throttle.pool.queued ? "yes":"no");*/
if (con->throttle.con.magazine <= 0) {
li_ev_io_rem_events(loop, w, EV_WRITE);
li_waitqueue_push(&con->wrk->throttle_queue, &con->throttle.wqueue_elem);
}
if (con->throttle.pool.ptr && con->throttle.pool.magazine <= MAX(write_max,0) && !con->throttle.pool.queue) {
liThrottlePool *pool = con->throttle.pool.ptr;
g_atomic_int_inc(&pool->num_cons_queued);
con->throttle.pool.queue = pool->queues[con->wrk->ndx];
g_queue_push_tail_link(con->throttle.pool.queue, &con->throttle.pool.lnk);
}
}
if (0 == con->raw_out->length) {
li_ev_io_rem_events(loop, w, EV_WRITE);
if (con->mainvr->throttled) {
li_throttle_update(con->mainvr, transferred, WRITE_MAX);
}
} else {
_DEBUG(con->srv, con->mainvr, "%s", "write event for empty queue");
li_ev_io_rem_events(loop, w, EV_WRITE);
}
}
@ -412,7 +376,9 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
li_waitqueue_push(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
}
check_response_done(con);
if (!check_response_done(con)) return;
update_io_events(con);
}
static void connection_keepalive_cb(struct ev_loop *loop, ev_timer *w, int revents) {
@ -422,18 +388,19 @@ static void connection_keepalive_cb(struct ev_loop *loop, ev_timer *w, int reven
}
static liHandlerResult mainvr_handle_response_headers(liVRequest *vr) {
liConnection *con = vr->con;
liConnection *con = LI_CONTAINER_OF(vr->coninfo, liConnection, info);
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "%s", "read request/handle response header");
}
parse_request_body(con);
update_io_events(con);
return LI_HANDLER_GO_ON;
}
static liHandlerResult mainvr_handle_response_body(liVRequest *vr) {
liConnection *con = vr->con;
if (check_response_done(con)) return LI_HANDLER_GO_ON;
liConnection *con = LI_CONTAINER_OF(vr->coninfo, liConnection, info);
if (!check_response_done(con)) return LI_HANDLER_GO_ON;
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "%s", "write response");
@ -442,22 +409,49 @@ static liHandlerResult mainvr_handle_response_body(liVRequest *vr) {
parse_request_body(con);
forward_response_body(con);
if (check_response_done(con)) return LI_HANDLER_GO_ON;
if (!check_response_done(con)) return LI_HANDLER_GO_ON;
update_io_events(con);
return LI_HANDLER_GO_ON;
}
static liHandlerResult mainvr_handle_response_error(liVRequest *vr) {
li_connection_internal_error(vr->con);
liConnection *con = LI_CONTAINER_OF(vr->coninfo, liConnection, info);
li_connection_internal_error(con);
update_io_events(con);
return LI_HANDLER_GO_ON;
}
static liHandlerResult mainvr_handle_request_headers(liVRequest *vr) {
liConnection *con = LI_CONTAINER_OF(vr->coninfo, liConnection, info);
/* start reading input */
parse_request_body(vr->con);
parse_request_body(con);
update_io_events(con);
return LI_HANDLER_GO_ON;
}
static gboolean mainvr_handle_check_io(liVRequest *vr) {
liConnection *con = LI_CONTAINER_OF(vr->coninfo, liConnection, info);
update_io_events(con);
return TRUE;
}
static const liConCallbacks con_callbacks = {
mainvr_handle_request_headers,
mainvr_handle_response_headers,
mainvr_handle_response_body,
mainvr_handle_response_error,
mainvr_handle_check_io
};
liConnection* li_connection_new(liWorker *wrk) {
liServer *srv = wrk->srv;
liConnection *con = g_slice_new0(liConnection);
@ -471,19 +465,17 @@ liConnection* li_connection_new(liWorker *wrk) {
ev_init(&con->sock_watcher, connection_cb);
ev_io_set(&con->sock_watcher, -1, 0);
con->sock_watcher.data = con;
con->remote_addr_str = g_string_sized_new(INET6_ADDRSTRLEN);
con->local_addr_str = g_string_sized_new(INET6_ADDRSTRLEN);
con->is_ssl = FALSE;
con->keep_alive = TRUE;
con->info.remote_addr_str = g_string_sized_new(INET6_ADDRSTRLEN);
con->info.local_addr_str = g_string_sized_new(INET6_ADDRSTRLEN);
con->info.is_ssl = FALSE;
con->info.keep_alive = TRUE;
con->raw_in = li_chunkqueue_new();
con->raw_out = li_chunkqueue_new();
con->mainvr = li_vrequest_new(con,
mainvr_handle_response_headers,
mainvr_handle_response_body,
mainvr_handle_response_error,
mainvr_handle_request_headers);
con->info.callbacks = &con_callbacks;
con->mainvr = li_vrequest_new(con, &con->info);
li_http_request_parser_init(&con->req_parser_ctx, &con->mainvr->request, con->raw_in);
con->in = con->mainvr->vr_in;
@ -502,9 +494,6 @@ liConnection* li_connection_new(liWorker *wrk) {
con->io_timeout_elem.data = con;
con->throttle.wqueue_elem.data = con;
con->throttle.pool.lnk.data = con;
return con;
}
@ -519,7 +508,7 @@ void li_connection_reset(liConnection *con) {
li_server_socket_release(con->srv_sock);
con->srv_sock = NULL;
con->srv_sock_data = NULL;
con->is_ssl = FALSE;
con->info.is_ssl = FALSE;
ev_io_stop(con->wrk->loop, &con->sock_watcher);
if (con->sock_watcher.fd != -1) {
@ -534,9 +523,12 @@ void li_connection_reset(liConnection *con) {
li_chunkqueue_reset(con->raw_in);
li_chunkqueue_reset(con->raw_out);
con->info.out_queue_length = 0;
li_buffer_release(con->raw_in_buffer);
con->raw_in_buffer = NULL;
li_throttle_reset(con->mainvr);
li_vrequest_reset(con->mainvr, FALSE);
/* restore chunkqueue limits */
@ -549,11 +541,11 @@ void li_connection_reset(liConnection *con) {
li_http_request_parser_reset(&con->req_parser_ctx);
g_string_truncate(con->remote_addr_str, 0);
li_sockaddr_clear(&con->remote_addr);
g_string_truncate(con->local_addr_str, 0);
li_sockaddr_clear(&con->local_addr);
con->keep_alive = TRUE;
g_string_truncate(con->info.remote_addr_str, 0);
li_sockaddr_clear(&con->info.remote_addr);
g_string_truncate(con->info.local_addr_str, 0);
li_sockaddr_clear(&con->info.local_addr);
con->info.keep_alive = TRUE;
if (con->keep_alive_data.link) {
g_queue_delete_link(&con->wrk->keep_alive_queue, con->keep_alive_data.link);
@ -565,18 +557,16 @@ void li_connection_reset(liConnection *con) {
con->keep_alive_requests = 0;
/* reset stats */
con->stats.bytes_in = G_GUINT64_CONSTANT(0);
con->stats.bytes_in_5s = G_GUINT64_CONSTANT(0);
con->stats.bytes_in_5s_diff = G_GUINT64_CONSTANT(0);
con->stats.bytes_out = G_GUINT64_CONSTANT(0);
con->stats.bytes_out_5s = G_GUINT64_CONSTANT(0);
con->stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0);
con->stats.last_avg = 0;
con->info.stats.bytes_in = G_GUINT64_CONSTANT(0);
con->info.stats.bytes_in_5s = G_GUINT64_CONSTANT(0);
con->info.stats.bytes_in_5s_diff = G_GUINT64_CONSTANT(0);
con->info.stats.bytes_out = G_GUINT64_CONSTANT(0);
con->info.stats.bytes_out_5s = G_GUINT64_CONSTANT(0);
con->info.stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0);
con->info.stats.last_avg = 0;
/* remove from timeout queue */
li_waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
li_throttle_reset(con);
}
static void li_connection_reset_keep_alive(liConnection *con) {
@ -617,9 +607,12 @@ static void li_connection_reset_keep_alive(liConnection *con) {
con->expect_100_cont = FALSE;
li_ev_io_set_events(con->wrk->loop, &con->sock_watcher, EV_READ);
con->keep_alive = TRUE;
con->info.keep_alive = TRUE;
con->raw_out->is_closed = FALSE;
con->info.out_queue_length = con->raw_out->length;
li_throttle_reset(con->mainvr);
li_vrequest_reset(con->mainvr, TRUE);
li_http_request_parser_reset(&con->req_parser_ctx);
@ -631,15 +624,13 @@ static void li_connection_reset_keep_alive(liConnection *con) {
li_cqlimit_set_limit(con->raw_out->limit, 512*1024);
/* reset stats */
con->stats.bytes_in = G_GUINT64_CONSTANT(0);
con->stats.bytes_in_5s = G_GUINT64_CONSTANT(0);
con->stats.bytes_in_5s_diff = G_GUINT64_CONSTANT(0);
con->stats.bytes_out = G_GUINT64_CONSTANT(0);
con->stats.bytes_out_5s = G_GUINT64_CONSTANT(0);
con->stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0);
con->stats.last_avg = 0;
li_throttle_reset(con);
con->info.stats.bytes_in = G_GUINT64_CONSTANT(0);
con->info.stats.bytes_in_5s = G_GUINT64_CONSTANT(0);
con->info.stats.bytes_in_5s_diff = G_GUINT64_CONSTANT(0);
con->info.stats.bytes_out = G_GUINT64_CONSTANT(0);
con->info.stats.bytes_out_5s = G_GUINT64_CONSTANT(0);
con->info.stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0);
con->info.stats.last_avg = 0;
if (con->raw_in->length != 0) {
/* start handling next request if data is already available */
@ -663,16 +654,18 @@ void li_connection_free(liConnection *con) {
close(con->sock_watcher.fd);
}
ev_io_set(&con->sock_watcher, -1, 0);
g_string_free(con->remote_addr_str, TRUE);
li_sockaddr_clear(&con->remote_addr);
g_string_free(con->local_addr_str, TRUE);
li_sockaddr_clear(&con->local_addr);
con->keep_alive = TRUE;
g_string_free(con->info.remote_addr_str, TRUE);
li_sockaddr_clear(&con->info.remote_addr);
g_string_free(con->info.local_addr_str, TRUE);
li_sockaddr_clear(&con->info.local_addr);
con->info.keep_alive = TRUE;
li_chunkqueue_free(con->raw_in);
li_chunkqueue_free(con->raw_out);
li_buffer_release(con->raw_in_buffer);
li_throttle_reset(con->mainvr);
li_vrequest_free(con->mainvr);
li_http_request_parser_clear(&con->req_parser_ctx);
@ -700,3 +693,13 @@ gchar *li_connection_state_str(liConnectionState state) {
return (gchar*)states[state];
}
liConnection* li_connection_from_vrequest(liVRequest *vr) {
liConnection *con;
if (vr->coninfo->callbacks != &con_callbacks) return NULL;
con = LI_CONTAINER_OF(vr->coninfo, liConnection, info);
return con;
}

44
src/main/connection_lua.c

@ -1,44 +0,0 @@
#include <lighttpd/core_lua.h>
#include <lualib.h>
#include <lauxlib.h>
#define LUA_CONNECTION "liConnection*"
static void init_con_mt(lua_State *L) {
/* TODO */
}
void li_lua_init_connection_mt(lua_State *L) {
if (luaL_newmetatable(L, LUA_CONNECTION)) {
init_con_mt(L);
}
lua_pop(L, 1);
}
liConnection* li_lua_get_connection(lua_State *L, int ndx) {
if (!lua_isuserdata(L, ndx)) return NULL;
if (!lua_getmetatable(L, ndx)) return NULL;
luaL_getmetatable(L, LUA_CONNECTION);
if (lua_isnil(L, -1) || lua_isnil(L, -2) || !lua_equal(L, -1, -2)) {
lua_pop(L, 2);
return NULL;
}
lua_pop(L, 2);
return *(liConnection**) lua_touserdata(L, ndx);
}
int li_lua_push_connection(lua_State *L, liConnection *con) {
liConnection **pcon;
pcon = (liConnection**) lua_newuserdata(L, sizeof(liConnection*));
*pcon = con;
if (luaL_newmetatable(L, LUA_CONNECTION)) {
init_con_mt(L);
}
lua_setmetatable(L, -2);
return 1;
}

2
src/main/core_lua.c

@ -243,7 +243,7 @@ static void lua_push_constants(lua_State *L, int ndx) {
void li_lua_init(lua_State *L, liServer *srv, liWorker *wrk) {
li_lua_init_chunk_mt(L);
li_lua_init_connection_mt(L);
li_lua_init_coninfo_mt(L);
li_lua_init_environment_mt(L);
li_lua_init_filter_mt(L);
li_lua_init_physical_mt(L);

17
src/main/plugin_core.c

@ -1418,7 +1418,7 @@ static liHandlerResult core_handle_throttle_pool(liVRequest *vr, gpointer param,
UNUSED(context);
li_throttle_pool_acquire(vr->con, pool);
li_throttle_pool_acquire(vr, pool);
return LI_HANDLER_GO_ON;
}
@ -1498,20 +1498,19 @@ static void core_throttle_connection_free(liServer *srv, gpointer param) {
static liHandlerResult core_handle_throttle_connection(liVRequest *vr, gpointer param, gpointer *context) {
liConnection *con = vr->con;
liThrottleParam *throttle_param = param;
UNUSED(context);
con->throttle.con.rate = throttle_param->rate;
con->throttled = TRUE;
vr->throttle.con.rate = throttle_param->rate;
vr->throttled = TRUE;
if (con->throttle.pool.magazine) {
guint supply = MAX(con->throttle.pool.magazine, throttle_param->rate * THROTTLE_GRANULARITY);
con->throttle.con.magazine += supply;
con->throttle.pool.magazine -= supply;
if (vr->throttle.pool.magazine) {
guint supply = MAX(vr->throttle.pool.magazine, throttle_param->rate * THROTTLE_GRANULARITY);
vr->throttle.magazine += supply;
vr->throttle.pool.magazine -= supply;
} else {
con->throttle.con.magazine += throttle_param->burst;
vr->throttle.magazine += throttle_param->burst;
}
return LI_HANDLER_GO_ON;

8
src/main/request.c

@ -61,7 +61,7 @@ void li_request_clear(liRequest *req) {
/* closes connection after response */
static void bad_request(liConnection *con, int status) {
con->keep_alive = FALSE;
con->info.keep_alive = FALSE;
con->mainvr->response.http_status = status;
li_vrequest_handle_direct(con->mainvr);
}
@ -94,7 +94,7 @@ gboolean li_request_validate_header(liConnection *con) {
liHttpHeader *hh;
GList *l;
if (con->is_ssl) {
if (con->info.is_ssl) {
g_string_append_len(req->uri.scheme, CONST_STR_LEN("https"));
} else {
g_string_append_len(req->uri.scheme, CONST_STR_LEN("http"));
@ -103,11 +103,11 @@ gboolean li_request_validate_header(liConnection *con) {
switch (req->http_version) {
case LI_HTTP_VERSION_1_0:
if (!li_http_header_is(req->headers, CONST_STR_LEN("connection"), CONST_STR_LEN("keep-alive")))
con->keep_alive = FALSE;
con->info.keep_alive = FALSE;
break;
case LI_HTTP_VERSION_1_1:
if (li_http_header_is(req->headers, CONST_STR_LEN("connection"), CONST_STR_LEN("close")))
con->keep_alive = FALSE;
con->info.keep_alive = FALSE;
break;
case LI_HTTP_VERSION_UNSET:
bad_request(con, 505); /* Version not Supported */

8
src/main/response.c

@ -50,7 +50,7 @@ gboolean li_response_send_headers(liConnection *con) {
g_string_printf(con->wrk->tmp_str, "%"L_GOFFSET_FORMAT, con->out->length);
li_http_header_overwrite(vr->response.headers, CONST_STR_LEN("Content-Length"), GSTR_LEN(con->wrk->tmp_str));
}
} else if (con->keep_alive && vr->request.http_version == LI_HTTP_VERSION_1_1) {
} else if (con->info.keep_alive && vr->request.http_version == LI_HTTP_VERSION_1_1) {
/* TODO: maybe someone set a content length header? */
if (!(vr->response.transfer_encoding & LI_HTTP_TRANSFER_ENCODING_CHUNKED)) {
vr->response.transfer_encoding |= LI_HTTP_TRANSFER_ENCODING_CHUNKED;
@ -58,7 +58,7 @@ gboolean li_response_send_headers(liConnection *con) {
}
} else {
/* Unknown content length, no chunked encoding */
con->keep_alive = FALSE;
con->info.keep_alive = FALSE;
}
if (vr->request.http_method == LI_HTTP_METHOD_HEAD) {
@ -71,11 +71,11 @@ gboolean li_response_send_headers(liConnection *con) {
/* Status line */
if (vr->request.http_version == LI_HTTP_VERSION_1_1) {
g_string_append_len(head, CONST_STR_LEN("HTTP/1.1 "));
if (!con->keep_alive)
if (!con->info.keep_alive)
li_http_header_overwrite(vr->response.headers, CONST_STR_LEN("Connection"), CONST_STR_LEN("close"));
} else {
g_string_append_len(head, CONST_STR_LEN("HTTP/1.0 "));
if (con->keep_alive)
if (con->info.keep_alive)
li_http_header_overwrite(vr->response.headers, CONST_STR_LEN("Connection"), CONST_STR_LEN("keep-alive"));
}

94
src/main/throttle.c

@ -49,43 +49,43 @@ void li_throttle_pool_free(liServer *srv, liThrottlePool *pool) {
g_slice_free(liThrottlePool, pool);
}
void li_throttle_pool_acquire(liConnection *con, liThrottlePool *pool) {
void li_throttle_pool_acquire(liVRequest *vr, liThrottlePool *pool) {
gint magazine;
if (con->throttle.pool.ptr == pool)
if (vr->throttle.pool.ptr == pool)
return;
if (con->throttle.pool.ptr != NULL) {
if (vr->throttle.pool.ptr != NULL) {
/* already in a different pool */
li_throttle_pool_release(con);
li_throttle_pool_release(vr);
}
/* try to steal some initial 4kbytes from the pool */
while ((magazine = g_atomic_int_get(&pool->magazine)) > (4*1024)) {
if (g_atomic_int_compare_and_exchange(&pool->magazine, magazine, magazine - (4*1024))) {
con->throttle.pool.magazine = 4*1024;
vr->throttle.pool.magazine = 4*1024;
break;
}
}
con->throttle.pool.ptr = pool;
con->throttled = TRUE;
vr->throttle.pool.ptr = pool;
vr->throttled = TRUE;
}
void li_throttle_pool_release(liConnection *con) {
if (con->throttle.pool.queue == NULL)
void li_throttle_pool_release(liVRequest *vr) {
if (vr->throttle.pool.queue == NULL)
return;
if (con->throttle.pool.queue) {
g_queue_unlink(con->throttle.pool.queue, &con->throttle.pool.lnk);
con->throttle.pool.queue = NULL;
g_atomic_int_add(&con->throttle.pool.ptr->num_cons_queued, -1);
if (vr->throttle.pool.queue) {
g_queue_unlink(vr->throttle.pool.queue, &vr->throttle.pool.lnk);
vr->throttle.pool.queue = NULL;
g_atomic_int_add(&vr->throttle.pool.ptr->num_cons_queued, -1);
}
/* give back bandwidth */
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.pool.magazine);
con->throttle.pool.magazine = 0;
con->throttle.pool.ptr = NULL;
g_atomic_int_add(&vr->throttle.pool.ptr->magazine, vr->throttle.pool.magazine);
vr->throttle.pool.magazine = 0;
vr->throttle.pool.ptr = NULL;
}
static void li_throttle_pool_rearm(liWorker *wrk, liThrottlePool *pool) {
@ -123,8 +123,8 @@ static void li_throttle_pool_rearm(liWorker *wrk, liThrottlePool *pool) {
/* rearm connections */
for (lnk = g_queue_peek_head_link(queue); lnk != NULL; lnk = lnk_next) {
((liConnection*)lnk->data)->throttle.pool.magazine += supply;
((liConnection*)lnk->data)->throttle.pool.queue = NULL;
((liVRequest*)lnk->data)->throttle.pool.magazine += supply;
((liVRequest*)lnk->data)->throttle.pool.queue = NULL;
lnk_next = lnk->next;
lnk->next = NULL;
lnk->prev = NULL;
@ -138,23 +138,23 @@ static void li_throttle_pool_rearm(liWorker *wrk, liThrottlePool *pool) {
}
}
void li_throttle_reset(liConnection *con) {
if (!con->throttled)
void li_throttle_reset(liVRequest *vr) {
if (!vr->throttled)
return;
/* remove from throttle queue */
li_waitqueue_remove(&con->wrk->throttle_queue, &con->throttle.wqueue_elem);
li_throttle_pool_release(con);
li_waitqueue_remove(&vr->wrk->throttle_queue, &vr->throttle.wqueue_elem);
li_throttle_pool_release(vr);
con->throttle.con.rate = 0;
con->throttle.con.magazine = 0;
con->throttled = FALSE;
vr->throttle.con.rate = 0;
vr->throttle.magazine = 0;
vr->throttled = FALSE;
}
void li_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) {
liWaitQueueElem *wqe;
liThrottlePool *pool;
liConnection *con;
liVRequest *vr;
liWorker *wrk;
ev_tstamp now;
guint supply;
@ -165,31 +165,49 @@ void li_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) {
now = ev_now(loop);
while (NULL != (wqe = li_waitqueue_pop(&wrk->throttle_queue))) {
con = wqe->data;
vr = wqe->data;
if (con->throttle.pool.ptr) {
if (vr->throttle.pool.ptr) {
/* throttled by pool */
pool = con->throttle.pool.ptr;
pool = vr->throttle.pool.ptr;
li_throttle_pool_rearm(wrk, pool);
if (con->throttle.con.rate) {
supply = MIN(con->throttle.pool.magazine, con->throttle.con.rate * THROTTLE_GRANULARITY);
con->throttle.con.magazine += supply;
con->throttle.pool.magazine -= supply;
if (vr->throttle.con.rate) {
supply = MIN(vr->throttle.pool.magazine, vr->throttle.con.rate * THROTTLE_GRANULARITY);
vr->throttle.magazine += supply;
vr->throttle.pool.magazine -= supply;
} else {
con->throttle.con.magazine += con->throttle.pool.magazine;
con->throttle.pool.magazine = 0;
vr->throttle.magazine += vr->throttle.pool.magazine;
vr->throttle.pool.magazine = 0;
}
/* TODO: throttled by ip */
} else {
/* throttled by connection */
if (con->throttle.con.magazine <= con->throttle.con.rate * THROTTLE_GRANULARITY * 4)
con->throttle.con.magazine += con->throttle.con.rate * THROTTLE_GRANULARITY;
if (vr->throttle.magazine <= vr->throttle.con.rate * THROTTLE_GRANULARITY * 4)
vr->throttle.magazine += vr->throttle.con.rate * THROTTLE_GRANULARITY;
}
li_ev_io_add_events(loop, &con->sock_watcher, EV_WRITE);
vr->coninfo->callbacks->handle_check_io(vr);
}
li_waitqueue_update(&wrk->throttle_queue);
}
void li_throttle_update(liVRequest *vr, goffset transferred, goffset write_max) {
vr->throttle.magazine -= transferred;
/*g_print("%p wrote %"G_GINT64_FORMAT"/%"G_GINT64_FORMAT" bytes, mags: %d/%d, queued: %s\n", (void*)con,
transferred, write_max, con->throttle.pool.magazine, con->throttle.con.magazine, con->throttle.pool.queued ? "yes":"no");*/
if (vr->throttle.magazine <= 0) {
li_waitqueue_push(&vr->wrk->throttle_queue, &vr->throttle.wqueue_elem);
}
if (vr->throttle.pool.ptr && vr->throttle.pool.magazine <= write_max && !vr->throttle.pool.queue) {
liThrottlePool *pool = vr->throttle.pool.ptr;
g_atomic_int_inc(&pool->num_cons_queued);
vr->throttle.pool.queue = pool->queues[vr->wrk->ndx];
g_queue_push_tail_link(vr->throttle.pool.queue, &vr->throttle.pool.lnk);
}
}

48
src/main/virtualrequest.c

@ -136,11 +136,11 @@ liFilter* li_vrequest_add_filter_out(liVRequest *vr, liFilterHandlerCB handle_da
return filters_add(&vr->filters_out, handle_data, handle_free, param);
}
liVRequest* li_vrequest_new(liConnection *con, liVRequestHandlerCB handle_response_headers, liVRequestHandlerCB handle_response_body, liVRequestHandlerCB handle_response_error, liVRequestHandlerCB handle_request_headers) {
liVRequest* li_vrequest_new(liConnection *con, liConInfo *coninfo) {
liServer *srv = con->srv;
liVRequest *vr = g_slice_new0(liVRequest);
vr->con = con;
vr->coninfo = coninfo;
vr->wrk = con->wrk;
vr->ref = g_slice_new0(liVRequestRef);
vr->ref->refcount = 1;
@ -148,11 +148,6 @@ liVRequest* li_vrequest_new(liConnection *con, liVRequestHandlerCB handle_respon
vr->ref->wrk = con->wrk;
vr->state = LI_VRS_CLEAN;
vr->handle_response_headers = handle_response_headers;
vr->handle_response_body = handle_response_body;
vr->handle_response_error = handle_response_error;
vr->handle_request_headers = handle_request_headers;
vr->plugin_ctx = g_ptr_array_new();
g_ptr_array_set_size(vr->plugin_ctx, g_hash_table_size(srv->plugins));
vr->options = g_slice_copy(srv->option_def_values->len * sizeof(liOptionValue), srv->option_def_values->data);
@ -194,6 +189,9 @@ liVRequest* li_vrequest_new(liConnection *con, liVRequestHandlerCB handle_respon
li_action_stack_init(&vr->action_stack);
vr->throttle.wqueue_elem.data = vr;
vr->throttle.pool.lnk.data = vr;
return vr;
}