diff --git a/include/lighttpd/condition.h b/include/lighttpd/condition.h index 6f9f2bb..953f027 100644 --- a/include/lighttpd/condition.h +++ b/include/lighttpd/condition.h @@ -149,7 +149,9 @@ typedef struct { } data; } liConditionValue; +/* uses wrk->tmp_str for temporary (and returned) strings */ LI_API liHandlerResult li_condition_get_value(liVRequest *vr, liConditionLValue *lvalue, liConditionValue *res, liConditionValueType prefer); +/* uses wrk->tmp_str for temporary (and returned) strings, no conflict with the tmp string from li_condition_get_value */ LI_API gchar const* li_condition_value_to_string(liVRequest *vr, liConditionValue *value); #endif diff --git a/include/lighttpd/memcached.h b/include/lighttpd/memcached.h new file mode 100644 index 0000000..37962d0 --- /dev/null +++ b/include/lighttpd/memcached.h @@ -0,0 +1,59 @@ +#ifndef _LIGHTTPD_MEMCACHED_H_ +#define _LIGHTTPD_MEMCACHED_H_ + +#include + +#include + +typedef struct liMemcachedCon liMemcachedCon; +typedef struct liMemcachedItem liMemcachedItem; +typedef struct liMemcachedRequest liMemcachedRequest; +typedef enum { + LI_MEMCACHED_OK, /* STORED, VALUE, DELETED */ + LI_MEMCACHED_NOT_STORED, + LI_MEMCACHED_EXISTS, + LI_MEMCACHED_NOT_FOUND, + LI_MEMCACHED_RESULT_ERROR /* some error occured */ +} liMemcachedResult; + +typedef void (*liMemcachedCB)(liMemcachedRequest *request, liMemcachedResult result, liMemcachedItem *item, GError **err); + +struct liMemcachedItem { + GString *key; + guint32 flags; + ev_tstamp ttl; + guint64 cas; + liBuffer *data; +}; + +struct liMemcachedRequest { + liMemcachedCB callback; + gpointer cb_data; +}; + +/* error handling */ +#define LI_MEMCACHED_ERROR li_memcached_error_quark() +LI_API GQuark li_memcached_error_quark(); + +typedef enum { + LI_MEMCACHED_CONNECTION, + LI_MEMCACHED_BAD_KEY, + LI_MEMCACHED_UNKNOWN = 0xff +} liMemcachedError; + +LI_API liMemcachedCon* li_memcached_con_new(struct ev_loop *loop, liSocketAddress addr); +LI_API void li_memcached_con_acquire(liMemcachedCon* con); +LI_API void li_memcached_con_release(liMemcachedCon* con); /* thread-safe */ + +/* these functions are not thread-safe, i.e. must be called in the same context as "loop" from li_memcached_con_new */ +LI_API liMemcachedRequest* li_memcached_get(liMemcachedCon *con, GString *key, liMemcachedCB callback, gpointer cb_data, GError **err); +LI_API liMemcachedRequest* li_memcached_set(liMemcachedCon *con, GString *key, guint32 flags, ev_tstamp ttl, liBuffer *data, liMemcachedCB callback, gpointer cb_data, GError **err); + +/* if length(key) <= 250 and all chars x: 0x20 < x < 0x7f the key + * remains untouched; otherwise it gets replaced with its sha1hex hash + * so in most cases the key stays readable, and we have a good fallback + */ +LI_API void li_memcached_mutate_key(GString *key); +LI_API gboolean li_memcached_is_key_valid(GString *key); + +#endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index cd8ab1d..e96b890 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -172,6 +172,7 @@ SET(COMMON_SRC encoding.c idlist.c ip_parsers.c + memcached.c mempool.c module.c radix.c @@ -329,6 +330,7 @@ ADD_AND_INSTALL_LIBRARY(mod_expire "modules/mod_expire.c") ADD_AND_INSTALL_LIBRARY(mod_fastcgi "modules/mod_fastcgi.c") ADD_AND_INSTALL_LIBRARY(mod_flv "modules/mod_flv.c") ADD_AND_INSTALL_LIBRARY(mod_fortune "modules/mod_fortune.c") +ADD_AND_INSTALL_LIBRARY(mod_memcached "modules/mod_memcached.c") ADD_AND_INSTALL_LIBRARY(mod_progress "modules/mod_progress.c") ADD_AND_INSTALL_LIBRARY(mod_proxy "modules/mod_proxy.c") ADD_AND_INSTALL_LIBRARY(mod_redirect "modules/mod_redirect.c") diff --git a/src/common/memcached.c b/src/common/memcached.c new file mode 100644 index 0000000..acb1535 --- /dev/null +++ b/src/common/memcached.c @@ -0,0 +1,914 @@ + +#include + +#include + +/* IMPORTANT + * In order to keep _release thread-safe the ev_io watcher keeps a + * reference too while active; when the last reference is dropped + * we don't have to stop the watcher, and everything else is + * "thread-safe" if no one is using it anymore (refcount == 0) + * (see memcached_{start,stop}_io) + * This means we have to stop the watcher after all requests are done. + * + * Most "public" functions have to be called while they hold + * a reference somehow; the other way this code gets executed is + * through the ev_io callback, which is why we get an extra + * reference there, so our refcount doesn't drop to 0 while + * we are working. + */ + +GQuark li_memcached_error_quark() { + return g_quark_from_static_string("memcached-error-quark"); +} + +#define BUFFER_CHUNK_SIZE 4*1024 + +typedef struct int_request int_request; +typedef enum { + REQ_GET, REQ_SET +} req_type; + +struct liMemcachedCon { + struct ev_loop *loop; + liSocketAddress addr; + + int refcount; + + ev_io con_watcher; + int fd; + + GQueue req_queue; + int_request *cur_req; + + GQueue out; + liBuffer *buf; + + GString *tmpstr; + + GError *err; + + /* read buffers */ + liBuffer *line, *data, *remaining; + liMemcachedItem curitem; + + /* GET */ + gsize get_data_size; + gboolean get_have_header; +}; + +struct int_request { + liMemcachedRequest req; + req_type type; + + GString *key; + guint32 flags; + ev_tstamp ttl; + liBuffer *data; + + GList iter; +}; + +typedef struct { + gsize pos, len; + liBuffer *buf; +} send_item; + +static void send_queue_push_buffer(GQueue *queue, liBuffer *buf, gsize start, gsize len) { + send_item *i; + if (!buf || !len) return; + g_assert(start+len <= buf->used); + + li_buffer_acquire(buf); + i = g_slice_new0(send_item); + i->buf = buf; + i->pos = start; + i->len = len; + + g_queue_push_tail(queue, i); +} + +static void send_queue_push_gstring(GQueue *queue, GString *str, liBuffer **pbuf) { + liBuffer *buf = *pbuf; + gsize pos; + + if (NULL != buf && (1 == buf->refcount)) { + buf->used = 0; + } + if (NULL == buf || (buf->alloc_size - buf->used < str->len)) { + li_buffer_release(buf); + buf = li_buffer_new_slice(BUFFER_CHUNK_SIZE > str->len ? BUFFER_CHUNK_SIZE : str->len); + *pbuf = buf; + } + + pos = buf->used; + memcpy(buf->addr + pos, str->str, str->len); + buf->used += str->len; + send_queue_push_buffer(queue, buf, pos, str->len); +} + +static void send_queue_item_free(send_item *i) { + if (!i) return; + li_buffer_release(i->buf); + g_slice_free(send_item, i); +} + +static void send_queue_clean(GQueue *queue) { + send_item *i; + while (NULL != (i = g_queue_peek_head(queue))) { + if (i->len != 0) return; + li_buffer_release(i->buf); + g_slice_free(send_item, i); + } +} + +static void send_queue_reset(GQueue *queue) { + send_item *i; + while (NULL != (i = g_queue_peek_head(queue))) { + li_buffer_release(i->buf); + g_slice_free(send_item, i); + } +} + +static void memcached_start_io(liMemcachedCon *con) { + if (!((ev_watcher*) &con->con_watcher)->active) { + li_memcached_con_acquire(con); + li_ev_safe_unref_and_start(ev_io_start, con->loop, &con->con_watcher); + } +} + +static void memcached_stop_io(liMemcachedCon *con) { + if (((ev_watcher*) &con->con_watcher)->active) { + li_ev_safe_ref_and_stop(ev_io_stop, con->loop, &con->con_watcher); + li_memcached_con_release(con); + } +} + +static void send_request(liMemcachedCon *con, int_request *req) { + switch (req->type) { + case REQ_GET: + g_string_printf(con->tmpstr, "get %s\r\n", req->key->str); + send_queue_push_gstring(&con->out, con->tmpstr, &con->buf); + break; + case REQ_SET: + /* set \r\n */ + + g_string_printf(con->tmpstr, "set %s %"G_GUINT32_FORMAT" %"G_GUINT64_FORMAT" %"G_GSIZE_FORMAT"\r\n", req->key->str, req->flags, (guint64) req->ttl, req->data->used); + send_queue_push_gstring(&con->out, con->tmpstr, &con->buf); + send_queue_push_buffer(&con->out, req->data, 0, req->data->used); + g_string_assign(con->tmpstr, "\r\n"); + send_queue_push_gstring(&con->out, con->tmpstr, &con->buf); + break; + } +} + +static gboolean push_request(liMemcachedCon *con, int_request *req, GError **err) { + UNUSED(err); + + li_memcached_con_acquire(con); + + send_request(con, req); + + req->iter.data = req; + g_queue_push_tail_link(&con->req_queue, &req->iter); + + memcached_start_io(con); + li_ev_io_set_events(con->loop, &con->con_watcher, EV_READ | EV_WRITE); + + return TRUE; +} + +static void free_request(liMemcachedCon *con, int_request *req) { + if (!req) return; + + li_memcached_con_release(con); + + if (NULL != req->iter.data) { + req->iter.data = NULL; + g_queue_unlink(&con->req_queue, &req->iter); + } + + switch (req->type) { + case REQ_GET: + break; + case REQ_SET: + li_buffer_release(req->data); + req->data = NULL; + break; + } + + g_string_free(req->key, TRUE); + req->key = NULL; +} + +static void cancel_all_requests(liMemcachedCon *con) { + int_request *req; + GError *err1 = NULL, *err = NULL; + if (con->err) { + err1 = g_error_copy(con->err); + } else { + g_set_error(&err1, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Connection reset"); + } + + while (NULL != (req = g_queue_peek_head(&con->req_queue))) { + if (NULL == err) { + err = g_error_copy(err1); + } + + if (req->req.callback) req->req.callback(&req->req, LI_MEMCACHED_ERROR, NULL, &err); + } + + if (NULL != err) g_clear_error(&err); + if (NULL != err1) g_clear_error(&err1); +} + +static void memcached_connect(liMemcachedCon *con) { + int s; + struct sockaddr addr; + socklen_t len; + + if (-1 != con->fd) return; + + s = con->con_watcher.fd; + if (-1 == s) { + do { + s = socket(con->addr.addr->plain.sa_family, SOCK_STREAM, 0); + } while (-1 == s && errno == EINTR); + if (-1 == s) { + g_clear_error(&con->err); + g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Couldn't open socket: %s", g_strerror(errno)); + return; + } + li_fd_init(s); + ev_io_set(&con->con_watcher, s, 0); + + if (-1 == connect(s, &con->addr.addr->plain, con->addr.len)) { + switch (errno) { + case EINPROGRESS: + case EALREADY: + case EINTR: + memcached_start_io(con); + li_ev_io_add_events(con->loop, &con->con_watcher, EV_READ | EV_WRITE); + break; + default: + g_clear_error(&con->err); + g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Couldn't connect to '%s': %s", + li_sockaddr_to_string(con->addr, con->tmpstr, TRUE)->str, + g_strerror(errno)); + close(s); + ev_io_set(&con->con_watcher, -1, 0); + break; + } + } else { + /* connect succeeded */ + con->fd = s; + g_clear_error(&con->err); + if (0 == con->out.length) { + memcached_start_io(con); + li_ev_io_set_events(con->loop, &con->con_watcher, EV_READ); + } else { + memcached_start_io(con); + li_ev_io_set_events(con->loop, &con->con_watcher, EV_READ | EV_WRITE); + } + } + + return; + } + + /* create new connection: + * see http://www.cyberconf.org/~cynbe/ref/nonblocking-connects.html + */ + + /* Check to see if we can determine our peer's address. */ + len = sizeof(addr); + if (getpeername(s, &addr, &len) == -1) { + /* connect failed; find out why */ + int err; + len = sizeof(err); +#ifdef SO_ERROR + getsockopt(s, SOL_SOCKET, SO_ERROR, (void*)&err, &len); +#else + { + char ch; + errno = 0; + read(s, &ch, 1); + err = errno; + } +#endif + g_clear_error(&con->err); + g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Couldn't connect socket to '%s': %s", + li_sockaddr_to_string(con->addr, con->tmpstr, TRUE)->str, + g_strerror(err)); + + close(s); + memcached_stop_io(con); + ev_io_set(&con->con_watcher, -1, 0); + } else { + /* connect succeeded */ + con->fd = s; + g_clear_error(&con->err); + if (0 == con->out.length) { + memcached_start_io(con); + li_ev_io_set_events(con->loop, &con->con_watcher, EV_READ); + } else { + memcached_start_io(con); + li_ev_io_set_events(con->loop, &con->con_watcher, EV_READ | EV_WRITE); + } + } +} + +static void reset_item(liMemcachedItem *item) { + if (item->key) { + g_string_free(item->key, TRUE); + item->key = NULL; + } + item->flags = 0; + item->ttl = 0; + item->cas = 0; + if (item->data) { + li_buffer_release(item->data); + item->data = NULL; + } +} + +static void close_con(liMemcachedCon *con) { + if (con->line) con->line->used = 0; + if (con->remaining) con->remaining->used = 0; + if (con->data) con->data->used = 0; + if (con->buf) con->buf->used = 0; + reset_item(&con->curitem); + send_queue_reset(&con->out); + + memcached_stop_io(con); + close(con->con_watcher.fd); + con->fd = -1; + ev_io_set(&con->con_watcher, -1, 0); + con->cur_req = NULL; + cancel_all_requests(con); + memcached_connect(con); +} + +static void add_remaining(liMemcachedCon *con, gchar *addr, gsize len) { + liBuffer *rem = con->remaining; + if (!rem) rem = con->remaining = li_buffer_new_slice(MAX(BUFFER_CHUNK_SIZE, len)); + if (rem->used + len > rem->alloc_size) { + rem = li_buffer_new_slice(MAX(BUFFER_CHUNK_SIZE, rem->used + len)); + memcpy(rem->addr, con->remaining->addr, (rem->used = con->remaining->used)); + li_buffer_release(con->remaining); + con->remaining = rem; + } + + memcpy(rem->addr + rem->used, addr, len); + rem->used += len; +} + +/** repeats read after EINTR */ +static ssize_t net_read(int fd, void *buf, ssize_t nbyte) { + ssize_t r; + while (-1 == (r = read(fd, buf, nbyte))) { + switch (errno) { + case EINTR: + /* Try again */ + break; + default: + /* report error */ + return r; + } + } + /* return bytes read */ + return r; +} + +static gboolean try_read_line(liMemcachedCon *con) { + liBuffer *line; + ssize_t r; + + if (!con->line) con->line = li_buffer_new_slice(BUFFER_CHUNK_SIZE); + if (!con->remaining) con->remaining = li_buffer_new_slice(BUFFER_CHUNK_SIZE); + + /* if we have remaining data use it for a new line */ + if (con->line->used == 0 && con->remaining->used > 0) { + liBuffer *tmp = con->remaining; con->remaining = con->line; con->line = tmp; + } + + g_assert(NULL == con->remaining || 0 == con->remaining->used); /* there shouldn't be any data in remaining while we fill con->line */ + + line = con->line; + + if (line->used > 0) { + /* search for \r\n */ + gchar *addr = line->addr; + gsize i, len = line->used; + for (i = 0; i < len; i++) { + if (addr[i] == '\r') { + i++; + if (i < len && addr[i] == '\n') { + add_remaining(con, addr + i+1, len - (i+1)); + line->used = i-1; + line->addr[i-1] = '\0'; + return TRUE; + } + } + } + } + + if (line->used > 1024) { + /* Protocol error: we don't parse line longer than 1024 */ + g_clear_error(&con->err); + g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Protocol error: line too long"); + close_con(con); + return FALSE; + } + + /* need more data */ + r = net_read(con->fd, line->addr + line->used, line->alloc_size - line->used); + if (r == 0) { + /* EOF */ + g_clear_error(&con->err); + g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Connection closed by peer"); + close_con(con); + return FALSE; + } else if (r < 0) { + switch (errno) { + case EAGAIN: +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + break; + default: + g_clear_error(&con->err); + g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Connection closed: %s", g_strerror(errno)); + close_con(con); + break; + } + return FALSE; + } + + line->used += r; + + if (line->used > 0) { + /* search for \r\n */ + gchar *addr = line->addr; + gsize i, len = line->used; + for (i = 0; i < len; i++) { + if (addr[i] == '\r') { + i++; + if (i < len && addr[i] == '\n') { + add_remaining(con, addr + i+1, len - (i+1)); + line->used = i-1; + line->addr[i-1] = '\0'; + return TRUE; + } + } + } + } + + return FALSE; +} + +static gboolean try_read_data(liMemcachedCon *con, gsize datalen) { + liBuffer *data; + ssize_t r; + + datalen += 2; /* \r\n */ + + /* if we have remaining data use it for a new line */ + if ((!con->data || con->data->used == 0) && con->remaining && con->remaining->used > 0) { + liBuffer *tmp = con->remaining; con->remaining = con->data; con->data = tmp; + } + + if (!con->data) con->data = li_buffer_new_slice(MAX(BUFFER_CHUNK_SIZE, datalen)); + + if (con->data->alloc_size < datalen) { + data = li_buffer_new_slice(MAX(BUFFER_CHUNK_SIZE, datalen)); + memcpy(data->addr, con->data->addr, (data->used = con->data->used)); + li_buffer_release(con->data); + con->data = data; + } + + g_assert(NULL == con->remaining || 0 == con->remaining->used); /* there shouldn't be any data in remaining while we fill con->data */ + + data = con->data; + + if (data->used < datalen) { + /* read more data */ + r = net_read(con->fd, data->addr + data->used, data->alloc_size - data->used); + if (r == 0) { + /* EOF */ + g_clear_error(&con->err); + g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Connection closed by peer"); + close_con(con); + return FALSE; + } else if (r < 0) { + switch (errno) { + case EAGAIN: +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + break; + default: + g_clear_error(&con->err); + g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Connection closed: %s", g_strerror(errno)); + close_con(con); + break; + } + return FALSE; + } + + data->used += r; + } + + if (data->used >= datalen) { + if (data->addr[datalen-2] != '\r' || data->addr[datalen-1] != '\n') { + /* Protocol error: data block not terminated with \r\n */ + g_clear_error(&con->err); + g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Protocol error: data block not terminated with \\r\\n"); + close_con(con); + return FALSE; + } + add_remaining(con, data->addr + datalen, data->used - datalen); + data->used = datalen - 2; + data->addr[datalen-2] = '\0'; + return TRUE; + } + + return FALSE; +} + + +static void handle_read(liMemcachedCon *con) { + int_request *cur; + + if (NULL == (cur = con->cur_req)) { + cur = con->cur_req = g_queue_peek_head(&con->req_queue); + + if (NULL == cur) { + /* unexpected read event, perhaps just eof */ + g_clear_error(&con->err); + g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Connection closed: unexpected read event"); + close_con(con); + return; + } + + reset_item(&con->curitem); + if (con->data) con->data->used = 0; + if (con->line) con->line->used = 0; + + /* init read state */ + switch (cur->type) { + case REQ_GET: + con->get_data_size = 0; + con->get_have_header = FALSE; + break; + case REQ_SET: + break; + } + } + + switch (cur->type) { + case REQ_GET: + if (!con->get_have_header) { + char *pos, *next; + + /* wait for header line */ + if (!try_read_line(con)) return; + + if (3 == con->line->used && 0 == memcmp("END", con->line->addr, 3)) { + /* key not found */ + if (cur->req.callback) { + cur->req.callback(&cur->req, LI_MEMCACHED_NOT_FOUND, NULL, NULL); + } + con->cur_req = NULL; + free_request(con, cur); + return; + } + + /* con->line is 0 terminated */ + + if (0 != strncmp("VALUE ", con->line->addr, 6)) { + g_clear_error(&con->err); + g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Protocol error: Unexpected response for GET: '%s'", con->line->addr); + close_con(con); + return; + } + + /* VALUE []\r\n */ + + /* */ + pos = con->line->addr + 6; + next = strchr(pos, ' '); + if (NULL == next) goto req_get_header_error; + + con->curitem.key = g_string_new_len(pos, next - pos); + + /* */ + pos = next + 1; + con->curitem.flags = strtoul(pos, &next, 10); + if (' ' != *next || pos == next) goto req_get_header_error; + + /* */ + pos = next + 1; + con->get_data_size = g_ascii_strtoll(pos, &next, 10); + if (pos == next) goto req_get_header_error; + + /* [] */ + if (' ' == *next) { + pos = next + 1; + con->curitem.cas = g_ascii_strtoll(pos, &next, 10); + if (pos == next) goto req_get_header_error; + } + + if ('\0' != *next) { + goto req_get_header_error; + } + + con->line->used = 0; + + goto req_get_header_done; + +req_get_header_error: + g_clear_error(&con->err); + g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Protocol error: Couldn't parse VALUE respone: '%s'", con->line->addr); + close_con(con); + return; + +req_get_header_done: ; + } + if (NULL == con->data || con->data->used < con->get_data_size) { + /* wait for data */ + if (!try_read_data(con, con->get_data_size)) return; + } + /* wait for END\r\n */ + if (!try_read_line(con)) return; + + if (3 == con->line->used && 0 == memcmp("END", con->line->addr, 3)) { + /* Move data to item */ + con->curitem.data = con->data; + con->data = NULL; + if (cur->req.callback) { + cur->req.callback(&cur->req, LI_MEMCACHED_OK, &con->curitem, NULL); + } + reset_item(&con->curitem); + } else { + g_clear_error(&con->err); + g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Protocol error: GET response not terminated with END (got '%s')", con->line->addr); + close_con(con); + return; + } + + con->cur_req = NULL; + free_request(con, cur); + return; + + case REQ_SET: + if (!try_read_line(con)) return; + + if (6 == con->line->used && 0 == memcmp("STORED", con->line->addr, 6)) { + if (cur->req.callback) { + cur->req.callback(&cur->req, LI_MEMCACHED_OK, NULL, NULL); + } + } else { + g_clear_error(&con->err); + g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Protocol error: unepxected SET response: '%s'", con->line->addr); + close_con(con); + return; + } + + con->cur_req = NULL; + free_request(con, cur); + return; + } +} + +static void memcached_io_cb(struct ev_loop *loop, ev_io *w, int revents) { + liMemcachedCon *con = (liMemcachedCon*) w->data; + + if (1 == g_atomic_int_get(&con->refcount) && w->active) { + memcached_stop_io(con); + return; + } + + li_memcached_con_acquire(con); /* make sure con isn't freed in the middle of something */ + + if (-1 == con->fd) memcached_connect(con); + if (-1 == con->fd) goto out; + + if (revents | EV_WRITE) { + int i; + ssize_t written, len; + gchar *data; + gboolean out_queue_empty; + send_item *si; + + si = g_queue_peek_head(&con->out); + + for (i = 0; si && (i < 10); i++) { /* don't send more than 10 chunks */ + data = si->buf->addr + si->pos; + len = si->len; + written = write(w->fd, data, len); + if (written < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + goto write_eagain; + default: /* Fatal error, connection has to be closed */ + g_clear_error(&con->err); + g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Couldn't write socket '%s': %s", + li_sockaddr_to_string(con->addr, con->tmpstr, TRUE)->str, + g_strerror(errno)); + close_con(con); + goto out; + } + } else { + si->pos += written; + si->len -= written; + if (0 == si->len) { + send_queue_item_free(si); + g_queue_pop_head(&con->out); + si = g_queue_peek_head(&con->out); + } + } + } + +write_eagain: + send_queue_clean(&con->out); + out_queue_empty = (0 == con->out.length); + + if (out_queue_empty) { + li_ev_io_rem_events(loop, w, EV_WRITE); + if (0 == (w->events & (EV_READ | EV_WRITE))) { + memcached_stop_io(con); + } + } + } + + if (revents | EV_READ) { + do { + handle_read(con); + } while (con->remaining && con->remaining->used > 0); + } + +out: + li_memcached_con_release(con); +} + + +liMemcachedCon* li_memcached_con_new(struct ev_loop *loop, liSocketAddress addr) { + liMemcachedCon* con = g_slice_new0(liMemcachedCon); + + con->refcount = 1; + con->loop = loop; + con->addr = li_sockaddr_dup(addr); + con->tmpstr = g_string_sized_new(511); + + con->fd = -1; + ev_io_init(&con->con_watcher, memcached_io_cb, -1, 0); + con->con_watcher.data = con; + + memcached_connect(con); + + return con; +} + +static void li_memcached_con_free(liMemcachedCon* con) { + if (!con) return; + + if (-1 != con->con_watcher.fd) { + close(con->con_watcher.fd); + /* as io has a reference on con, we don't need to stop it here */ + ev_io_set(&con->con_watcher, -1, 0); + con->fd = -1; + } + + send_queue_reset(&con->out); + cancel_all_requests(con); + + li_buffer_release(con->buf); + li_buffer_release(con->line); + li_buffer_release(con->remaining); + li_buffer_release(con->data); + reset_item(&con->curitem); + + li_sockaddr_clear(&con->addr); + g_string_free(con->tmpstr, TRUE); + + g_slice_free(liMemcachedCon, con); +} + +void li_memcached_con_release(liMemcachedCon* con) { + if (!con) return; + assert(g_atomic_int_get(&con->refcount) > 0); + if (g_atomic_int_dec_and_test(&con->refcount)) { + li_memcached_con_free(con); + } +} + +void li_memcached_con_acquire(liMemcachedCon* con) { + assert(g_atomic_int_get(&con->refcount) > 0); + g_atomic_int_inc(&con->refcount); +} + + +liMemcachedRequest* li_memcached_get(liMemcachedCon *con, GString *key, liMemcachedCB callback, gpointer cb_data, GError **err) { + int_request* req; + + if (!li_memcached_is_key_valid(key)) { + g_set_error(err, LI_MEMCACHED_ERROR, LI_MEMCACHED_BAD_KEY, "Invalid key: '%s'", key->str); + return NULL; + } + + if (-1 == con->fd) memcached_connect(con); + if (-1 == con->fd) { + if (NULL == con->err) { + g_set_error(err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Not connected"); + } else if (err) { + *err = g_error_copy(con->err); + } + return NULL; + } + + req = g_slice_new0(int_request); + req->req.callback = callback; + req->req.cb_data = cb_data; + + req->type = REQ_GET; + req->key = g_string_new_len(GSTR_LEN(key)); + + if (!push_request(con, req, err)) { + free_request(con, req); + return NULL; + } + + return &req->req; +} + +liMemcachedRequest* li_memcached_set(liMemcachedCon *con, GString *key, guint32 flags, ev_tstamp ttl, liBuffer *data, liMemcachedCB callback, gpointer cb_data, GError **err) { + int_request* req; + + if (!li_memcached_is_key_valid(key)) { + g_set_error(err, LI_MEMCACHED_ERROR, LI_MEMCACHED_BAD_KEY, "Invalid key: '%s'", key->str); + return NULL; + } + + if (-1 == con->fd) memcached_connect(con); + if (-1 == con->fd) { + if (NULL == con->err) { + g_set_error(err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Not connected"); + } else if (err) { + *err = g_error_copy(con->err); + } + return NULL; + } + + req = g_slice_new0(int_request); + req->req.callback = callback; + req->req.cb_data = cb_data; + + req->type = REQ_SET; + req->key = g_string_new_len(GSTR_LEN(key)); + req->flags = flags; + req->ttl = ttl; + li_buffer_acquire(data); + req->data = data; + + if (!push_request(con, req, err)) { + free_request(con, req); + return NULL; + } + + return &req->req; +} + +/* if length(key) <= 250 and all chars x: 0x20 < x < 0x7f the key + * remains untouched; otherwise it gets replaced with its sha1hex hash + * so in most cases the key stays readable, and we have a good fallback + */ + +void li_memcached_mutate_key(GString *key) { + GChecksum *hash; + + if (li_memcached_is_key_valid(key)) return; + + hash = g_checksum_new(G_CHECKSUM_SHA1); + + g_checksum_update(hash, (const guchar *) GSTR_LEN(key)); + g_string_assign(key, g_checksum_get_string(hash)); + + g_checksum_free(hash); +} + +gboolean li_memcached_is_key_valid(GString *key) { + guint i; + + if (key->len > 250 || 0 == key->len) return FALSE; + + for (i = 0; i < key->len; i++) { + if (key->str[i] <= 0x20 || key->str[i] >= 0x7f) return FALSE; + } + + return TRUE; +} diff --git a/src/modules/Makefile.am b/src/modules/Makefile.am index 880ea05..0d1bc53 100644 --- a/src/modules/Makefile.am +++ b/src/modules/Makefile.am @@ -77,6 +77,11 @@ libmod_lua_la_LDFLAGS = $(common_ldflags) libmod_lua_la_LIBADD = $(common_libadd) endif +install_libs += libmod_memcached.la +libmod_memcached_la_SOURCES = mod_memcached.c +libmod_memcached_la_LDFLAGS = $(common_ldflags) +libmod_memcached_la_LIBADD = $(common_libadd) + if USE_OPENSSL install_libs += libmod_openssl.la libmod_openssl_la_SOURCES = mod_openssl.c diff --git a/src/modules/mod_memcached.c b/src/modules/mod_memcached.c new file mode 100644 index 0000000..fc4d8f0 --- /dev/null +++ b/src/modules/mod_memcached.c @@ -0,0 +1,1022 @@ +/* + * mod_memcached - cache content on memcached servers + * + * Description: + * cache content on memcached servers + * + * Setups: + * none + * Options: + * none + * Actions: + * (trailing parameters are optional) + * memcached.lookup , , + * memcached.store + * options: hash of + * - server: socket address as string (default: 127.0.0.1:11211) + * - flags: flags for storing (default 0) + * - ttl: ttl for storing (default 0 - forever) + * - maxsize: maximum size in bytes we want to store + * - headers: whether to store/lookup headers too (not supported yet) + * if disabled: get mime-type from request.uri.path for lookup + * - key: pattern for lookup/store key + * default: "%{req.path}" + * + * Example config: + * memcached.lookup [], ${ header.add "X-Memcached" => "Hit" }, ${ header.add "X-Memcached" => "Miss" }; + * + * memcached.lookup ["key": "%{req.scheme}://%{req.host}%{req.path}"]; + * + * Exports a lua api to per-worker luaStates too. + * + * Todo: + * - store/lookup headers too + * + * Author: + * Copyright (c) 2010 Stefan Bühler + */ + +#include +#include + +#include + +#ifdef HAVE_LUA_H +# include +# include +# include +#endif + +LI_API gboolean mod_memcached_init(liModules *mods, liModule *mod); +LI_API gboolean mod_memcached_free(liModules *mods, liModule *mod); + +typedef struct memcache_ctx memcache_ctx; +struct memcache_ctx { + liMemcachedCon **worker_client_ctx; + liSocketAddress addr; + liPattern *pattern; + guint flags; + ev_tstamp ttl; + gssize maxsize; + gboolean headers; + + liAction *act_found, *act_miss; + + liPlugin *p; + GList mconf_link; +}; + +typedef struct memcached_config memcached_config; +struct memcached_config { + GQueue prepare_ctx; +}; + +typedef struct { + liMemcachedRequest *req; + liBuffer *buffer; + liVRequest *vr; +} memcache_request; + +/* memcache option names */ +static const GString + mon_server = { CONST_STR_LEN("server"), 0 }, + mon_flags = { CONST_STR_LEN("flags"), 0 }, + mon_ttl = { CONST_STR_LEN("ttl"), 0 }, + mon_maxsize = { CONST_STR_LEN("maxsize"), 0 }, + mon_headers = { CONST_STR_LEN("headers"), 0 }, + mon_key = { CONST_STR_LEN("key"), 0 } +; + +static void mc_ctx_free(liServer *srv, gpointer param) { + memcache_ctx *ctx = param; + guint i; + + if (ctx->worker_client_ctx) { + for (i = 0; i < srv->worker_count; i++) { + li_memcached_con_release(ctx->worker_client_ctx[i]); + } + g_slice_free1(sizeof(liMemcachedCon*) * srv->worker_count, ctx->worker_client_ctx); + } + + li_sockaddr_clear(&ctx->addr); + + li_pattern_free(ctx->pattern); + + li_action_release(srv, ctx->act_found); + li_action_release(srv, ctx->act_miss); + + if (ctx->mconf_link.data) { /* still in LI_SERVER_INIT */ + memcached_config *mconf = ctx->p->data; + g_queue_unlink(&mconf->prepare_ctx, &ctx->mconf_link); + ctx->mconf_link.data = NULL; + } + + g_slice_free(memcache_ctx, ctx); +} + +static memcache_ctx* mc_ctx_parse(liServer *srv, liPlugin *p, liValue *config) { + memcache_ctx *ctx; + memcached_config *mconf = p->data; + GString def_server = li_const_gstring(CONST_STR_LEN("127.0.0.1:11211")); + + if (config && config->type != LI_VALUE_HASH) { + ERROR(srv, "%s", "memcache expects an optional hash of options"); + return NULL; + } + + ctx = g_slice_new0(memcache_ctx); + ctx->p = p; + + ctx->addr = li_sockaddr_from_string(&def_server, 11211); + + ctx->pattern = li_pattern_new("%{req.path}"); + + ctx->flags = 0; + ctx->ttl = 0; + ctx->maxsize = 64*1024; /* 64 kB */ + ctx->headers = FALSE; + + if (config) { + GHashTable *ht = config->data.hash; + GHashTableIter it; + gpointer pkey, pvalue; + + g_hash_table_iter_init(&it, ht); + while (g_hash_table_iter_next(&it, &pkey, &pvalue)) { + GString *key = pkey; + liValue *value = pvalue; + + if (g_string_equal(key, &mon_server)) { + if (value->type != LI_VALUE_STRING) { + ERROR(srv, "memcache option '%s' expects string as parameter", mon_server.str); + goto option_failed; + } + li_sockaddr_clear(&ctx->addr); + ctx->addr = li_sockaddr_from_string(value->data.string, 11211); + if (NULL == ctx->addr.addr) { + ERROR(srv, "invalid socket address: '%s'", value->data.string->str); + goto option_failed; + } + } else if (g_string_equal(key, &mon_key)) { + if (value->type != LI_VALUE_STRING) { + ERROR(srv, "memcache option '%s' expects string as parameter", mon_key.str); + goto option_failed; + } + li_pattern_free(ctx->pattern); + ctx->pattern = li_pattern_new(value->data.string->str); + if (NULL == ctx->pattern) { + ERROR(srv, "memcache: couldn't parse pattern for key '%s'", value->data.string->str); + goto option_failed; + } + } else if (g_string_equal(key, &mon_flags)) { + if (value->type != LI_VALUE_NUMBER || value->data.number <= 0) { + ERROR(srv, "memcache option '%s' expects positive integer as parameter", mon_flags.str); + goto option_failed; + } + ctx->flags = value->data.number; + } else if (g_string_equal(key, &mon_ttl)) { + if (value->type != LI_VALUE_NUMBER || value->data.number <= 0) { + ERROR(srv, "memcache option '%s' expects positive integer as parameter", mon_ttl.str); + goto option_failed; + } + ctx->ttl = value->data.number; + } else if (g_string_equal(key, &mon_maxsize)) { + if (value->type != LI_VALUE_NUMBER || value->data.number <= 0) { + ERROR(srv, "memcache option '%s' expects positive integer as parameter", mon_maxsize.str); + goto option_failed; + } + ctx->maxsize = value->data.number; + } else if (g_string_equal(key, &mon_headers)) { + if (value->type != LI_VALUE_BOOLEAN) { + ERROR(srv, "memcache option '%s' expects boolean as parameter", mon_headers.str); + goto option_failed; + } + ctx->headers = value->data.boolean; + if (ctx->headers) { + ERROR(srv, "%s", "memcache: lookup/storing headers not supported yet"); + goto option_failed; + } + } else { + ERROR(srv, "unknown option for memcache '%s'", key->str); + goto option_failed; + } + } + } + + if (LI_SERVER_INIT != g_atomic_int_get(&srv->state)) { + ctx->worker_client_ctx = g_slice_alloc0(sizeof(liMemcachedCon*) * srv->worker_count); + } else { + ctx->mconf_link.data = ctx; + g_queue_push_tail_link(&mconf->prepare_ctx, &ctx->mconf_link); + } + + return ctx; + +option_failed: + mc_ctx_free(srv, ctx); + return NULL; +} + +static GString* mc_ctx_build_key(memcache_ctx *ctx, liVRequest *vr) { + GMatchInfo *match_info = NULL; + GString *key = g_string_sized_new(255); + + g_string_truncate(key, 0); + + if (vr->action_stack.regex_stack->len) { + GArray *rs = vr->action_stack.regex_stack; + match_info = g_array_index(rs, liActionRegexStackElement, rs->len - 1).match_info; + } + + li_pattern_eval(vr, key, ctx->pattern, NULL, NULL, li_pattern_regex_cb, match_info); + + li_memcached_mutate_key(key); + + return key; +} + +static liMemcachedCon* mc_ctx_prepare(memcache_ctx *ctx, liWorker *wrk) { + liMemcachedCon *con = ctx->worker_client_ctx[wrk->ndx]; + + if (!con) { + con = li_memcached_con_new(wrk->loop, ctx->addr); + ctx->worker_client_ctx[wrk->ndx] = con; + } + + return con; +} + +static void memcache_callback(liMemcachedRequest *request, liMemcachedResult result, liMemcachedItem *item, GError **err) { + memcache_request *req = request->cb_data; + liVRequest *vr = req->vr; + + /* request done */ + req->req = NULL; + + if (!vr) { + g_slice_free(memcache_request, req); + return; + } + + switch (result) { + case LI_MEMCACHED_OK: /* STORED, VALUE, DELETED */ + /* steal buffer */ + req->buffer = item->data; + item->data = NULL; + if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) { + VR_DEBUG(vr, "memcached.lookup: key '%s' found, ttl = %f, flags = %u", item->key->str, (double) item->ttl, (guint) item->flags); + } + break; + case LI_MEMCACHED_NOT_FOUND: + /* ok, nothing to do - we just didn't find an entry */ + if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) { + VR_DEBUG(vr, "%s", "memcached.lookup: key not found"); + } + break; + case LI_MEMCACHED_NOT_STORED: + case LI_MEMCACHED_EXISTS: + VR_ERROR(vr, "memcached error: %s", "unexpected result"); + /* TODO (not possible for lookup) */ + break; + case LI_MEMCACHED_RESULT_ERROR: + if (err && *err) { + VR_ERROR(vr, "memcached error: %s", (*err)->message); + } else { + VR_ERROR(vr, "memcached error: %s", "Unknown error"); + } + break; + } + + li_vrequest_joblist_append(vr); +} + +static liHandlerResult mc_handle_lookup(liVRequest *vr, gpointer param, gpointer *context) { + memcache_ctx *ctx = param; + memcache_request *req = *context; + + if (req) { + static const GString default_mime_str = { CONST_STR_LEN("application/octet-stream"), 0 }; + + liBuffer *buf = req->buffer; + const GString *mime_str; + + if (NULL != req->req) return LI_HANDLER_WAIT_FOR_EVENT; /* not done yet */ + + g_slice_free(memcache_request, req); + *context = NULL; + + if (NULL == buf) { + /* miss */ + if (ctx->act_miss) li_action_enter(vr, ctx->act_miss); + return LI_HANDLER_GO_ON; + } + + if (!li_vrequest_handle_direct(vr)) { + if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) { + VR_DEBUG(vr, "%s", "memcached.lookup: request already handled"); + } + li_buffer_release(buf); + return LI_HANDLER_GO_ON; + } + + if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) { + VR_DEBUG(vr, "%s", "memcached.lookup: key found, handling request"); + } + + li_chunkqueue_append_buffer(vr->out, buf); + + vr->response.http_status = 200; + + mime_str = li_mimetype_get(vr, vr->request.uri.path); + if (!mime_str) mime_str = &default_mime_str; + li_http_header_overwrite(vr->response.headers, CONST_STR_LEN("Content-Type"), GSTR_LEN(mime_str)); + + /* hit */ + if (ctx->act_found) li_action_enter(vr, ctx->act_found); + return LI_HANDLER_GO_ON; + } else { + liMemcachedCon *con; + GString *key; + GError *err = NULL; + + if (li_vrequest_is_handled(vr)) { + if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) { + VR_DEBUG(vr, "%s", "memcached.lookup: request already handled"); + } + return LI_HANDLER_GO_ON; + } + + con = mc_ctx_prepare(ctx, vr->wrk); + key = mc_ctx_build_key(ctx, vr); + + if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) { + VR_DEBUG(vr, "memcached.lookup: looking up key '%s'", key->str); + } + + req = g_slice_new0(memcache_request); + req->req = li_memcached_get(con, key, memcache_callback, req, &err); + g_string_free(key, TRUE); + + if (NULL == req->req) { + if (NULL != err) { + VR_ERROR(vr, "memcached.lookup: get failed: %s", err->message); + g_clear_error(&err); + } else { + VR_ERROR(vr, "memcached.lookup: get failed: %s", "Unkown error"); + } + g_slice_free(memcache_request, req); + + /* miss */ + if (ctx->act_miss) li_action_enter(vr, ctx->act_miss); + + return LI_HANDLER_GO_ON; + } + req->vr = vr; + + *context = req; + + return LI_HANDLER_WAIT_FOR_EVENT; + } +} + +static liHandlerResult mc_handle_store(liVRequest *vr, gpointer param, gpointer *context) { + return LI_HANDLER_GO_ON; +} + +static liHandlerResult mc_handle_free(liVRequest *vr, gpointer param, gpointer context) { + memcache_request *req = context; + UNUSED(vr); + UNUSED(param); + + if (NULL == req->req) { + li_buffer_release(req->buffer); + g_slice_free(memcache_request, req); + } else { + req->vr = NULL; + } + + return LI_HANDLER_GO_ON; +} + +static liAction* mc_lookup_create(liServer *srv, liWorker *wrk, liPlugin* p, liValue *val, gpointer userdata) { + memcache_ctx *ctx; + liValue *config = val, *act_found = NULL, *act_miss = NULL; + UNUSED(wrk); + UNUSED(userdata); + + if (val && LI_VALUE_LIST == val->type) { + GArray *list = val->data.list; + config = NULL; + + if (list->len > 3) { + ERROR(srv, "%s", "memcached.lookup: too many arguments"); + return NULL; + } + + if (list->len >= 1) config = g_array_index(list, liValue*, 0); + if (list->len >= 2) act_found = g_array_index(list, liValue*, 1); + if (list->len >= 3) act_miss = g_array_index(list, liValue*, 2); + + if (config && config->type != LI_VALUE_HASH) { + ERROR(srv, "%s", "memcached.lookup: expected hash as first argument"); + return NULL; + } + + if (act_found && act_found->type != LI_VALUE_ACTION) { + ERROR(srv, "%s", "memcached.lookup: expected action as second argument"); + return NULL; + } + + if (act_miss && act_miss->type != LI_VALUE_ACTION) { + ERROR(srv, "%s", "memcached.lookup: expected action as third argument"); + return NULL; + } + } + + ctx = mc_ctx_parse(srv, p, config); + + if (!ctx) return NULL; + + if (act_found) ctx->act_found = li_value_extract_action(act_found); + if (act_miss) ctx->act_miss = li_value_extract_action(act_miss); + + return li_action_new_function(mc_handle_lookup, mc_handle_free, mc_ctx_free, ctx); +} + +static liAction* mc_store_create(liServer *srv, liWorker *wrk, liPlugin* p, liValue *val, gpointer userdata) { + memcache_ctx *ctx; + UNUSED(wrk); + UNUSED(userdata); + + ctx = mc_ctx_parse(srv, p, val); + + if (!ctx) return NULL; + + WARNING(srv, "%s", "memcached.store not supported yet"); + + return li_action_new_function(mc_handle_store, mc_handle_free, mc_ctx_free, ctx); +} + +static const liPluginOption options[] = { + { NULL, 0, 0, NULL } +}; + +static const liPluginAction actions[] = { + { "memcached.lookup", mc_lookup_create, NULL }, + { "memcached.store", mc_store_create, NULL }, + + { NULL, NULL, NULL } +}; + +static const liPluginSetup setups[] = { + { NULL, NULL, NULL } +}; + +#ifdef HAVE_LUA_H + +typedef struct { + liMemcachedRequest *req; + int result_ref; /* table if vr_ref == NULL, callback function otherwise */ + liVRequestRef *vr_ref; + lua_State *L; +} mc_lua_request; + +#define LUA_MEMCACHEDCON "liMemcachedCon*" +#define LUA_MEMCACHEDREQUEST "mc_lua_request*" + +static liMemcachedCon* li_lua_get_memcached_con(lua_State *L, int ndx); +static int lua_memcached_con_gc(lua_State *L); +static int li_lua_push_memcached_con(lua_State *L, liMemcachedCon *con); +static mc_lua_request* li_lua_get_memcached_req(lua_State *L, int ndx); +static int lua_memcached_req_gc(lua_State *L); +static int li_lua_push_memcached_req(lua_State *L, mc_lua_request *req); + +static void lua_memcache_callback(liMemcachedRequest *request, liMemcachedResult result, liMemcachedItem *item, GError **err) { + mc_lua_request *mreq = request->cb_data; + lua_State *L = mreq->L; + + if (mreq->req != request) return; + + request->cb_data = NULL; + request->callback = NULL; + mreq->req = NULL; + + if (mreq->vr_ref) { + lua_rawgeti(L, LUA_REGISTRYINDEX, mreq->result_ref); /* get table */ + } else { + lua_rawgeti(L, LUA_REGISTRYINDEX, mreq->result_ref); /* get function */ + lua_newtable(L); + } + + lua_pushnumber(L, result); + lua_setfield(L, -2, "code"); + + if (err && *err) { + lua_pushstring(L, (*err)->message); + lua_setfield(L, -2, "error"); + } else if (item) { + if (item->key) { + lua_pushlstring(L, GSTR_LEN(item->key)); + lua_setfield(L, -2, "key"); + } + lua_pushnumber(L, item->flags); + lua_setfield(L, -2, "flags"); + lua_pushnumber(L, item->ttl); + lua_setfield(L, -2, "ttl"); + { + GString *cas = g_string_sized_new(31); + g_string_printf(cas, "%"G_GUINT64_FORMAT, item->cas); + lua_pushlstring(L, GSTR_LEN(cas)); + lua_setfield(L, -2, "cas"); + g_string_free(cas, TRUE); + } + if (item->data) { + lua_pushlstring(L, item->data->addr, item->data->used); + lua_setfield(L, -2, "data"); + } + } + + if (mreq->vr_ref) { + lua_pop(L, 1); + li_vrequest_joblist_append_async(mreq->vr_ref); + } else { + liServer *srv; + int errfunc; + + lua_getfield(L, LUA_REGISTRYINDEX, "lighty.srv"); + srv = lua_touserdata(L, -1); + lua_pop(L, 1); + + errfunc = li_lua_push_traceback(L, 1); + if (lua_pcall(L, 1, 0, errfunc)) { + ERROR(srv, "lua_pcall(): %s", lua_tostring(L, -1)); + lua_pop(L, 1); + } + lua_remove(L, errfunc); + /* function and args were popped */ + } +} + +static int lua_mc_get(lua_State *L) { + liMemcachedCon *con; + GString key; + const char *str; + size_t len; + GError *err = NULL; + liVRequest *vr; + + mc_lua_request *mreq; + liMemcachedRequest *req; + + if (lua_gettop(L) != 3) { + lua_pushliteral(L, "lua_mc_get(con, key, cb | vr): incorrect number of arguments"); + lua_error(L); + } + + con = li_lua_get_memcached_con(L, 1); + vr = li_lua_get_vrequest(L, 3); + if (NULL == con || !lua_isstring(L, 2) || (NULL == vr && !lua_isfunction(L, 3))) { + lua_pushliteral(L, "lua_mc_get(con, key, cb | vr): wrong argument types"); + lua_error(L); + } + + str = lua_tolstring(L, 2, &len); + key = li_const_gstring(str, len); + + mreq = g_slice_new0(mc_lua_request); + + req = li_memcached_get(con, &key, lua_memcache_callback, mreq, &err); + + if (!req) { + g_slice_free(mc_lua_request, mreq); + + lua_pushnil(L); + if (NULL != err) { + lua_pushstring(L, err->message); + g_clear_error(&err); + } else { + lua_pushliteral(L, "Unknown li_memcached_get error"); + } + return 2; + } + + mreq->req = req; + mreq->L = L; + + if (NULL == vr) { + /* lua callback function */ + lua_pushvalue(L, 3); /* +1 */ + mreq->result_ref = luaL_ref(L, LUA_REGISTRYINDEX); /* -1 */ + } else { + /* push result into table, wake vr if done */ + lua_newtable(L); /* +1 */ + mreq->result_ref = luaL_ref(L, LUA_REGISTRYINDEX); /* -1 */ + mreq->vr_ref = li_vrequest_get_ref(vr); + } + + return li_lua_push_memcached_req(L, mreq); +} + +static int lua_mc_set(lua_State *L) { + liMemcachedCon *con; + GString key, value; + const char *str; + size_t len; + GError *err = NULL; + liVRequest *vr; + ev_tstamp ttl; + liBuffer *valuebuf; + + mc_lua_request *mreq; + liMemcachedRequest *req; + + if (lua_gettop(L) < 4) { + lua_pushliteral(L, "lua_mc_set(con, key, value, cb | vr, [ttl]): incorrect number of arguments"); + lua_error(L); + } + + con = li_lua_get_memcached_con(L, 1); + vr = li_lua_get_vrequest(L, 4); + if (NULL == con || !lua_isstring(L, 2) || (NULL == vr && !lua_isfunction(L, 4))) { + lua_pushliteral(L, "lua_mc_set(con, key, value, cb | vr): wrong argument types"); + lua_error(L); + } + + str = lua_tolstring(L, 2, &len); + key = li_const_gstring(str, len); + + str = lua_tolstring(L, 3, &len); + value = li_const_gstring(str, len); + + if (lua_gettop(L) == 5) { + ttl = lua_tonumber(L, 5); + } else { + ttl = 300; + } + + mreq = g_slice_new0(mc_lua_request); + valuebuf = li_buffer_new(value.len); + valuebuf->used = value.len; + memcpy(valuebuf->addr, value.str, value.len); + + req = li_memcached_set(con, &key, 0, ttl, valuebuf, lua_memcache_callback, mreq, &err); + + li_buffer_release(valuebuf); + + if (!req) { + g_slice_free(mc_lua_request, mreq); + + lua_pushnil(L); + if (NULL != err) { + lua_pushstring(L, err->message); + g_clear_error(&err); + } else { + lua_pushliteral(L, "Unknown li_memcached_set error"); + } + return 2; + } + + mreq->req = req; + mreq->L = L; + + if (NULL == vr) { + /* lua callback function */ + lua_pushvalue(L, 3); /* +1 */ + mreq->result_ref = luaL_ref(L, LUA_REGISTRYINDEX); /* -1 */ + } else { + /* push result into table, wake vr if done */ + lua_newtable(L); /* +1 */ + mreq->result_ref = luaL_ref(L, LUA_REGISTRYINDEX); /* -1 */ + mreq->vr_ref = li_vrequest_get_ref(vr); + } + + return li_lua_push_memcached_req(L, mreq); +} + +static int lua_mc_setq(lua_State *L) { + liMemcachedCon *con; + GString key, value; + const char *str; + size_t len; + GError *err = NULL; + ev_tstamp ttl; + liBuffer *valuebuf; + + liMemcachedRequest *req; + + if (lua_gettop(L) < 3) { + lua_pushliteral(L, "lua_mc_setq(con, key, value, [ttl]): incorrect number of arguments"); + lua_error(L); + } + + con = li_lua_get_memcached_con(L, 1); + if (NULL == con || !lua_isstring(L, 2)) { + lua_pushliteral(L, "lua_mc_setq(con, key, value): wrong argument types"); + lua_error(L); + } + + str = lua_tolstring(L, 2, &len); + key = li_const_gstring(str, len); + + str = lua_tolstring(L, 3, &len); + value = li_const_gstring(str, len); + + if (lua_gettop(L) == 5) { + ttl = lua_tonumber(L, 5); + } else { + ttl = 300; + } + + valuebuf = li_buffer_new(value.len); + valuebuf->used = value.len; + memcpy(valuebuf->addr, value.str, value.len); + + req = li_memcached_set(con, &key, 0, ttl, valuebuf, NULL, NULL, &err); + + li_buffer_release(valuebuf); + + if (!req) { + lua_pushnil(L); + if (NULL != err) { + lua_pushstring(L, err->message); + g_clear_error(&err); + } else { + lua_pushliteral(L, "Unknown li_memcached_set error"); + } + return 2; + } + + lua_pushboolean(L, 1); + return 1; +} + +static const luaL_Reg mc_con_mt[] = { + { "__gc", lua_memcached_con_gc }, + + { "get", lua_mc_get }, + { "set", lua_mc_set }, + { "setq", lua_mc_setq }, + + { NULL, NULL } +}; + +typedef int (*lua_mc_req_Attrib)(mc_lua_request *req, lua_State *L); + +static int lua_mc_req_attr_read_response(mc_lua_request *req, lua_State *L) { + if (NULL != req->vr_ref) { + lua_rawgeti(L, LUA_REGISTRYINDEX, req->result_ref); + } else { + lua_pushnil(L); + } + return 1; +} + +#define AR(m) { #m, lua_mc_req_attr_read_##m, NULL } +#define AW(m) { #m, NULL, lua_mc_req_attr_write_##m } +#define ARW(m) { #m, lua_mc_req_attr_read_##m, lua_mc_req_attr_write_##m } + +static const struct { + const char* key; + lua_mc_req_Attrib read_attr, write_attr; +} mc_req_attribs[] = { + AR(response), + + { NULL, NULL, NULL } +}; + +static int lua_mc_req_index(lua_State *L) { + mc_lua_request *req; + const char *key; + int i; + + if (lua_gettop(L) != 2) { + lua_pushstring(L, "incorrect number of arguments"); + lua_error(L); + } + + if (li_lua_metatable_index(L)) return 1; + + req = li_lua_get_memcached_req(L, 1); + if (!req) return 0; + + if (lua_isnumber(L, 2)) return 0; + if (!lua_isstring(L, 2)) return 0; + + key = lua_tostring(L, 2); + for (i = 0; mc_req_attribs[i].key ; i++) { + if (0 == strcmp(key, mc_req_attribs[i].key)) { + if (mc_req_attribs[i].read_attr) + return mc_req_attribs[i].read_attr(req, L); + break; + } + } + + lua_pushstring(L, "cannot read attribute "); + lua_pushstring(L, key); + lua_pushstring(L, " in mc_lua_request"); + lua_concat(L, 3); + lua_error(L); + + return 0; +} + +static const luaL_Reg mc_req_mt[] = { + { "__index", lua_mc_req_index }, + { "__gc", lua_memcached_req_gc }, + + { NULL, NULL } +}; + +static void init_mc_con_mt(lua_State *L) { + luaL_register(L, NULL, mc_con_mt); + lua_pushvalue(L, -1); + lua_setfield(L, -2, "__index"); +} + +static void init_mc_req_mt(lua_State *L) { + luaL_register(L, NULL, mc_req_mt); +} + +static liMemcachedCon* li_lua_get_memcached_con(lua_State *L, int ndx) { + if (!lua_isuserdata(L, ndx)) return NULL; + if (!lua_getmetatable(L, ndx)) return NULL; + luaL_getmetatable(L, LUA_MEMCACHEDCON); + if (lua_isnil(L, -1) || lua_isnil(L, -2) || !lua_equal(L, -1, -2)) { + lua_pop(L, 2); + return NULL; + } + lua_pop(L, 2); + return *(liMemcachedCon**) lua_touserdata(L, ndx); +} + +static int lua_memcached_con_gc(lua_State *L) { + liMemcachedCon **pcon = (liMemcachedCon**) luaL_checkudata(L, 1, LUA_MEMCACHEDCON); + if (!pcon || !*pcon) return 0; + + li_memcached_con_release(*pcon); + return 0; +} + +static int li_lua_push_memcached_con(lua_State *L, liMemcachedCon *con) { + liMemcachedCon **pcon; + + pcon = (liMemcachedCon**) lua_newuserdata(L, sizeof(liMemcachedCon*)); + *pcon = con; + + if (luaL_newmetatable(L, LUA_MEMCACHEDCON)) { + init_mc_con_mt(L); + } + + lua_setmetatable(L, -2); + return 1; +} + +static mc_lua_request* li_lua_get_memcached_req(lua_State *L, int ndx) { + if (!lua_isuserdata(L, ndx)) return NULL; + if (!lua_getmetatable(L, ndx)) return NULL; + luaL_getmetatable(L, LUA_MEMCACHEDREQUEST); + if (lua_isnil(L, -1) || lua_isnil(L, -2) || !lua_equal(L, -1, -2)) { + lua_pop(L, 2); + return NULL; + } + lua_pop(L, 2); + return *(mc_lua_request**) lua_touserdata(L, ndx); +} + +static int lua_memcached_req_gc(lua_State *L) { + mc_lua_request **preq = (mc_lua_request**) luaL_checkudata(L, 1, LUA_MEMCACHEDREQUEST); + mc_lua_request *req; + if (!preq || !*preq) return 0; + + req = *preq; + li_vrequest_ref_release(req->vr_ref); + + if (req->req) { + req->req->callback = NULL; + req->req->cb_data = NULL; + } + + luaL_unref(L, LUA_REGISTRYINDEX, req->result_ref); + + g_slice_free(mc_lua_request, req); + + return 0; +} + +static int li_lua_push_memcached_req(lua_State *L, mc_lua_request *req) { + mc_lua_request **preq; + + preq = (mc_lua_request**) lua_newuserdata(L, sizeof(mc_lua_request*)); + *preq = req; + + if (luaL_newmetatable(L, LUA_MEMCACHEDREQUEST)) { + init_mc_req_mt(L); + } + + lua_setmetatable(L, -2); + return 1; +} + +static int mc_lua_new(lua_State *L) { + liWorker *wrk; + liMemcachedCon *con; + liSocketAddress addr; + const char *buf; + size_t len = 0; + GString fakestr; + + wrk = (liWorker*) lua_touserdata(L, lua_upvalueindex(1)); + + if (lua_type(L, -1) != LUA_TSTRING) { + /* duplicate */ + lua_pushvalue(L, -1); + } + + buf = lua_tolstring(L, -1, &len); + if (!buf) { + lua_pushliteral(L, "[mod_memcached] mc_lua_new: Couldn't convert parameter to string"); + lua_error(L); + } + fakestr = li_const_gstring(buf, len); + addr = li_sockaddr_from_string(&fakestr, 0); + + if (NULL == addr.addr) { + lua_pushliteral(L, "[mod_memcached] mc_lua_new: couldn't parse parameter as address: "); + lua_pushvalue(L, -2); + lua_concat(L, 2); + lua_error(L); + } + + con = li_memcached_con_new(wrk->loop, addr); + return li_lua_push_memcached_con(L, con); +} + +static void mod_memcached_lua_init(lua_State *L, liServer *srv, liWorker *wrk, liPlugin *p) { + UNUSED(srv); + UNUSED(p); + + if (wrk) { + lua_newtable(L); /* { } */ + + lua_pushlightuserdata(L, wrk); + lua_pushcclosure(L, mc_lua_new, 1); + lua_setfield(L, -2, "new"); + + lua_setfield(L, LUA_GLOBALSINDEX, "memcached"); + } +} +#endif + +static void memcached_prepare(liServer *srv, liPlugin *p) { + memcached_config *mconf = p->data; + GList *conf_link; + memcache_ctx *ctx; + + while (NULL != (conf_link = g_queue_pop_head_link(&mconf->prepare_ctx))) { + ctx = conf_link->data; + ctx->worker_client_ctx = g_slice_alloc0(sizeof(liMemcachedCon*) * srv->worker_count); + conf_link->data = NULL; + } +} + +static void memcached_free(liServer *srv, liPlugin *p) { + memcached_config *mconf = p->data; + UNUSED(srv); + + g_slice_free(memcached_config, mconf); +} + +static void memcached_init(liServer *srv, liPlugin *p, gpointer userdata) { + memcached_config *mconf; + UNUSED(srv); UNUSED(userdata); + + mconf = g_slice_new0(memcached_config); + p->data = mconf; + + p->options = options; + p->actions = actions; + p->setups = setups; + + p->free = memcached_free; + + p->handle_prepare = memcached_prepare; + +#ifdef HAVE_LUA_H + p->handle_init_lua = mod_memcached_lua_init; +#endif +} + +gboolean mod_memcached_init(liModules *mods, liModule *mod) { + MODULE_VERSION_CHECK(mods); + + mod->config = li_plugin_register(mods->main, "mod_memcached", memcached_init, NULL); + + return mod->config != NULL; +} + +gboolean mod_memcached_free(liModules *mods, liModule *mod) { + if (mod->config) + li_plugin_free(mods->main, mod->config); + + return TRUE; +} diff --git a/src/modules/wscript b/src/modules/wscript index bd5d3fd..c60da50 100644 --- a/src/modules/wscript +++ b/src/modules/wscript @@ -50,6 +50,7 @@ def build(bld): lighty_mod(bld, 'mod_fortune', 'mod_fortune.c') if bld.env['USE_LUA'] == 1: lighty_mod(bld, 'mod_lua', 'mod_lua.c', ['lua']) + lighty_mod(bld, 'mod_memcached', 'mod_memcached.c') if env['USE_OPENSSL'] == 1: uselib = ['ssl','crypto'] lighty_mod(bld, 'mod_openssl', 'mod_openssl.c', uselib)