948 lines
23 KiB
C
948 lines
23 KiB
C
|
|
#include <lighttpd/memcached.h>
|
|
|
|
#include <lighttpd/utils.h>
|
|
|
|
/* IMPORTANT
|
|
* In order to keep _release thread-safe the ev_io watcher keeps a
|
|
* reference too while active; when the last reference is dropped
|
|
* we don't have to stop the watcher, and everything else is
|
|
* "thread-safe" if no one is using it anymore (refcount == 0)
|
|
* (see memcached_{start,stop}_io)
|
|
* This means we have to stop the watcher after all requests are done.
|
|
*
|
|
* Most "public" functions have to be called while they hold
|
|
* a reference somehow; the other way this code gets executed is
|
|
* through the ev_io callback, which is why we get an extra
|
|
* reference there, so our refcount doesn't drop to 0 while
|
|
* we are working.
|
|
*
|
|
* TODO: retry connect() once (per second?) if we have a request
|
|
* before we drop all requests
|
|
*/
|
|
|
|
GQuark li_memcached_error_quark() {
|
|
return g_quark_from_static_string("memcached-error-quark");
|
|
}
|
|
|
|
#define BUFFER_CHUNK_SIZE 4*1024
|
|
|
|
typedef struct int_request int_request;
|
|
typedef enum {
|
|
REQ_GET, REQ_SET
|
|
} req_type;
|
|
|
|
struct liMemcachedCon {
|
|
struct ev_loop *loop;
|
|
liSocketAddress addr;
|
|
|
|
int refcount;
|
|
|
|
ev_io con_watcher;
|
|
int fd;
|
|
ev_tstamp last_con_start;
|
|
|
|
GQueue req_queue;
|
|
int_request *cur_req;
|
|
|
|
GQueue out;
|
|
liBuffer *buf;
|
|
|
|
GString *tmpstr;
|
|
|
|
GError *err;
|
|
|
|
/* read buffers */
|
|
liBuffer *line, *data, *remaining;
|
|
liMemcachedItem curitem;
|
|
|
|
/* GET */
|
|
gsize get_data_size;
|
|
gboolean get_have_header;
|
|
};
|
|
|
|
struct int_request {
|
|
liMemcachedRequest req;
|
|
req_type type;
|
|
|
|
GString *key;
|
|
guint32 flags;
|
|
ev_tstamp ttl;
|
|
liBuffer *data;
|
|
|
|
GList iter;
|
|
};
|
|
|
|
typedef struct {
|
|
gsize pos, len;
|
|
liBuffer *buf;
|
|
} send_item;
|
|
|
|
static void send_queue_push_buffer(GQueue *queue, liBuffer *buf, gsize start, gsize len) {
|
|
send_item *i;
|
|
if (!buf || !len) return;
|
|
g_assert(start+len <= buf->used);
|
|
|
|
li_buffer_acquire(buf);
|
|
i = g_slice_new0(send_item);
|
|
i->buf = buf;
|
|
i->pos = start;
|
|
i->len = len;
|
|
|
|
g_queue_push_tail(queue, i);
|
|
}
|
|
|
|
static void send_queue_push_gstring(GQueue *queue, GString *str, liBuffer **pbuf) {
|
|
liBuffer *buf = *pbuf;
|
|
gsize pos;
|
|
|
|
if (NULL != buf && (1 == buf->refcount)) {
|
|
buf->used = 0;
|
|
}
|
|
if (NULL == buf || (buf->alloc_size - buf->used < str->len)) {
|
|
li_buffer_release(buf);
|
|
buf = li_buffer_new_slice(BUFFER_CHUNK_SIZE > str->len ? BUFFER_CHUNK_SIZE : str->len);
|
|
*pbuf = buf;
|
|
}
|
|
|
|
pos = buf->used;
|
|
memcpy(buf->addr + pos, str->str, str->len);
|
|
buf->used += str->len;
|
|
send_queue_push_buffer(queue, buf, pos, str->len);
|
|
}
|
|
|
|
static void send_queue_item_free(send_item *i) {
|
|
if (!i) return;
|
|
li_buffer_release(i->buf);
|
|
g_slice_free(send_item, i);
|
|
}
|
|
|
|
static void send_queue_clean(GQueue *queue) {
|
|
send_item *i;
|
|
while (NULL != (i = g_queue_peek_head(queue))) {
|
|
if (i->len != 0) return;
|
|
g_queue_pop_head(queue);
|
|
li_buffer_release(i->buf);
|
|
g_slice_free(send_item, i);
|
|
}
|
|
}
|
|
|
|
static void send_queue_reset(GQueue *queue) {
|
|
send_item *i;
|
|
while (NULL != (i = g_queue_pop_head(queue))) {
|
|
li_buffer_release(i->buf);
|
|
g_slice_free(send_item, i);
|
|
}
|
|
}
|
|
|
|
static void memcached_start_io(liMemcachedCon *con) {
|
|
if (!((ev_watcher*) &con->con_watcher)->active) {
|
|
li_memcached_con_acquire(con);
|
|
li_ev_safe_unref_and_start(ev_io_start, con->loop, &con->con_watcher);
|
|
}
|
|
}
|
|
|
|
static void memcached_stop_io(liMemcachedCon *con) {
|
|
if (((ev_watcher*) &con->con_watcher)->active) {
|
|
li_ev_safe_ref_and_stop(ev_io_stop, con->loop, &con->con_watcher);
|
|
li_memcached_con_release(con);
|
|
}
|
|
}
|
|
|
|
static void send_request(liMemcachedCon *con, int_request *req) {
|
|
switch (req->type) {
|
|
case REQ_GET:
|
|
g_string_printf(con->tmpstr, "get %s\r\n", req->key->str);
|
|
send_queue_push_gstring(&con->out, con->tmpstr, &con->buf);
|
|
break;
|
|
case REQ_SET:
|
|
/* set <key> <flags> <exptime> <bytes>\r\n */
|
|
|
|
g_string_printf(con->tmpstr, "set %s %"G_GUINT32_FORMAT" %"G_GUINT64_FORMAT" %"G_GSIZE_FORMAT"\r\n", req->key->str, req->flags, (guint64) req->ttl, req->data->used);
|
|
send_queue_push_gstring(&con->out, con->tmpstr, &con->buf);
|
|
send_queue_push_buffer(&con->out, req->data, 0, req->data->used);
|
|
g_string_assign(con->tmpstr, "\r\n");
|
|
send_queue_push_gstring(&con->out, con->tmpstr, &con->buf);
|
|
break;
|
|
}
|
|
}
|
|
|
|
static gboolean push_request(liMemcachedCon *con, int_request *req, GError **err) {
|
|
UNUSED(err);
|
|
|
|
li_memcached_con_acquire(con);
|
|
|
|
send_request(con, req);
|
|
|
|
req->iter.data = req;
|
|
g_queue_push_tail_link(&con->req_queue, &req->iter);
|
|
|
|
memcached_start_io(con);
|
|
li_ev_io_set_events(con->loop, &con->con_watcher, EV_READ | EV_WRITE);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static void free_request(liMemcachedCon *con, int_request *req) {
|
|
if (!req) return;
|
|
|
|
li_memcached_con_release(con);
|
|
|
|
if (NULL != req->iter.data) {
|
|
req->iter.data = NULL;
|
|
g_queue_unlink(&con->req_queue, &req->iter);
|
|
}
|
|
|
|
switch (req->type) {
|
|
case REQ_GET:
|
|
break;
|
|
case REQ_SET:
|
|
li_buffer_release(req->data);
|
|
req->data = NULL;
|
|
break;
|
|
}
|
|
|
|
g_string_free(req->key, TRUE);
|
|
req->key = NULL;
|
|
|
|
g_slice_free(int_request, req);
|
|
}
|
|
|
|
static void cancel_all_requests(liMemcachedCon *con) {
|
|
int_request *req;
|
|
GError *err1 = NULL, *err = NULL;
|
|
gboolean first = TRUE;
|
|
|
|
if (con->err) {
|
|
err1 = g_error_copy(con->err);
|
|
} else {
|
|
g_set_error(&err1, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Connection reset");
|
|
}
|
|
|
|
while (NULL != (req = g_queue_peek_head(&con->req_queue))) {
|
|
if (NULL == err) {
|
|
err = g_error_copy(err1);
|
|
}
|
|
|
|
if (req->req.callback) req->req.callback(&req->req, LI_MEMCACHED_RESULT_ERROR, NULL, &err);
|
|
|
|
free_request(con, req);
|
|
|
|
if (first) {
|
|
if (err) err->code = LI_MEMCACHED_DISABLED; /* "silent" fail */
|
|
if (err1) err1->code = LI_MEMCACHED_DISABLED; /* "silent" fail */
|
|
if (con->err) con->err->code = LI_MEMCACHED_DISABLED; /* "silent" fail */
|
|
|
|
first = FALSE;
|
|
}
|
|
}
|
|
|
|
if (NULL != err) g_clear_error(&err);
|
|
if (NULL != err1) g_clear_error(&err1);
|
|
}
|
|
|
|
static void memcached_update_io(liMemcachedCon *con) {
|
|
int events = 0;
|
|
|
|
if (-1 == con->fd) return; /* not connected or in connect stage */
|
|
|
|
if (0 < con->req_queue.length) events = events | EV_READ;
|
|
if (0 < con->out.length) events = events | EV_WRITE;
|
|
|
|
if (0 == events) {
|
|
memcached_stop_io(con);
|
|
} else {
|
|
memcached_start_io(con);
|
|
li_ev_io_set_events(con->loop, &con->con_watcher, events);
|
|
}
|
|
}
|
|
|
|
static void memcached_connect(liMemcachedCon *con) {
|
|
int s;
|
|
struct sockaddr addr;
|
|
socklen_t len;
|
|
|
|
if (-1 != con->fd) return;
|
|
|
|
s = con->con_watcher.fd;
|
|
if (-1 == s) {
|
|
/* reconnect limit */
|
|
if (ev_now(con->loop) < con->last_con_start + 1) {
|
|
if (con->err) {
|
|
con->err->code = LI_MEMCACHED_DISABLED;
|
|
} else {
|
|
g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_DISABLED, "Disabled right now");
|
|
}
|
|
return;
|
|
}
|
|
con->last_con_start = ev_now(con->loop);
|
|
|
|
do {
|
|
s = socket(con->addr.addr->plain.sa_family, SOCK_STREAM, 0);
|
|
} while (-1 == s && errno == EINTR);
|
|
if (-1 == s) {
|
|
g_clear_error(&con->err);
|
|
g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Couldn't open socket: %s", g_strerror(errno));
|
|
return;
|
|
}
|
|
li_fd_init(s);
|
|
ev_io_set(&con->con_watcher, s, 0);
|
|
|
|
if (-1 == connect(s, &con->addr.addr->plain, con->addr.len)) {
|
|
switch (errno) {
|
|
case EINPROGRESS:
|
|
case EALREADY:
|
|
case EINTR:
|
|
memcached_start_io(con);
|
|
li_ev_io_add_events(con->loop, &con->con_watcher, EV_READ | EV_WRITE);
|
|
break;
|
|
default:
|
|
g_clear_error(&con->err);
|
|
g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Couldn't connect to '%s': %s",
|
|
li_sockaddr_to_string(con->addr, con->tmpstr, TRUE)->str,
|
|
g_strerror(errno));
|
|
close(s);
|
|
ev_io_set(&con->con_watcher, -1, 0);
|
|
break;
|
|
}
|
|
} else {
|
|
/* connect succeeded */
|
|
con->fd = s;
|
|
g_clear_error(&con->err);
|
|
memcached_update_io(con);
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
/* create new connection:
|
|
* see http://www.cyberconf.org/~cynbe/ref/nonblocking-connects.html
|
|
*/
|
|
|
|
/* Check to see if we can determine our peer's address. */
|
|
len = sizeof(addr);
|
|
if (getpeername(s, &addr, &len) == -1) {
|
|
/* connect failed; find out why */
|
|
int err;
|
|
len = sizeof(err);
|
|
#ifdef SO_ERROR
|
|
getsockopt(s, SOL_SOCKET, SO_ERROR, (void*)&err, &len);
|
|
#else
|
|
{
|
|
char ch;
|
|
errno = 0;
|
|
read(s, &ch, 1);
|
|
err = errno;
|
|
}
|
|
#endif
|
|
g_clear_error(&con->err);
|
|
g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Couldn't connect socket to '%s': %s",
|
|
li_sockaddr_to_string(con->addr, con->tmpstr, TRUE)->str,
|
|
g_strerror(err));
|
|
|
|
close(s);
|
|
memcached_stop_io(con);
|
|
ev_io_set(&con->con_watcher, -1, 0);
|
|
} else {
|
|
/* connect succeeded */
|
|
con->fd = s;
|
|
g_clear_error(&con->err);
|
|
memcached_update_io(con);
|
|
}
|
|
}
|
|
|
|
static void reset_item(liMemcachedItem *item) {
|
|
if (item->key) {
|
|
g_string_free(item->key, TRUE);
|
|
item->key = NULL;
|
|
}
|
|
item->flags = 0;
|
|
item->ttl = 0;
|
|
item->cas = 0;
|
|
if (item->data) {
|
|
li_buffer_release(item->data);
|
|
item->data = NULL;
|
|
}
|
|
}
|
|
|
|
static void close_con(liMemcachedCon *con) {
|
|
if (con->line) con->line->used = 0;
|
|
if (con->remaining) con->remaining->used = 0;
|
|
if (con->data) con->data->used = 0;
|
|
if (con->buf) con->buf->used = 0;
|
|
reset_item(&con->curitem);
|
|
send_queue_reset(&con->out);
|
|
|
|
memcached_stop_io(con);
|
|
close(con->con_watcher.fd);
|
|
con->fd = -1;
|
|
ev_io_set(&con->con_watcher, -1, 0);
|
|
con->cur_req = NULL;
|
|
cancel_all_requests(con);
|
|
memcached_connect(con);
|
|
}
|
|
|
|
static void add_remaining(liMemcachedCon *con, gchar *addr, gsize len) {
|
|
liBuffer *rem = con->remaining;
|
|
if (!rem) rem = con->remaining = li_buffer_new_slice(MAX(BUFFER_CHUNK_SIZE, len));
|
|
if (rem->used + len > rem->alloc_size) {
|
|
rem = li_buffer_new_slice(MAX(BUFFER_CHUNK_SIZE, rem->used + len));
|
|
memcpy(rem->addr, con->remaining->addr, (rem->used = con->remaining->used));
|
|
li_buffer_release(con->remaining);
|
|
con->remaining = rem;
|
|
}
|
|
|
|
memcpy(rem->addr + rem->used, addr, len);
|
|
rem->used += len;
|
|
}
|
|
|
|
/** repeats read after EINTR */
|
|
static ssize_t net_read(int fd, void *buf, ssize_t nbyte) {
|
|
ssize_t r;
|
|
while (-1 == (r = read(fd, buf, nbyte))) {
|
|
switch (errno) {
|
|
case EINTR:
|
|
/* Try again */
|
|
break;
|
|
default:
|
|
/* report error */
|
|
return r;
|
|
}
|
|
}
|
|
/* return bytes read */
|
|
return r;
|
|
}
|
|
|
|
static gboolean try_read_line(liMemcachedCon *con) {
|
|
liBuffer *line;
|
|
ssize_t r;
|
|
|
|
if (!con->line) con->line = li_buffer_new_slice(BUFFER_CHUNK_SIZE);
|
|
if (!con->remaining) con->remaining = li_buffer_new_slice(BUFFER_CHUNK_SIZE);
|
|
|
|
/* if we have remaining data use it for a new line */
|
|
if (con->line->used == 0 && con->remaining->used > 0) {
|
|
liBuffer *tmp = con->remaining; con->remaining = con->line; con->line = tmp;
|
|
}
|
|
|
|
g_assert(NULL == con->remaining || 0 == con->remaining->used); /* there shouldn't be any data in remaining while we fill con->line */
|
|
|
|
line = con->line;
|
|
|
|
if (line->used > 0) {
|
|
/* search for \r\n */
|
|
gchar *addr = line->addr;
|
|
gsize i, len = line->used;
|
|
for (i = 0; i < len; i++) {
|
|
if (addr[i] == '\r') {
|
|
i++;
|
|
if (i < len && addr[i] == '\n') {
|
|
add_remaining(con, addr + i+1, len - (i+1));
|
|
line->used = i-1;
|
|
line->addr[i-1] = '\0';
|
|
return TRUE;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (line->used > 1024) {
|
|
/* Protocol error: we don't parse line longer than 1024 */
|
|
g_clear_error(&con->err);
|
|
g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Protocol error: line too long");
|
|
close_con(con);
|
|
return FALSE;
|
|
}
|
|
|
|
/* need more data */
|
|
r = net_read(con->fd, line->addr + line->used, line->alloc_size - line->used);
|
|
if (r == 0) {
|
|
/* EOF */
|
|
g_clear_error(&con->err);
|
|
g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Connection closed by peer");
|
|
close_con(con);
|
|
return FALSE;
|
|
} else if (r < 0) {
|
|
switch (errno) {
|
|
case EAGAIN:
|
|
#if EWOULDBLOCK != EAGAIN
|
|
case EWOULDBLOCK:
|
|
#endif
|
|
break;
|
|
default:
|
|
g_clear_error(&con->err);
|
|
g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Connection closed: %s", g_strerror(errno));
|
|
close_con(con);
|
|
break;
|
|
}
|
|
return FALSE;
|
|
}
|
|
|
|
line->used += r;
|
|
|
|
if (line->used > 0) {
|
|
/* search for \r\n */
|
|
gchar *addr = line->addr;
|
|
gsize i, len = line->used;
|
|
for (i = 0; i < len; i++) {
|
|
if (addr[i] == '\r') {
|
|
i++;
|
|
if (i < len && addr[i] == '\n') {
|
|
add_remaining(con, addr + i+1, len - (i+1));
|
|
line->used = i-1;
|
|
line->addr[i-1] = '\0';
|
|
return TRUE;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
static gboolean try_read_data(liMemcachedCon *con, gsize datalen) {
|
|
liBuffer *data;
|
|
ssize_t r;
|
|
|
|
datalen += 2; /* \r\n */
|
|
|
|
/* if we have remaining data use it for a new line */
|
|
if ((!con->data || con->data->used == 0) && con->remaining && con->remaining->used > 0) {
|
|
liBuffer *tmp = con->remaining; con->remaining = con->data; con->data = tmp;
|
|
}
|
|
|
|
if (!con->data) con->data = li_buffer_new_slice(MAX(BUFFER_CHUNK_SIZE, datalen));
|
|
|
|
if (con->data->alloc_size < datalen) {
|
|
data = li_buffer_new_slice(MAX(BUFFER_CHUNK_SIZE, datalen));
|
|
memcpy(data->addr, con->data->addr, (data->used = con->data->used));
|
|
li_buffer_release(con->data);
|
|
con->data = data;
|
|
}
|
|
|
|
g_assert(NULL == con->remaining || 0 == con->remaining->used); /* there shouldn't be any data in remaining while we fill con->data */
|
|
|
|
data = con->data;
|
|
|
|
if (data->used < datalen) {
|
|
/* read more data */
|
|
r = net_read(con->fd, data->addr + data->used, data->alloc_size - data->used);
|
|
if (r == 0) {
|
|
/* EOF */
|
|
g_clear_error(&con->err);
|
|
g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Connection closed by peer");
|
|
close_con(con);
|
|
return FALSE;
|
|
} else if (r < 0) {
|
|
switch (errno) {
|
|
case EAGAIN:
|
|
#if EWOULDBLOCK != EAGAIN
|
|
case EWOULDBLOCK:
|
|
#endif
|
|
break;
|
|
default:
|
|
g_clear_error(&con->err);
|
|
g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Connection closed: %s", g_strerror(errno));
|
|
close_con(con);
|
|
break;
|
|
}
|
|
return FALSE;
|
|
}
|
|
|
|
data->used += r;
|
|
}
|
|
|
|
if (data->used >= datalen) {
|
|
if (data->addr[datalen-2] != '\r' || data->addr[datalen-1] != '\n') {
|
|
/* Protocol error: data block not terminated with \r\n */
|
|
g_clear_error(&con->err);
|
|
g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Protocol error: data block not terminated with \\r\\n");
|
|
close_con(con);
|
|
return FALSE;
|
|
}
|
|
add_remaining(con, data->addr + datalen, data->used - datalen);
|
|
data->used = datalen - 2;
|
|
data->addr[datalen-2] = '\0';
|
|
return TRUE;
|
|
}
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
|
|
static void handle_read(liMemcachedCon *con) {
|
|
int_request *cur;
|
|
|
|
if (NULL == (cur = con->cur_req)) {
|
|
cur = con->cur_req = g_queue_peek_head(&con->req_queue);
|
|
|
|
if (NULL == cur) {
|
|
/* unexpected read event, perhaps just eof */
|
|
g_clear_error(&con->err);
|
|
g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Connection closed: unexpected read event");
|
|
close_con(con);
|
|
return;
|
|
}
|
|
|
|
reset_item(&con->curitem);
|
|
if (con->data) con->data->used = 0;
|
|
if (con->line) con->line->used = 0;
|
|
|
|
/* init read state */
|
|
switch (cur->type) {
|
|
case REQ_GET:
|
|
con->get_data_size = 0;
|
|
con->get_have_header = FALSE;
|
|
break;
|
|
case REQ_SET:
|
|
break;
|
|
}
|
|
}
|
|
|
|
switch (cur->type) {
|
|
case REQ_GET:
|
|
if (!con->get_have_header) {
|
|
char *pos, *next;
|
|
|
|
/* wait for header line */
|
|
if (!try_read_line(con)) return;
|
|
|
|
con->get_have_header = TRUE;
|
|
|
|
if (3 == con->line->used && 0 == memcmp("END", con->line->addr, 3)) {
|
|
/* key not found */
|
|
if (cur->req.callback) {
|
|
cur->req.callback(&cur->req, LI_MEMCACHED_NOT_FOUND, NULL, NULL);
|
|
}
|
|
con->cur_req = NULL;
|
|
free_request(con, cur);
|
|
return;
|
|
}
|
|
|
|
/* con->line is 0 terminated */
|
|
|
|
if (0 != strncmp("VALUE ", con->line->addr, 6)) {
|
|
g_clear_error(&con->err);
|
|
g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Protocol error: Unexpected response for GET: '%s'", con->line->addr);
|
|
close_con(con);
|
|
return;
|
|
}
|
|
|
|
/* VALUE <key> <flags> <bytes> [<cas unique>]\r\n */
|
|
|
|
/* <key> */
|
|
pos = con->line->addr + 6;
|
|
next = strchr(pos, ' ');
|
|
if (NULL == next) goto req_get_header_error;
|
|
|
|
con->curitem.key = g_string_new_len(pos, next - pos);
|
|
|
|
/* <flags> */
|
|
pos = next + 1;
|
|
con->curitem.flags = strtoul(pos, &next, 10);
|
|
if (' ' != *next || pos == next) goto req_get_header_error;
|
|
|
|
/* <bytes> */
|
|
pos = next + 1;
|
|
con->get_data_size = g_ascii_strtoll(pos, &next, 10);
|
|
if (pos == next) goto req_get_header_error;
|
|
|
|
/* [<cas unique>] */
|
|
if (' ' == *next) {
|
|
pos = next + 1;
|
|
con->curitem.cas = g_ascii_strtoll(pos, &next, 10);
|
|
if (pos == next) goto req_get_header_error;
|
|
}
|
|
|
|
if ('\0' != *next) {
|
|
goto req_get_header_error;
|
|
}
|
|
|
|
con->line->used = 0;
|
|
|
|
goto req_get_header_done;
|
|
|
|
req_get_header_error:
|
|
g_clear_error(&con->err);
|
|
g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Protocol error: Couldn't parse VALUE respone: '%s'", con->line->addr);
|
|
close_con(con);
|
|
return;
|
|
|
|
req_get_header_done: ;
|
|
}
|
|
if (NULL == con->data || con->data->used < con->get_data_size) {
|
|
/* wait for data */
|
|
if (!try_read_data(con, con->get_data_size)) return;
|
|
}
|
|
/* wait for END\r\n */
|
|
if (!try_read_line(con)) return;
|
|
|
|
if (3 == con->line->used && 0 == memcmp("END", con->line->addr, 3)) {
|
|
/* Move data to item */
|
|
con->curitem.data = con->data;
|
|
con->data = NULL;
|
|
if (cur->req.callback) {
|
|
cur->req.callback(&cur->req, LI_MEMCACHED_OK, &con->curitem, NULL);
|
|
}
|
|
reset_item(&con->curitem);
|
|
} else {
|
|
g_clear_error(&con->err);
|
|
g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Protocol error: GET response not terminated with END (got '%s')", con->line->addr);
|
|
close_con(con);
|
|
return;
|
|
}
|
|
|
|
con->cur_req = NULL;
|
|
free_request(con, cur);
|
|
return;
|
|
|
|
case REQ_SET:
|
|
if (!try_read_line(con)) return;
|
|
|
|
if (6 == con->line->used && 0 == memcmp("STORED", con->line->addr, 6)) {
|
|
if (cur->req.callback) {
|
|
cur->req.callback(&cur->req, LI_MEMCACHED_OK, NULL, NULL);
|
|
}
|
|
} else {
|
|
g_clear_error(&con->err);
|
|
g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Protocol error: unepxected SET response: '%s'", con->line->addr);
|
|
close_con(con);
|
|
return;
|
|
}
|
|
|
|
con->cur_req = NULL;
|
|
free_request(con, cur);
|
|
return;
|
|
}
|
|
}
|
|
|
|
static void memcached_io_cb(struct ev_loop *loop, ev_io *w, int revents) {
|
|
liMemcachedCon *con = (liMemcachedCon*) w->data;
|
|
UNUSED(loop);
|
|
|
|
if (1 == g_atomic_int_get(&con->refcount) && w->active) {
|
|
memcached_stop_io(con);
|
|
return;
|
|
}
|
|
|
|
if (-1 == con->fd) {
|
|
memcached_connect(con);
|
|
return;
|
|
}
|
|
|
|
li_memcached_con_acquire(con); /* make sure con isn't freed in the middle of something */
|
|
|
|
if (revents | EV_WRITE) {
|
|
int i;
|
|
ssize_t written, len;
|
|
gchar *data;
|
|
send_item *si;
|
|
|
|
si = g_queue_peek_head(&con->out);
|
|
|
|
for (i = 0; si && (i < 10); i++) { /* don't send more than 10 chunks */
|
|
data = si->buf->addr + si->pos;
|
|
len = si->len;
|
|
written = write(w->fd, data, len);
|
|
if (written < 0) {
|
|
switch (errno) {
|
|
case EINTR:
|
|
continue;
|
|
case EAGAIN:
|
|
#if EWOULDBLOCK != EAGAIN
|
|
case EWOULDBLOCK:
|
|
#endif
|
|
goto write_eagain;
|
|
default: /* Fatal error, connection has to be closed */
|
|
g_clear_error(&con->err);
|
|
g_set_error(&con->err, LI_MEMCACHED_ERROR, LI_MEMCACHED_CONNECTION, "Couldn't write socket '%s': %s",
|
|
li_sockaddr_to_string(con->addr, con->tmpstr, TRUE)->str,
|
|
g_strerror(errno));
|
|
close_con(con);
|
|
goto out;
|
|
}
|
|
} else {
|
|
si->pos += written;
|
|
si->len -= written;
|
|
if (0 == si->len) {
|
|
send_queue_item_free(si);
|
|
g_queue_pop_head(&con->out);
|
|
si = g_queue_peek_head(&con->out);
|
|
}
|
|
}
|
|
}
|
|
|
|
write_eagain:
|
|
send_queue_clean(&con->out);
|
|
}
|
|
|
|
if (revents | EV_READ) {
|
|
do {
|
|
handle_read(con);
|
|
} while (con->remaining && con->remaining->used > 0);
|
|
}
|
|
|
|
out:
|
|
memcached_update_io(con);
|
|
li_memcached_con_release(con);
|
|
}
|
|
|
|
|
|
liMemcachedCon* li_memcached_con_new(struct ev_loop *loop, liSocketAddress addr) {
|
|
liMemcachedCon* con = g_slice_new0(liMemcachedCon);
|
|
|
|
con->refcount = 1;
|
|
con->loop = loop;
|
|
con->addr = li_sockaddr_dup(addr);
|
|
con->tmpstr = g_string_sized_new(511);
|
|
|
|
con->fd = -1;
|
|
ev_io_init(&con->con_watcher, memcached_io_cb, -1, 0);
|
|
con->con_watcher.data = con;
|
|
|
|
memcached_connect(con);
|
|
|
|
return con;
|
|
}
|
|
|
|
static void li_memcached_con_free(liMemcachedCon* con) {
|
|
if (!con) return;
|
|
|
|
if (-1 != con->con_watcher.fd) {
|
|
close(con->con_watcher.fd);
|
|
/* as io has a reference on con, we don't need to stop it here */
|
|
ev_io_set(&con->con_watcher, -1, 0);
|
|
con->fd = -1;
|
|
}
|
|
|
|
send_queue_reset(&con->out);
|
|
cancel_all_requests(con);
|
|
|
|
li_buffer_release(con->buf);
|
|
li_buffer_release(con->line);
|
|
li_buffer_release(con->remaining);
|
|
li_buffer_release(con->data);
|
|
reset_item(&con->curitem);
|
|
|
|
li_sockaddr_clear(&con->addr);
|
|
g_string_free(con->tmpstr, TRUE);
|
|
|
|
g_clear_error(&con->err);
|
|
|
|
g_slice_free(liMemcachedCon, con);
|
|
}
|
|
|
|
void li_memcached_con_release(liMemcachedCon* con) {
|
|
if (!con) return;
|
|
assert(g_atomic_int_get(&con->refcount) > 0);
|
|
if (g_atomic_int_dec_and_test(&con->refcount)) {
|
|
li_memcached_con_free(con);
|
|
}
|
|
}
|
|
|
|
void li_memcached_con_acquire(liMemcachedCon* con) {
|
|
assert(g_atomic_int_get(&con->refcount) > 0);
|
|
g_atomic_int_inc(&con->refcount);
|
|
}
|
|
|
|
|
|
liMemcachedRequest* li_memcached_get(liMemcachedCon *con, GString *key, liMemcachedCB callback, gpointer cb_data, GError **err) {
|
|
int_request* req;
|
|
|
|
if (!li_memcached_is_key_valid(key)) {
|
|
g_set_error(err, LI_MEMCACHED_ERROR, LI_MEMCACHED_BAD_KEY, "Invalid key: '%s'", key->str);
|
|
return NULL;
|
|
}
|
|
|
|
if (-1 == con->fd) memcached_connect(con);
|
|
if (-1 == con->fd) {
|
|
if (NULL == con->err) {
|
|
g_set_error(err, LI_MEMCACHED_ERROR, LI_MEMCACHED_DISABLED, "Not connected");
|
|
} else if (err) {
|
|
*err = g_error_copy(con->err);
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
req = g_slice_new0(int_request);
|
|
req->req.callback = callback;
|
|
req->req.cb_data = cb_data;
|
|
|
|
req->type = REQ_GET;
|
|
req->key = g_string_new_len(GSTR_LEN(key));
|
|
|
|
if (!push_request(con, req, err)) {
|
|
free_request(con, req);
|
|
return NULL;
|
|
}
|
|
|
|
return &req->req;
|
|
}
|
|
|
|
liMemcachedRequest* li_memcached_set(liMemcachedCon *con, GString *key, guint32 flags, ev_tstamp ttl, liBuffer *data, liMemcachedCB callback, gpointer cb_data, GError **err) {
|
|
int_request* req;
|
|
|
|
if (!li_memcached_is_key_valid(key)) {
|
|
g_set_error(err, LI_MEMCACHED_ERROR, LI_MEMCACHED_BAD_KEY, "Invalid key: '%s'", key->str);
|
|
return NULL;
|
|
}
|
|
|
|
if (-1 == con->fd) memcached_connect(con);
|
|
if (-1 == con->fd) {
|
|
if (NULL == con->err) {
|
|
g_set_error(err, LI_MEMCACHED_ERROR, LI_MEMCACHED_DISABLED, "Not connected");
|
|
} else if (err) {
|
|
*err = g_error_copy(con->err);
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
req = g_slice_new0(int_request);
|
|
req->req.callback = callback;
|
|
req->req.cb_data = cb_data;
|
|
|
|
req->type = REQ_SET;
|
|
req->key = g_string_new_len(GSTR_LEN(key));
|
|
req->flags = flags;
|
|
req->ttl = ttl;
|
|
li_buffer_acquire(data);
|
|
req->data = data;
|
|
|
|
if (!push_request(con, req, err)) {
|
|
free_request(con, req);
|
|
return NULL;
|
|
}
|
|
|
|
return &req->req;
|
|
}
|
|
|
|
/* if length(key) <= 250 and all chars x: 0x20 < x < 0x7f the key
|
|
* remains untouched; otherwise it gets replaced with its sha1hex hash
|
|
* so in most cases the key stays readable, and we have a good fallback
|
|
*/
|
|
|
|
void li_memcached_mutate_key(GString *key) {
|
|
GChecksum *hash;
|
|
|
|
if (li_memcached_is_key_valid(key)) return;
|
|
|
|
hash = g_checksum_new(G_CHECKSUM_SHA1);
|
|
|
|
g_checksum_update(hash, (const guchar *) GSTR_LEN(key));
|
|
g_string_assign(key, g_checksum_get_string(hash));
|
|
|
|
g_checksum_free(hash);
|
|
}
|
|
|
|
gboolean li_memcached_is_key_valid(GString *key) {
|
|
guint i;
|
|
|
|
if (key->len > 250 || 0 == key->len) return FALSE;
|
|
|
|
for (i = 0; i < key->len; i++) {
|
|
if (key->str[i] <= 0x20 || key->str[i] >= 0x7f) return FALSE;
|
|
}
|
|
|
|
return TRUE;
|
|
}
|