[core] implement streams on chunkqueues
This commit is contained in:
parent
a12d550557
commit
74ce55c860
|
@ -18,6 +18,7 @@
|
|||
#include <lighttpd/chunk_parser.h>
|
||||
|
||||
#include <lighttpd/waitqueue.h>
|
||||
#include <lighttpd/stream.h>
|
||||
#include <lighttpd/radix.h>
|
||||
|
||||
#include <lighttpd/base_lua.h>
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
#ifndef _LIGHTTPD_STREAM_H_
|
||||
#define _LIGHTTPD_STREAM_H_
|
||||
|
||||
#ifndef _LIGHTTPD_BASE_H_
|
||||
#error Please include <lighttpd/base.h> instead of this file
|
||||
#endif
|
||||
|
||||
#include <lighttpd/jobqueue.h>
|
||||
|
||||
typedef void (*liStreamCB)(liStream *stream, liStreamEvent event);
|
||||
|
||||
struct liStream {
|
||||
gint refcount;
|
||||
|
||||
liStream *source, *dest;
|
||||
|
||||
liChunkQueue *out;
|
||||
|
||||
liJob new_data_job;
|
||||
liJobQueue *jobqueue;
|
||||
|
||||
liStreamCB cb;
|
||||
};
|
||||
|
||||
LI_API void li_stream_init(liStream* stream, liJobQueue *jobqueue, liStreamCB cb);
|
||||
LI_API void li_stream_acquire(liStream* stream);
|
||||
LI_API void li_stream_release(liStream* stream);
|
||||
|
||||
LI_API void li_stream_connect(liStream *source, liStream *dest);
|
||||
LI_API void li_stream_disconnect(liStream *stream); /* disconnects stream->source and stream */
|
||||
LI_API void li_stream_disconnect_dest(liStream *stream); /* disconnects stream->dest and stream. only for errors/conection resets */
|
||||
LI_API void li_stream_reset(liStream *stream); /* disconnect both sides */
|
||||
|
||||
LI_API void li_stream_notify(liStream *stream); /* new data in stream->cq, notify stream->dest */
|
||||
LI_API void li_stream_notify_later(liStream *stream);
|
||||
LI_API void li_stream_again(liStream *stream); /* more data to be generated in stream with event NEW_DATA or more data to be read from stream->source->cq */
|
||||
LI_API void li_stream_again_later(liStream *stream);
|
||||
|
||||
/* detach from jobqueue, stops all event handling. you have to detach all connected streams to move streams between threads */
|
||||
LI_API void li_stream_detach(liStream *stream);
|
||||
LI_API void li_stream_attach(liStream *stream, liJobQueue *jobqueue); /* attach to another jobqueue - possibly after switching threads */
|
||||
|
||||
/* walks from first using ->dest until it reaches NULL or (it reached last and NULL != i->limit) or limit == i->cq->limit and
|
||||
* sets i->cq->limit to limit, triggering LI_STREAM_NEW_CQLIMIT.
|
||||
* limit must not be NULL!
|
||||
*/
|
||||
LI_API void li_stream_set_cqlimit(liStream *first, liStream *last, liCQLimit *limit);
|
||||
|
||||
|
||||
LI_API liStream* li_stream_plug_new(liJobQueue *jobqueue); /* simple forwarder; can also be used for providing data from memory */
|
||||
LI_API liStream* li_stream_null_new(liJobQueue *jobqueue); /* eats everything, disconnects source on eof, out is always closed */
|
||||
|
||||
|
||||
|
||||
typedef void (*liIOStreamCB)(liIOStream *stream, liIOStreamEvent event);
|
||||
|
||||
/* TODO: support throttle */
|
||||
struct liIOStream {
|
||||
liStream stream_in, stream_out;
|
||||
liCQLimit *stream_in_limit;
|
||||
|
||||
/* initialize these before connecting stream_out if you need them */
|
||||
liWaitQueue *write_timeout_queue;
|
||||
liWaitQueueElem write_timeout_elem;
|
||||
|
||||
liWorker *wrk;
|
||||
ev_io io_watcher;
|
||||
|
||||
/* whether we want to read/write */
|
||||
gboolean in_closed, out_closed;
|
||||
gboolean can_read, can_write; /* set to FALSE if you got EAGAIN */
|
||||
|
||||
liIOStreamCB cb;
|
||||
|
||||
gpointer data; /* data for the callback */
|
||||
};
|
||||
|
||||
LI_API liIOStream* li_iostream_new(liWorker *wrk, int fd, liIOStreamCB cb, gpointer data);
|
||||
LI_API void li_iostream_acquire(liIOStream* iostream);
|
||||
LI_API void li_iostream_release(liIOStream* iostream);
|
||||
|
||||
LI_API int li_iostream_reset(liIOStream *iostream); /* returns fd, disconnects everything, stop callbacks, releases one reference */
|
||||
|
||||
/* similar to stream_detach/_attach */
|
||||
LI_API void li_iostream_detach(liIOStream *iostream);
|
||||
LI_API void li_iostream_attach(liIOStream *iostream, liWorker *wrk);
|
||||
|
||||
|
||||
LI_API void stream_simple_socket_close(liIOStream *stream, gboolean aborted);
|
||||
LI_API void stream_simple_socket_io_cb(liIOStream *stream, liIOStreamEvent event);
|
||||
LI_API void stream_simple_socket_io_cb_with_context(liIOStream *stream, liIOStreamEvent event, gpointer *data);
|
||||
|
||||
#endif
|
|
@ -0,0 +1,8 @@
|
|||
#ifndef _LIGHTTPD_STREAM_HTTP_RESPONSE_H_
|
||||
#define _LIGHTTPD_STREAM_HTTP_RESPONSE_H_
|
||||
|
||||
#include <lighttpd/base.h>
|
||||
|
||||
LI_API liStream* li_stream_http_response_handle(liStream *http_in, liVRequest *vr, gboolean accept_cgi, gboolean accept_nph);
|
||||
|
||||
#endif
|
|
@ -214,6 +214,31 @@ typedef struct liServer liServer;
|
|||
|
||||
typedef struct liServerSocket liServerSocket;
|
||||
|
||||
/* stream.h */
|
||||
|
||||
typedef struct liStream liStream;
|
||||
typedef struct liIOStream liIOStream;
|
||||
|
||||
typedef enum {
|
||||
LI_STREAM_NEW_DATA, /* either new/more data in stream->source->cq, or more data to be generated */
|
||||
LI_STREAM_NEW_CQLIMIT,
|
||||
LI_STREAM_CONNECTED_DEST,
|
||||
LI_STREAM_CONNECTED_SOURCE,
|
||||
LI_STREAM_DISCONNECTED_DEST,
|
||||
LI_STREAM_DISCONNECTED_SOURCE,
|
||||
LI_STREAM_DESTROY
|
||||
} liStreamEvent;
|
||||
|
||||
typedef enum {
|
||||
LI_IOSTREAM_READ, /* should try reading */
|
||||
LI_IOSTREAM_WRITE, /* should try writing */
|
||||
LI_IOSTREAM_CONNECTED_DEST, /* stream_in connected dest */
|
||||
LI_IOSTREAM_CONNECTED_SOURCE, /* stream_out connected source */
|
||||
LI_IOSTREAM_DISCONNECTED_DEST, /* stream_in disconnected dest */
|
||||
LI_IOSTREAM_DISCONNECTED_SOURCE, /* stream_out disconnected source */
|
||||
LI_IOSTREAM_DESTROY /* stream_in and stream_out are down to refcount = 0 */
|
||||
} liIOStreamEvent;
|
||||
|
||||
/* throttle.h */
|
||||
|
||||
typedef struct liThrottlePool liThrottlePool;
|
||||
|
|
|
@ -231,6 +231,9 @@ SET(LIGHTTPD_SHARED_SRC
|
|||
response.c
|
||||
server.c
|
||||
stat_cache.c
|
||||
stream.c
|
||||
stream_http_response.c
|
||||
stream_simple_socket.c
|
||||
throttle.c
|
||||
url_parser.c
|
||||
value.c
|
||||
|
|
|
@ -36,6 +36,9 @@ lighttpd_shared_src= \
|
|||
response.c \
|
||||
server.c \
|
||||
stat_cache.c \
|
||||
stream.c \
|
||||
stream_http_response.c \
|
||||
stream_simple_socket.c \
|
||||
throttle.c \
|
||||
url_parser.c \
|
||||
value.c \
|
||||
|
|
|
@ -24,8 +24,14 @@
|
|||
action status {
|
||||
getStringTo(fpc, ctx->h_value);
|
||||
ctx->response->http_status = atoi(ctx->h_value->str);
|
||||
if (ctx->response->http_status >= 100 && ctx->response->http_status < 200) {
|
||||
switch (ctx->response->http_status) {
|
||||
case 100: /* Continue */
|
||||
case 102: /* Processing */
|
||||
ctx->drop_header = TRUE;
|
||||
break;
|
||||
/* don't ignore 101 Switching Protocols */
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,501 @@
|
|||
|
||||
#include <lighttpd/base.h>
|
||||
|
||||
/* callback can assume that the stream is not destroyed while the callback is running */
|
||||
static void li_stream_safe_cb(liStream *stream, liStreamEvent event) {
|
||||
if (NULL != stream->cb) {
|
||||
li_stream_acquire(stream);
|
||||
stream->cb(stream, event);
|
||||
li_stream_release(stream);
|
||||
}
|
||||
}
|
||||
|
||||
static void stream_new_data_job_cb(liJob *job) {
|
||||
liStream *stream = LI_CONTAINER_OF(job, liStream, new_data_job);
|
||||
li_stream_safe_cb(stream, LI_STREAM_NEW_DATA);
|
||||
}
|
||||
|
||||
void li_stream_init(liStream* stream, liJobQueue *jobqueue, liStreamCB cb) {
|
||||
stream->refcount = 1;
|
||||
stream->source = stream->dest = NULL;
|
||||
stream->out = li_chunkqueue_new();
|
||||
li_job_init(&stream->new_data_job, stream_new_data_job_cb);
|
||||
stream->jobqueue = jobqueue;
|
||||
stream->cb = cb;
|
||||
}
|
||||
|
||||
void li_stream_acquire(liStream* stream) {
|
||||
assert(g_atomic_int_get(&stream->refcount) > 0);
|
||||
g_atomic_int_inc(&stream->refcount);
|
||||
}
|
||||
|
||||
void li_stream_release(liStream* stream) {
|
||||
assert(g_atomic_int_get(&stream->refcount) > 0);
|
||||
if (g_atomic_int_dec_and_test(&stream->refcount)) {
|
||||
li_job_clear(&stream->new_data_job);
|
||||
li_chunkqueue_free(stream->out);
|
||||
stream->out = NULL;
|
||||
stream->jobqueue = NULL;
|
||||
if (NULL != stream->cb) stream->cb(stream, LI_STREAM_DESTROY); /* "unsafe" cb, we can't keep a ref this time */
|
||||
}
|
||||
}
|
||||
|
||||
void li_stream_connect(liStream *source, liStream *dest) {
|
||||
/* streams must be "valid" */
|
||||
assert(source->refcount > 0 && dest->refcount > 0);
|
||||
|
||||
if (NULL != source->dest || NULL != dest->source) {
|
||||
g_error("Can't connect already connected streams");
|
||||
}
|
||||
|
||||
/* keep them alive for this function and for callbacks (-> callbacks are "safe") */
|
||||
g_atomic_int_inc(&source->refcount);
|
||||
g_atomic_int_inc(&dest->refcount);
|
||||
|
||||
/* references for the links */
|
||||
g_atomic_int_inc(&source->refcount);
|
||||
g_atomic_int_inc(&dest->refcount);
|
||||
source->dest = dest;
|
||||
dest->source = source;
|
||||
|
||||
if (NULL != source->cb) source->cb(source, LI_STREAM_CONNECTED_DEST);
|
||||
/* only notify dest if source didn't disconnect */
|
||||
if (source->dest == dest && NULL != dest->cb) dest->cb(dest, LI_STREAM_CONNECTED_SOURCE);
|
||||
|
||||
/* still connected: sync liCQLimit */
|
||||
if (source->dest == dest) {
|
||||
liCQLimit *sl = source->out->limit, *dl = dest->out->limit;
|
||||
if (sl != NULL && dl == NULL) {
|
||||
li_stream_set_cqlimit(dest, NULL, sl);
|
||||
}
|
||||
else if (sl == NULL && dl != NULL) {
|
||||
li_stream_set_cqlimit(NULL, source, dl);
|
||||
}
|
||||
}
|
||||
|
||||
/* still connected and source has data: notify dest */
|
||||
if (source->dest == dest && (source->out->length > 0 || source->out->is_closed)) {
|
||||
li_stream_again_later(dest);
|
||||
}
|
||||
|
||||
/* release our "function" refs */
|
||||
li_stream_release(source);
|
||||
li_stream_release(dest);
|
||||
}
|
||||
|
||||
static void _disconnect(liStream *source, liStream *dest) {
|
||||
/* streams must be "valid" */
|
||||
assert(g_atomic_int_get(&source->refcount) > 0 && g_atomic_int_get(&dest->refcount) > 0);
|
||||
assert(source->dest == dest && dest->source == source);
|
||||
|
||||
source->dest = NULL;
|
||||
dest->source = NULL;
|
||||
/* we still have the references from the links -> callbacks are "safe" */
|
||||
if (NULL != source->cb) source->cb(source, LI_STREAM_DISCONNECTED_DEST);
|
||||
if (NULL != dest->cb) dest->cb(dest, LI_STREAM_DISCONNECTED_SOURCE);
|
||||
|
||||
/* release references from the link */
|
||||
li_stream_release(source);
|
||||
li_stream_release(dest);
|
||||
}
|
||||
|
||||
void li_stream_disconnect(liStream *stream) {
|
||||
if (NULL == stream || NULL == stream->source) return;
|
||||
_disconnect(stream->source, stream);
|
||||
}
|
||||
|
||||
void li_stream_disconnect_dest(liStream *stream) {
|
||||
if (NULL == stream || NULL == stream->dest) return;
|
||||
_disconnect(stream, stream->dest);
|
||||
}
|
||||
|
||||
void li_stream_reset(liStream *stream) {
|
||||
if (NULL == stream || 0 == stream->refcount) return;
|
||||
|
||||
li_stream_acquire(stream);
|
||||
if (NULL != stream->source) _disconnect(stream->source, stream);
|
||||
if (NULL != stream->dest) _disconnect(stream, stream->dest);
|
||||
li_stream_release(stream);
|
||||
}
|
||||
|
||||
void li_stream_notify(liStream *stream) {
|
||||
if (NULL != stream->dest) li_stream_again(stream->dest);
|
||||
}
|
||||
|
||||
void li_stream_notify_later(liStream *stream) {
|
||||
if (NULL != stream->dest) li_stream_again_later(stream->dest);
|
||||
}
|
||||
|
||||
void li_stream_again(liStream *stream) {
|
||||
if (NULL != stream->jobqueue) {
|
||||
li_job_now(stream->jobqueue, &stream->new_data_job);
|
||||
}
|
||||
}
|
||||
|
||||
void li_stream_again_later(liStream *stream) {
|
||||
if (NULL != stream->jobqueue) {
|
||||
li_job_later(stream->jobqueue, &stream->new_data_job);
|
||||
}
|
||||
}
|
||||
|
||||
void li_stream_detach(liStream *stream) {
|
||||
stream->jobqueue = NULL;
|
||||
li_job_stop(&stream->new_data_job);
|
||||
|
||||
li_chunkqueue_set_limit(stream->out, NULL);
|
||||
}
|
||||
|
||||
void li_stream_attach(liStream *stream, liJobQueue *jobqueue) {
|
||||
stream->jobqueue = jobqueue;
|
||||
li_stream_again_later(stream);
|
||||
}
|
||||
|
||||
void li_stream_set_cqlimit(liStream *first, liStream *last, liCQLimit *limit) {
|
||||
assert(NULL != limit);
|
||||
|
||||
li_cqlimit_acquire(limit);
|
||||
if (NULL == first) {
|
||||
while (NULL != last && NULL == last->out->limit) {
|
||||
if (limit == last->out->limit) break;
|
||||
li_chunkqueue_set_limit(last->out, limit);
|
||||
if (NULL != last->cb) {
|
||||
li_stream_acquire(last);
|
||||
last->cb(last, LI_STREAM_NEW_CQLIMIT);
|
||||
if (NULL != last->source) {
|
||||
last = last->source;
|
||||
li_stream_release(last->dest);
|
||||
} else {
|
||||
li_stream_release(last);
|
||||
last = NULL;
|
||||
}
|
||||
} else {
|
||||
last = last->source;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
gboolean reached_last = (NULL == last);
|
||||
while (NULL != first && !(reached_last && NULL != first->out->limit)) {
|
||||
if (limit == first->out->limit) break;
|
||||
if (first == last) reached_last = TRUE;
|
||||
li_chunkqueue_set_limit(first->out, limit);
|
||||
if (NULL != first->cb) {
|
||||
li_stream_acquire(first);
|
||||
first->cb(first, LI_STREAM_NEW_CQLIMIT);
|
||||
if (NULL != first->dest) {
|
||||
first = first->dest;
|
||||
li_stream_release(first->source);
|
||||
} else {
|
||||
li_stream_release(first);
|
||||
first = NULL;
|
||||
}
|
||||
} else {
|
||||
first = first->dest;
|
||||
}
|
||||
}
|
||||
}
|
||||
li_cqlimit_release(limit);
|
||||
}
|
||||
|
||||
|
||||
static void stream_plug_cb(liStream *stream, liStreamEvent event) {
|
||||
switch (event) {
|
||||
case LI_STREAM_NEW_DATA:
|
||||
if (!stream->out->is_closed && NULL != stream->source) {
|
||||
li_chunkqueue_steal_all(stream->out, stream->source->out);
|
||||
stream->out->is_closed = stream->out->is_closed || stream->source->out->is_closed;
|
||||
li_stream_notify_later(stream);
|
||||
}
|
||||
if (stream->out->is_closed) {
|
||||
li_stream_disconnect(stream);
|
||||
}
|
||||
break;
|
||||
case LI_STREAM_DISCONNECTED_DEST:
|
||||
li_stream_disconnect(stream);
|
||||
break;
|
||||
case LI_STREAM_DESTROY:
|
||||
g_slice_free(liStream, stream);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
liStream* li_stream_plug_new(liJobQueue *jobqueue) {
|
||||
liStream *stream = g_slice_new0(liStream);
|
||||
li_stream_init(stream, jobqueue, stream_plug_cb);
|
||||
return stream;
|
||||
}
|
||||
|
||||
static void stream_null_cb(liStream *stream, liStreamEvent event) {
|
||||
switch (event) {
|
||||
case LI_STREAM_NEW_DATA:
|
||||
if (NULL == stream->source) return;
|
||||
li_chunkqueue_skip_all(stream->source->out);
|
||||
if (stream->source->out->is_closed) li_stream_disconnect(stream);
|
||||
break;
|
||||
case LI_STREAM_DESTROY:
|
||||
g_slice_free(liStream, stream);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
liStream* li_stream_null_new(liJobQueue *jobqueue) {
|
||||
liStream *stream = g_slice_new0(liStream);
|
||||
li_stream_init(stream, jobqueue, stream_null_cb);
|
||||
stream->out->is_closed = TRUE;
|
||||
return stream;
|
||||
}
|
||||
|
||||
|
||||
static void iostream_destroy(liIOStream *iostream) {
|
||||
if (0 < iostream->stream_out.refcount || 0 < iostream->stream_in.refcount) return;
|
||||
iostream->stream_out.refcount = iostream->stream_in.refcount = 1;
|
||||
|
||||
if (NULL != iostream->stream_in_limit) {
|
||||
if (&iostream->io_watcher == iostream->stream_in_limit->io_watcher) {
|
||||
iostream->stream_in_limit->io_watcher = NULL;
|
||||
}
|
||||
li_cqlimit_release(iostream->stream_in_limit);
|
||||
iostream->stream_in_limit = NULL;
|
||||
}
|
||||
|
||||
if (NULL != iostream->write_timeout_queue) {
|
||||
li_waitqueue_remove(iostream->write_timeout_queue, &iostream->write_timeout_elem);
|
||||
iostream->write_timeout_queue = NULL;
|
||||
}
|
||||
|
||||
ev_io_stop(iostream->wrk->loop, &iostream->io_watcher);
|
||||
|
||||
iostream->cb(iostream, LI_IOSTREAM_DESTROY);
|
||||
|
||||
assert(1 == iostream->stream_out.refcount);
|
||||
assert(1 == iostream->stream_in.refcount);
|
||||
|
||||
g_slice_free(liIOStream, iostream);
|
||||
}
|
||||
|
||||
static void iostream_in_cb(liStream *stream, liStreamEvent event) {
|
||||
liIOStream *iostream = LI_CONTAINER_OF(stream, liIOStream, stream_in);
|
||||
|
||||
switch (event) {
|
||||
case LI_STREAM_NEW_DATA:
|
||||
if (0 == li_chunkqueue_limit_available(stream->out)) {
|
||||
/* locked */
|
||||
return;
|
||||
}
|
||||
if (iostream->can_read) {
|
||||
goffset curoutlen = stream->out->length;
|
||||
gboolean curout_closed = stream->out->is_closed;
|
||||
|
||||
iostream->cb(iostream, LI_IOSTREAM_READ);
|
||||
|
||||
if (curoutlen != stream->out->length || curout_closed != stream->out->is_closed) {
|
||||
li_stream_notify_later(stream);
|
||||
}
|
||||
|
||||
if (-1 == iostream->io_watcher.fd) return;
|
||||
|
||||
if (iostream->can_read) {
|
||||
li_stream_again_later(stream);
|
||||
}
|
||||
}
|
||||
if (!iostream->can_read && !iostream->in_closed) {
|
||||
li_ev_io_add_events(iostream->wrk->loop, &iostream->io_watcher, EV_READ);
|
||||
}
|
||||
if (!iostream->can_write && !iostream->out_closed) {
|
||||
li_ev_io_add_events(iostream->wrk->loop, &iostream->io_watcher, EV_WRITE);
|
||||
}
|
||||
break;
|
||||
case LI_STREAM_NEW_CQLIMIT:
|
||||
if (NULL != iostream->stream_in_limit) {
|
||||
if (&iostream->io_watcher == iostream->stream_in_limit->io_watcher) {
|
||||
iostream->stream_in_limit->io_watcher = NULL;
|
||||
}
|
||||
li_cqlimit_release(iostream->stream_in_limit);
|
||||
}
|
||||
if (stream->out->limit) {
|
||||
stream->out->limit->io_watcher = &iostream->io_watcher;
|
||||
li_cqlimit_acquire(stream->out->limit);
|
||||
}
|
||||
iostream->stream_in_limit = stream->out->limit;
|
||||
break;
|
||||
case LI_STREAM_CONNECTED_SOURCE:
|
||||
/* there is no incoming data */
|
||||
li_stream_disconnect(stream);
|
||||
break;
|
||||
case LI_STREAM_CONNECTED_DEST:
|
||||
iostream->cb(iostream, LI_IOSTREAM_CONNECTED_DEST);
|
||||
break;
|
||||
case LI_STREAM_DISCONNECTED_DEST:
|
||||
iostream->cb(iostream, LI_IOSTREAM_DISCONNECTED_DEST);
|
||||
break;
|
||||
case LI_STREAM_DESTROY:
|
||||
iostream->can_read = FALSE;
|
||||
iostream_destroy(iostream);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static void iostream_out_cb(liStream *stream, liStreamEvent event) {
|
||||
liIOStream *iostream = LI_CONTAINER_OF(stream, liIOStream, stream_out);
|
||||
|
||||
switch (event) {
|
||||
case LI_STREAM_NEW_DATA:
|
||||
if (iostream->can_write) {
|
||||
iostream->cb(iostream, LI_IOSTREAM_WRITE);
|
||||
if (NULL != iostream->write_timeout_queue) {
|
||||
if (stream->out->length > 0) {
|
||||
if (!iostream->write_timeout_elem.queued || (iostream->write_timeout_elem.ts + 1.0) < ev_now(iostream->wrk->loop)) {
|
||||
li_waitqueue_push(iostream->write_timeout_queue, &iostream->write_timeout_elem);
|
||||
}
|
||||
} else {
|
||||
li_waitqueue_remove(iostream->write_timeout_queue, &iostream->write_timeout_elem);
|
||||
}
|
||||
}
|
||||
|
||||
if (-1 == iostream->io_watcher.fd) return;
|
||||
|
||||
if (iostream->can_write) {
|
||||
if (stream->out->length > 0 || stream->out->is_closed) {
|
||||
li_stream_again_later(stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!iostream->can_read && !iostream->in_closed) {
|
||||
li_ev_io_add_events(iostream->wrk->loop, &iostream->io_watcher, EV_READ);
|
||||
}
|
||||
if (!iostream->can_write && !iostream->out_closed) {
|
||||
li_ev_io_add_events(iostream->wrk->loop, &iostream->io_watcher, EV_WRITE);
|
||||
}
|
||||
break;
|
||||
case LI_STREAM_CONNECTED_DEST:
|
||||
/* there is no outgoing data */
|
||||
li_stream_disconnect_dest(stream);
|
||||
break;
|
||||
case LI_STREAM_CONNECTED_SOURCE:
|
||||
iostream->cb(iostream, LI_IOSTREAM_CONNECTED_SOURCE);
|
||||
break;
|
||||
case LI_STREAM_DISCONNECTED_SOURCE:
|
||||
iostream->cb(iostream, LI_IOSTREAM_DISCONNECTED_SOURCE);
|
||||
break;
|
||||
case LI_STREAM_DESTROY:
|
||||
iostream->can_write = FALSE;
|
||||
iostream_destroy(iostream);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static void iostream_io_cb(struct ev_loop *loop, ev_io *w, int revents) {
|
||||
liIOStream *iostream = (liIOStream*) w->data;
|
||||
gboolean do_write = FALSE;
|
||||
UNUSED(loop);
|
||||
|
||||
li_ev_io_rem_events(iostream->wrk->loop, &iostream->io_watcher, EV_WRITE | EV_READ);
|
||||
|
||||
if (0 != (revents & EV_WRITE) && !iostream->can_write && iostream->stream_out.refcount > 0) {
|
||||
iostream->can_write = TRUE;
|
||||
do_write = TRUE;
|
||||
li_stream_acquire(&iostream->stream_out); /* keep out stream alive during li_stream_again(&iostream->stream_in) */
|
||||
}
|
||||
|
||||
if (0 != (revents & EV_READ) && !iostream->can_read && iostream->stream_in.refcount > 0) {
|
||||
iostream->can_read = TRUE;
|
||||
li_stream_again_later(&iostream->stream_in);
|
||||
}
|
||||
|
||||
if (do_write) {
|
||||
li_stream_again_later(&iostream->stream_out);
|
||||
li_stream_release(&iostream->stream_out);
|
||||
}
|
||||
}
|
||||
|
||||
liIOStream* li_iostream_new(liWorker *wrk, int fd, liIOStreamCB cb, gpointer data) {
|
||||
liIOStream *iostream = g_slice_new0(liIOStream);
|
||||
|
||||
li_stream_init(&iostream->stream_in, &wrk->jobqueue, iostream_in_cb);
|
||||
li_stream_init(&iostream->stream_out, &wrk->jobqueue, iostream_out_cb);
|
||||
iostream->stream_in_limit = NULL;
|
||||
|
||||
iostream->write_timeout_queue = NULL;
|
||||
|
||||
iostream->wrk = wrk;
|
||||
ev_io_init(&iostream->io_watcher, iostream_io_cb, fd, EV_READ);
|
||||
iostream->io_watcher.data = iostream;
|
||||
|
||||
iostream->in_closed = iostream->out_closed = iostream->can_read = FALSE;
|
||||
iostream->can_write = TRUE;
|
||||
|
||||
iostream->cb = cb;
|
||||
iostream->data = data;
|
||||
|
||||
ev_io_start(iostream->wrk->loop, &iostream->io_watcher);
|
||||
|
||||
return iostream;
|
||||
}
|
||||
|
||||
void li_iostream_acquire(liIOStream* iostream) {
|
||||
li_stream_acquire(&iostream->stream_in);
|
||||
li_stream_acquire(&iostream->stream_out);
|
||||
}
|
||||
|
||||
void li_iostream_release(liIOStream* iostream) {
|
||||
li_stream_release(&iostream->stream_in);
|
||||
li_stream_release(&iostream->stream_out);
|
||||
}
|
||||
|
||||
int li_iostream_reset(liIOStream *iostream) {
|
||||
int fd;
|
||||
if (NULL == iostream) return -1;
|
||||
|
||||
fd = iostream->io_watcher.fd;
|
||||
if (NULL != iostream->wrk->loop) {
|
||||
ev_io_stop(iostream->wrk->loop, &iostream->io_watcher);
|
||||
}
|
||||
ev_io_set(&iostream->io_watcher, -1, 0);
|
||||
|
||||
if (NULL != iostream->write_timeout_queue) {
|
||||
li_waitqueue_remove(iostream->write_timeout_queue, &iostream->write_timeout_elem);
|
||||
iostream->write_timeout_queue = NULL;
|
||||
}
|
||||
|
||||
li_stream_disconnect(&iostream->stream_out);
|
||||
li_stream_disconnect_dest(&iostream->stream_in);
|
||||
|
||||
li_iostream_release(iostream);
|
||||
|
||||
return fd;
|
||||
}
|
||||
|
||||
void li_iostream_detach(liIOStream *iostream) {
|
||||
if (NULL != iostream->wrk) {
|
||||
ev_io_stop(iostream->wrk->loop, &iostream->io_watcher);
|
||||
iostream->wrk = NULL;
|
||||
}
|
||||
|
||||
if (NULL != iostream->stream_in_limit) {
|
||||
if (&iostream->io_watcher == iostream->stream_in_limit->io_watcher) {
|
||||
iostream->stream_in_limit->io_watcher = NULL;
|
||||
}
|
||||
li_cqlimit_release(iostream->stream_in_limit);
|
||||
iostream->stream_in_limit = NULL;
|
||||
}
|
||||
|
||||
li_stream_detach(&iostream->stream_in);
|
||||
li_stream_detach(&iostream->stream_out);
|
||||
}
|
||||
|
||||
void li_iostream_attach(liIOStream *iostream, liWorker *wrk) {
|
||||
assert(NULL == iostream->wrk);
|
||||
|
||||
li_stream_attach(&iostream->stream_in, &wrk->jobqueue);
|
||||
li_stream_attach(&iostream->stream_out, &wrk->jobqueue);
|
||||
|
||||
iostream->wrk = wrk;
|
||||
ev_io_start(iostream->wrk->loop, &iostream->io_watcher);
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
#include <lighttpd/stream_http_response.h>
|
||||
|
||||
typedef struct liStreamHttpResponse liStreamHttpResponse;
|
||||
|
||||
struct liStreamHttpResponse {
|
||||
liHttpResponseCtx parse_response_ctx;
|
||||
|
||||
liStream stream;
|
||||
liVRequest *vr;
|
||||
gboolean response_headers_finished;
|
||||
};
|
||||
|
||||
static void stream_http_respone_data(liStreamHttpResponse* shr) {
|
||||
if (NULL == shr->stream.source) return;
|
||||
|
||||
if (!shr->response_headers_finished) {
|
||||
switch (li_http_response_parse(shr->vr, &shr->parse_response_ctx)) {
|
||||
case LI_HANDLER_GO_ON:
|
||||
shr->response_headers_finished = TRUE;
|
||||
li_vrequest_indirect_headers_ready(shr->vr);
|
||||
break;
|
||||
case LI_HANDLER_ERROR:
|
||||
VR_ERROR(shr->vr, "%s", "Parsing response header failed");
|
||||
li_vrequest_error(shr->vr);
|
||||
return;
|
||||
case LI_HANDLER_WAIT_FOR_EVENT:
|
||||
if (shr->stream.source == NULL || shr->stream.source->out->is_closed) {
|
||||
VR_ERROR(shr->vr, "%s", "Parsing response header failed (eos)");
|
||||
li_vrequest_error(shr->vr);
|
||||
}
|
||||
default:
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
li_chunkqueue_steal_all(shr->stream.out, shr->stream.source->out);
|
||||
if (shr->stream.source->out->is_closed) {
|
||||
shr->stream.out->is_closed = TRUE;
|
||||
li_stream_disconnect(&shr->stream);
|
||||
}
|
||||
li_stream_notify(&shr->stream);
|
||||
}
|
||||
|
||||
|
||||
static void stream_http_respone_cb(liStream *stream, liStreamEvent event) {
|
||||
liStreamHttpResponse* shr = LI_CONTAINER_OF(stream, liStreamHttpResponse, stream);
|
||||
|
||||
switch (event) {
|
||||
case LI_STREAM_NEW_DATA:
|
||||
stream_http_respone_data(shr);
|
||||
break;
|
||||
case LI_STREAM_DISCONNECTED_DEST:
|
||||
li_stream_disconnect(stream);
|
||||
break;
|
||||
case LI_STREAM_DISCONNECTED_SOURCE:
|
||||
if (NULL != stream->dest && !stream->out->is_closed) {
|
||||
/* "abort" */
|
||||
li_stream_disconnect_dest(stream);
|
||||
}
|
||||
break;
|
||||
case LI_STREAM_DESTROY:
|
||||
li_http_response_parser_clear(&shr->parse_response_ctx);
|
||||
g_slice_free(liStreamHttpResponse, shr);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
LI_API liStream* li_stream_http_response_handle(liStream *http_in, liVRequest *vr, gboolean accept_cgi, gboolean accept_nph) {
|
||||
liStreamHttpResponse *shr = g_slice_new0(liStreamHttpResponse);
|
||||
shr->response_headers_finished = FALSE;
|
||||
shr->vr = vr;
|
||||
li_stream_init(&shr->stream, &vr->wrk->jobqueue, stream_http_respone_cb);
|
||||
li_http_response_parser_init(&shr->parse_response_ctx, &vr->response, http_in->out,
|
||||
accept_cgi, accept_nph);
|
||||
li_stream_connect(http_in, &shr->stream);
|
||||
return &shr->stream;
|
||||
}
|
|
@ -0,0 +1,145 @@
|
|||
|
||||
#include <lighttpd/base.h>
|
||||
|
||||
void stream_simple_socket_close(liIOStream *stream, gboolean aborted) {
|
||||
int fd = stream->io_watcher.fd;
|
||||
|
||||
ev_io_stop(stream->wrk->loop, &stream->io_watcher);
|
||||
|
||||
if (-1 == fd) return;
|
||||
|
||||
stream->out_closed = stream->in_closed = TRUE;
|
||||
stream->can_read = stream->can_write = FALSE;
|
||||
if (NULL != stream->stream_in.out) {
|
||||
stream->stream_in.out->is_closed = TRUE;
|
||||
}
|
||||
|
||||
if (aborted || stream->in_closed) {
|
||||
li_iostream_acquire(stream);
|
||||
fd = li_iostream_reset(stream);
|
||||
if (-1 != fd) {
|
||||
shutdown(fd, SHUT_RDWR);
|
||||
close(fd);
|
||||
}
|
||||
} else {
|
||||
stream->io_watcher.fd = -1;
|
||||
|
||||
shutdown(fd, SHUT_WR);
|
||||
li_stream_disconnect(&stream->stream_out);
|
||||
ERROR(stream->wrk->srv, "Adding fd %i to closing sockets", fd);
|
||||
li_worker_add_closing_socket(stream->wrk, fd);
|
||||
}
|
||||
}
|
||||
|
||||
static void stream_simple_socket_read(liIOStream *stream, gpointer *data) {
|
||||
liNetworkStatus res;
|
||||
GError *err = NULL;
|
||||
liWorker *wrk = stream->wrk;
|
||||
|
||||
liChunkQueue *raw_in = stream->stream_in.out;
|
||||
|
||||
if (NULL == *data && NULL != wrk->network_read_buf) {
|
||||
/* reuse worker buf if needed */
|
||||
*data = wrk->network_read_buf;
|
||||
wrk->network_read_buf = NULL;
|
||||
}
|
||||
|
||||
{
|
||||
liBuffer *raw_in_buffer = *data;
|
||||
res = li_network_read(stream->io_watcher.fd, raw_in, &raw_in_buffer, &err);
|
||||
*data = raw_in_buffer;
|
||||
}
|
||||
|
||||
if (NULL == wrk->network_read_buf && NULL != *data
|
||||
&& 1 == g_atomic_int_get(&((liBuffer*)*data)->refcount)) {
|
||||
/* move buffer back to worker if we didn't use it */
|
||||
wrk->network_read_buf = *data;
|
||||
*data = NULL;
|
||||
}
|
||||
|
||||
switch (res) {
|
||||
case LI_NETWORK_STATUS_SUCCESS:
|
||||
break;
|
||||
case LI_NETWORK_STATUS_FATAL_ERROR:
|
||||
ERROR(wrk->srv, "network read fatal error: %s", NULL != err ? err->message : "(unknown)");
|
||||
g_error_free(err);
|
||||
stream_simple_socket_close(stream, TRUE);
|
||||
break;
|
||||
case LI_NETWORK_STATUS_CONNECTION_CLOSE:
|
||||
li_ev_io_rem_events(stream->wrk->loop, &stream->io_watcher, EV_READ);
|
||||
stream->stream_in.out->is_closed = TRUE;
|
||||
stream->in_closed = TRUE;
|
||||
stream->can_read = FALSE;
|
||||
break;
|
||||
case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
|
||||
stream->can_read = FALSE;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static void stream_simple_socket_write(liIOStream *stream) {
|
||||
liNetworkStatus res;
|
||||
liChunkQueue *raw_out = stream->stream_out.out;
|
||||
liChunkQueue *from = stream->stream_out.source->out;
|
||||
|
||||
li_chunkqueue_steal_all(raw_out, from);
|
||||
|
||||
if (raw_out->length > 0) {
|
||||
static const goffset WRITE_MAX = 256*1024; /* 256kB */
|
||||
goffset write_max;
|
||||
GError *err = NULL;
|
||||
|
||||
write_max = WRITE_MAX;
|
||||
|
||||
res = li_network_write(stream->io_watcher.fd, raw_out, write_max, &err);
|
||||
|
||||
switch (res) {
|
||||
case LI_NETWORK_STATUS_SUCCESS:
|
||||
break;
|
||||
case LI_NETWORK_STATUS_FATAL_ERROR:
|
||||
ERROR(stream->wrk->srv, "network write fatal error: %s", NULL != err ? err->message : "(unknown)");
|
||||
g_error_free(err);
|
||||
stream_simple_socket_close(stream, TRUE);
|
||||
break;
|
||||
case LI_NETWORK_STATUS_CONNECTION_CLOSE:
|
||||
stream_simple_socket_close(stream, TRUE);
|
||||
break;
|
||||
case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
|
||||
stream->can_write = FALSE;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (0 == raw_out->length && from->is_closed) {
|
||||
int fd = stream->io_watcher.fd;
|
||||
li_ev_io_rem_events(stream->wrk->loop, &stream->io_watcher, EV_WRITE);
|
||||
if (-1 != fd) shutdown(fd, SHUT_WR);
|
||||
stream->out_closed = TRUE;
|
||||
stream->can_write = FALSE;
|
||||
li_stream_disconnect(&stream->stream_out);
|
||||
}
|
||||
}
|
||||
|
||||
void stream_simple_socket_io_cb(liIOStream *stream, liIOStreamEvent event) {
|
||||
stream_simple_socket_io_cb_with_context(stream, event, &stream->data);
|
||||
}
|
||||
|
||||
void stream_simple_socket_io_cb_with_context(liIOStream *stream, liIOStreamEvent event, gpointer *data) {
|
||||
// TODO remove debug
|
||||
ERROR(stream->wrk->srv, "stream_simple_socket_io_cb: %p, %i", (void*)stream, event);
|
||||
switch (event) {
|
||||
case LI_IOSTREAM_READ:
|
||||
stream_simple_socket_read(stream, data);
|
||||
break;
|
||||
case LI_IOSTREAM_WRITE:
|
||||
stream_simple_socket_write(stream);
|
||||
break;
|
||||
case LI_IOSTREAM_DESTROY:
|
||||
if (NULL != *data) {
|
||||
li_buffer_release(*data);
|
||||
*data = NULL;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
|
@ -52,6 +52,9 @@ def build(bld):
|
|||
response.c
|
||||
server.c
|
||||
stat_cache.c
|
||||
stream.c
|
||||
stream_http_response.c
|
||||
stream_simple_socket.c
|
||||
throttle.c
|
||||
url_parser.rl
|
||||
value.c
|
||||
|
|
Loading…
Reference in New Issue