2
0
Fork 0

[mod_memcached]: Implement memcached.store

personal/stbuehler/wip
Stefan Bühler 2010-07-17 22:41:21 +02:00
parent c0128fea48
commit ca315bee89
2 changed files with 157 additions and 25 deletions

View File

@ -16,6 +16,9 @@
* through the ev_io callback, which is why we get an extra
* reference there, so our refcount doesn't drop to 0 while
* we are working.
*
* TODO: retry connect() once (per second?) if we have a request
* before we drop all requests
*/
GQuark li_memcached_error_quark() {
@ -37,6 +40,7 @@ struct liMemcachedCon {
ev_io con_watcher;
int fd;
ev_tstamp last_con_start;
GQueue req_queue;
int_request *cur_req;
@ -231,6 +235,10 @@ static void memcached_connect(liMemcachedCon *con) {
s = con->con_watcher.fd;
if (-1 == s) {
/* reconnect limit */
if (ev_now(con->loop) < con->last_con_start + 1) return;
con->last_con_start = ev_now(con->loop);
do {
s = socket(con->addr.addr->plain.sa_family, SOCK_STREAM, 0);
} while (-1 == s && errno == EINTR);
@ -688,10 +696,12 @@ static void memcached_io_cb(struct ev_loop *loop, ev_io *w, int revents) {
return;
}
li_memcached_con_acquire(con); /* make sure con isn't freed in the middle of something */
if (-1 == con->fd) {
memcached_connect(con);
return;
}
if (-1 == con->fd) memcached_connect(con);
if (-1 == con->fd) goto out;
li_memcached_con_acquire(con); /* make sure con isn't freed in the middle of something */
if (revents | EV_WRITE) {
int i;

View File

@ -50,8 +50,10 @@
LI_API gboolean mod_memcached_init(liModules *mods, liModule *mod);
LI_API gboolean mod_memcached_free(liModules *mods, liModule *mod);
typedef struct memcache_ctx memcache_ctx;
struct memcache_ctx {
typedef struct memcached_ctx memcached_ctx;
struct memcached_ctx {
int refcount; /* TODO */
liMemcachedCon **worker_client_ctx;
liSocketAddress addr;
liPattern *pattern;
@ -77,6 +79,11 @@ typedef struct {
liVRequest *vr;
} memcache_request;
typedef struct {
memcached_ctx *ctx;
liBuffer *buf;
} memcache_filter;
/* memcache option names */
static const GString
mon_server = { CONST_STR_LEN("server"), 0 },
@ -88,7 +95,7 @@ static const GString
;
static void mc_ctx_free(liServer *srv, gpointer param) {
memcache_ctx *ctx = param;
memcached_ctx *ctx = param;
guint i;
if (ctx->worker_client_ctx) {
@ -111,11 +118,11 @@ static void mc_ctx_free(liServer *srv, gpointer param) {
ctx->mconf_link.data = NULL;
}
g_slice_free(memcache_ctx, ctx);
g_slice_free(memcached_ctx, ctx);
}
static memcache_ctx* mc_ctx_parse(liServer *srv, liPlugin *p, liValue *config) {
memcache_ctx *ctx;
static memcached_ctx* mc_ctx_parse(liServer *srv, liPlugin *p, liValue *config) {
memcached_ctx *ctx;
memcached_config *mconf = p->data;
GString def_server = li_const_gstring(CONST_STR_LEN("127.0.0.1:11211"));
@ -124,7 +131,7 @@ static memcache_ctx* mc_ctx_parse(liServer *srv, liPlugin *p, liValue *config) {
return NULL;
}
ctx = g_slice_new0(memcache_ctx);
ctx = g_slice_new0(memcached_ctx);
ctx->p = p;
ctx->addr = li_sockaddr_from_string(&def_server, 11211);
@ -217,7 +224,7 @@ option_failed:
return NULL;
}
static GString* mc_ctx_build_key(memcache_ctx *ctx, liVRequest *vr) {
static GString* mc_ctx_build_key(memcached_ctx *ctx, liVRequest *vr) {
GMatchInfo *match_info = NULL;
GString *key = g_string_sized_new(255);
@ -235,7 +242,7 @@ static GString* mc_ctx_build_key(memcache_ctx *ctx, liVRequest *vr) {
return key;
}
static liMemcachedCon* mc_ctx_prepare(memcache_ctx *ctx, liWorker *wrk) {
static liMemcachedCon* mc_ctx_prepare(memcached_ctx *ctx, liWorker *wrk) {
liMemcachedCon *con = ctx->worker_client_ctx[wrk->ndx];
if (!con) {
@ -291,7 +298,7 @@ static void memcache_callback(liMemcachedRequest *request, liMemcachedResult res
}
static liHandlerResult mc_handle_lookup(liVRequest *vr, gpointer param, gpointer *context) {
memcache_ctx *ctx = param;
memcached_ctx *ctx = param;
memcache_request *req = *context;
if (req) {
@ -379,11 +386,7 @@ static liHandlerResult mc_handle_lookup(liVRequest *vr, gpointer param, gpointer
}
}
static liHandlerResult mc_handle_store(liVRequest *vr, gpointer param, gpointer *context) {
return LI_HANDLER_GO_ON;
}
static liHandlerResult mc_handle_free(liVRequest *vr, gpointer param, gpointer context) {
static liHandlerResult mc_lookup_handle_free(liVRequest *vr, gpointer param, gpointer context) {
memcache_request *req = context;
UNUSED(vr);
UNUSED(param);
@ -398,8 +401,129 @@ static liHandlerResult mc_handle_free(liVRequest *vr, gpointer param, gpointer c
return LI_HANDLER_GO_ON;
}
static void memcache_store_filter_free(liVRequest *vr, liFilter *f) {
memcache_filter *mf = (memcache_filter*) f->param;
UNUSED(vr);
/* release mf->ctx */
li_buffer_release(mf->buf);
g_slice_free(memcache_filter, mf);
}
static liHandlerResult memcache_store_filter(liVRequest *vr, liFilter *f) {
memcache_filter *mf = (memcache_filter*) f->param;
if (f->in->is_closed && 0 == f->in->length && f->out->is_closed) {
/* nothing to do anymore */
return LI_HANDLER_GO_ON;
}
if (f->out->is_closed) {
li_chunkqueue_skip_all(f->in);
f->in->is_closed = TRUE;
return LI_HANDLER_GO_ON;
}
/* if already in "forward" mode */
if (NULL == mf->buf) goto forward;
/* check if size still fits into buffer */
if ((gssize) (f->in->length + mf->buf->used) > (gssize) mf->ctx->maxsize) {
/* response too big, switch to "forward" mode */
li_buffer_release(mf->buf);
mf->buf = NULL;
goto forward;
}
while (0 < f->in->length) {
char *data;
off_t len;
liChunkIter ci;
liHandlerResult res;
if (0 == f->in->length) break;
ci = li_chunkqueue_iter(f->in);
if (LI_HANDLER_GO_ON != (res = li_chunkiter_read(vr, ci, 0, 16*1024, &data, &len)))
return res;
if ((gssize) (len + mf->buf->used) > (gssize) mf->ctx->maxsize) {
/* response too big, switch to "forward" mode */
li_buffer_release(mf->buf);
mf->buf = NULL;
goto forward;
}
memcpy(mf->buf->addr + mf->buf->used, data, len);
mf->buf->used += len;
li_chunkqueue_steal_len(f->out, f->in, len);
}
if (f->in->is_closed) {
/* finally: store response in memcached */
liMemcachedCon *con;
GString *key;
GError *err = NULL;
liMemcachedRequest *req;
memcached_ctx *ctx = mf->ctx;
f->out->is_closed = TRUE;
con = mc_ctx_prepare(ctx, vr->wrk);
key = mc_ctx_build_key(ctx, vr);
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "memcached.lookup: storing response for key '%s'", key->str);
}
req = li_memcached_set(con, key, ctx->flags, ctx->ttl, mf->buf, NULL, NULL, &err);
g_string_free(key, TRUE);
li_buffer_release(mf->buf);
mf->buf = NULL;
if (NULL == req) {
if (NULL != err) {
VR_ERROR(vr, "memcached.store: set failed: %s", err->message);
g_clear_error(&err);
} else {
VR_ERROR(vr, "memcached.store: set failed: %s", "Unkown error");
}
}
}
return LI_HANDLER_GO_ON;
forward:
li_chunkqueue_steal_all(f->out, f->in);
if (f->in->is_closed) f->out->is_closed = f->in->is_closed;
return LI_HANDLER_GO_ON;
}
static liHandlerResult mc_handle_store(liVRequest *vr, gpointer param, gpointer *context) {
memcached_ctx *ctx = param;
memcache_filter *mf;
UNUSED(context);
VREQUEST_WAIT_FOR_RESPONSE_HEADERS(vr);
if (vr->response.http_status != 200) return LI_HANDLER_GO_ON;
mf = g_slice_new0(memcache_filter);
mf->ctx = ctx;
/* TODO acquire ctx */
mf->buf = li_buffer_new(ctx->maxsize);
li_vrequest_add_filter_out(vr, memcache_store_filter, memcache_store_filter_free, mf);
return LI_HANDLER_GO_ON;
}
static liAction* mc_lookup_create(liServer *srv, liWorker *wrk, liPlugin* p, liValue *val, gpointer userdata) {
memcache_ctx *ctx;
memcached_ctx *ctx;
liValue *config = val, *act_found = NULL, *act_miss = NULL;
UNUSED(wrk);
UNUSED(userdata);
@ -440,11 +564,11 @@ static liAction* mc_lookup_create(liServer *srv, liWorker *wrk, liPlugin* p, liV
if (act_found) ctx->act_found = li_value_extract_action(act_found);
if (act_miss) ctx->act_miss = li_value_extract_action(act_miss);
return li_action_new_function(mc_handle_lookup, mc_handle_free, mc_ctx_free, ctx);
return li_action_new_function(mc_handle_lookup, mc_lookup_handle_free, mc_ctx_free, ctx);
}
static liAction* mc_store_create(liServer *srv, liWorker *wrk, liPlugin* p, liValue *val, gpointer userdata) {
memcache_ctx *ctx;
memcached_ctx *ctx;
UNUSED(wrk);
UNUSED(userdata);
@ -452,9 +576,7 @@ static liAction* mc_store_create(liServer *srv, liWorker *wrk, liPlugin* p, liVa
if (!ctx) return NULL;
WARNING(srv, "%s", "memcached.store not supported yet");
return li_action_new_function(mc_handle_store, mc_handle_free, mc_ctx_free, ctx);
return li_action_new_function(mc_handle_store, NULL, mc_ctx_free, ctx);
}
static const liPluginOption options[] = {
@ -970,7 +1092,7 @@ static void mod_memcached_lua_init(lua_State *L, liServer *srv, liWorker *wrk, l
static void memcached_prepare(liServer *srv, liPlugin *p) {
memcached_config *mconf = p->data;
GList *conf_link;
memcache_ctx *ctx;
memcached_ctx *ctx;
while (NULL != (conf_link = g_queue_pop_head_link(&mconf->prepare_ctx))) {
ctx = conf_link->data;