Add chunkqueue-limit framework

personal/stbuehler/wip
Stefan Bühler 14 years ago
parent efab0ca75d
commit 39ecfa89e5
  1. 26
      include/lighttpd/chunk.h
  2. 3
      include/lighttpd/typedefs.h
  3. 117
      src/chunk.c

@ -40,11 +40,26 @@ struct chunk {
} file;
};
typedef void (*cqlimit_notify)(vrequest *vr, gpointer context, gboolean locked);
struct cqlimit {
gint refcount;
vrequest *vr;
goffset limit, current;
gboolean locked;
ev_io *io_watcher;
cqlimit_notify notify; /* callback to reactivate input */
gpointer context;
};
struct chunkqueue {
/* public */
gboolean is_closed;
/* read only */
goffset bytes_in, bytes_out, length;
goffset bytes_in, bytes_out, length, mem_usage;
cqlimit *limit; /* limit is the sum of all { c->mem->len | c->type == MEM_CHUNK } */
/* private */
GQueue *queue;
};
@ -88,6 +103,15 @@ LI_API handler_t chunkiter_read_mmap(struct vrequest *vr, chunkiter iter, off_t
INLINE goffset chunk_length(chunk *c);
/******************
* cqlimit *
******************/
LI_API cqlimit* cqlimit_new(vrequest *vr);
LI_API void cqlimit_acquire(cqlimit *cql);
LI_API void cqlimit_release(cqlimit *cql);
/******************
* chunkqueue *
******************/

@ -31,6 +31,9 @@ typedef struct chunkfile chunkfile;
struct chunk;
typedef struct chunk chunk;
struct cqlimit;
typedef struct cqlimit cqlimit;
struct chunkqueue;
typedef struct chunkqueue chunkqueue;

@ -306,6 +306,72 @@ static void chunk_free(chunk *c) {
g_slice_free(chunk, c);
}
/******************
* cqlimit *
******************/
cqlimit* cqlimit_new(vrequest *vr) {
cqlimit *cql = g_slice_new0(cqlimit);
cql->refcount = 1;
cql->vr = vr;
cql->limit = -1;
return cql;
}
void cqlimit_acquire(cqlimit *cql) {
assert(g_atomic_int_get(&cql->refcount) > 0);
g_atomic_int_inc(&cql->refcount);
}
void cqlimit_release(cqlimit *cql) {
if (!cql) return;
assert(g_atomic_int_get(&cql->refcount) > 0);
if (g_atomic_int_dec_and_test(&cql->refcount)) {
g_slice_free(cqlimit, cql);
}
}
static void cqlimit_lock(cqlimit *cql) {
cql->locked = TRUE;
if (cql->io_watcher) {
ev_io_rem_events(cql->vr->con->wrk->loop, cql->io_watcher, EV_READ);
}
if (cql->notify) {
cql->notify(cql->vr, cql->context, cql->locked);
}
}
static void cqlimit_unlock(cqlimit *cql) {
cql->locked = FALSE;
if (cql->io_watcher) {
ev_io_add_events(cql->vr->con->wrk->loop, cql->io_watcher, EV_READ);
}
if (cql->notify) {
cql->notify(cql->vr, cql->context, cql->locked);
}
}
static void cqlimit_update(chunkqueue *cq, goffset d) {
cqlimit *cql;
if (!cq) return;
cq->mem_usage += d;
assert(cq->mem_usage >= 0);
cql = cq->limit;
if (!cql) return;
cql->current += d;
assert(cql->current >= 0);
if (cql->locked) {
if (cql->current < cql->limit) {
cqlimit_unlock(cql);
}
} else {
if (cql->limit > 0 && cql->current >= cql->limit) {
cqlimit_lock(cql);
}
}
}
/******************
* chunkqueue *
******************/
@ -316,23 +382,34 @@ chunkqueue* chunkqueue_new() {
return cq;
}
static void __chunk_free(gpointer c, gpointer UNUSED_PARAM(userdata)) {
chunk_free((chunk*) c);
static void __chunk_free(gpointer _c, gpointer userdata) {
chunk *c = (chunk *)_c;
chunkqueue *cq = (chunkqueue*) userdata;
if (c->type == MEM_CHUNK) cqlimit_update(cq, - c->mem->len);
chunk_free(c);
}
void chunkqueue_reset(chunkqueue *cq) {
if (!cq) return;
cq->is_closed = FALSE;
cq->bytes_in = cq->bytes_out = cq->length = 0;
g_queue_foreach(cq->queue, __chunk_free, NULL);
g_queue_foreach(cq->queue, __chunk_free, cq);
cqlimit_release(cq->limit);
cq->limit = NULL;
assert(cq->mem_usage == 0);
cq->mem_usage = 0;
g_queue_clear(cq->queue);
}
void chunkqueue_free(chunkqueue *cq) {
if (!cq) return;
g_queue_foreach(cq->queue, __chunk_free, NULL);
g_queue_foreach(cq->queue, __chunk_free, cq);
g_queue_free(cq->queue);
cq->queue = NULL;
cqlimit_release(cq->limit);
cq->limit = NULL;
assert(cq->mem_usage == 0);
cq->mem_usage = 0;
g_slice_free(chunkqueue, cq);
}
@ -348,6 +425,7 @@ void chunkqueue_append_string(chunkqueue *cq, GString *str) {
g_queue_push_tail(cq->queue, c);
cq->length += str->len;
cq->bytes_in += str->len;
cqlimit_update(cq, str->len);
}
/* memory gets copied */
@ -360,6 +438,7 @@ void chunkqueue_append_mem(chunkqueue *cq, const void *mem, gssize len) {
g_queue_push_tail(cq->queue, c);
cq->length += c->mem->len;
cq->bytes_in += c->mem->len;
cqlimit_update(cq, c->mem->len);
}
static void __chunkqueue_append_file(chunkqueue *cq, GString *filename, off_t start, off_t length, int fd, gboolean is_temp) {
@ -402,12 +481,13 @@ void chunkqueue_append_tempfile_fd(chunkqueue *cq, GString *filename, off_t star
goffset chunkqueue_steal_len(chunkqueue *out, chunkqueue *in, goffset length) {
chunk *c, *cnew;
GList* l;
goffset bytes = 0;
goffset bytes = 0, meminbytes = 0, memoutbytes = 0;
goffset we_have;
while ( (NULL != (c = chunkqueue_first_chunk(in))) && length > 0 ) {
we_have = chunk_length(c);
if (!we_have) { /* remove empty chunks */
if (c->type == MEM_CHUNK) meminbytes -= c->mem->len;
chunk_free(c);
g_queue_pop_head(in->queue);
continue;
@ -416,6 +496,10 @@ goffset chunkqueue_steal_len(chunkqueue *out, chunkqueue *in, goffset length) {
l = g_queue_pop_head_link(in->queue);
g_queue_push_tail_link(out->queue, l);
bytes += we_have;
if (c->type == MEM_CHUNK) {
meminbytes -= c->mem->len;
memoutbytes += c->mem->len;
}
length -= we_have;
} else { /* copy first part of a chunk */
cnew = chunk_new();
@ -429,6 +513,7 @@ goffset chunkqueue_steal_len(chunkqueue *out, chunkqueue *in, goffset length) {
case MEM_CHUNK:
cnew->type = MEM_CHUNK;
cnew->mem = g_string_new_len(c->mem->str + c->offset, length);
memoutbytes += length;
break;
case FILE_CHUNK:
cnew->type = FILE_CHUNK;
@ -449,6 +534,8 @@ goffset chunkqueue_steal_len(chunkqueue *out, chunkqueue *in, goffset length) {
in->length -= bytes;
out->bytes_in += bytes;
out->length += bytes;
cqlimit_update(out, memoutbytes);
cqlimit_update(in, meminbytes);
return bytes;
}
@ -458,6 +545,15 @@ goffset chunkqueue_steal_all(chunkqueue *out, chunkqueue *in) {
/* if in->queue is empty, do nothing */
if (!in->length) return 0;
if (in->limit != out->limit) {
cqlimit_update(in, -in->mem_usage);
cqlimit_update(out, in->mem_usage);
} else {
out->mem_usage += in->mem_usage;
in->mem_usage = 0;
}
/* if out->queue is empty, just swap in->queue/out->queue */
if (g_queue_is_empty(out->queue)) {
GQueue *tmp = in->queue; in->queue = out->queue; out->queue = tmp;
@ -482,16 +578,22 @@ goffset chunkqueue_steal_all(chunkqueue *out, chunkqueue *in) {
/* steal the first chunk from in and append it to out, return number of bytes stolen */
goffset chunkqueue_steal_chunk(chunkqueue *out, chunkqueue *in) {
chunk *c;
goffset length;
GList *l = g_queue_pop_head_link(in->queue);
if (!l) return 0;
g_queue_push_tail_link(out->queue, l);
length = chunk_length((chunk*) l->data);
c = (chunk*) l->data;
length = chunk_length(c);
in->bytes_out += length;
in->length -= length;
out->bytes_in += length;
out->length += length;
if (in->limit != out->limit && c->type == MEM_CHUNK) {
cqlimit_update(out, c->mem->len);
cqlimit_update(in, - c->mem->len);
}
return length;
}
@ -504,6 +606,7 @@ goffset chunkqueue_skip(chunkqueue *cq, goffset length) {
while ( (NULL != (c = chunkqueue_first_chunk(cq))) && (0 == (we_have = chunk_length(c)) || length > 0) ) {
if (we_have <= length) {
/* skip (delete) complete chunk */
if (c->type == MEM_CHUNK) cqlimit_update(cq, - c->mem->len);
chunk_free(c);
g_queue_pop_head(cq->queue);
bytes += we_have;
@ -523,7 +626,7 @@ goffset chunkqueue_skip(chunkqueue *cq, goffset length) {
goffset chunkqueue_skip_all(chunkqueue *cq) {
goffset bytes = cq->length;
g_queue_foreach(cq->queue, __chunk_free, NULL);
g_queue_foreach(cq->queue, __chunk_free, cq);
g_queue_clear(cq->queue);
cq->bytes_out += bytes;

Loading…
Cancel
Save