diff --git a/include/lighttpd/base.h b/include/lighttpd/base.h index 687331b..72e12db 100644 --- a/include/lighttpd/base.h +++ b/include/lighttpd/base.h @@ -51,6 +51,7 @@ #include #include #include +#include #include #include #include diff --git a/include/lighttpd/filter_buffer_on_disk.h b/include/lighttpd/filter_buffer_on_disk.h index 742b5a1..e61e945 100644 --- a/include/lighttpd/filter_buffer_on_disk.h +++ b/include/lighttpd/filter_buffer_on_disk.h @@ -1,7 +1,9 @@ #ifndef _LIGHTTPD_FILTER_BUFFER_ON_DISK_H_ #define _LIGHTTPD_FILTER_BUFFER_ON_DISK_H_ -#include +#ifndef _LIGHTTPD_BASE_H_ +#error Please include instead of this file +#endif /* initialize with zero */ typedef struct { diff --git a/include/lighttpd/virtualrequest.h b/include/lighttpd/virtualrequest.h index e8d0876..bd5c240 100644 --- a/include/lighttpd/virtualrequest.h +++ b/include/lighttpd/virtualrequest.h @@ -80,11 +80,12 @@ struct liVRequest { /* environment entries will be passed to the backends */ liEnvironment env; - /* -> vr_in -> filters_in -> in -> handle -> out -> filters_out -> vr_out -> */ + /* -> vr_in -> filters_in -> in_memory ->(buffer_on_disk) -> in -> handle -> out -> filters_out -> vr_out -> */ gboolean cq_memory_limit_hit; /* stop feeding chunkqueues with memory chunks */ liFilters filters_in, filters_out; - liChunkQueue *vr_in, *vr_out; + liChunkQueue *vr_in, *vr_out, *in_memory; liChunkQueue *in, *out; + liFilterBufferOnDiskState in_buffer_state; liActionStack action_stack; gboolean actions_wait_for_response; diff --git a/src/main/connection.c b/src/main/connection.c index 42c8313..fc80723 100644 --- a/src/main/connection.c +++ b/src/main/connection.c @@ -471,13 +471,8 @@ liConnection* li_connection_new(liWorker *wrk) { con->in = con->mainvr->vr_in; con->out = con->mainvr->vr_out; - li_chunkqueue_use_limit(con->raw_in, con->mainvr); - li_chunkqueue_use_limit(con->raw_out, con->mainvr); - li_cqlimit_set_limit(con->raw_out->limit, 512*1024); - li_chunkqueue_set_limit(con->mainvr->vr_in, con->raw_in->limit); - li_chunkqueue_set_limit(con->mainvr->vr_out, con->raw_out->limit); - li_chunkqueue_set_limit(con->mainvr->in, con->raw_in->limit); - li_chunkqueue_set_limit(con->mainvr->out, con->raw_out->limit); + li_chunkqueue_set_limit(con->raw_in, con->mainvr->vr_in->limit); + li_chunkqueue_set_limit(con->raw_out, con->mainvr->vr_out->limit); con->keep_alive_data.link = NULL; con->keep_alive_data.timeout = 0; @@ -518,6 +513,9 @@ void li_connection_reset(liConnection *con) { } ev_io_set(&con->sock_watcher, -1, 0); + li_chunkqueue_reset(con->raw_in); + li_chunkqueue_reset(con->raw_out); + li_vrequest_reset(con->mainvr, FALSE); li_http_request_parser_reset(&con->req_parser_ctx); @@ -527,12 +525,6 @@ void li_connection_reset(liConnection *con) { li_sockaddr_clear(&con->local_addr); con->keep_alive = TRUE; - li_chunkqueue_reset(con->raw_in); - li_chunkqueue_reset(con->raw_out); - li_cqlimit_reset(con->raw_in->limit); - li_cqlimit_reset(con->raw_out->limit); - li_cqlimit_set_limit(con->raw_out->limit, 512*1024); - if (con->keep_alive_data.link) { g_queue_delete_link(&con->wrk->keep_alive_queue, con->keep_alive_data.link); con->keep_alive_data.link = NULL; diff --git a/src/main/filter_buffer_on_disk.c b/src/main/filter_buffer_on_disk.c index 9da7ea0..046b386 100644 --- a/src/main/filter_buffer_on_disk.c +++ b/src/main/filter_buffer_on_disk.c @@ -1,5 +1,5 @@ -#include +#include typedef liFilterBufferOnDiskState bod_state; diff --git a/src/main/virtualrequest.c b/src/main/virtualrequest.c index 6b893ae..4666a92 100644 --- a/src/main/virtualrequest.c +++ b/src/main/virtualrequest.c @@ -158,10 +158,22 @@ liVRequest* li_vrequest_new(liConnection *con, liVRequestHandlerCB handle_respon filters_init(&vr->filters_in); filters_init(&vr->filters_out); vr->vr_in = vr->filters_in.in; - vr->in = vr->filters_in.out; + vr->in_memory = vr->filters_in.out; + vr->in = li_chunkqueue_new(); vr->out = vr->filters_out.in; vr->vr_out = vr->filters_out.out; + li_chunkqueue_use_limit(vr->in, vr); + li_chunkqueue_set_limit(vr->vr_in, vr->in->limit); + li_chunkqueue_set_limit(vr->in_memory, vr->in->limit); + li_chunkqueue_use_limit(vr->out, vr); + li_chunkqueue_set_limit(vr->vr_out, vr->out->limit); + li_cqlimit_set_limit(vr->in->limit, 512*1024); + li_cqlimit_set_limit(vr->out->limit, 512*1024); + + vr->in_buffer_state.flush_limit = -1; /* wait until upload is complete */ + vr->in_buffer_state.split_on_file_chunks = FALSE; + vr->stat_cache_entries = g_ptr_array_sized_new(2); vr->job_queue_link.data = vr; @@ -183,6 +195,8 @@ void li_vrequest_free(liVRequest* vr) { filters_clean(vr, &vr->filters_in); filters_clean(vr, &vr->filters_out); + li_chunkqueue_free(vr->in); + li_filter_buffer_on_disk_reset(&vr->in_buffer_state); if (g_atomic_int_get(&vr->queued)) { /* atomic access shouldn't be needed here; no one else can access vr here... */ g_queue_unlink(&vr->wrk->job_queue, &vr->job_queue_link); @@ -227,6 +241,22 @@ void li_vrequest_reset(liVRequest *vr, gboolean keepalive) { filters_reset(vr, &vr->filters_in); filters_reset(vr, &vr->filters_out); + li_chunkqueue_reset(vr->in); + li_filter_buffer_on_disk_reset(&vr->in_buffer_state); + vr->in_buffer_state.flush_limit = -1; /* wait until upload is complete */ + vr->in_buffer_state.split_on_file_chunks = FALSE; + + /* restore chunkqueue limits */ + li_cqlimit_reset(vr->in->limit); + li_cqlimit_reset(vr->out->limit); + + li_chunkqueue_use_limit(vr->in, vr); + li_chunkqueue_set_limit(vr->vr_in, vr->in->limit); + li_chunkqueue_set_limit(vr->in_memory, vr->in->limit); + li_chunkqueue_use_limit(vr->out, vr); + li_chunkqueue_set_limit(vr->vr_out, vr->out->limit); + li_cqlimit_set_limit(vr->in->limit, 512*1024); + li_cqlimit_set_limit(vr->out->limit, 512*1024); if (g_atomic_int_get(&vr->queued)) { /* atomic access shouldn't be needed here; no one else can access vr here... */ g_queue_unlink(&vr->wrk->job_queue, &vr->job_queue_link); @@ -390,6 +420,9 @@ static liHandlerResult vrequest_do_handle_actions(liVRequest *vr) { static gboolean vrequest_do_handle_read(liVRequest *vr) { if (vr->backend && vr->backend->handle_request_body) { + goffset lim_avail; + + if (vr->in->is_closed) vr->in_memory->is_closed = TRUE; if (!filters_handle_out_close(vr, &vr->filters_in)) { li_vrequest_error(vr); } @@ -397,7 +430,25 @@ static gboolean vrequest_do_handle_read(liVRequest *vr) { li_vrequest_error(vr); } - if (vr->vr_in->is_closed) vr->in->is_closed = TRUE; + if (vr->in_buffer_state.tempfile || vr->request.content_length < 0 || vr->request.content_length > 64*1024 || + ((lim_avail = li_chunkqueue_limit_available(vr->in)) <= 32*1024 && lim_avail >= 0)) { + switch (li_filter_buffer_on_disk(vr, vr->in, vr->in_memory, &vr->in_buffer_state)) { + case LI_HANDLER_GO_ON: + break; + case LI_HANDLER_COMEBACK: + li_vrequest_joblist_append(vr); /* come back later */ + return FALSE; + case LI_HANDLER_WAIT_FOR_EVENT: + return FALSE; + case LI_HANDLER_ERROR: + li_vrequest_error(vr); + break; + } + } else { + li_chunkqueue_steal_all(vr->in, vr->in_memory); + if (vr->in_memory->is_closed) vr->in->is_closed = TRUE; + } + switch (vr->backend->handle_request_body(vr, vr->backend)) { case LI_HANDLER_GO_ON: break;