Browse Source

Add buffer-on-disk for uploads automatically

personal/stbuehler/wip
Stefan Bühler 13 years ago
parent
commit
402c45dc45
  1. 1
      include/lighttpd/base.h
  2. 4
      include/lighttpd/filter_buffer_on_disk.h
  3. 5
      include/lighttpd/virtualrequest.h
  4. 18
      src/main/connection.c
  5. 2
      src/main/filter_buffer_on_disk.c
  6. 55
      src/main/virtualrequest.c

1
include/lighttpd/base.h

@ -51,6 +51,7 @@
#include <lighttpd/request.h>
#include <lighttpd/response.h>
#include <lighttpd/environment.h>
#include <lighttpd/filter_buffer_on_disk.h>
#include <lighttpd/virtualrequest.h>
#include <lighttpd/log.h>
#include <lighttpd/stat_cache.h>

4
include/lighttpd/filter_buffer_on_disk.h

@ -1,7 +1,9 @@
#ifndef _LIGHTTPD_FILTER_BUFFER_ON_DISK_H_
#define _LIGHTTPD_FILTER_BUFFER_ON_DISK_H_
#include <lighttpd/base.h>
#ifndef _LIGHTTPD_BASE_H_
#error Please include <lighttpd/base.h> instead of this file
#endif
/* initialize with zero */
typedef struct {

5
include/lighttpd/virtualrequest.h

@ -80,11 +80,12 @@ struct liVRequest {
/* environment entries will be passed to the backends */
liEnvironment env;
/* -> vr_in -> filters_in -> in -> handle -> out -> filters_out -> vr_out -> */
/* -> vr_in -> filters_in -> in_memory ->(buffer_on_disk) -> in -> handle -> out -> filters_out -> vr_out -> */
gboolean cq_memory_limit_hit; /* stop feeding chunkqueues with memory chunks */
liFilters filters_in, filters_out;
liChunkQueue *vr_in, *vr_out;
liChunkQueue *vr_in, *vr_out, *in_memory;
liChunkQueue *in, *out;
liFilterBufferOnDiskState in_buffer_state;
liActionStack action_stack;
gboolean actions_wait_for_response;

18
src/main/connection.c

@ -471,13 +471,8 @@ liConnection* li_connection_new(liWorker *wrk) {
con->in = con->mainvr->vr_in;
con->out = con->mainvr->vr_out;
li_chunkqueue_use_limit(con->raw_in, con->mainvr);
li_chunkqueue_use_limit(con->raw_out, con->mainvr);
li_cqlimit_set_limit(con->raw_out->limit, 512*1024);
li_chunkqueue_set_limit(con->mainvr->vr_in, con->raw_in->limit);
li_chunkqueue_set_limit(con->mainvr->vr_out, con->raw_out->limit);
li_chunkqueue_set_limit(con->mainvr->in, con->raw_in->limit);
li_chunkqueue_set_limit(con->mainvr->out, con->raw_out->limit);
li_chunkqueue_set_limit(con->raw_in, con->mainvr->vr_in->limit);
li_chunkqueue_set_limit(con->raw_out, con->mainvr->vr_out->limit);
con->keep_alive_data.link = NULL;
con->keep_alive_data.timeout = 0;
@ -518,6 +513,9 @@ void li_connection_reset(liConnection *con) {
}
ev_io_set(&con->sock_watcher, -1, 0);
li_chunkqueue_reset(con->raw_in);
li_chunkqueue_reset(con->raw_out);
li_vrequest_reset(con->mainvr, FALSE);
li_http_request_parser_reset(&con->req_parser_ctx);
@ -527,12 +525,6 @@ void li_connection_reset(liConnection *con) {
li_sockaddr_clear(&con->local_addr);
con->keep_alive = TRUE;
li_chunkqueue_reset(con->raw_in);
li_chunkqueue_reset(con->raw_out);
li_cqlimit_reset(con->raw_in->limit);
li_cqlimit_reset(con->raw_out->limit);
li_cqlimit_set_limit(con->raw_out->limit, 512*1024);
if (con->keep_alive_data.link) {
g_queue_delete_link(&con->wrk->keep_alive_queue, con->keep_alive_data.link);
con->keep_alive_data.link = NULL;

2
src/main/filter_buffer_on_disk.c

@ -1,5 +1,5 @@
#include <lighttpd/filter_buffer_on_disk.h>
#include <lighttpd/base.h>
typedef liFilterBufferOnDiskState bod_state;

55
src/main/virtualrequest.c

@ -158,10 +158,22 @@ liVRequest* li_vrequest_new(liConnection *con, liVRequestHandlerCB handle_respon
filters_init(&vr->filters_in);
filters_init(&vr->filters_out);
vr->vr_in = vr->filters_in.in;
vr->in = vr->filters_in.out;
vr->in_memory = vr->filters_in.out;
vr->in = li_chunkqueue_new();
vr->out = vr->filters_out.in;
vr->vr_out = vr->filters_out.out;
li_chunkqueue_use_limit(vr->in, vr);
li_chunkqueue_set_limit(vr->vr_in, vr->in->limit);
li_chunkqueue_set_limit(vr->in_memory, vr->in->limit);
li_chunkqueue_use_limit(vr->out, vr);
li_chunkqueue_set_limit(vr->vr_out, vr->out->limit);
li_cqlimit_set_limit(vr->in->limit, 512*1024);
li_cqlimit_set_limit(vr->out->limit, 512*1024);
vr->in_buffer_state.flush_limit = -1; /* wait until upload is complete */
vr->in_buffer_state.split_on_file_chunks = FALSE;
vr->stat_cache_entries = g_ptr_array_sized_new(2);
vr->job_queue_link.data = vr;
@ -183,6 +195,8 @@ void li_vrequest_free(liVRequest* vr) {
filters_clean(vr, &vr->filters_in);
filters_clean(vr, &vr->filters_out);
li_chunkqueue_free(vr->in);
li_filter_buffer_on_disk_reset(&vr->in_buffer_state);
if (g_atomic_int_get(&vr->queued)) { /* atomic access shouldn't be needed here; no one else can access vr here... */
g_queue_unlink(&vr->wrk->job_queue, &vr->job_queue_link);
@ -227,6 +241,22 @@ void li_vrequest_reset(liVRequest *vr, gboolean keepalive) {
filters_reset(vr, &vr->filters_in);
filters_reset(vr, &vr->filters_out);
li_chunkqueue_reset(vr->in);
li_filter_buffer_on_disk_reset(&vr->in_buffer_state);
vr->in_buffer_state.flush_limit = -1; /* wait until upload is complete */
vr->in_buffer_state.split_on_file_chunks = FALSE;
/* restore chunkqueue limits */
li_cqlimit_reset(vr->in->limit);
li_cqlimit_reset(vr->out->limit);
li_chunkqueue_use_limit(vr->in, vr);
li_chunkqueue_set_limit(vr->vr_in, vr->in->limit);
li_chunkqueue_set_limit(vr->in_memory, vr->in->limit);
li_chunkqueue_use_limit(vr->out, vr);
li_chunkqueue_set_limit(vr->vr_out, vr->out->limit);
li_cqlimit_set_limit(vr->in->limit, 512*1024);
li_cqlimit_set_limit(vr->out->limit, 512*1024);
if (g_atomic_int_get(&vr->queued)) { /* atomic access shouldn't be needed here; no one else can access vr here... */
g_queue_unlink(&vr->wrk->job_queue, &vr->job_queue_link);
@ -390,6 +420,9 @@ static liHandlerResult vrequest_do_handle_actions(liVRequest *vr) {
static gboolean vrequest_do_handle_read(liVRequest *vr) {
if (vr->backend && vr->backend->handle_request_body) {
goffset lim_avail;
if (vr->in->is_closed) vr->in_memory->is_closed = TRUE;
if (!filters_handle_out_close(vr, &vr->filters_in)) {
li_vrequest_error(vr);
}
@ -397,7 +430,25 @@ static gboolean vrequest_do_handle_read(liVRequest *vr) {
li_vrequest_error(vr);
}
if (vr->vr_in->is_closed) vr->in->is_closed = TRUE;
if (vr->in_buffer_state.tempfile || vr->request.content_length < 0 || vr->request.content_length > 64*1024 ||
((lim_avail = li_chunkqueue_limit_available(vr->in)) <= 32*1024 && lim_avail >= 0)) {
switch (li_filter_buffer_on_disk(vr, vr->in, vr->in_memory, &vr->in_buffer_state)) {
case LI_HANDLER_GO_ON:
break;
case LI_HANDLER_COMEBACK:
li_vrequest_joblist_append(vr); /* come back later */
return FALSE;
case LI_HANDLER_WAIT_FOR_EVENT:
return FALSE;
case LI_HANDLER_ERROR:
li_vrequest_error(vr);
break;
}
} else {
li_chunkqueue_steal_all(vr->in, vr->in_memory);
if (vr->in_memory->is_closed) vr->in->is_closed = TRUE;
}
switch (vr->backend->handle_request_body(vr, vr->backend)) {
case LI_HANDLER_GO_ON:
break;

Loading…
Cancel
Save