2
0
Fork 0

fix filters (deflate, memcached, cache_disk_etag)

personal/stbuehler/wip
Stefan Bühler 2013-05-24 13:17:22 +02:00
parent 49796611be
commit 5c2cc7143d
5 changed files with 145 additions and 97 deletions

View File

@ -16,6 +16,7 @@ struct liFilter {
/* if the handler wasn't able to handle all "in" data it must call li_stream_again(&f->stream) to trigger a new call to handle_data
* vr, in and out can be NULL if the associated vrequest/stream was destroyed
* in handle_data out is never NULL
*/
liFilterHandlerCB handle_data;
liFilterFreeCB handle_free;

View File

@ -29,7 +29,7 @@ static void filter_handle_data(liFilter *filter) {
goffset curoutlen = filter->stream.out->length;
gboolean curout_closed = filter->stream.out->is_closed;
filter->in = (NULL != filter->stream.source) ? filter->stream.source->out : NULL;
assert(NULL != filter->out);
switch (filter->handle_data(filter->vr, filter)) {
case LI_HANDLER_GO_ON:
@ -44,6 +44,7 @@ static void filter_handle_data(liFilter *filter) {
case LI_HANDLER_WAIT_FOR_EVENT:
break;
case LI_HANDLER_ERROR:
filter->in = NULL;
if (NULL != filter->vr) li_vrequest_error(filter->vr);
li_stream_reset(&filter->stream);
break;
@ -72,6 +73,8 @@ static void filter_stream_cb(liStream *stream, liStreamEvent event) {
}
break;
case LI_STREAM_CONNECTED_SOURCE:
filter->in = (NULL != filter->stream.source) ? filter->stream.source->out : NULL;
/* fall through */
case LI_STREAM_CONNECTED_DEST:
if (NULL != filter->handle_event) {
filter->handle_event(filter->vr, filter, event);
@ -80,6 +83,7 @@ static void filter_stream_cb(liStream *stream, liStreamEvent event) {
}
break;
case LI_STREAM_DISCONNECTED_SOURCE:
filter->in = NULL;
if (NULL != filter->handle_event) {
filter->handle_event(filter->vr, filter, event);
} else {
@ -90,10 +94,12 @@ static void filter_stream_cb(liStream *stream, liStreamEvent event) {
if (NULL != filter->handle_event) {
filter->handle_event(filter->vr, filter, event);
} else {
li_stream_disconnect(stream);
stream->out->is_closed = TRUE;
li_chunkqueue_skip_all(stream->out);
}
break;
case LI_STREAM_DESTROY:
filter->out = NULL;
if (NULL != filter->handle_event) {
filter->handle_event(filter->vr, filter, event);
}

View File

@ -127,22 +127,19 @@ static void cache_etag_file_finish(liVRequest *vr, cache_etag_file *cfile) {
static void cache_etag_filter_free(liVRequest *vr, liFilter *f) {
cache_etag_file *cfile = (cache_etag_file*) f->param;
UNUSED(vr);
f->param = NULL;
cache_etag_file_free(cfile);
}
static liHandlerResult cache_etag_filter_hit(liVRequest *vr, liFilter *f) {
cache_etag_file *cfile = (cache_etag_file*) f->param;
UNUSED(vr);
if (!cfile) return LI_HANDLER_GO_ON;
if (!f->out->is_closed) li_chunkqueue_append_file_fd(f->out, NULL, 0, cfile->hit_length, cfile->hit_fd);
cfile->hit_fd = -1;
cache_etag_file_free(cfile);
f->param = NULL;
f->out->is_closed = TRUE;
if (NULL != f->in) {
li_chunkqueue_skip_all(f->in);
li_stream_disconnect(&f->stream);
return LI_HANDLER_GO_ON;
}
return LI_HANDLER_GO_ON;
}
@ -154,56 +151,67 @@ static liHandlerResult cache_etag_filter_miss(liVRequest *vr, liFilter *f) {
off_t buflen;
liChunkIter citer = li_chunkqueue_iter(f->in);
GError *err = NULL;
UNUSED(vr);
if (0 == f->in->length) return LI_HANDLER_GO_ON;
if (!cfile) { /* somehow we lost the file */
li_chunkqueue_steal_all(f->out, f->in);
if (f->in->is_closed) f->out->is_closed = TRUE;
if (NULL == f->in) {
cache_etag_filter_free(vr, f);
/* didn't handle f->in->is_closed? abort forwarding */
if (!f->out->is_closed) li_stream_reset(&f->stream);
return LI_HANDLER_GO_ON;
}
if (LI_HANDLER_GO_ON != li_chunkiter_read(citer, 0, 64*1024, &buf, &buflen, &err)) {
if (NULL != err) {
VR_ERROR(vr, "Couldn't read data from chunkqueue: %s", err->message);
g_error_free(err);
if (NULL == cfile) goto forward;
if (f->in->length > 0) {
if (LI_HANDLER_GO_ON != li_chunkiter_read(citer, 0, 64*1024, &buf, &buflen, &err)) {
if (NULL != err) {
if (NULL != vr) VR_ERROR(vr, "Couldn't read data from chunkqueue: %s", err->message);
g_error_free(err);
} else {
if (NULL != vr) VR_ERROR(vr, "%s", "Couldn't read data from chunkqueue");
}
cache_etag_filter_free(vr, f);
goto forward;
}
res = write(cfile->fd, buf, buflen);
if (res < 0) {
switch (errno) {
case EINTR:
case EAGAIN:
return LI_HANDLER_COMEBACK;
default:
if (NULL != vr) VR_ERROR(vr, "Couldn't write to temporary cache file '%s': %s",
cfile->tmpfilename->str, g_strerror(errno));
cache_etag_filter_free(vr, f);
goto forward;
}
} else {
VR_ERROR(vr, "%s", "Couldn't read data from chunkqueue");
if (!f->out->is_closed) {
li_chunkqueue_steal_len(f->out, f->in, res);
} else {
li_chunkqueue_skip(f->in, res);
}
}
cache_etag_file_free(cfile);
}
if (0 == f->in->length && f->in->is_closed) {
f->out->is_closed = TRUE;
cache_etag_file_finish(vr, cfile);
f->param = NULL;
li_chunkqueue_steal_all(f->out, f->in);
if (f->in->is_closed) f->out->is_closed = TRUE;
return LI_HANDLER_GO_ON;
}
res = write(cfile->fd, buf, buflen);
if (res < 0) {
switch (errno) {
case EINTR:
case EAGAIN:
break; /* come back later */
default:
VR_ERROR(vr, "Couldn't write to temporary cache file '%s': %s",
cfile->tmpfilename->str, g_strerror(errno));
cache_etag_file_free(cfile);
f->param = NULL;
li_chunkqueue_steal_all(f->out, f->in);
if (f->in->is_closed) f->out->is_closed = TRUE;
return LI_HANDLER_GO_ON;
}
} else {
li_chunkqueue_steal_len(f->out, f->in, res);
if (f->in->length == 0 && f->in->is_closed) {
cache_etag_file_finish(vr, cfile);
f->param = NULL;
f->out->is_closed = TRUE;
return LI_HANDLER_GO_ON;
}
}
return LI_HANDLER_GO_ON;
return f->in->length ? LI_HANDLER_COMEBACK : LI_HANDLER_GO_ON;
forward:
if (f->out->is_closed) {
li_chunkqueue_skip_all(f->in);
li_stream_disconnect(&f->stream);
} else {
li_chunkqueue_steal_all(f->out, f->in);
if (f->in->is_closed) f->out->is_closed = f->in->is_closed;
}
return LI_HANDLER_GO_ON;
}
static GString* createFileName(liVRequest *vr, GString *path, liHttpHeader *etagheader) {
@ -233,7 +241,6 @@ static liHandlerResult cache_etag_handle(liVRequest *vr, gpointer param, gpointe
liHttpHeader *etag;
struct stat st;
GString *tmp_str = vr->wrk->tmp_str;
liFilter *f;
liHandlerResult res;
int err, fd;
@ -265,6 +272,7 @@ static liHandlerResult cache_etag_handle(liVRequest *vr, gpointer param, gpointe
return res;
if (res == LI_HANDLER_GO_ON) {
liFilter *f;
if (!S_ISREG(st.st_mode)) {
VR_ERROR(vr, "Unexpected file type for cache file '%s' (mode %o)", cfile->filename->str, (unsigned int) st.st_mode);
close(fd);
@ -281,8 +289,14 @@ static liHandlerResult cache_etag_handle(liVRequest *vr, gpointer param, gpointe
g_string_truncate(tmp_str, 0);
li_string_append_int(tmp_str, st.st_size);
li_http_header_overwrite(vr->response.headers, CONST_STR_LEN("Content-Length"), GSTR_LEN(tmp_str));
f = li_vrequest_add_filter_out(vr, cache_etag_filter_hit, cache_etag_filter_free, NULL, cfile);
f->in->is_closed = TRUE;
f = li_vrequest_add_filter_out(vr, cache_etag_filter_hit, NULL, NULL, NULL);
if (NULL != f) {
li_chunkqueue_append_file_fd(f->out, NULL, 0, cfile->hit_length, cfile->hit_fd);
f->out->is_closed = TRUE;
cfile->hit_fd = -1;
}
cache_etag_file_free(cfile);
*context = NULL;
return LI_HANDLER_GO_ON;
}

View File

@ -179,12 +179,16 @@ static liHandlerResult deflate_filter_zlib(liVRequest *vr, liFilter *f) {
deflate_context_zlib *ctx = (deflate_context_zlib*) f->param;
const off_t blocksize = ctx->conf.blocksize;
const off_t max_compress = 4 * blocksize;
gboolean debug = _OPTION(vr, ctx->conf.p, 0).boolean;
gboolean debug = (NULL != vr) && _OPTION(vr, ctx->conf.p, 0).boolean;
z_stream *z = &ctx->z;
off_t l = 0;
liHandlerResult res;
int rc;
UNUSED(vr);
if (NULL == f->in) {
f->out->is_closed = TRUE;
return LI_HANDLER_GO_ON;
}
if (f->in->is_closed && 0 == f->in->length && f->out->is_closed) {
/* nothing to do anymore */
@ -193,7 +197,7 @@ static liHandlerResult deflate_filter_zlib(liVRequest *vr, liFilter *f) {
if (f->out->is_closed) {
li_chunkqueue_skip_all(f->in);
f->in->is_closed = TRUE;
li_stream_disconnect(&f->stream);
if (debug) {
VR_DEBUG(vr, "deflate out stream closed: in: %i, out : %i", (int) z->total_in, (int) z->total_out);
}
@ -231,7 +235,7 @@ static liHandlerResult deflate_filter_zlib(liVRequest *vr, liFilter *f) {
if (LI_HANDLER_GO_ON != (res = li_chunkiter_read(ci, 0, blocksize, &data, &len, &err))) {
if (NULL != err) {
VR_ERROR(vr, "Couldn't read data from chunkqueue: %s", err->message);
if (NULL != vr) VR_ERROR(vr, "Couldn't read data from chunkqueue: %s", err->message);
g_error_free(err);
}
return res;
@ -247,7 +251,7 @@ static liHandlerResult deflate_filter_zlib(liVRequest *vr, liFilter *f) {
do {
if (Z_OK != deflate(z, Z_NO_FLUSH)) {
f->out->is_closed = TRUE;
VR_ERROR(vr, "deflate error: %s", z->msg);
if (NULL != vr) VR_ERROR(vr, "deflate error: %s", z->msg);
return LI_HANDLER_ERROR;
}
@ -267,7 +271,7 @@ static liHandlerResult deflate_filter_zlib(liVRequest *vr, liFilter *f) {
rc = deflate(z, Z_FINISH);
if (rc != Z_OK && rc != Z_STREAM_END) {
f->out->is_closed = TRUE;
VR_ERROR(vr, "deflate error: %s", z->msg);
if (NULL != vr) VR_ERROR(vr, "deflate error: %s", z->msg);
return LI_HANDLER_ERROR;
}
@ -306,8 +310,7 @@ static liHandlerResult deflate_filter_zlib(liVRequest *vr, liFilter *f) {
if (l > 0 && 0 == f->in->length && !f->in->is_closed) { /* flush z_stream */
rc = deflate(z, Z_SYNC_FLUSH);
if (rc != Z_OK && rc != Z_STREAM_END) {
f->out->is_closed = TRUE;
VR_ERROR(vr, "deflate error: %s", z->msg);
if (NULL != vr) VR_ERROR(vr, "deflate error: %s", z->msg);
return LI_HANDLER_ERROR;
}
}
@ -393,12 +396,17 @@ static liHandlerResult deflate_filter_bzip2(liVRequest *vr, liFilter *f) {
deflate_context_bzip2 *ctx = (deflate_context_bzip2*) f->param;
const off_t blocksize = ctx->conf.blocksize;
const off_t max_compress = 4 * blocksize;
gboolean debug = _OPTION(vr, ctx->conf.p, 0).boolean;
gboolean debug = (NULL != vr) && _OPTION(vr, ctx->conf.p, 0).boolean;
bz_stream *bz = &ctx->bz;
off_t l = 0;
liHandlerResult res;
int rc;
UNUSED(vr);
if (NULL == f->in) {
/* didn't handle f->in->is_closed? abort forwarding */
if (!f->out->is_closed) li_stream_reset(&f->stream);
return LI_HANDLER_GO_ON;
}
if (f->in->is_closed && 0 == f->in->length && f->out->is_closed) {
/* nothing to do anymore */
@ -407,7 +415,7 @@ static liHandlerResult deflate_filter_bzip2(liVRequest *vr, liFilter *f) {
if (f->out->is_closed) {
li_chunkqueue_skip_all(f->in);
f->in->is_closed = TRUE;
li_stream_disconnect(&f->stream);
if (debug) {
VR_DEBUG(vr, "deflate out stream closed: in: %i, out : %i", (int) bz->total_in_lo32, (int) bz->total_out_lo32);
}
@ -426,7 +434,7 @@ static liHandlerResult deflate_filter_bzip2(liVRequest *vr, liFilter *f) {
if (LI_HANDLER_GO_ON != (res = li_chunkiter_read(ci, 0, blocksize, &data, &len, &err))) {
if (NULL != err) {
VR_ERROR(vr, "Couldn't read data from chunkqueue: %s", err->message);
if (NULL != vr) VR_ERROR(vr, "Couldn't read data from chunkqueue: %s", err->message);
g_error_free(err);
}
return res;
@ -439,7 +447,7 @@ static liHandlerResult deflate_filter_bzip2(liVRequest *vr, liFilter *f) {
rc = BZ2_bzCompress(bz, BZ_RUN);
if (rc != BZ_RUN_OK) {
f->out->is_closed = TRUE;
VR_ERROR(vr, "BZ2_bzCompress error: rc = %i", rc);
if (NULL != vr) VR_ERROR(vr, "BZ2_bzCompress error: rc = %i", rc);
return LI_HANDLER_ERROR;
}
@ -459,7 +467,7 @@ static liHandlerResult deflate_filter_bzip2(liVRequest *vr, liFilter *f) {
rc = BZ2_bzCompress(bz, BZ_FINISH);
if (rc != BZ_RUN_OK && rc != BZ_STREAM_END && rc != BZ_FINISH_OK) {
f->out->is_closed = TRUE;
VR_ERROR(vr, "BZ2_bzCompress error: rc = %i", rc);
if (NULL != vr) VR_ERROR(vr, "BZ2_bzCompress error: rc = %i", rc);
return LI_HANDLER_ERROR;
}
@ -475,7 +483,6 @@ static liHandlerResult deflate_filter_bzip2(liVRequest *vr, liFilter *f) {
VR_DEBUG(vr, "deflate finished: in: %i, out : %i", (int) bz->total_in_lo32, (int) bz->total_out_lo32);
}
f->out->is_closed = TRUE;
}
@ -492,8 +499,10 @@ static liHandlerResult deflate_filter_bzip2(liVRequest *vr, liFilter *f) {
static liHandlerResult deflate_filter_null(liVRequest *vr, liFilter *f) {
UNUSED(vr);
li_chunkqueue_skip_all(f->in);
f->out->is_closed = f->in->is_closed = TRUE;
if (NULL != f->in) {
li_chunkqueue_skip_all(f->in);
li_stream_disconnect(&f->stream);
}
return LI_HANDLER_GO_ON;
}
@ -665,7 +674,7 @@ static liHandlerResult deflate_handle(liVRequest *vr, gpointer param, gpointer *
if (is_head_request) {
/* kill content so response.c doesn't send wrong content-length */
liFilter *f = li_vrequest_add_filter_out(vr, deflate_filter_null, NULL, NULL, NULL);
f->out->is_closed = f->in->is_closed = TRUE;
f->out->is_closed = TRUE;
}
li_http_header_insert(vr->response.headers, CONST_STR_LEN("Content-Encoding"), encoding_names[i], strlen(encoding_names[i]));

View File

@ -54,6 +54,7 @@ LI_API gboolean mod_memcached_free(liModules *mods, liModule *mod);
typedef struct memcached_ctx memcached_ctx;
struct memcached_ctx {
int refcount;
liServer *srv;
liMemcachedCon **worker_client_ctx;
liSocketAddress addr;
@ -100,11 +101,15 @@ static void mc_ctx_acquire(memcached_ctx* ctx) {
g_atomic_int_inc(&ctx->refcount);
}
static void mc_ctx_release(liServer *srv, gpointer param) {
/* not every context has srv ready, extract from context instead */
static void mc_ctx_release(liServer *_srv, gpointer param) {
memcached_ctx *ctx = param;
liServer *srv;
guint i;
UNUSED(_srv);
if (NULL == ctx) return;
srv = ctx->srv;
assert(g_atomic_int_get(&ctx->refcount) > 0);
if (!g_atomic_int_dec_and_test(&ctx->refcount)) return;
@ -129,6 +134,7 @@ static void mc_ctx_release(liServer *srv, gpointer param) {
ctx->mconf_link.data = NULL;
}
ctx->srv = NULL;
g_slice_free(memcached_ctx, ctx);
}
@ -143,6 +149,7 @@ static memcached_ctx* mc_ctx_parse(liServer *srv, liPlugin *p, liValue *config)
}
ctx = g_slice_new0(memcached_ctx);
ctx->srv = srv;
ctx->refcount = 1;
ctx->p = p;
@ -232,7 +239,7 @@ static memcached_ctx* mc_ctx_parse(liServer *srv, liPlugin *p, liValue *config)
return ctx;
option_failed:
mc_ctx_release(srv, ctx);
mc_ctx_release(NULL, ctx);
return NULL;
}
@ -415,8 +422,13 @@ static void memcache_store_filter_free(liVRequest *vr, liFilter *f) {
memcache_filter *mf = (memcache_filter*) f->param;
UNUSED(vr);
mc_ctx_release(vr->wrk->srv, mf->ctx);
if (NULL == f->param) return;
f->param = NULL;
mc_ctx_release(NULL, mf->ctx);
li_buffer_release(mf->buf);
mf->buf = NULL;
g_slice_free(memcache_filter, mf);
}
@ -424,25 +436,24 @@ static void memcache_store_filter_free(liVRequest *vr, liFilter *f) {
static liHandlerResult memcache_store_filter(liVRequest *vr, liFilter *f) {
memcache_filter *mf = (memcache_filter*) f->param;
if (NULL == f->in) {
memcache_store_filter_free(vr, f);
/* didn't handle f->in->is_closed? abort forwarding */
if (!f->out->is_closed) li_stream_reset(&f->stream);
return LI_HANDLER_GO_ON;
}
if (NULL == mf) goto forward;
if (f->in->is_closed && 0 == f->in->length && f->out->is_closed) {
/* nothing to do anymore */
return LI_HANDLER_GO_ON;
}
if (f->out->is_closed) {
li_chunkqueue_skip_all(f->in);
f->in->is_closed = TRUE;
return LI_HANDLER_GO_ON;
}
/* if already in "forward" mode */
if (NULL == mf->buf) goto forward;
/* check if size still fits into buffer */
if ((gssize) (f->in->length + mf->buf->used) > (gssize) mf->ctx->maxsize) {
/* response too big, switch to "forward" mode */
li_buffer_release(mf->buf);
mf->buf = NULL;
memcache_store_filter_free(vr, f);
goto forward;
}
@ -453,8 +464,6 @@ static liHandlerResult memcache_store_filter(liVRequest *vr, liFilter *f) {
liHandlerResult res;
GError *err = NULL;
if (0 == f->in->length) break;
ci = li_chunkqueue_iter(f->in);
if (LI_HANDLER_GO_ON != (res = li_chunkiter_read(ci, 0, 16*1024, &data, &len, &err))) {
@ -467,15 +476,18 @@ static liHandlerResult memcache_store_filter(liVRequest *vr, liFilter *f) {
if ((gssize) (len + mf->buf->used) > (gssize) mf->ctx->maxsize) {
/* response too big, switch to "forward" mode */
li_buffer_release(mf->buf);
mf->buf = NULL;
memcache_store_filter_free(vr, f);
goto forward;
}
memcpy(mf->buf->addr + mf->buf->used, data, len);
mf->buf->used += len;
li_chunkqueue_steal_len(f->out, f->in, len);
if (!f->out->is_closed) {
li_chunkqueue_steal_len(f->out, f->in, len);
} else {
li_chunkqueue_skip(f->in, len);
}
}
if (f->in->is_closed) {
@ -486,26 +498,27 @@ static liHandlerResult memcache_store_filter(liVRequest *vr, liFilter *f) {
liMemcachedRequest *req;
memcached_ctx *ctx = mf->ctx;
assert(0 == f->in->length);
f->out->is_closed = TRUE;
con = mc_ctx_prepare(ctx, vr->wrk);
mc_ctx_build_key(vr->wrk->tmp_str, ctx, vr);
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
if (NULL != vr && CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "memcached.store: storing response for key '%s'", vr->wrk->tmp_str->str);
}
req = li_memcached_set(con, vr->wrk->tmp_str, ctx->flags, ctx->ttl, mf->buf, NULL, NULL, &err);
li_buffer_release(mf->buf);
mf->buf = NULL;
memcache_store_filter_free(vr, f);
if (NULL == req) {
if (NULL != err) {
if (LI_MEMCACHED_DISABLED != err->code) {
if (NULL != vr && LI_MEMCACHED_DISABLED != err->code) {
VR_ERROR(vr, "memcached.store: set failed: %s", err->message);
}
g_clear_error(&err);
} else {
} else if (NULL != vr) {
VR_ERROR(vr, "memcached.store: set failed: %s", "Unkown error");
}
}
@ -514,8 +527,13 @@ static liHandlerResult memcache_store_filter(liVRequest *vr, liFilter *f) {
return LI_HANDLER_GO_ON;
forward:
li_chunkqueue_steal_all(f->out, f->in);
if (f->in->is_closed) f->out->is_closed = f->in->is_closed;
if (f->out->is_closed) {
li_chunkqueue_skip_all(f->in);
li_stream_disconnect(&f->stream);
} else {
li_chunkqueue_steal_all(f->out, f->in);
if (f->in->is_closed) f->out->is_closed = f->in->is_closed;
}
return LI_HANDLER_GO_ON;
}