asynchronous, bidirectional streaming options

asynchronous, bidirectional streaming support for request and response
Merge branch 'bug-949-streaming-request-response' into gmaster

github: closes #66
personal/stbuehler/mod-csrf-old
Glenn Strauss 2016-06-19 23:37:36 -04:00
commit b14e1f16f0
20 changed files with 864 additions and 429 deletions

View File

@ -263,6 +263,8 @@ typedef struct {
unsigned short use_xattr;
unsigned short follow_symlink;
unsigned short range_requests;
unsigned short stream_request_body;
unsigned short stream_response_body;
/* debug */

View File

@ -461,7 +461,7 @@ static chunk *chunkqueue_get_append_tempfile(chunkqueue *cq) {
static void chunkqueue_remove_empty_chunks(chunkqueue *cq);
static int chunkqueue_append_to_tempfile(server *srv, chunkqueue *dest, const char *mem, size_t len) {
int chunkqueue_append_mem_to_tempfile(server *srv, chunkqueue *dest, const char *mem, size_t len) {
chunk *dst_c;
ssize_t written;
@ -599,7 +599,7 @@ int chunkqueue_steal_with_tempfiles(server *srv, chunkqueue *dest, chunkqueue *s
case MEM_CHUNK:
/* store "use" bytes from memory chunk in tempfile */
if (0 != chunkqueue_append_to_tempfile(srv, dest, c->mem->ptr + c->offset, use)) {
if (0 != chunkqueue_append_mem_to_tempfile(srv, dest, c->mem->ptr + c->offset, use)) {
return -1;
}

View File

@ -57,6 +57,9 @@ void chunkqueue_append_mem(chunkqueue *cq, const char *mem, size_t len); /* copi
void chunkqueue_append_buffer(chunkqueue *cq, buffer *mem); /* may reset "mem" */
void chunkqueue_prepend_buffer(chunkqueue *cq, buffer *mem); /* may reset "mem" */
struct server; /*(declaration)*/
int chunkqueue_append_mem_to_tempfile(struct server *srv, chunkqueue *cq, const char *mem, size_t len);
/* functions to handle buffers to read into: */
/* return a pointer to a buffer in *mem with size *len;
* it should be at least min_size big, and use alloc_size if

View File

@ -119,6 +119,8 @@ static int config_insert(server *srv) {
{ "server.http-parseopt-host-strict", NULL, T_CONFIG_BOOLEAN, T_CONFIG_SCOPE_SERVER }, /* 73 */
{ "server.http-parseopt-host-normalize",NULL,T_CONFIG_BOOLEAN, T_CONFIG_SCOPE_SERVER }, /* 74 */
{ "server.bsd-accept-filter", NULL, T_CONFIG_STRING, T_CONFIG_SCOPE_CONNECTION }, /* 75 */
{ "server.stream-request-body", NULL, T_CONFIG_SHORT, T_CONFIG_SCOPE_CONNECTION }, /* 76 */
{ "server.stream-response-body", NULL, T_CONFIG_SHORT, T_CONFIG_SCOPE_CONNECTION }, /* 77 */
{ "server.host",
"use server.bind instead",
@ -248,6 +250,8 @@ static int config_insert(server *srv) {
s->ssl_verifyclient_export_cert = 0;
s->ssl_disable_client_renegotiation = 1;
s->listen_backlog = (0 == i ? 1024 : srv->config_storage[0]->listen_backlog);
s->stream_request_body = 0;
s->stream_response_body = 0;
/* all T_CONFIG_SCOPE_CONNECTION options */
cv[2].destination = s->errorfile_prefix;
@ -310,12 +314,21 @@ static int config_insert(server *srv) {
|| defined(__OpenBSD__) || defined(__DragonflyBSD__)
cv[75].destination = s->bsd_accept_filter;
#endif
cv[76].destination = &(s->stream_request_body);
cv[77].destination = &(s->stream_response_body);
srv->config_storage[i] = s;
if (0 != (ret = config_insert_values_global(srv, config->value, cv, i == 0 ? T_CONFIG_SCOPE_SERVER : T_CONFIG_SCOPE_CONNECTION))) {
break;
}
if (s->stream_request_body & FDEVENT_STREAM_REQUEST_BUFMIN) {
s->stream_request_body |= FDEVENT_STREAM_REQUEST;
}
if (s->stream_response_body & FDEVENT_STREAM_RESPONSE_BUFMIN) {
s->stream_response_body |= FDEVENT_STREAM_RESPONSE;
}
}
{
@ -453,6 +466,9 @@ int config_setup_connection(server *srv, connection *con) {
PATCH(range_requests);
PATCH(force_lowercase_filenames);
/*PATCH(listen_backlog);*//*(not necessary; used only at startup)*/
PATCH(stream_request_body);
PATCH(stream_response_body);
PATCH(ssl_enabled);
PATCH(ssl_pemfile);
@ -563,6 +579,10 @@ int config_patch_connection(server *srv, connection *con) {
buffer_copy_buffer(con->server_name, s->server_name);
} else if (buffer_is_equal_string(du->key, CONST_STR_LEN("server.tag"))) {
PATCH(server_tag);
} else if (buffer_is_equal_string(du->key, CONST_STR_LEN("server.stream-request-body"))) {
PATCH(stream_request_body);
} else if (buffer_is_equal_string(du->key, CONST_STR_LEN("server.stream-response-body"))) {
PATCH(stream_response_body);
} else if (buffer_is_equal_string(du->key, CONST_STR_LEN("connection.kbytes-per-second"))) {
PATCH(kbytes_per_second);
} else if (buffer_is_equal_string(du->key, CONST_STR_LEN("debug.log-request-handling"))) {

View File

@ -212,10 +212,8 @@ static int connection_handle_read_ssl(server *srv, connection *con) {
return -2;
} else {
joblist_append(srv, con);
return 0;
}
return 0;
#else
UNUSED(srv);
UNUSED(con);
@ -359,7 +357,10 @@ handler_t connection_handle_read_post_state(server *srv, connection *con) {
if (dst_cq->bytes_in == (off_t)con->request.content_length) {
/* Content is ready */
connection_set_state(srv, con, CON_STATE_HANDLE_REQUEST);
con->conf.stream_request_body &= ~FDEVENT_STREAM_REQUEST_POLLIN;
if (con->state == CON_STATE_READ_POST) {
connection_set_state(srv, con, CON_STATE_HANDLE_REQUEST);
}
return HANDLER_GO_ON;
} else if (is_closed) {
#if 0
@ -372,6 +373,9 @@ handler_t connection_handle_read_post_state(server *srv, connection *con) {
#endif
return HANDLER_ERROR;
} else {
return HANDLER_WAIT_FOR_EVENT;
con->conf.stream_request_body |= FDEVENT_STREAM_REQUEST_POLLIN;
return (con->conf.stream_request_body & FDEVENT_STREAM_REQUEST)
? HANDLER_GO_ON
: HANDLER_WAIT_FOR_EVENT;
}
}

View File

@ -169,6 +169,7 @@ static void connection_handle_errdoc_init(server *srv, connection *con) {
}
}
con->response.transfer_encoding = 0;
buffer_reset(con->physical.path);
array_reset(con->response.headers);
chunkqueue_reset(con->write_queue);
@ -291,7 +292,6 @@ static int connection_handle_write_prepare(server *srv, connection *con) {
http_chunk_append_buffer(srv, con, b);
buffer_free(b);
http_chunk_close(srv, con);
response_header_overwrite(srv, con, CONST_STR_LEN("Content-Type"), CONST_STR_LEN("text/html"));
}
@ -341,7 +341,21 @@ static int connection_handle_write_prepare(server *srv, connection *con) {
if (((con->parsed_response & HTTP_CONTENT_LENGTH) == 0) &&
((con->response.transfer_encoding & HTTP_TRANSFER_ENCODING_CHUNKED) == 0)) {
con->keep_alive = 0;
if (con->request.http_version == HTTP_VERSION_1_1) {
off_t qlen = chunkqueue_length(con->write_queue);
con->response.transfer_encoding = HTTP_TRANSFER_ENCODING_CHUNKED;
if (qlen) {
/* create initial Transfer-Encoding: chunked segment */
buffer *b = srv->tmp_chunk_len;
buffer_string_set_length(b, 0);
buffer_append_uint_hex(b, (uintmax_t)qlen);
buffer_append_string_len(b, CONST_STR_LEN("\r\n"));
chunkqueue_prepend_buffer(con->write_queue, b);
chunkqueue_append_mem(con->write_queue, CONST_STR_LEN("\r\n"));
}
} else {
con->keep_alive = 0;
}
}
/**
@ -361,12 +375,6 @@ static int connection_handle_write_prepare(server *srv, connection *con) {
}
}
if (con->request.content_length
&& (off_t)con->request.content_length > con->request_content_queue->bytes_in) {
/* request body is present and has not been read completely */
con->keep_alive = 0;
}
if (con->request.http_method == HTTP_METHOD_HEAD) {
/**
* a HEAD request has the same as a GET
@ -389,18 +397,15 @@ static int connection_handle_write(server *srv, connection *con) {
con->write_request_ts = srv->cur_ts;
if (con->file_finished) {
connection_set_state(srv, con, CON_STATE_RESPONSE_END);
joblist_append(srv, con);
}
break;
case -1: /* error on our side */
log_error_write(srv, __FILE__, __LINE__, "sd",
"connection closed: write failed on fd", con->fd);
connection_set_state(srv, con, CON_STATE_ERROR);
joblist_append(srv, con);
break;
case -2: /* remote close */
connection_set_state(srv, con, CON_STATE_ERROR);
joblist_append(srv, con);
break;
case 1:
con->write_request_ts = srv->cur_ts;
@ -1036,6 +1041,11 @@ int connection_state_machine(server *srv, connection *con) {
}
switch (r = http_response_prepare(srv, con)) {
case HANDLER_WAIT_FOR_EVENT:
if (!con->file_finished && (!con->file_started || 0 == con->conf.stream_response_body)) {
break; /* come back here */
}
/* response headers received from backend; fall through to start response */
case HANDLER_FINISHED:
if (con->error_handler_saved_status > 0) {
con->request.http_method = con->error_handler_saved_method;
@ -1126,9 +1136,6 @@ int connection_state_machine(server *srv, connection *con) {
break;
case HANDLER_COMEBACK:
done = -1;
/* fallthrough */
case HANDLER_WAIT_FOR_EVENT:
/* come back here */
break;
case HANDLER_ERROR:
/* something went wrong */
@ -1168,6 +1175,12 @@ int connection_state_machine(server *srv, connection *con) {
"state for fd", con->fd, connection_get_state(con->state));
}
if (con->request.content_length
&& (off_t)con->request.content_length > con->request_content_queue->bytes_in) {
/* request body is present and has not been read completely */
con->keep_alive = 0;
}
plugins_call_handle_request_done(srv, con);
srv->con_written++;
@ -1278,19 +1291,44 @@ int connection_state_machine(server *srv, connection *con) {
"state for fd", con->fd, connection_get_state(con->state));
}
/* only try to write if we have something in the queue */
if (!chunkqueue_is_empty(con->write_queue)) {
if (con->is_writable) {
if (-1 == connection_handle_write(srv, con)) {
log_error_write(srv, __FILE__, __LINE__, "ds",
con->fd,
"handle write failed.");
do {
/* only try to write if we have something in the queue */
if (!chunkqueue_is_empty(con->write_queue)) {
if (con->is_writable) {
if (-1 == connection_handle_write(srv, con)) {
log_error_write(srv, __FILE__, __LINE__, "ds",
con->fd,
"handle write failed.");
connection_set_state(srv, con, CON_STATE_ERROR);
break;
}
if (con->state != CON_STATE_WRITE) break;
}
} else if (con->file_finished) {
connection_set_state(srv, con, CON_STATE_RESPONSE_END);
break;
}
if (con->mode != DIRECT && !con->file_finished) {
switch(r = plugins_call_handle_subrequest(srv, con)) {
case HANDLER_WAIT_FOR_EVENT:
case HANDLER_FINISHED:
case HANDLER_GO_ON:
break;
case HANDLER_WAIT_FOR_FD:
srv->want_fds++;
fdwaitqueue_append(srv, con);
break;
case HANDLER_COMEBACK:
default:
log_error_write(srv, __FILE__, __LINE__, "sdd", "unexpected subrequest handler ret-value: ", con->fd, r);
/* fall through */
case HANDLER_ERROR:
connection_set_state(srv, con, CON_STATE_ERROR);
break;
}
}
} else if (con->file_finished) {
connection_set_state(srv, con, CON_STATE_RESPONSE_END);
}
} while (con->state == CON_STATE_WRITE && (!chunkqueue_is_empty(con->write_queue) ? con->is_writable : con->file_finished));
break;
case CON_STATE_ERROR: /* transient */
@ -1416,11 +1454,11 @@ int connection_state_machine(server *srv, connection *con) {
connection_get_state(con->state));
}
r = 0;
switch(con->state) {
case CON_STATE_READ_POST:
case CON_STATE_READ:
case CON_STATE_CLOSE:
fdevent_event_set(srv->ev, &(con->fde_ndx), con->fd, FDEVENT_IN);
r = FDEVENT_IN;
break;
case CON_STATE_WRITE:
/* request write-fdevent only if we really need it
@ -1430,15 +1468,30 @@ int connection_state_machine(server *srv, connection *con) {
if (!chunkqueue_is_empty(con->write_queue) &&
(con->is_writable == 0) &&
(con->traffic_limit_reached == 0)) {
fdevent_event_set(srv->ev, &(con->fde_ndx), con->fd, FDEVENT_OUT);
} else {
fdevent_event_set(srv->ev, &(con->fde_ndx), con->fd, 0);
r |= FDEVENT_OUT;
}
/* fall through */
case CON_STATE_READ_POST:
if (con->conf.stream_request_body & FDEVENT_STREAM_REQUEST_POLLIN) {
r |= FDEVENT_IN;
}
break;
default:
fdevent_event_set(srv->ev, &(con->fde_ndx), con->fd, 0);
break;
}
if (-1 != con->fd) {
const int events = fdevent_event_get_interest(srv->ev, con->fd);
if (r != events) {
/* update timestamps when enabling interest in events */
if ((r & FDEVENT_IN) && !(events & FDEVENT_IN)) {
con->read_idle_ts = srv->cur_ts;
}
if ((r & FDEVENT_OUT) && !(events & FDEVENT_OUT)) {
con->write_request_ts = srv->cur_ts;
}
fdevent_event_set(srv->ev, &con->fde_ndx, con->fd, r);
}
}
return 0;
}

View File

@ -171,6 +171,30 @@ void fdevent_event_set(fdevents *ev, int *fde_ndx, int fd, int events) {
ev->fdarray[fd]->events = events;
}
void fdevent_event_add(fdevents *ev, int *fde_ndx, int fd, int event) {
int events;
if (-1 == fd) return;
events = ev->fdarray[fd]->events;
if ((events & event) || 0 == event) return; /*(no change; nothing to do)*/
events |= event;
if (ev->event_set) *fde_ndx = ev->event_set(ev, *fde_ndx, fd, events);
ev->fdarray[fd]->events = events;
}
void fdevent_event_clr(fdevents *ev, int *fde_ndx, int fd, int event) {
int events;
if (-1 == fd) return;
events = ev->fdarray[fd]->events;
if (!(events & event)) return; /*(no change; nothing to do)*/
events &= ~event;
if (ev->event_set) *fde_ndx = ev->event_set(ev, *fde_ndx, fd, events);
ev->fdarray[fd]->events = events;
}
int fdevent_poll(fdevents *ev, int timeout_ms) {
if (ev->poll == NULL) SEGFAULT();
return ev->poll(ev, timeout_ms);

View File

@ -65,6 +65,13 @@ typedef handler_t (*fdevent_handler)(struct server *srv, void *ctx, int revents)
#define FDEVENT_HUP BV(4)
#define FDEVENT_NVAL BV(5)
#define FDEVENT_STREAM_REQUEST BV(0)
#define FDEVENT_STREAM_REQUEST_BUFMIN BV(1)
#define FDEVENT_STREAM_REQUEST_POLLIN BV(15)
#define FDEVENT_STREAM_RESPONSE BV(0)
#define FDEVENT_STREAM_RESPONSE_BUFMIN BV(1)
typedef enum { FD_EVENT_TYPE_UNSET = -1,
FD_EVENT_TYPE_CONNECTION,
FD_EVENT_TYPE_FCGI_CONNECTION,
@ -173,7 +180,11 @@ fdevents *fdevent_init(struct server *srv, size_t maxfds, fdevent_handler_t type
int fdevent_reset(fdevents *ev); /* "init" after fork() */
void fdevent_free(fdevents *ev);
#define fdevent_event_get_interest(ev, fd) \
(-1 != (fd) ? (ev)->fdarray[(fd)]->events : 0)
void fdevent_event_set(fdevents *ev, int *fde_ndx, int fd, int events); /* events can be FDEVENT_IN, FDEVENT_OUT or FDEVENT_IN | FDEVENT_OUT */
void fdevent_event_add(fdevents *ev, int *fde_ndx, int fd, int event); /* events can be FDEVENT_IN or FDEVENT_OUT */
void fdevent_event_clr(fdevents *ev, int *fde_ndx, int fd, int event); /* events can be FDEVENT_IN or FDEVENT_OUT */
void fdevent_event_del(fdevents *ev, int *fde_ndx, int fd);
int fdevent_event_get_revent(fdevents *ev, size_t ndx);
int fdevent_event_get_fd(fdevents *ev, size_t ndx);

View File

@ -726,3 +726,37 @@ void http_response_xsendfile (server *srv, connection *con, buffer *path, const
con->http_status = status;
}
}
void http_response_backend_error (server *srv, connection *con) {
UNUSED(srv);
if (con->file_started) {
/*(response might have been already started, kill the connection)*/
/*(mode == DIRECT to avoid later call to http_response_backend_done())*/
con->mode = DIRECT; /*(avoid sending final chunked block)*/
con->keep_alive = 0; /*(no keep-alive; final chunked block not sent)*/
con->file_finished = 1;
} /*(else error status set later by http_response_backend_done())*/
}
void http_response_backend_done (server *srv, connection *con) {
/* (not CON_STATE_ERROR and not CON_STATE_RESPONSE_END,
* i.e. not called from handle_connection_close or connection_reset
* hooks, except maybe from errdoc handler, which later resets state)*/
switch (con->state) {
case CON_STATE_HANDLE_REQUEST:
case CON_STATE_READ_POST:
if (!con->file_started) {
/* Send an error if we haven't sent any data yet */
con->http_status = 500;
con->mode = DIRECT;
break;
} /* else fall through */
case CON_STATE_WRITE:
if (!con->file_finished) {
http_chunk_close(srv, con);
con->file_finished = 1;
}
default:
break;
}
}

View File

@ -94,45 +94,83 @@ int http_chunk_append_file(server *srv, connection *con, buffer *fn) {
return 0;
}
void http_chunk_append_buffer(server *srv, connection *con, buffer *mem) {
chunkqueue *cq;
force_assert(NULL != con);
if (buffer_string_is_empty(mem)) return;
cq = con->write_queue;
static int http_chunk_append_to_tempfile(server *srv, connection *con, const char * mem, size_t len) {
chunkqueue * const cq = con->write_queue;
if (con->response.transfer_encoding & HTTP_TRANSFER_ENCODING_CHUNKED) {
http_chunk_append_len(srv, con, buffer_string_length(mem));
/*http_chunk_append_len(srv, con, len);*/
buffer *b = srv->tmp_chunk_len;
buffer_string_set_length(b, 0);
buffer_append_uint_hex(b, len);
buffer_append_string_len(b, CONST_STR_LEN("\r\n"));
if (0 != chunkqueue_append_mem_to_tempfile(srv, cq, CONST_BUF_LEN(b))) {
return -1;
}
}
chunkqueue_append_buffer(cq, mem);
if (0 != chunkqueue_append_mem_to_tempfile(srv, cq, mem, len)) {
return -1;
}
if (con->response.transfer_encoding & HTTP_TRANSFER_ENCODING_CHUNKED) {
chunkqueue_append_mem(cq, CONST_STR_LEN("\r\n"));
if (0 != chunkqueue_append_mem_to_tempfile(srv, cq, CONST_STR_LEN("\r\n"))) {
return -1;
}
}
return 0;
}
void http_chunk_append_mem(server *srv, connection *con, const char * mem, size_t len) {
chunkqueue *cq;
static int http_chunk_append_data(server *srv, connection *con, buffer *b, const char * mem, size_t len) {
force_assert(NULL != con);
force_assert(NULL != mem || 0 == len);
chunkqueue * const cq = con->write_queue;
chunk *c = cq->last;
if (0 == len) return 0;
if (NULL == mem || 0 == len) return;
/* current usage does not append_mem or append_buffer after appending
* file, so not checking if users of this interface have appended large
* (references to) files to chunkqueue, which would not be in memory */
cq = con->write_queue;
/*(allow slightly larger mem use if FDEVENT_STREAM_RESPONSE_BUFMIN
* to reduce creation of temp files when backend producer will be
* blocked until more data is sent to network to client)*/
if ((c && c->type == FILE_CHUNK && c->file.is_temp)
|| cq->bytes_in - cq->bytes_out + len
> 1024 * ((con->conf.stream_response_body & FDEVENT_STREAM_RESPONSE_BUFMIN) ? 128 : 64)) {
return http_chunk_append_to_tempfile(srv, con, b ? b->ptr : mem, len);
}
/* not appending to prior mem chunk just in case using openssl
* and need to resubmit same args as prior call to openssl (required?)*/
if (con->response.transfer_encoding & HTTP_TRANSFER_ENCODING_CHUNKED) {
http_chunk_append_len(srv, con, len);
}
chunkqueue_append_mem(cq, mem, len);
/*(chunkqueue_append_buffer() might steal buffer contents)*/
b ? chunkqueue_append_buffer(cq, b) : chunkqueue_append_mem(cq, mem, len);
if (con->response.transfer_encoding & HTTP_TRANSFER_ENCODING_CHUNKED) {
chunkqueue_append_mem(cq, CONST_STR_LEN("\r\n"));
}
return 0;
}
int http_chunk_append_buffer(server *srv, connection *con, buffer *mem) {
force_assert(NULL != con);
return http_chunk_append_data(srv, con, mem, NULL, buffer_string_length(mem));
}
int http_chunk_append_mem(server *srv, connection *con, const char * mem, size_t len) {
force_assert(NULL != con);
force_assert(NULL != mem || 0 == len);
return http_chunk_append_data(srv, con, NULL, mem, len);
}
void http_chunk_close(server *srv, connection *con) {

View File

@ -5,8 +5,8 @@
#include "server.h"
#include <sys/types.h>
void http_chunk_append_mem(server *srv, connection *con, const char * mem, size_t len); /* copies memory */
void http_chunk_append_buffer(server *srv, connection *con, buffer *mem); /* may reset "mem" */
int http_chunk_append_mem(server *srv, connection *con, const char * mem, size_t len); /* copies memory */
int http_chunk_append_buffer(server *srv, connection *con, buffer *mem); /* may reset "mem" */
int http_chunk_append_file(server *srv, connection *con, buffer *fn); /* copies "fn" */
int http_chunk_append_file_range(server *srv, connection *con, buffer *fn, off_t offset, off_t len); /* copies "fn" */
void http_chunk_close(server *srv, connection *con);

View File

@ -400,6 +400,7 @@ static int cgi_demux_response(server *srv, handler_ctx *hctx) {
if (-1 == (n = read(hctx->fd, hctx->response->ptr, hctx->response->size - 1))) {
if (errno == EAGAIN || errno == EINTR) {
/* would block, wait for signal */
fdevent_event_add(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_IN);
return FDEVENT_HANDLED_NOT_FINISHED;
}
/* error */
@ -409,12 +410,6 @@ static int cgi_demux_response(server *srv, handler_ctx *hctx) {
if (n == 0) {
/* read finished */
con->file_finished = 1;
/* send final chunk */
http_chunk_close(srv, con);
return FDEVENT_HANDLED_FINISHED;
}
@ -495,12 +490,9 @@ static int cgi_demux_response(server *srv, handler_ctx *hctx) {
if (is_header_end) {
if (!is_header) {
/* no header, but a body */
if (con->request.http_version == HTTP_VERSION_1_1) {
con->response.transfer_encoding = HTTP_TRANSFER_ENCODING_CHUNKED;
if (0 != http_chunk_append_buffer(srv, con, hctx->response_header)) {
return FDEVENT_HANDLED_ERROR;
}
http_chunk_append_buffer(srv, con, hctx->response_header);
} else {
const char *bstart;
size_t blen;
@ -534,21 +526,39 @@ static int cgi_demux_response(server *srv, handler_ctx *hctx) {
}
}
/* enable chunked-transfer-encoding */
if (con->request.http_version == HTTP_VERSION_1_1 &&
!(con->parsed_response & HTTP_CONTENT_LENGTH)) {
con->response.transfer_encoding = HTTP_TRANSFER_ENCODING_CHUNKED;
}
if (blen > 0) {
http_chunk_append_mem(srv, con, bstart, blen);
if (0 != http_chunk_append_mem(srv, con, bstart, blen)) {
return FDEVENT_HANDLED_ERROR;
}
}
}
con->file_started = 1;
} else {
/*(reuse MAX_HTTP_REQUEST_HEADER as max size for response headers from backends)*/
if (header_len > MAX_HTTP_REQUEST_HEADER) {
log_error_write(srv, __FILE__, __LINE__, "sb", "response headers too large for", con->uri.path);
con->http_status = 502; /* Bad Gateway */
con->mode = DIRECT;
return FDEVENT_HANDLED_FINISHED;
}
}
} else {
http_chunk_append_buffer(srv, con, hctx->response);
if (0 != http_chunk_append_buffer(srv, con, hctx->response)) {
return FDEVENT_HANDLED_ERROR;
}
if ((con->conf.stream_response_body & FDEVENT_STREAM_RESPONSE_BUFMIN)
&& chunkqueue_length(con->write_queue) > 65536 - 4096) {
if (!con->is_writable) {
/*(defer removal of FDEVENT_IN interest since
* connection_state_machine() might be able to send data
* immediately, unless !con->is_writable, where
* connection_state_machine() might not loop back to call
* mod_cgi_handle_subrequest())*/
fdevent_event_clr(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_IN);
}
break;
}
}
#if 0
@ -656,20 +666,9 @@ static void cgi_connection_close(server *srv, handler_ctx *hctx) {
}
#endif
/* finish response (if not already finished) */
if (con->mode == p->id
&& (con->state == CON_STATE_HANDLE_REQUEST || con->state == CON_STATE_READ_POST)) {
/* (not CON_STATE_ERROR and not CON_STATE_RESPONSE_END,
* i.e. not called from cgi_connection_close_callback()) */
/* Send an error if we haven't sent any data yet */
if (0 == con->file_started) {
con->http_status = 500;
con->mode = DIRECT;
} else if (0 == con->file_finished) {
http_chunk_close(srv, con);
con->file_finished = 1;
}
/* finish response (if not already con->file_started, con->file_finished) */
if (con->mode == p->id) {
http_response_backend_done(srv, con);
}
}
@ -702,8 +701,13 @@ static handler_t cgi_handle_fdevent_send (server *srv, void *ctx, int revents) {
if (revents & FDEVENT_HUP) {
/* skip sending remaining data to CGI */
chunkqueue *cq = con->request_content_queue;
chunkqueue_mark_written(cq, chunkqueue_length(cq));
if (con->request.content_length) {
chunkqueue *cq = con->request_content_queue;
chunkqueue_mark_written(cq, chunkqueue_length(cq));
if (cq->bytes_in != (off_t)con->request.content_length) {
con->keep_alive = 0;
}
}
cgi_connection_close_fdtocgi(srv, hctx); /*(closes only hctx->fdtocgi)*/
} else if (revents & FDEVENT_ERR) {
@ -719,13 +723,7 @@ static handler_t cgi_handle_fdevent_send (server *srv, void *ctx, int revents) {
}
static handler_t cgi_handle_fdevent(server *srv, void *ctx, int revents) {
handler_ctx *hctx = ctx;
connection *con = hctx->remote_conn;
joblist_append(srv, con);
if (revents & FDEVENT_IN) {
static int cgi_recv_response(server *srv, handler_ctx *hctx) {
switch (cgi_demux_response(srv, hctx)) {
case FDEVENT_HANDLED_NOT_FINISHED:
break;
@ -745,25 +743,47 @@ static handler_t cgi_handle_fdevent(server *srv, void *ctx, int revents) {
cgi_connection_close(srv, hctx);
return HANDLER_FINISHED;
}
}
if (revents & FDEVENT_OUT) {
/* nothing to do */
return HANDLER_GO_ON;
}
static handler_t cgi_handle_fdevent(server *srv, void *ctx, int revents) {
handler_ctx *hctx = ctx;
connection *con = hctx->remote_conn;
joblist_append(srv, con);
if (revents & FDEVENT_IN) {
handler_t rc = cgi_recv_response(srv, hctx);/*(might invalidate hctx)*/
if (rc != HANDLER_GO_ON) return rc; /*(unless HANDLER_GO_ON)*/
}
/* perhaps this issue is already handled */
if (revents & FDEVENT_HUP) {
/* check if we still have a unfinished header package which is a body in reality */
if (con->file_started == 0 && !buffer_string_is_empty(hctx->response_header)) {
if (con->file_started) {
/* drain any remaining data from kernel pipe buffers
* even if (con->conf.stream_response_body
* & FDEVENT_STREAM_RESPONSE_BUFMIN)
* since event loop will spin on fd FDEVENT_HUP event
* until unregistered. */
handler_t rc;
do {
rc = cgi_recv_response(srv,hctx);/*(might invalidate hctx)*/
} while (rc == HANDLER_GO_ON); /*(unless HANDLER_GO_ON)*/
return rc; /* HANDLER_FINISHED or HANDLER_ERROR */
} else if (!buffer_string_is_empty(hctx->response_header)) {
/* unfinished header package which is a body in reality */
con->file_started = 1;
http_chunk_append_buffer(srv, con, hctx->response_header);
}
if (0 != http_chunk_append_buffer(srv, con, hctx->response_header)) {
cgi_connection_close(srv, hctx);
return HANDLER_ERROR;
}
} else {
# if 0
log_error_write(srv, __FILE__, __LINE__, "sddd", "got HUP from cgi", con->fd, hctx->fd, revents);
log_error_write(srv, __FILE__, __LINE__, "sddd", "got HUP from cgi", con->fd, hctx->fd, revents);
# endif
/* rtsigs didn't liked the close */
}
cgi_connection_close(srv, hctx);
} else if (revents & FDEVENT_ERR) {
/* kill all connections to the cgi process */
@ -962,10 +982,10 @@ static int cgi_write_request(server *srv, handler_ctx *hctx, int fd) {
}
}
if (chunkqueue_is_empty(cq)) {
if (cq->bytes_out == (off_t)con->request.content_length) {
/* sent all request body input */
/* close connection to the cgi-script */
if (-1 == hctx->fdtocgi) { /*(entire request body sent in initial send to pipe buffer)*/
if (-1 == hctx->fdtocgi) { /*(received request body sent in initial send to pipe buffer)*/
if (close(fd)) {
log_error_write(srv, __FILE__, __LINE__, "sds", "cgi stdin close failed ", fd, strerror(errno));
}
@ -973,11 +993,25 @@ static int cgi_write_request(server *srv, handler_ctx *hctx, int fd) {
cgi_connection_close_fdtocgi(srv, hctx); /*(closes only hctx->fdtocgi)*/
}
} else {
/* more request body remains to be sent to CGI so register for fdevents */
off_t cqlen = cq->bytes_in - cq->bytes_out;
if (cq->bytes_in < (off_t)con->request.content_length && cqlen < 65536 - 16384) {
/*(con->conf.stream_request_body & FDEVENT_STREAM_REQUEST)*/
if (!(con->conf.stream_request_body & FDEVENT_STREAM_REQUEST_POLLIN)) {
con->conf.stream_request_body |= FDEVENT_STREAM_REQUEST_POLLIN;
con->is_readable = 1; /* trigger optimistic read from client */
}
}
if (-1 == hctx->fdtocgi) { /*(not registered yet)*/
hctx->fdtocgi = fd;
hctx->fde_ndx_tocgi = -1;
fdevent_register(srv->ev, hctx->fdtocgi, cgi_handle_fdevent_send, hctx);
}
if (0 == cqlen) { /*(chunkqueue_is_empty(cq))*/
if ((fdevent_event_get_interest(srv->ev, hctx->fdtocgi) & FDEVENT_OUT)) {
fdevent_event_set(srv->ev, &(hctx->fde_ndx_tocgi), hctx->fdtocgi, 0);
}
} else {
/* more request body remains to be sent to CGI so register for fdevents */
fdevent_event_set(srv->ev, &(hctx->fde_ndx_tocgi), hctx->fdtocgi, FDEVENT_OUT);
}
}
@ -1484,13 +1518,38 @@ TRIGGER_FUNC(cgi_trigger) {
SUBREQUEST_FUNC(mod_cgi_handle_subrequest) {
plugin_data *p = p_d;
handler_ctx *hctx = con->plugin_ctx[p->id];
chunkqueue *cq = con->request_content_queue;
if (con->mode != p->id) return HANDLER_GO_ON;
if (NULL == hctx) return HANDLER_GO_ON;
if (con->state == CON_STATE_READ_POST) {
handler_t r = connection_handle_read_post_state(srv, con);
if (r != HANDLER_GO_ON) return r;
if ((con->conf.stream_response_body & FDEVENT_STREAM_RESPONSE_BUFMIN)
&& con->file_started) {
if (chunkqueue_length(con->write_queue) > 65536 - 4096) {
fdevent_event_clr(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_IN);
} else if (!(fdevent_event_get_interest(srv->ev, hctx->fd) & FDEVENT_IN)) {
/* optimistic read from backend, which might re-enable FDEVENT_IN */
handler_t rc = cgi_recv_response(srv, hctx); /*(might invalidate hctx)*/
if (rc != HANDLER_GO_ON) return rc; /*(unless HANDLER_GO_ON)*/
}
}
if (cq->bytes_in != (off_t)con->request.content_length) {
/*(64k - 4k to attempt to avoid temporary files
* in conjunction with FDEVENT_STREAM_REQUEST_BUFMIN)*/
if (cq->bytes_in - cq->bytes_out > 65536 - 4096
&& (con->conf.stream_request_body & FDEVENT_STREAM_REQUEST_BUFMIN)){
con->conf.stream_request_body &= ~FDEVENT_STREAM_REQUEST_POLLIN;
if (-1 != hctx->fd) return HANDLER_WAIT_FOR_EVENT;
} else {
handler_t r = connection_handle_read_post_state(srv, con);
if (!chunkqueue_is_empty(cq)) {
if (fdevent_event_get_interest(srv->ev, hctx->fdtocgi) & FDEVENT_OUT) {
return (r == HANDLER_GO_ON) ? HANDLER_WAIT_FOR_EVENT : r;
}
}
if (r != HANDLER_GO_ON) return r;
}
}
if (-1 == hctx->fd) {
@ -1502,14 +1561,18 @@ SUBREQUEST_FUNC(mod_cgi_handle_subrequest) {
return HANDLER_FINISHED;
}
}
#if 0
log_error_write(srv, __FILE__, __LINE__, "sdd", "subrequest, pid =", hctx, hctx->pid);
#endif
} else if (!chunkqueue_is_empty(con->request_content_queue)) {
if (0 != cgi_write_request(srv, hctx, hctx->fdtocgi)) {
cgi_connection_close(srv, hctx);
return HANDLER_ERROR;
}
}
/* if not done, wait for CGI to close stdout, so we read EOF on pipe */
return con->file_finished ? HANDLER_FINISHED : HANDLER_WAIT_FOR_EVENT;
return HANDLER_WAIT_FOR_EVENT;
}

View File

@ -338,20 +338,20 @@ typedef struct {
fcgi_connection_state_t state;
time_t state_timestamp;
int reconnects; /* number of reconnect attempts */
chunkqueue *rb; /* read queue */
chunkqueue *wb; /* write queue */
off_t wb_reqlen;
buffer *response_header;
size_t request_id;
int fd; /* fd to the fastcgi process */
int fde_ndx; /* index into the fd-event buffer */
pid_t pid;
int got_proc;
int reconnects; /* number of reconnect attempts */
int request_id;
int send_content_body;
plugin_config conf;
@ -497,6 +497,7 @@ static handler_ctx * handler_ctx_init(void) {
hctx->rb = chunkqueue_init();
hctx->wb = chunkqueue_init();
hctx->wb_reqlen = 0;
return hctx;
}
@ -1585,21 +1586,9 @@ static void fcgi_connection_close(server *srv, handler_ctx *hctx) {
handler_ctx_free(srv, hctx);
con->plugin_ctx[p->id] = NULL;
/* finish response (if not already finished) */
if (con->mode == p->id
&& (con->state == CON_STATE_HANDLE_REQUEST || con->state == CON_STATE_READ_POST)) {
/* (not CON_STATE_ERROR and not CON_STATE_RESPONSE_END,
* i.e. not called from fcgi_connection_reset()) */
/* Send an error if we haven't sent any data yet */
if (0 == con->file_started) {
con->http_status = 500;
con->mode = DIRECT;
}
else if (!con->file_finished) {
http_chunk_close(srv, con);
con->file_finished = 1;
}
/* finish response (if not already con->file_started, con->file_finished) */
if (con->mode == p->id) {
http_response_backend_done(srv, con);
}
}
@ -1724,7 +1713,7 @@ static int fcgi_env_add(buffer *env, const char *key, size_t key_len, const char
return 0;
}
static int fcgi_header(FCGI_Header * header, unsigned char type, size_t request_id, int contentLength, unsigned char paddingLength) {
static int fcgi_header(FCGI_Header * header, unsigned char type, int request_id, int contentLength, unsigned char paddingLength) {
force_assert(contentLength <= FCGI_MAX_LENGTH);
header->version = FCGI_VERSION_1;
@ -1874,7 +1863,6 @@ static connection_result_t fcgi_establish_connection(server *srv, handler_ctx *h
#define FCGI_ENV_ADD_CHECK(ret, con) \
if (ret == -1) { \
con->http_status = 400; \
con->file_finished = 1; \
return -1; \
};
static int fcgi_env_add_request_headers(server *srv, connection *con, plugin_data *p) {
@ -1907,7 +1895,42 @@ static int fcgi_env_add_request_headers(server *srv, connection *con, plugin_dat
return 0;
}
static int fcgi_create_env(server *srv, handler_ctx *hctx, size_t request_id) {
static void fcgi_stdin_append(server *srv, connection *con, handler_ctx *hctx, int request_id) {
FCGI_Header header;
chunkqueue *req_cq = con->request_content_queue;
plugin_data *p = hctx->plugin_data;
off_t offset, weWant;
const off_t req_cqlen = req_cq->bytes_in - req_cq->bytes_out;
/* something to send ? */
for (offset = 0; offset != req_cqlen; offset += weWant) {
weWant = req_cqlen - offset > FCGI_MAX_LENGTH ? FCGI_MAX_LENGTH : req_cqlen - offset;
/* we announce toWrite octets
* now take all request_content chunks available
* */
fcgi_header(&(header), FCGI_STDIN, request_id, weWant, 0);
chunkqueue_append_mem(hctx->wb, (const char *)&header, sizeof(header));
hctx->wb_reqlen += sizeof(header);
if (p->conf.debug > 10) {
log_error_write(srv, __FILE__, __LINE__, "soso", "tosend:", offset, "/", req_cqlen);
}
chunkqueue_steal(hctx->wb, req_cq, weWant);
/*(hctx->wb_reqlen already includes content_length)*/
}
if (hctx->wb->bytes_in == hctx->wb_reqlen) {
/* terminate STDIN */
fcgi_header(&(header), FCGI_STDIN, request_id, 0, 0);
chunkqueue_append_mem(hctx->wb, (const char *)&header, sizeof(header));
hctx->wb_reqlen += (int)sizeof(header);
}
}
static int fcgi_create_env(server *srv, handler_ctx *hctx, int request_id) {
FCGI_BeginRequestRecord beginRecord;
FCGI_Header header;
@ -2139,38 +2162,13 @@ static int fcgi_create_env(server *srv, handler_ctx *hctx, size_t request_id) {
fcgi_header(&(header), FCGI_PARAMS, request_id, 0, 0);
buffer_append_string_len(b, (const char *)&header, sizeof(header));
hctx->wb_reqlen = buffer_string_length(b);
chunkqueue_append_buffer(hctx->wb, b);
buffer_free(b);
}
if (con->request.content_length) {
chunkqueue *req_cq = con->request_content_queue;
off_t offset;
/* something to send ? */
for (offset = 0; offset != req_cq->bytes_in; ) {
off_t weWant = req_cq->bytes_in - offset > FCGI_MAX_LENGTH ? FCGI_MAX_LENGTH : req_cq->bytes_in - offset;
/* we announce toWrite octets
* now take all the request_content chunks that we need to fill this request
* */
fcgi_header(&(header), FCGI_STDIN, request_id, weWant, 0);
chunkqueue_append_mem(hctx->wb, (const char *)&header, sizeof(header));
if (p->conf.debug > 10) {
log_error_write(srv, __FILE__, __LINE__, "soso", "tosend:", offset, "/", req_cq->bytes_in);
}
chunkqueue_steal(hctx->wb, req_cq, weWant);
offset += weWant;
}
}
/* terminate STDIN */
fcgi_header(&(header), FCGI_STDIN, request_id, 0, 0);
chunkqueue_append_mem(hctx->wb, (const char *)&header, sizeof(header));
hctx->wb_reqlen += con->request.content_length;/* (eventual) (minimal) total request size, not necessarily including all fcgi_headers around content length yet */
fcgi_stdin_append(srv, con, hctx, request_id);
return 0;
}
@ -2385,9 +2383,6 @@ range_success: ;
if (have_sendfile2) {
data_string *dcls;
hctx->send_content_body = 0;
joblist_append(srv, con);
/* fix content-length */
if (NULL == (dcls = (data_string *)array_get_unused_element(con->response.headers, TYPE_STRING))) {
dcls = data_response_init();
@ -2399,6 +2394,7 @@ range_success: ;
con->parsed_response |= HTTP_CONTENT_LENGTH;
con->response.content_length = sendfile2_content_length;
return 200;
}
/* CGI/1.1 rev 03 - 7.2.1.2 */
@ -2412,10 +2408,10 @@ range_success: ;
typedef struct {
buffer *b;
size_t len;
unsigned int len;
int type;
int padding;
size_t request_id;
int request_id;
} fastcgi_response_packet;
static int fastcgi_get_packet(server *srv, handler_ctx *hctx, fastcgi_response_packet *packet) {
@ -2512,7 +2508,10 @@ static int fcgi_demux_response(server *srv, handler_ctx *hctx) {
* check how much we have to read
*/
if (ioctl(hctx->fd, FIONREAD, &toread)) {
if (errno == EAGAIN) return 0;
if (errno == EAGAIN) {
fdevent_event_add(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_IN);
return 0;
}
log_error_write(srv, __FILE__, __LINE__, "sd",
"unexpected end-of-file (perhaps the fastcgi process died):",
fcgi_fd);
@ -2523,12 +2522,26 @@ static int fcgi_demux_response(server *srv, handler_ctx *hctx) {
char *mem;
size_t mem_len;
if ((con->conf.stream_response_body & FDEVENT_STREAM_RESPONSE_BUFMIN)) {
off_t cqlen = chunkqueue_length(hctx->rb);
if (cqlen + toread > 65536 + (int)sizeof(FCGI_Header)) { /*(max size of FastCGI packet + 1)*/
if (cqlen < 65536 + (int)sizeof(FCGI_Header)) {
toread = 65536 + (int)sizeof(FCGI_Header) - cqlen;
} else { /* should not happen */
toread = toread < 1024 ? toread : 1024;
}
}
}
chunkqueue_get_memory(hctx->rb, &mem, &mem_len, 0, toread);
r = read(hctx->fd, mem, mem_len);
chunkqueue_use_memory(hctx->rb, r > 0 ? r : 0);
if (-1 == r) {
if (errno == EAGAIN) return 0;
if (errno == EAGAIN) {
fdevent_event_add(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_IN);
return 0;
}
log_error_write(srv, __FILE__, __LINE__, "sds",
"unexpected end-of-file (perhaps the fastcgi process died):",
fcgi_fd, strerror(errno));
@ -2587,14 +2600,25 @@ static int fcgi_demux_response(server *srv, handler_ctx *hctx) {
buffer_string_set_length(hctx->response_header, hlen);
} else {
/* no luck, no header found */
/*(reuse MAX_HTTP_REQUEST_HEADER as max size for response headers from backends)*/
if (buffer_string_length(hctx->response_header) > MAX_HTTP_REQUEST_HEADER) {
log_error_write(srv, __FILE__, __LINE__, "sb", "response headers too large for", con->uri.path);
con->http_status = 502; /* Bad Gateway */
con->mode = DIRECT;
fin = 1;
}
break;
}
/* parse the response header */
if ((ret = fcgi_response_parse(srv, con, p, hctx->response_header))) {
con->http_status = ret;
hctx->send_content_body = 0;
if (200 != ret) { /*(200 returned for X-Sendfile2 handled)*/
con->http_status = ret;
con->mode = DIRECT;
}
con->file_started = 1;
hctx->send_content_body = 0;
fin = 1;
break;
}
@ -2618,16 +2642,26 @@ static int fcgi_demux_response(server *srv, handler_ctx *hctx) {
hctx->send_content_body = 0; /* ignore the content */
break;
}
/* enable chunked-transfer-encoding */
if (con->request.http_version == HTTP_VERSION_1_1 &&
!(con->parsed_response & HTTP_CONTENT_LENGTH)) {
con->response.transfer_encoding = HTTP_TRANSFER_ENCODING_CHUNKED;
}
}
if (hctx->send_content_body && !buffer_string_is_empty(packet.b)) {
http_chunk_append_buffer(srv, con, packet.b);
if (0 != http_chunk_append_buffer(srv, con, packet.b)) {
/* error writing to tempfile;
* truncate response or send 500 if nothing sent yet */
fin = 1;
break;
}
if ((con->conf.stream_response_body & FDEVENT_STREAM_RESPONSE_BUFMIN)
&& chunkqueue_length(con->write_queue) > 65536 - 4096) {
if (!con->is_writable) {
/*(defer removal of FDEVENT_IN interest since
* connection_state_machine() might be able to send data
* immediately, unless !con->is_writable, where
* connection_state_machine() might not loop back to call
* mod_fastcgi_handle_subrequest())*/
fdevent_event_clr(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_IN);
}
}
}
break;
case FCGI_STDERR:
@ -2638,15 +2672,6 @@ static int fcgi_demux_response(server *srv, handler_ctx *hctx) {
break;
case FCGI_END_REQUEST:
con->file_finished = 1;
if (host->mode != FCGI_AUTHORIZER ||
!(con->http_status == 0 ||
con->http_status == 200)) {
/* send chunk-end if necessary */
http_chunk_close(srv, con);
}
fin = 1;
break;
default:
@ -3006,6 +3031,7 @@ static handler_t fcgi_write_request(server *srv, handler_ctx *hctx) {
if (-1 == fcgi_create_env(srv, hctx, hctx->request_id)) return HANDLER_ERROR;
fdevent_event_add(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_IN);
fcgi_set_state(srv, hctx, FCGI_STATE_WRITE);
/* fall through */
case FCGI_STATE_WRITE:
@ -3036,11 +3062,23 @@ static handler_t fcgi_write_request(server *srv, handler_ctx *hctx) {
}
}
if (hctx->wb->bytes_out == hctx->wb->bytes_in) {
fdevent_event_set(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_IN);
if (hctx->wb->bytes_out == hctx->wb_reqlen) {
fdevent_event_clr(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_OUT);
fcgi_set_state(srv, hctx, FCGI_STATE_READ);
} else {
fdevent_event_set(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_IN|FDEVENT_OUT);
off_t wblen = hctx->wb->bytes_in - hctx->wb->bytes_out;
if (hctx->wb->bytes_in < hctx->wb_reqlen && wblen < 65536 - 16384) {
/*(con->conf.stream_request_body & FDEVENT_STREAM_REQUEST)*/
if (!(con->conf.stream_request_body & FDEVENT_STREAM_REQUEST_POLLIN)) {
con->conf.stream_request_body |= FDEVENT_STREAM_REQUEST_POLLIN;
con->is_readable = 1; /* trigger optimistic read from client */
}
}
if (0 == wblen) {
fdevent_event_clr(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_OUT);
} else {
fdevent_event_add(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_OUT);
}
}
return HANDLER_WAIT_FOR_EVENT;
@ -3149,6 +3187,9 @@ static handler_t fcgi_send_request(server *srv, handler_ctx *hctx) {
}
static handler_t fcgi_recv_response(server *srv, handler_ctx *hctx);
SUBREQUEST_FUNC(mod_fastcgi_handle_subrequest) {
plugin_data *p = p_d;
@ -3159,28 +3200,52 @@ SUBREQUEST_FUNC(mod_fastcgi_handle_subrequest) {
/* not my job */
if (con->mode != p->id) return HANDLER_GO_ON;
if (con->state == CON_STATE_READ_POST) {
handler_t r = connection_handle_read_post_state(srv, con);
if (r != HANDLER_GO_ON) return r;
if ((con->conf.stream_response_body & FDEVENT_STREAM_RESPONSE_BUFMIN)
&& con->file_started) {
if (chunkqueue_length(con->write_queue) > 65536 - 4096) {
fdevent_event_clr(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_IN);
} else if (!(fdevent_event_get_interest(srv->ev, hctx->fd) & FDEVENT_IN)) {
/* optimistic read from backend, which might re-enable FDEVENT_IN */
handler_t rc = fcgi_recv_response(srv, hctx); /*(might invalidate hctx)*/
if (rc != HANDLER_GO_ON) return rc; /*(unless HANDLER_GO_ON)*/
}
}
return (hctx->state != FCGI_STATE_READ)
if (0 == hctx->wb->bytes_in
? con->state == CON_STATE_READ_POST
: hctx->wb->bytes_in < hctx->wb_reqlen) {
/*(64k - 4k to attempt to avoid temporary files
* in conjunction with FDEVENT_STREAM_REQUEST_BUFMIN)*/
if (hctx->wb->bytes_in - hctx->wb->bytes_out > 65536 - 4096
&& (con->conf.stream_request_body & FDEVENT_STREAM_REQUEST_BUFMIN)){
con->conf.stream_request_body &= ~FDEVENT_STREAM_REQUEST_POLLIN;
if (0 != hctx->wb->bytes_in) return HANDLER_WAIT_FOR_EVENT;
} else {
handler_t r = connection_handle_read_post_state(srv, con);
chunkqueue *req_cq = con->request_content_queue;
if (0 != hctx->wb->bytes_in && !chunkqueue_is_empty(req_cq)) {
fcgi_stdin_append(srv, con, hctx, hctx->request_id);
if (fdevent_event_get_interest(srv->ev, hctx->fd) & FDEVENT_OUT) {
return (r == HANDLER_GO_ON) ? HANDLER_WAIT_FOR_EVENT : r;
}