summaryrefslogtreecommitdiff
path: root/libmanda.c
diff options
context:
space:
mode:
authorStefan Bühler <stbuehler@web.de>2010-09-09 14:26:43 +0200
committerStefan Bühler <stbuehler@web.de>2010-09-09 14:26:43 +0200
commit7a88d7a2887dcfea8371e7e84bb905a2527f4133 (patch)
tree3c3b23528af46391a45209b6b2f187dd76ebd3f0 /libmanda.c
parent87bb51239cb3ecd64407c4e06fb14e35da21db8c (diff)
downloadlibmanda-7a88d7a2887dcfea8371e7e84bb905a2527f4133.tar.gz
libmanda-7a88d7a2887dcfea8371e7e84bb905a2527f4133.zip
Add idlist, code basic message handling
Diffstat (limited to 'libmanda.c')
-rw-r--r--libmanda.c725
1 files changed, 725 insertions, 0 deletions
diff --git a/libmanda.c b/libmanda.c
index cf5436a..8dd8c9f 100644
--- a/libmanda.c
+++ b/libmanda.c
@@ -1,5 +1,6 @@
#include "libmanda.h"
+#include "idlist.h"
#include <arpa/inet.h>
#include <errno.h>
@@ -8,3 +9,727 @@
#include <unistd.h>
#include <string.h>
+#include <assert.h>
+
+#define TIMEOUT_STEP (3)
+
+#define ENTER(x) do { ++((x)->refcount); } while(0)
+#define LEAVE(x, destroy) do { if (0 == --((x)->refcount)) destroy(x); } while(0)
+
+#if __GNUC__
+# define INLINE static inline
+#else
+# define INLINE static
+#endif
+
+typedef struct messageheader messageheader;
+typedef struct request request;
+typedef struct manda_connection manda_connection;
+
+typedef void (*con_message_cb)(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *mesg_payload);
+typedef void (*con_close_cb)(manda_connection *con);
+
+struct messageheader {
+ guint16 command, size, reqid, respid;
+};
+
+struct request {
+ guint16 command; /* zero means slot is free */
+ double timeout, timeout_step;
+
+ guint16 timeout_prev_id, timeout_next_id;
+
+ gpointer data;
+};
+
+struct manda_connection {
+ gpointer data; /* application data */
+ gpointer priv_data; /* internal data */
+ const manda_async_ctrl *ctrl;
+
+ con_message_cb message_cb;
+ con_close_cb close_cb;
+
+ int fd;
+
+ manda_fd_watcher fd_watcher;
+ manda_timeout req_timeout;
+
+ manda_IDList *request_ids;
+ GArray *requests;
+ guint16 timeout_first_id, timeout_last_id;
+
+ guint cur_header_pos; /* how many bytes of the header we have */
+ guint8 cur_header_buf[8];
+ messageheader cur_header;
+ guint cur_payload_pos;
+ GByteArray *cur_payload;
+
+ guint cur_send_pos;
+ GQueue send_queue;
+
+ gint refcount;
+};
+
+
+INLINE guint8 _read_net_uint8(const guint8* buf) {
+ return *buf;
+}
+INLINE guint16 _read_net_uint16(const guint8* buf) {
+ guint16 i;
+ memcpy(&i, buf, sizeof(i));
+ return ntohs(i);
+}
+INLINE guint32 _read_net_uint32(const guint8* buf) {
+ guint32 i;
+ memcpy(&i, buf, sizeof(i));
+ return ntohl(i);
+}
+
+INLINE void _write_net_uint8(guint8 *buf, guint8 val) {
+ *buf = val;
+}
+INLINE void _write_net_uint16(guint8 *buf, guint16 val) {
+ val = htons(val);
+ memcpy(buf, &val, sizeof(val));
+}
+INLINE void _write_net_uint32(guint8 *buf, guint32 val) {
+ val = htons(val);
+ memcpy(buf, &val, sizeof(val));
+}
+
+INLINE gboolean read_net_uint8(GByteArray *buf, guint *pos, guint8 *dest) {
+ if (buf->len < sizeof(*dest) || *pos >= buf->len - sizeof(*dest)) {
+ /* end of buffer */
+ *dest = 0;
+ *pos = buf->len;
+ return FALSE;
+ }
+ *dest = _read_net_uint8(buf->data + *pos);
+ *pos += sizeof(*dest);
+ return TRUE;
+}
+INLINE gboolean read_net_uint16(GByteArray *buf, guint *pos, guint16 *dest) {
+ if (buf->len < sizeof(*dest) || *pos >= buf->len - sizeof(*dest)) {
+ /* end of buffer */
+ *dest = 0;
+ *pos = buf->len;
+ return FALSE;
+ }
+ *dest = _read_net_uint16(buf->data + *pos);
+ *pos += sizeof(*dest);
+ return TRUE;
+}
+INLINE gboolean read_net_uint32(GByteArray *buf, guint *pos, guint32 *dest) {
+ if (buf->len < sizeof(*dest) || *pos >= buf->len - sizeof(*dest)) {
+ /* end of buffer */
+ *dest = 0;
+ *pos = buf->len;
+ return FALSE;
+ }
+ *dest = _read_net_uint32(buf->data + *pos);
+ *pos += sizeof(*dest);
+ return TRUE;
+}
+INLINE gboolean read_net_string(GByteArray *buf, guint *pos, GString *dest) {
+ guint16 slen;
+ g_string_truncate(dest, 0);
+ if (!read_net_uint16(buf, pos, &slen)) return FALSE;
+ if (buf->len < slen || *pos >= buf->len - slen) {
+ *pos = buf->len;
+ return FALSE;
+ }
+ g_string_set_size(dest, slen);
+ memcpy(dest->str, buf->data + *pos, slen);
+ *pos += slen;
+ return TRUE;
+}
+
+INLINE void write_net_uint8(GByteArray *buf, guint8 val) {
+ guint curlen = buf->len;
+ g_byte_array_set_size(buf, curlen + sizeof(val));
+ _write_net_uint8(buf->data + curlen, val);
+}
+INLINE void write_net_uint16(GByteArray *buf, guint16 val) {
+ guint curlen = buf->len;
+ g_byte_array_set_size(buf, curlen + sizeof(val));
+ _write_net_uint16(buf->data + curlen, val);
+}
+INLINE void write_net_uint32(GByteArray *buf, guint32 val) {
+ guint curlen = buf->len;
+ g_byte_array_set_size(buf, curlen + sizeof(val));
+ _write_net_uint32(buf->data + curlen, val);
+}
+INLINE void write_net_string(GByteArray *buf, GString *val) {
+ guint curlen = buf->len;
+ g_byte_array_set_size(buf, curlen + sizeof(guint16) + val->len);
+ _write_net_uint16(buf->data + curlen, val->len);
+ memcpy(buf->data + curlen + sizeof(guint16), val->str, val->len);
+}
+
+
+
+static void con_req_unlink(manda_connection *con, request *req) {
+ request *prev = NULL, *next = NULL;
+
+ /* unlink */
+ if (req->timeout_next_id != 0) {
+ next = &g_array_index(con->requests, request, req->timeout_next_id);
+ next->timeout_prev_id = req->timeout_prev_id;
+ } else {
+ con->timeout_last_id = req->timeout_prev_id;
+ }
+ if (req->timeout_prev_id != 0) {
+ prev = &g_array_index(con->requests, request, req->timeout_prev_id);
+ prev->timeout_next_id = req->timeout_next_id;
+ } else {
+ con->timeout_first_id = req->timeout_next_id;
+ }
+ req->timeout_next_id = req->timeout_prev_id = 0;
+}
+
+static void con_req_push(manda_connection *con, request *req, guint16 reqid) {
+ request *prev;
+
+ if (con->timeout_last_id != 0) {
+ prev = &g_array_index(con->requests, request, con->timeout_last_id);
+ prev->timeout_next_id = reqid;
+ } else {
+ con->timeout_first_id = reqid;
+ }
+
+ req->timeout_prev_id = con->timeout_last_id;
+ con->timeout_last_id = reqid;
+}
+
+static void con_close(manda_connection *con) {
+ GByteArray *buf;
+
+ if (NULL != con->fd_watcher.priv) {
+ con->ctrl->destroy_fd_watcher(con->data, &con->fd_watcher);
+ con->fd_watcher.priv = NULL;
+ }
+ if (NULL != con->req_timeout.priv) {
+ con->ctrl->destroy_timeout(con->data, &con->req_timeout);
+ con->req_timeout.priv = NULL;
+ }
+
+ if (-1 != con->fd) {
+ while (-1 == close(con->fd) && errno == EINTR) ;
+ con->fd = -1;
+ }
+
+ while (NULL != (buf = g_queue_pop_head(&con->send_queue))) {
+ g_byte_array_free(buf, TRUE);
+ }
+ con->cur_send_pos = 0;
+
+ if (NULL != con->close_cb) {
+ con->close_cb(con);
+ }
+
+ /* "timeout" all requests */
+ for ( ; con->timeout_first_id > 0; ) {
+ request *req;
+ guint16 req_command;
+ gpointer req_data;
+
+ req = &g_array_index(con->requests, request, con->timeout_first_id);
+
+ req_command = req->command;
+ req_data = req->data;
+
+ con_req_unlink(con, req);
+ /* reset request */
+ req->command = 0;
+ req->timeout = 0;
+ req->data = NULL;
+
+ if (NULL != con->message_cb) {
+ con->message_cb(con, req_command, req_data, 0, 0, NULL);
+ }
+ }
+
+ if (NULL != con->request_ids) {
+ manda_idlist_free(con->request_ids);
+ con->request_ids = NULL;
+ }
+
+ if (NULL != con->requests) {
+ g_array_free(con->requests, TRUE);
+ con->requests = NULL;
+ }
+
+ if (NULL != con->cur_payload) {
+ g_byte_array_free(con->cur_payload, TRUE);
+ con->cur_payload = NULL;
+ }
+}
+
+static void _con_free(manda_connection *con) {
+ con_close(con);
+
+ g_slice_free(manda_connection, con);
+}
+
+static void con_free(manda_connection *con) {
+ LEAVE(con, _con_free);
+}
+
+static void con_handle_response(manda_connection *con) {
+ GByteArray *payload = con->cur_payload;
+ guint16 orig_command = 0, mesg_command = con->cur_header.command, resp_to = con->cur_header.respid, req_id = con->cur_header.reqid;
+ gpointer orig_data = NULL;
+
+ con->cur_header_pos = 0;
+ con->cur_payload_pos = 0;
+ con->cur_payload = NULL;
+
+ if (0 != resp_to) {
+ request *req;
+
+ if (!manda_idlist_is_used(con->request_ids, resp_to)) {
+ /* protocol error */
+ con_close(con);
+ goto clean;
+ }
+
+ manda_idlist_put(con->request_ids, resp_to);
+
+ /* try to find request data; if we can't find it the timeout already triggered */
+ if (resp_to >= con->requests->len) goto clean;
+ req = &g_array_index(con->requests, request, resp_to);
+ if (0 == req->command) goto clean; /* "empty" request */
+
+ orig_command = req->command;
+ orig_data = req->data;
+
+ /* reset request */
+ con_req_unlink(con, req);
+ req->command = 0;
+ req->timeout = req->timeout_step = 0;
+ req->data = NULL;
+ }
+
+ if (NULL != con->message_cb) {
+ con->message_cb(con, orig_command, orig_data, mesg_command, req_id, payload);
+ }
+
+clean:
+ if (NULL != payload) g_byte_array_free(payload, TRUE);
+}
+
+static void con_fd_watcher_update(manda_connection *con) {
+ if (con->fd != -1) {
+ int events = (con->send_queue.length > 0) ? MANDA_FD_READ | MANDA_FD_WRITE : MANDA_FD_READ;
+ if (events != con->fd_watcher.events) {
+ con->fd_watcher.events = events;
+ con->ctrl->update_fd_watcher(con->data, &con->fd_watcher);
+ }
+ }
+}
+
+static void con_fd_watcher_cb(manda_fd_watcher *watcher) {
+ manda_connection *con = watcher->priv;
+ guint i;
+
+ ENTER(con);
+
+ /* handle read */
+ for ( i = 0 ; (con->fd != -1) && (i < 100) ; i++ ) {
+ if (con->cur_header_pos < 8) {
+ ssize_t r = read(con->fd, &con->cur_header_buf[con->cur_header_pos], 8 - con->cur_header_pos);
+
+ if (r < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ break;
+ case ECONNRESET: /* "eof" */
+ con_close(con);
+ goto out;
+ default:
+ con_close(con);
+ goto out;
+ }
+ break;
+ } else if (r == 0) { /* eof */
+ con_close(con);
+ goto out;
+ } else {
+ con->cur_header_pos += r;
+ }
+ }
+
+ if (con->cur_header_pos < 8) break;
+
+ /* parse header */
+ con->cur_header.command = _read_net_uint16(con->cur_header_buf + 0);
+ con->cur_header.size = _read_net_uint16(con->cur_header_buf + 2);
+ con->cur_header.reqid = _read_net_uint16(con->cur_header_buf + 4);
+ con->cur_header.respid = _read_net_uint16(con->cur_header_buf + 6);
+
+ if (con->cur_header.size < 8) {
+ /* error */
+ con_close(con);
+ } else if (con->cur_header.size > 8) {
+ ssize_t r;
+
+ if (!con->cur_payload) {
+ con->cur_payload = g_byte_array_sized_new(con->cur_header.size - 8);
+ g_byte_array_set_size(con->cur_payload, con->cur_header.size - 8);
+ con->cur_payload_pos = 0;
+ }
+
+ r = read(con->fd, &con->cur_payload->data[con->cur_payload_pos], con->cur_payload->len - con->cur_payload_pos);
+
+ if (r < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ break;
+ case ECONNRESET: /* "eof" */
+ con_close(con);
+ goto out;
+ default:
+ con_close(con);
+ goto out;
+ }
+ break;
+ } else if (r == 0) { /* eof */
+ con_close(con);
+ goto out;
+ } else {
+ con->cur_payload_pos += r;
+ }
+
+ if (con->cur_payload_pos < con->cur_payload->len) break;
+
+ con_handle_response(con);
+ } else {
+ con_handle_response(con);
+ }
+ }
+
+ for ( i = 0 ; (con->fd != -1) && (i < 100) && (con->send_queue.length > 0) ; i++ ) {
+ GByteArray *buf = g_queue_peek_head(&con->send_queue);
+ ssize_t written;
+
+ written = write(con->fd, &buf->data[con->cur_send_pos], buf->len - con->cur_send_pos);
+ if (written < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ break;
+ case ECONNRESET:
+ case EPIPE:
+ con_close(con);
+ goto out;
+ default: /* Fatal error/remote close, connection has to be closed */
+ con_close(con);
+ goto out;
+ }
+ break;
+ } else {
+ con->cur_send_pos += written;
+ }
+
+ if (con->cur_send_pos == buf->len) {
+ con->cur_send_pos = 0;
+ g_queue_pop_head(&con->send_queue);
+ g_byte_array_free(buf, TRUE);
+ } else {
+ break;
+ }
+ }
+
+ con_fd_watcher_update(con);
+
+out:
+ LEAVE(con, _con_free);
+}
+
+static void con_timeout_cb(manda_timeout *timeout) {
+ manda_connection *con = timeout->priv;
+
+ double now = con->ctrl->get_time(con->data);
+
+ for ( ; con->timeout_first_id != 0; ) {
+ request *req;
+ guint16 req_command;
+ gpointer req_data;
+ guint16 reqid = con->timeout_first_id;
+
+ req = &g_array_index(con->requests, request, con->timeout_first_id);
+
+ if (req->timeout_step > now) break;
+
+ if (req->timeout > now) {
+ req->timeout_step = now + TIMEOUT_STEP;
+ if (req->timeout_step > req->timeout) req->timeout_step = req->timeout;
+
+ /* requeue */
+ con_req_unlink(con, req);
+ con_req_push(con, req, reqid);
+
+ continue;
+ }
+
+ req_command = req->command;
+ req_data = req->data;
+
+ /* reset request */
+ con_req_unlink(con, req);
+ req->command = 0;
+ req->timeout = req->timeout_step = 0;
+ req->data = NULL;
+
+ if (NULL != con->message_cb) {
+ con->message_cb(con, req_command, req_data, 0, 0, NULL);
+ }
+ }
+
+ if (con->timeout_first_id != 0) {
+ request *req;
+
+ req = &g_array_index(con->requests, request, con->timeout_first_id);
+
+ con->req_timeout.timeout = req->timeout_step;
+ con->ctrl->start_timeout(con->data, &con->req_timeout);
+ }
+}
+
+static manda_connection* con_new(gpointer srv, const manda_async_ctrl *ctrl, gpointer priv_data, con_message_cb message_cb, con_close_cb close_cb, int fd) {
+ manda_connection *con = g_slice_new0(manda_connection);
+ gint first_id;
+
+ con->data = srv;
+ con->ctrl = ctrl;
+ con->priv_data = priv_data;
+ con->message_cb = message_cb;
+ con->close_cb = close_cb;
+ con->fd = fd;
+
+ con->fd_watcher.priv = con;
+ con->fd_watcher.callback = con_fd_watcher_cb;
+ con->fd_watcher.events = MANDA_FD_READ;
+ con->fd_watcher.fd = fd;
+
+ con->req_timeout.priv = con;
+ con->req_timeout.callback = con_timeout_cb;
+ con->req_timeout.timeout = 0;
+
+ con->request_ids = manda_idlist_new(65535);
+ /* id 0 is reserved so request it here */
+ first_id = manda_idlist_get(con->request_ids);
+ assert(0 == first_id);
+
+ con->requests = g_array_new(FALSE, TRUE, sizeof(request));
+
+ con->ctrl->new_fd_watcher(con->data, &con->fd_watcher);
+ con->ctrl->update_fd_watcher(con->data, &con->fd_watcher);
+ con->ctrl->new_timeout(con->data, &con->req_timeout);
+
+ return con;
+}
+
+static void con_fix_header(GByteArray *payload, guint16 command, guint16 req_id, guint16 resp_id) {
+ _write_net_uint16(payload->data + 0, command);
+ _write_net_uint16(payload->data + 2, payload->len);
+ _write_net_uint16(payload->data + 4, req_id);
+ _write_net_uint16(payload->data + 6, resp_id);
+}
+
+/* payload needs to be prefixed with 8 dummy bytes for the header */
+static void con_send_request(manda_connection *con, GByteArray *payload, guint16 command, guint16 resp_id, gpointer data, double wait_timeout) {
+ double now;
+ gint reqid;
+ request *req;
+
+ ENTER(con);
+
+ if (-1 == con->fd) {
+ /* connection closed */
+ goto error;
+ }
+
+ if (payload->len > 65535 || payload->len < 8 || 0 == command || wait_timeout <= 0) {
+ /* payload too big / invalid parameters */
+ goto error;
+ }
+
+ reqid = manda_idlist_get(con->request_ids);
+
+ if (-1 == reqid) goto error; /* no free request id available */
+
+ assert(reqid > 0 && reqid < 65536);
+
+ if ((guint) reqid > con->requests->len) {
+ g_array_set_size(con->requests, reqid+1);
+ }
+ req = &g_array_index(con->requests, request, reqid);
+
+ now = con->ctrl->get_time(con->data);
+ req->timeout = now + wait_timeout;;
+ req->timeout_step = now + TIMEOUT_STEP;
+ if (req->timeout_step > req->timeout) req->timeout_step = req->timeout;
+
+ if (con->timeout_first_id == 0) {
+ con->req_timeout.timeout = req->timeout_step;
+ con->ctrl->start_timeout(con->data, &con->req_timeout);
+ }
+
+ con_fix_header(payload, command, reqid, resp_id);
+ con_req_push(con, req, reqid);
+
+ g_queue_push_tail(&con->send_queue, payload);
+ con_fd_watcher_update(con);
+
+ goto out;
+
+error:
+ if (NULL != con->message_cb) {
+ con->message_cb(con, command, data, 0, 0, NULL);
+ }
+
+ g_byte_array_free(payload, TRUE);
+
+out:
+ LEAVE(con, _con_free);
+}
+
+/* payload needs to be prefixed with 8 dummy bytes for the header */
+static void con_send_notify(manda_connection *con, GByteArray *payload, guint16 command, guint16 resp_id) {
+ ENTER(con);
+
+ if (-1 == con->fd) {
+ /* connection closed */
+ goto error;
+ }
+
+ if (payload->len > 65535 || payload->len < 8 || 0 == command) {
+ /* payload too big / invalid parameters */
+ goto error;
+ }
+
+ con_fix_header(payload, command, 0, resp_id);
+
+ g_queue_push_tail(&con->send_queue, payload);
+ con_fd_watcher_update(con);
+
+ goto out;
+
+error:
+ g_byte_array_free(payload, TRUE);
+
+out:
+ LEAVE(con, _con_free);
+}
+
+
+
+typedef struct server_socket server_socket;
+struct server_socket {
+ int fd;
+ gpointer data;
+};
+
+static void manda_server_connection_free(manda_server_connection *con);
+static void manda_server_backend_release(manda_server_connection *con, guint32 id);
+
+manda_server* manda_server_new(gpointer srv, const manda_async_ctrl *ctrl, const manda_server_callbacks *callbacks) {
+ manda_server *s = g_slice_new(manda_server);
+ s->refcount = 1;
+ s->data = srv;
+ s->connections = g_ptr_array_new();
+ s->ctrl = ctrl;
+ s->callbacks = callbacks;
+ s->sockets = g_array_new(FALSE, TRUE, sizeof(server_socket));
+
+ return s;
+}
+
+void manda_server_acquire(manda_server *s) {
+ ++s->refcount;
+}
+
+void manda_server_close(manda_server *s) {
+ guint i;
+
+ for (i = 0; i < s->sockets->len; i++) {
+ server_socket *sock = &g_array_index(s->sockets, server_socket, i);
+ close(sock->fd);
+ }
+ g_array_set_size(s->sockets, 0);
+
+ for (i = s->connections->len; i-- > 0; ) {
+ manda_server_connection *con = g_ptr_array_index(s->connections, i);
+ con->delete_later = TRUE;
+ s->callbacks->closed_connection(s->data, con);
+ if (con->refcount == 0) manda_server_connection_free(con);
+ }
+ g_ptr_array_set_size(s->connections, 0);
+}
+
+void manda_server_release(manda_server *s) {
+ if (NULL == s) return;
+ if (0 < --s->refcount) return;
+
+ manda_server_close(s);
+
+ g_array_free(s->sockets, TRUE);
+ g_ptr_array_free(s->connections, TRUE);
+
+ g_slice_free(manda_server, s);
+}
+
+void manda_server_add_socket(manda_server *s, int fd, gpointer data);
+
+static void manda_server_connection_free(manda_server_connection *con) {
+ /* TODO */
+ g_slice_free(manda_server_connection, con);
+}
+
+void manda_server_con_close(manda_server_connection *con);
+
+static void manda_server_backend_release(manda_server_connection *con, guint32 id) {
+ manda_server_backend_use *use;
+ manda_server_backend *b;
+
+ if (id >= con->backends->len) return;
+
+ use = g_ptr_array_index(con->backends, id);
+ if (NULL == use) return;
+
+ b = use->backend;
+ if (use->ndx != b->usage->len - 1) {
+ manda_server_backend_use *u = g_ptr_array_index(b->usage, b->usage->len - 1);
+ g_ptr_array_index(b->usage, use->ndx) = u;
+ u->ndx = use->ndx;
+ }
+ b->sum_last_load -= use->last_load;
+ g_ptr_array_set_size(b->usage, b->usage->len-1);
+
+ con->refcount++;
+
+ con->srv->callbacks->release_backend(con->srv->data, b, use->ndx, use);
+ g_slice_free(manda_server_backend_use, use);
+}
+
+manda_server_backend *manda_server_backend_new(gpointer data, GString *addr);
+void manda_server_backend_free(manda_server_backend *backend);
+
+void manda_server_return_backend(manda_server_connection *con, gint16 reqid, manda_server_backend *backend);
+void manda_server_drop_backend(manda_server_backend *backend);