2
0
Fork 0

Use chunkqueue-limit (setup in plugin_core)

This commit is contained in:
Stefan Bühler 2009-03-01 21:23:24 +01:00
parent 6ebc331215
commit 2f9e612c99
5 changed files with 135 additions and 9 deletions

View File

@ -108,8 +108,10 @@ INLINE goffset chunk_length(chunk *c);
******************/
LI_API cqlimit* cqlimit_new(vrequest *vr);
LI_API void cqlimit_reset(cqlimit *cql);
LI_API void cqlimit_acquire(cqlimit *cql);
LI_API void cqlimit_release(cqlimit *cql);
LI_API void cqlimit_set_limit(cqlimit *cql, goffset limit);
/******************
@ -120,6 +122,9 @@ LI_API chunkqueue* chunkqueue_new();
LI_API void chunkqueue_reset(chunkqueue *cq);
LI_API void chunkqueue_free(chunkqueue *cq);
LI_API void chunkqueue_use_limit(chunkqueue *cq, vrequest *vr);
LI_API void chunkqueue_set_limit(chunkqueue *cq, cqlimit* cql);
/* pass ownership of str to chunkqueue, do not free/modify it afterwards
* you may modify the data (not the length) if you are sure it isn't sent before.
*/

View File

@ -317,6 +317,16 @@ cqlimit* cqlimit_new(vrequest *vr) {
return cql;
}
void cqlimit_reset(cqlimit *cql) {
assert(cql->current == 0);
assert(cql->io_watcher == NULL);
assert(cql->notify == NULL);
cql->current = 0;
cql->limit = -1;
cql->io_watcher = NULL;
cql->notify = NULL;
}
void cqlimit_acquire(cqlimit *cql) {
assert(g_atomic_int_get(&cql->refcount) > 0);
g_atomic_int_inc(&cql->refcount);
@ -332,7 +342,7 @@ void cqlimit_release(cqlimit *cql) {
static void cqlimit_lock(cqlimit *cql) {
cql->locked = TRUE;
if (cql->io_watcher) {
if (cql->io_watcher && cql->io_watcher->fd != -1) {
ev_io_rem_events(cql->vr->con->wrk->loop, cql->io_watcher, EV_READ);
}
if (cql->notify) {
@ -342,7 +352,7 @@ static void cqlimit_lock(cqlimit *cql) {
static void cqlimit_unlock(cqlimit *cql) {
cql->locked = FALSE;
if (cql->io_watcher) {
if (cql->io_watcher && cql->io_watcher->fd != -1) {
ev_io_add_events(cql->vr->con->wrk->loop, cql->io_watcher, EV_READ);
}
if (cql->notify) {
@ -357,12 +367,14 @@ static void cqlimit_update(chunkqueue *cq, goffset d) {
cq->mem_usage += d;
assert(cq->mem_usage >= 0);
cql = cq->limit;
fprintf(stderr, "cqlimit_update: cq->mem_usage: %"L_GOFFSET_FORMAT"\n", cq->mem_usage);
if (!cql) return;
cql->current += d;
assert(cql->current >= 0);
fprintf(stderr, "cqlimit_update: cql->current: %"L_GOFFSET_FORMAT", cql->limit: %"L_GOFFSET_FORMAT"\n", cql->current, cql->limit);
if (cql->locked) {
if (cql->current < cql->limit) {
if (cql->limit <= 0 || cql->current < cql->limit) {
cqlimit_unlock(cql);
}
} else {
@ -372,6 +384,22 @@ static void cqlimit_update(chunkqueue *cq, goffset d) {
}
}
void cqlimit_set_limit(cqlimit *cql, goffset limit) {
if (!cql) return;
cql->limit = limit;
if (cql->locked) {
if (cql->limit <= 0 || cql->current < cql->limit) {
cqlimit_unlock(cql);
}
} else {
if (cql->limit > 0 && cql->current >= cql->limit) {
cqlimit_lock(cql);
}
}
}
/******************
* chunkqueue *
******************/
@ -394,8 +422,6 @@ void chunkqueue_reset(chunkqueue *cq) {
cq->is_closed = FALSE;
cq->bytes_in = cq->bytes_out = cq->length = 0;
g_queue_foreach(cq->queue, __chunk_free, cq);
cqlimit_release(cq->limit);
cq->limit = NULL;
assert(cq->mem_usage == 0);
cq->mem_usage = 0;
g_queue_clear(cq->queue);
@ -413,6 +439,17 @@ void chunkqueue_free(chunkqueue *cq) {
g_slice_free(chunkqueue, cq);
}
void chunkqueue_use_limit(chunkqueue *cq, vrequest *vr) {
if (cq->limit) return;
cq->limit = cqlimit_new(vr);
}
void chunkqueue_set_limit(chunkqueue *cq, cqlimit* cql) {
if (cql) cqlimit_acquire(cql);
cqlimit_release(cq->limit);
cq->limit = cql;
}
/* pass ownership of str to chunkqueue, do not free/modify it afterwards
* you may modify the data (not the length) if you are sure it isn't sent before.
*/
@ -547,8 +584,8 @@ goffset chunkqueue_steal_all(chunkqueue *out, chunkqueue *in) {
if (!in->length) return 0;
if (in->limit != out->limit) {
cqlimit_update(in, -in->mem_usage);
cqlimit_update(out, in->mem_usage);
cqlimit_update(in, -in->mem_usage);
} else {
out->mem_usage += in->mem_usage;
in->mem_usage = 0;

View File

@ -337,6 +337,13 @@ connection* connection_new(worker *wrk) {
con->in = con->mainvr->vr_in;
con->out = con->mainvr->vr_out;
chunkqueue_use_limit(con->raw_in, con->mainvr);
chunkqueue_use_limit(con->raw_out, con->mainvr);
chunkqueue_set_limit(con->mainvr->vr_in, con->raw_in->limit);
chunkqueue_set_limit(con->mainvr->vr_out, con->raw_out->limit);
chunkqueue_set_limit(con->mainvr->in, con->raw_in->limit);
chunkqueue_set_limit(con->mainvr->out, con->raw_out->limit);
con->keep_alive_data.link = NULL;
con->keep_alive_data.timeout = 0;
con->keep_alive_data.max_idle = 0;
@ -368,6 +375,9 @@ void connection_reset(connection *con) {
vrequest_reset(con->mainvr);
http_request_parser_reset(&con->req_parser_ctx);
cqlimit_reset(con->raw_in->limit);
cqlimit_reset(con->raw_out->limit);
g_string_truncate(con->remote_addr_str, 0);
g_string_truncate(con->local_addr_str, 0);
con->keep_alive = TRUE;

View File

@ -602,7 +602,7 @@ static void fastcgi_fd_cb(struct ev_loop *loop, ev_io *w, int revents) {
fcon->response_headers_finished = TRUE;
vrequest_handle_response_headers(fcon->vr);
}
if (fcon->response_headers_finished) {
chunkqueue_steal_all(fcon->vr->out, fcon->stdout);
fcon->vr->out->is_closed = fcon->stdout->is_closed;
@ -700,6 +700,11 @@ static handler_t fastcgi_handle(vrequest *vr, gpointer param, gpointer *context)
}
g_ptr_array_index(vr->plugin_ctx, ctx->plugin->id) = fcon;
chunkqueue_set_limit(fcon->fcgi_in, vr->out->limit);
chunkqueue_set_limit(fcon->stdout, vr->out->limit);
chunkqueue_set_limit(fcon->fcgi_out, vr->in->limit);
vr->out->limit->io_watcher = &fcon->fd_watcher;
return fastcgi_statemachine(vr, fcon);
}
@ -714,8 +719,10 @@ static handler_t fastcgi_handle_request_body(vrequest *vr, plugin *p) {
static void fastcgi_close(vrequest *vr, plugin *p) {
fastcgi_connection *fcon = (fastcgi_connection*) g_ptr_array_index(vr->plugin_ctx, p->id);
g_ptr_array_index(vr->plugin_ctx, p->id) = NULL;
fastcgi_connection_free(fcon);
if (fcon) {
vr->out->limit->io_watcher = NULL;
fastcgi_connection_free(fcon);
}
}

View File

@ -960,6 +960,70 @@ static action* core_physical_is_dir(server *srv, plugin* p, value *val) {
return action_new_function(core_handle_physical_is_dir, NULL, core_conditional_free, cc);
}
/* chunkqueue memory limits */
static handler_t core_handle_limit_out(vrequest *vr, gpointer param, gpointer *context) {
gint limit = GPOINTER_TO_INT(param);
UNUSED(context);
cqlimit_set_limit(vr->out->limit, limit);
return HANDLER_GO_ON;
}
static action* core_limit_out(server *srv, plugin* p, value *val) {
gint64 limit;
UNUSED(p);
if (val->type != VALUE_NUMBER) {
ERROR(srv, "'core_limit_out' action expects an integer as parameter, %s given", value_type_string(val->type));
return NULL;
}
limit = val->data.number;
if (limit < 0) {
limit = 0; /* no limit */
}
if (limit > (1 << 30)) {
ERROR(srv, "limit %"G_GINT64_FORMAT" is too high (1GB is the maximum)", limit);
return NULL;
}
return action_new_function(core_handle_limit_out, NULL, NULL, GINT_TO_POINTER(limit));
}
static handler_t core_handle_limit_in(vrequest *vr, gpointer param, gpointer *context) {
gint limit = GPOINTER_TO_INT(param);
UNUSED(context);
cqlimit_set_limit(vr->out->limit, limit);
return HANDLER_GO_ON;
}
static action* core_limit_in(server *srv, plugin* p, value *val) {
gint64 limit;
UNUSED(p);
if (val->type != VALUE_NUMBER) {
ERROR(srv, "'core_limit_in' action expects an integer as parameter, %s given", value_type_string(val->type));
return NULL;
}
limit = val->data.number;
if (limit < 0) {
limit = 0; /* no limit */
}
if (limit > (1 << 30)) {
ERROR(srv, "limit %"G_GINT64_FORMAT" is too high (1GB is the maximum)", limit);
return NULL;
}
return action_new_function(core_handle_limit_in, NULL, NULL, GINT_TO_POINTER(limit));
}
static const plugin_option options[] = {
{ "debug.log_request_handling", VALUE_BOOLEAN, GINT_TO_POINTER(FALSE), NULL, NULL },
@ -1002,6 +1066,9 @@ static const plugin_action actions[] = {
{ "physical.is_file", core_physical_is_file },
{ "physical.is_dir", core_physical_is_dir },
{ "limit.out", core_limit_out },
{ "limit.in", core_limit_in },
{ NULL, NULL }
};