2
0
Fork 0

Improve filter handling (handle reverse out stream closing)

personal/stbuehler/wip
Stefan Bühler 2009-09-29 11:45:55 +02:00
parent 6702e1413b
commit 471a521f48
4 changed files with 67 additions and 28 deletions

View File

@ -42,12 +42,13 @@ struct liFilter {
liFilterHandlerCB handle_data;
liFilterFreeCB handle_free;
gpointer param;
/* do not modify these yourself: */
gboolean knows_out_is_closed, done;
};
struct liFilters {
GPtrArray *queue;
liChunkQueue *in, *out;
guint skip_ndx;
};
struct liVRequestRef {
@ -111,8 +112,8 @@ LI_API void li_vrequest_reset(liVRequest *vr);
LI_API liVRequestRef* li_vrequest_acquire_ref(liVRequest *vr);
LI_API liVRequest* li_vrequest_release_ref(liVRequestRef *vr_ref);
LI_API void li_vrequest_add_filter_in(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, gpointer param);
LI_API void li_vrequest_add_filter_out(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, gpointer param);
LI_API liFilter* li_vrequest_add_filter_in(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, gpointer param);
LI_API liFilter* li_vrequest_add_filter_out(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, gpointer param);
/* Signals an internal error; handles the error in the _next_ loop */
LI_API void li_vrequest_error(liVRequest *vr);

View File

@ -23,7 +23,6 @@ static void filters_clean(liVRequest *vr, liFilters *fs) {
static void filters_reset(liVRequest *vr, liFilters *fs) {
guint i;
fs->skip_ndx = 0;
for (i = 0; i < fs->queue->len; i++) {
liFilter *f = (liFilter*) g_ptr_array_index(fs->queue, i);
if (f->handle_free && f->param) f->handle_free(vr, f);
@ -35,6 +34,32 @@ static void filters_reset(liVRequest *vr, liFilters *fs) {
li_chunkqueue_reset(fs->out);
}
static gboolean filters_handle_out_close(liVRequest *vr, liFilters *fs) {
guint i;
if (0 == fs->queue->len) {
if (fs->out->is_closed) fs->in->is_closed = TRUE;
return TRUE;
}
for (i = fs->queue->len; i-- > 0; ) {
liFilter *f = (liFilter*) g_ptr_array_index(fs->queue, i);
if (f->out->is_closed && !f->knows_out_is_closed) {
f->knows_out_is_closed = TRUE;
switch (f->handle_data(vr, f)) {
case LI_HANDLER_GO_ON:
break;
case LI_HANDLER_COMEBACK:
li_vrequest_joblist_append(vr);
break;
case LI_HANDLER_WAIT_FOR_EVENT:
break; /* ignore - filter has to call li_vrequest_joblist_append(vr); */
case LI_HANDLER_ERROR:
return FALSE;
}
}
}
return TRUE;
}
static gboolean filters_run(liVRequest *vr, liFilters *fs) {
guint i;
if (0 == fs->queue->len) {
@ -55,26 +80,32 @@ static gboolean filters_run(liVRequest *vr, liFilters *fs) {
case LI_HANDLER_ERROR:
return FALSE;
}
}
if (fs->out->is_closed) {
liFilter *f = (liFilter*) g_ptr_array_index(fs->queue, fs->queue->len - 1);
f->in->is_closed = TRUE;
}
for (i = fs->queue->len; i-- > fs->skip_ndx; ) {
liFilter *f = (liFilter*) g_ptr_array_index(fs->queue, i);
if (f->in->is_closed) {
guint j = i;
while (j-- > fs->skip_ndx) {
liFilter *ff = (liFilter*) g_ptr_array_index(fs->queue, j);
ff->in->is_closed = TRUE;
f->knows_out_is_closed = f->out->is_closed;
if (f->in->is_closed && i > 0) {
guint j;
for (j = i; j-- > 0; ) {
liFilter *g = (liFilter*) g_ptr_array_index(fs->queue, j);
if (g->knows_out_is_closed) break;
g->knows_out_is_closed = TRUE;
switch (f->handle_data(vr, f)) {
case LI_HANDLER_GO_ON:
break;
case LI_HANDLER_COMEBACK:
li_vrequest_joblist_append(vr);
break;
case LI_HANDLER_WAIT_FOR_EVENT:
break; /* ignore - filter has to call li_vrequest_joblist_append(vr); */
case LI_HANDLER_ERROR:
return FALSE;
}
if (!g->in->is_closed) break;
}
fs->skip_ndx = i;
}
}
return TRUE;
}
static void filters_add(liFilters *fs, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, gpointer param) {
static liFilter* filters_add(liFilters *fs, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, gpointer param) {
liFilter *f = g_slice_new0(liFilter);
f->out = fs->out;
f->param = param;
@ -88,14 +119,15 @@ static void filters_add(liFilters *fs, liFilterHandlerCB handle_data, liFilterFr
li_chunkqueue_set_limit(f->in, fs->in->limit);
}
g_ptr_array_add(fs->queue, f);
return f;
}
void li_vrequest_add_filter_in(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, gpointer param) {
filters_add(&vr->filters_in, handle_data, handle_free, param);
liFilter* li_vrequest_add_filter_in(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, gpointer param) {
return filters_add(&vr->filters_in, handle_data, handle_free, param);
}
void li_vrequest_add_filter_out(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, gpointer param) {
filters_add(&vr->filters_out, handle_data, handle_free, param);
liFilter* li_vrequest_add_filter_out(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, gpointer param) {
return filters_add(&vr->filters_out, handle_data, handle_free, param);
}
liVRequest* li_vrequest_new(liConnection *con, liVRequestHandlerCB handle_response_headers, liVRequestHandlerCB handle_response_body, liVRequestHandlerCB handle_response_error, liVRequestHandlerCB handle_request_headers) {
@ -354,6 +386,9 @@ static liHandlerResult vrequest_do_handle_actions(liVRequest *vr) {
static gboolean vrequest_do_handle_read(liVRequest *vr) {
if (vr->backend && vr->backend->handle_request_body) {
if (!filters_handle_out_close(vr, &vr->filters_in)) {
li_vrequest_error(vr);
}
if (!filters_run(vr, &vr->filters_in)) {
li_vrequest_error(vr);
}
@ -379,6 +414,9 @@ static gboolean vrequest_do_handle_read(liVRequest *vr) {
}
static gboolean vrequest_do_handle_write(liVRequest *vr) {
if (!filters_handle_out_close(vr, &vr->filters_out)) {
li_vrequest_error(vr);
}
if (!filters_run(vr, &vr->filters_out)) {
li_vrequest_error(vr);
}

View File

@ -137,9 +137,7 @@ static liHandlerResult cache_etag_filter_hit(liVRequest *vr, liFilter *f) {
if (!cfile) return LI_HANDLER_GO_ON;
f->in->is_closed = TRUE;
li_chunkqueue_append_file_fd(f->out, NULL, 0, cfile->hit_length, cfile->hit_fd);
if (!f->out->is_closed) li_chunkqueue_append_file_fd(f->out, NULL, 0, cfile->hit_length, cfile->hit_fd);
cfile->hit_fd = -1;
cache_etag_file_free(cfile);
f->param = NULL;
@ -229,6 +227,7 @@ static liHandlerResult cache_etag_handle(liVRequest *vr, gpointer param, gpointe
liHttpHeader *etag;
struct stat st;
GString *tmp_str = vr->wrk->tmp_str;
liFilter *f;
if (!cfile) {
if (vr->request.http_method != LI_HTTP_METHOD_GET) return LI_HANDLER_GO_ON;
@ -277,7 +276,8 @@ static liHandlerResult cache_etag_handle(liVRequest *vr, gpointer param, gpointe
g_string_truncate(tmp_str, 0);
li_string_append_int(tmp_str, st.st_size);
li_http_header_overwrite(vr->response.headers, CONST_STR_LEN("Content-Length"), GSTR_LEN(tmp_str));
li_vrequest_add_filter_out(vr, cache_etag_filter_hit, cache_etag_filter_free, cfile);
f = li_vrequest_add_filter_out(vr, cache_etag_filter_hit, cache_etag_filter_free, cfile);
f->in->is_closed = TRUE;
*context = NULL;
return LI_HANDLER_GO_ON;
}

View File

@ -189,7 +189,7 @@ static liHandlerResult deflate_filter_zlib(liVRequest *vr, liFilter *f) {
li_chunkqueue_skip_all(f->in);
f->in->is_closed = TRUE;
if (debug) {
VR_DEBUG(vr, "deflate: %s", "connection closed by remote");
VR_DEBUG(vr, "deflate out stream closed: in: %i, out : %i", (int) z->total_in, (int) z->total_out);
}
return LI_HANDLER_GO_ON;
}
@ -397,7 +397,7 @@ static liHandlerResult deflate_filter_bzip2(liVRequest *vr, liFilter *f) {
li_chunkqueue_skip_all(f->in);
f->in->is_closed = TRUE;
if (debug) {
VR_DEBUG(vr, "deflate: %s", "connection closed by remote");
VR_DEBUG(vr, "deflate out stream closed: in: %i, out : %i", (int) bz->total_in_lo32, (int) bz->total_out_lo32);
}
return LI_HANDLER_GO_ON;
}