summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Bühler <stbuehler@web.de>2010-09-09 14:26:43 +0200
committerStefan Bühler <stbuehler@web.de>2010-09-09 14:26:43 +0200
commit7a88d7a2887dcfea8371e7e84bb905a2527f4133 (patch)
tree3c3b23528af46391a45209b6b2f187dd76ebd3f0
parent87bb51239cb3ecd64407c4e06fb14e35da21db8c (diff)
downloadlibmanda-7a88d7a2887dcfea8371e7e84bb905a2527f4133.tar.gz
libmanda-7a88d7a2887dcfea8371e7e84bb905a2527f4133.zip
Add idlist, code basic message handling
-rw-r--r--CMakeLists.txt2
-rw-r--r--idlist.c114
-rw-r--r--idlist.h39
-rw-r--r--libmanda.c725
-rw-r--r--libmanda.h97
5 files changed, 965 insertions, 12 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index abe55ca..22204fa 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -35,7 +35,7 @@ pkg_check_modules (GLIB2 REQUIRED glib-2.0)
SET(GLIB_INCLUDES ${GLIB2_INCLUDE_DIRS} ${GLIB2_INCLUDE_DIRS}/glib-2.0/ ${GLIB2_INCLUDE_DIRS}/glib-2.0/include/)
INCLUDE_DIRECTORIES(${GLIB_INCLUDES})
-SET(MAIN_SOURCE libmanda.c)
+SET(MAIN_SOURCE libmanda.c idlist.c)
SET(PACKAGE_NAME ${CMAKE_PROJECT_NAME})
SET(PACKAGE_VERSION ${PACKAGE_VERSION})
diff --git a/idlist.c b/idlist.c
new file mode 100644
index 0000000..56e3e68
--- /dev/null
+++ b/idlist.c
@@ -0,0 +1,114 @@
+
+#include "idlist.h"
+
+#define UL_BITS (sizeof(gulong) * 8)
+
+/* There are often no explicit bit shifts used in this code. This is on purpose, the
+ * code looks much cleaner without them, the correct constant for *, / and % is easier to calculate
+ * as constant (UL_BITS) and the compiler should know how to optimize the operations; as UL_BITS is hopefully
+ * of the form 2^n this should result in bit shifts in the executable code.
+ */
+
+manda_IDList* manda_idlist_new(gint max_ids) {
+ manda_IDList *l = g_slice_new0(manda_IDList);
+ g_assert(max_ids > 0);
+ l->bitvector = g_array_new(FALSE, TRUE, sizeof(gulong));
+ l->max_ids = max_ids;
+ l->next_free_id = -1;
+ l->used_ids = 0;
+ return l;
+}
+
+void manda_idlist_free(manda_IDList *l) {
+ if (!l) return;
+ g_array_free(l->bitvector, TRUE);
+ g_slice_free(manda_IDList, l);
+}
+
+static void mark_bit(GArray *a, gint id) {
+ guint ndx = id / UL_BITS, bndx = id % UL_BITS;
+ gulong bmask = 1 << bndx;
+ g_assert(id >= 0 && ndx < a->len);
+
+ g_assert(0 == (g_array_index(a, gulong, ndx) & (bmask))); /* bit musn't be set */
+ g_array_index(a, gulong, ndx) |= (bmask);
+}
+
+static void clear_bit(GArray *a, gint id) {
+ guint ndx = id / UL_BITS, bndx = id % UL_BITS;
+ gulong bmask = 1 << bndx;
+ g_assert(id >= 0 && ndx < a->len);
+
+ g_assert(0 != (g_array_index(a, gulong, ndx) & (bmask))); /* bit must be set */
+ g_array_index(a, gulong, ndx) &= ~(bmask);
+}
+
+static void idlist_reserve(GArray *a, guint id) {
+ guint ndx = id / UL_BITS;
+ if (ndx >= a->len) g_array_set_size(a, ndx+1);
+}
+
+gint manda_idlist_get(manda_IDList *l) {
+ guint fndx, ndx;
+ gint newid, bndx;
+ gulong u = -1;
+ GArray *a = l->bitvector;
+ if (l->used_ids >= l->max_ids) return -1;
+
+ if (l->next_free_id < 0) { /* all ids in use */
+ newid = l->used_ids++;
+ idlist_reserve(a, newid);
+ mark_bit(a, newid);
+ return newid;
+ }
+
+ /* search for an array entry which doesn't have all bits set (i.e. != (gulong) -1)
+ * start with the entry of next_free_id, all below are in use anyway
+ */
+ fndx = l->next_free_id / UL_BITS;
+ for (ndx = fndx; ndx < a->len && ((gulong) -1 == (u = g_array_index(a, gulong, ndx))); ndx++) ;
+
+ if (ndx == a->len) { /* again: all ids are in use */
+ l->next_free_id = -1;
+ newid = l->used_ids++;
+ idlist_reserve(a, newid);
+ mark_bit(a, newid);
+ return newid;
+ }
+
+ /* array entry != -1, search for free bit */
+ if (fndx == ndx) bndx = (l->next_free_id / UL_BITS) - 1;
+ else bndx = -1;
+ bndx = g_bit_nth_lsf(~u, bndx);
+
+ /* no free bit found; should never happen as u != -1 and next_free_id should be correct, i.e. all bits <= the bit start index should be set */
+ g_assert(bndx != -1);
+
+ newid = ndx * UL_BITS + bndx;
+ if (newid == (gint) l->used_ids) {
+ l->next_free_id = -1;
+ } else {
+ l->next_free_id = newid+1;
+ }
+
+ l->used_ids++;
+ mark_bit(a, newid);
+
+ return newid;
+}
+
+gboolean manda_idlist_is_used(manda_IDList *l, gint id) {
+ GArray *a = l->bitvector;
+ guint ndx = id / UL_BITS, bndx = id % UL_BITS;
+ gulong bmask = 1 << bndx;
+ if (id < 0 || ndx >= a->len) return FALSE;
+
+ return (0 != (g_array_index(a, gulong, ndx) & (bmask)));
+}
+
+void manda_idlist_put(manda_IDList *l, gint id) {
+ clear_bit(l->bitvector, id);
+
+ l->used_ids--;
+ if ((l->next_free_id < 0 && (guint) id < l->used_ids) || id < l->next_free_id) l->next_free_id = id;
+}
diff --git a/idlist.h b/idlist.h
new file mode 100644
index 0000000..879b894
--- /dev/null
+++ b/idlist.h
@@ -0,0 +1,39 @@
+#ifndef _LIBMANDA_IDLIST_H_
+#define _LIBMANDA_IDLIST_H_
+
+#include "libmanda.h"
+
+struct manda_IDList {
+ /* used ids are marked with a "1" in the bitvector (represented as array of gulong) */
+ GArray *bitvector;
+
+ /* all ids are in the range [0, max_ids[, i.e. 0 <= id < max_ids
+ * although the type is guint, it has to fit in a gint too, as we
+ * use gint for the ids in the interface, so we can use -1 as a special value.
+ */
+ guint max_ids;
+
+ /* if all ids in [0, used_ids-1] are used, next_free_id is -1
+ * if not, then all available ids are >= next_free_id,
+ * so we can start at next_free_id for searching the next free id
+ */
+ gint next_free_id;
+ guint used_ids;
+};
+
+/* create new idlist; the parameter max_ids is "signed" on purpose */
+manda_IDList* manda_idlist_new(gint max_ids);
+
+/* free idlist */
+void manda_idlist_free(manda_IDList *l);
+
+/* request new id; return -1 if no id is available, valid ids are always > 0 */
+gint manda_idlist_get(manda_IDList *l);
+
+/* check whether an id is in use and can be "_put" */
+gboolean manda_idlist_is_used(manda_IDList *l, gint id);
+
+/* release id. never release an id more than once! */
+void manda_idlist_put(manda_IDList *l, gint id);
+
+#endif
diff --git a/libmanda.c b/libmanda.c
index cf5436a..8dd8c9f 100644
--- a/libmanda.c
+++ b/libmanda.c
@@ -1,5 +1,6 @@
#include "libmanda.h"
+#include "idlist.h"
#include <arpa/inet.h>
#include <errno.h>
@@ -8,3 +9,727 @@
#include <unistd.h>
#include <string.h>
+#include <assert.h>
+
+#define TIMEOUT_STEP (3)
+
+#define ENTER(x) do { ++((x)->refcount); } while(0)
+#define LEAVE(x, destroy) do { if (0 == --((x)->refcount)) destroy(x); } while(0)
+
+#if __GNUC__
+# define INLINE static inline
+#else
+# define INLINE static
+#endif
+
+typedef struct messageheader messageheader;
+typedef struct request request;
+typedef struct manda_connection manda_connection;
+
+typedef void (*con_message_cb)(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *mesg_payload);
+typedef void (*con_close_cb)(manda_connection *con);
+
+struct messageheader {
+ guint16 command, size, reqid, respid;
+};
+
+struct request {
+ guint16 command; /* zero means slot is free */
+ double timeout, timeout_step;
+
+ guint16 timeout_prev_id, timeout_next_id;
+
+ gpointer data;
+};
+
+struct manda_connection {
+ gpointer data; /* application data */
+ gpointer priv_data; /* internal data */
+ const manda_async_ctrl *ctrl;
+
+ con_message_cb message_cb;
+ con_close_cb close_cb;
+
+ int fd;
+
+ manda_fd_watcher fd_watcher;
+ manda_timeout req_timeout;
+
+ manda_IDList *request_ids;
+ GArray *requests;
+ guint16 timeout_first_id, timeout_last_id;
+
+ guint cur_header_pos; /* how many bytes of the header we have */
+ guint8 cur_header_buf[8];
+ messageheader cur_header;
+ guint cur_payload_pos;
+ GByteArray *cur_payload;
+
+ guint cur_send_pos;
+ GQueue send_queue;
+
+ gint refcount;
+};
+
+
+INLINE guint8 _read_net_uint8(const guint8* buf) {
+ return *buf;
+}
+INLINE guint16 _read_net_uint16(const guint8* buf) {
+ guint16 i;
+ memcpy(&i, buf, sizeof(i));
+ return ntohs(i);
+}
+INLINE guint32 _read_net_uint32(const guint8* buf) {
+ guint32 i;
+ memcpy(&i, buf, sizeof(i));
+ return ntohl(i);
+}
+
+INLINE void _write_net_uint8(guint8 *buf, guint8 val) {
+ *buf = val;
+}
+INLINE void _write_net_uint16(guint8 *buf, guint16 val) {
+ val = htons(val);
+ memcpy(buf, &val, sizeof(val));
+}
+INLINE void _write_net_uint32(guint8 *buf, guint32 val) {
+ val = htons(val);
+ memcpy(buf, &val, sizeof(val));
+}
+
+INLINE gboolean read_net_uint8(GByteArray *buf, guint *pos, guint8 *dest) {
+ if (buf->len < sizeof(*dest) || *pos >= buf->len - sizeof(*dest)) {
+ /* end of buffer */
+ *dest = 0;
+ *pos = buf->len;
+ return FALSE;
+ }
+ *dest = _read_net_uint8(buf->data + *pos);
+ *pos += sizeof(*dest);
+ return TRUE;
+}
+INLINE gboolean read_net_uint16(GByteArray *buf, guint *pos, guint16 *dest) {
+ if (buf->len < sizeof(*dest) || *pos >= buf->len - sizeof(*dest)) {
+ /* end of buffer */
+ *dest = 0;
+ *pos = buf->len;
+ return FALSE;
+ }
+ *dest = _read_net_uint16(buf->data + *pos);
+ *pos += sizeof(*dest);
+ return TRUE;
+}
+INLINE gboolean read_net_uint32(GByteArray *buf, guint *pos, guint32 *dest) {
+ if (buf->len < sizeof(*dest) || *pos >= buf->len - sizeof(*dest)) {
+ /* end of buffer */
+ *dest = 0;
+ *pos = buf->len;
+ return FALSE;
+ }
+ *dest = _read_net_uint32(buf->data + *pos);
+ *pos += sizeof(*dest);
+ return TRUE;
+}
+INLINE gboolean read_net_string(GByteArray *buf, guint *pos, GString *dest) {
+ guint16 slen;
+ g_string_truncate(dest, 0);
+ if (!read_net_uint16(buf, pos, &slen)) return FALSE;
+ if (buf->len < slen || *pos >= buf->len - slen) {
+ *pos = buf->len;
+ return FALSE;
+ }
+ g_string_set_size(dest, slen);
+ memcpy(dest->str, buf->data + *pos, slen);
+ *pos += slen;
+ return TRUE;
+}
+
+INLINE void write_net_uint8(GByteArray *buf, guint8 val) {
+ guint curlen = buf->len;
+ g_byte_array_set_size(buf, curlen + sizeof(val));
+ _write_net_uint8(buf->data + curlen, val);
+}
+INLINE void write_net_uint16(GByteArray *buf, guint16 val) {
+ guint curlen = buf->len;
+ g_byte_array_set_size(buf, curlen + sizeof(val));
+ _write_net_uint16(buf->data + curlen, val);
+}
+INLINE void write_net_uint32(GByteArray *buf, guint32 val) {
+ guint curlen = buf->len;
+ g_byte_array_set_size(buf, curlen + sizeof(val));
+ _write_net_uint32(buf->data + curlen, val);
+}
+INLINE void write_net_string(GByteArray *buf, GString *val) {
+ guint curlen = buf->len;
+ g_byte_array_set_size(buf, curlen + sizeof(guint16) + val->len);
+ _write_net_uint16(buf->data + curlen, val->len);
+ memcpy(buf->data + curlen + sizeof(guint16), val->str, val->len);
+}
+
+
+
+static void con_req_unlink(manda_connection *con, request *req) {
+ request *prev = NULL, *next = NULL;
+
+ /* unlink */
+ if (req->timeout_next_id != 0) {
+ next = &g_array_index(con->requests, request, req->timeout_next_id);
+ next->timeout_prev_id = req->timeout_prev_id;
+ } else {
+ con->timeout_last_id = req->timeout_prev_id;
+ }
+ if (req->timeout_prev_id != 0) {
+ prev = &g_array_index(con->requests, request, req->timeout_prev_id);
+ prev->timeout_next_id = req->timeout_next_id;
+ } else {
+ con->timeout_first_id = req->timeout_next_id;
+ }
+ req->timeout_next_id = req->timeout_prev_id = 0;
+}
+
+static void con_req_push(manda_connection *con, request *req, guint16 reqid) {
+ request *prev;
+
+ if (con->timeout_last_id != 0) {
+ prev = &g_array_index(con->requests, request, con->timeout_last_id);
+ prev->timeout_next_id = reqid;
+ } else {
+ con->timeout_first_id = reqid;
+ }
+
+ req->timeout_prev_id = con->timeout_last_id;
+ con->timeout_last_id = reqid;
+}
+
+static void con_close(manda_connection *con) {
+ GByteArray *buf;
+
+ if (NULL != con->fd_watcher.priv) {
+ con->ctrl->destroy_fd_watcher(con->data, &con->fd_watcher);
+ con->fd_watcher.priv = NULL;
+ }
+ if (NULL != con->req_timeout.priv) {
+ con->ctrl->destroy_timeout(con->data, &con->req_timeout);
+ con->req_timeout.priv = NULL;
+ }
+
+ if (-1 != con->fd) {
+ while (-1 == close(con->fd) && errno == EINTR) ;
+ con->fd = -1;
+ }
+
+ while (NULL != (buf = g_queue_pop_head(&con->send_queue))) {
+ g_byte_array_free(buf, TRUE);
+ }
+ con->cur_send_pos = 0;
+
+ if (NULL != con->close_cb) {
+ con->close_cb(con);
+ }
+
+ /* "timeout" all requests */
+ for ( ; con->timeout_first_id > 0; ) {
+ request *req;
+ guint16 req_command;
+ gpointer req_data;
+
+ req = &g_array_index(con->requests, request, con->timeout_first_id);
+
+ req_command = req->command;
+ req_data = req->data;
+
+ con_req_unlink(con, req);
+ /* reset request */
+ req->command = 0;
+ req->timeout = 0;
+ req->data = NULL;
+
+ if (NULL != con->message_cb) {
+ con->message_cb(con, req_command, req_data, 0, 0, NULL);
+ }
+ }
+
+ if (NULL != con->request_ids) {
+ manda_idlist_free(con->request_ids);
+ con->request_ids = NULL;
+ }
+
+ if (NULL != con->requests) {
+ g_array_free(con->requests, TRUE);
+ con->requests = NULL;
+ }
+
+ if (NULL != con->cur_payload) {
+ g_byte_array_free(con->cur_payload, TRUE);
+ con->cur_payload = NULL;
+ }
+}
+
+static void _con_free(manda_connection *con) {
+ con_close(con);
+
+ g_slice_free(manda_connection, con);
+}
+
+static void con_free(manda_connection *con) {
+ LEAVE(con, _con_free);
+}
+
+static void con_handle_response(manda_connection *con) {
+ GByteArray *payload = con->cur_payload;
+ guint16 orig_command = 0, mesg_command = con->cur_header.command, resp_to = con->cur_header.respid, req_id = con->cur_header.reqid;
+ gpointer orig_data = NULL;
+
+ con->cur_header_pos = 0;
+ con->cur_payload_pos = 0;
+ con->cur_payload = NULL;
+
+ if (0 != resp_to) {
+ request *req;
+
+ if (!manda_idlist_is_used(con->request_ids, resp_to)) {
+ /* protocol error */
+ con_close(con);
+ goto clean;
+ }
+
+ manda_idlist_put(con->request_ids, resp_to);
+
+ /* try to find request data; if we can't find it the timeout already triggered */
+ if (resp_to >= con->requests->len) goto clean;
+ req = &g_array_index(con->requests, request, resp_to);
+ if (0 == req->command) goto clean; /* "empty" request */
+
+ orig_command = req->command;
+ orig_data = req->data;
+
+ /* reset request */
+ con_req_unlink(con, req);
+ req->command = 0;
+ req->timeout = req->timeout_step = 0;
+ req->data = NULL;
+ }
+
+ if (NULL != con->message_cb) {
+ con->message_cb(con, orig_command, orig_data, mesg_command, req_id, payload);
+ }
+
+clean:
+ if (NULL != payload) g_byte_array_free(payload, TRUE);
+}
+
+static void con_fd_watcher_update(manda_connection *con) {
+ if (con->fd != -1) {
+ int events = (con->send_queue.length > 0) ? MANDA_FD_READ | MANDA_FD_WRITE : MANDA_FD_READ;
+ if (events != con->fd_watcher.events) {
+ con->fd_watcher.events = events;
+ con->ctrl->update_fd_watcher(con->data, &con->fd_watcher);
+ }
+ }
+}
+
+static void con_fd_watcher_cb(manda_fd_watcher *watcher) {
+ manda_connection *con = watcher->priv;
+ guint i;
+
+ ENTER(con);
+
+ /* handle read */
+ for ( i = 0 ; (con->fd != -1) && (i < 100) ; i++ ) {
+ if (con->cur_header_pos < 8) {
+ ssize_t r = read(con->fd, &con->cur_header_buf[con->cur_header_pos], 8 - con->cur_header_pos);
+
+ if (r < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ break;
+ case ECONNRESET: /* "eof" */
+ con_close(con);
+ goto out;
+ default:
+ con_close(con);
+ goto out;
+ }
+ break;
+ } else if (r == 0) { /* eof */
+ con_close(con);
+ goto out;
+ } else {
+ con->cur_header_pos += r;
+ }
+ }
+
+ if (con->cur_header_pos < 8) break;
+
+ /* parse header */
+ con->cur_header.command = _read_net_uint16(con->cur_header_buf + 0);
+ con->cur_header.size = _read_net_uint16(con->cur_header_buf + 2);
+ con->cur_header.reqid = _read_net_uint16(con->cur_header_buf + 4);
+ con->cur_header.respid = _read_net_uint16(con->cur_header_buf + 6);
+
+ if (con->cur_header.size < 8) {
+ /* error */
+ con_close(con);
+ } else if (con->cur_header.size > 8) {
+ ssize_t r;
+
+ if (!con->cur_payload) {
+ con->cur_payload = g_byte_array_sized_new(con->cur_header.size - 8);
+ g_byte_array_set_size(con->cur_payload, con->cur_header.size - 8);
+ con->cur_payload_pos = 0;
+ }
+
+ r = read(con->fd, &con->cur_payload->data[con->cur_payload_pos], con->cur_payload->len - con->cur_payload_pos);
+
+ if (r < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ break;
+ case ECONNRESET: /* "eof" */
+ con_close(con);
+ goto out;
+ default:
+ con_close(con);
+ goto out;
+ }
+ break;
+ } else if (r == 0) { /* eof */
+ con_close(con);
+ goto out;
+ } else {
+ con->cur_payload_pos += r;
+ }
+
+ if (con->cur_payload_pos < con->cur_payload->len) break;
+
+ con_handle_response(con);
+ } else {
+ con_handle_response(con);
+ }
+ }
+
+ for ( i = 0 ; (con->fd != -1) && (i < 100) && (con->send_queue.length > 0) ; i++ ) {
+ GByteArray *buf = g_queue_peek_head(&con->send_queue);
+ ssize_t written;
+
+ written = write(con->fd, &buf->data[con->cur_send_pos], buf->len - con->cur_send_pos);
+ if (written < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ break;
+ case ECONNRESET:
+ case EPIPE:
+ con_close(con);
+ goto out;
+ default: /* Fatal error/remote close, connection has to be closed */
+ con_close(con);
+ goto out;
+ }
+ break;
+ } else {
+ con->cur_send_pos += written;
+ }
+
+ if (con->cur_send_pos == buf->len) {
+ con->cur_send_pos = 0;
+ g_queue_pop_head(&con->send_queue);
+ g_byte_array_free(buf, TRUE);
+ } else {
+ break;
+ }
+ }
+
+ con_fd_watcher_update(con);
+
+out:
+ LEAVE(con, _con_free);
+}
+
+static void con_timeout_cb(manda_timeout *timeout) {
+ manda_connection *con = timeout->priv;
+
+ double now = con->ctrl->get_time(con->data);
+
+ for ( ; con->timeout_first_id != 0; ) {
+ request *req;
+ guint16 req_command;
+ gpointer req_data;
+ guint16 reqid = con->timeout_first_id;
+
+ req = &g_array_index(con->requests, request, con->timeout_first_id);
+
+ if (req->timeout_step > now) break;
+
+ if (req->timeout > now) {
+ req->timeout_step = now + TIMEOUT_STEP;
+ if (req->timeout_step > req->timeout) req->timeout_step = req->timeout;
+
+ /* requeue */
+ con_req_unlink(con, req);
+ con_req_push(con, req, reqid);
+
+ continue;
+ }
+
+ req_command = req->command;
+ req_data = req->data;
+
+ /* reset request */
+ con_req_unlink(con, req);
+ req->command = 0;
+ req->timeout = req->timeout_step = 0;
+ req->data = NULL;
+
+ if (NULL != con->message_cb) {
+ con->message_cb(con, req_command, req_data, 0, 0, NULL);
+ }
+ }
+
+ if (con->timeout_first_id != 0) {
+ request *req;
+
+ req = &g_array_index(con->requests, request, con->timeout_first_id);
+
+ con->req_timeout.timeout = req->timeout_step;
+ con->ctrl->start_timeout(con->data, &con->req_timeout);
+ }
+}
+
+static manda_connection* con_new(gpointer srv, const manda_async_ctrl *ctrl, gpointer priv_data, con_message_cb message_cb, con_close_cb close_cb, int fd) {
+ manda_connection *con = g_slice_new0(manda_connection);
+ gint first_id;
+
+ con->data = srv;
+ con->ctrl = ctrl;
+ con->priv_data = priv_data;
+ con->message_cb = message_cb;
+ con->close_cb = close_cb;
+ con->fd = fd;
+
+ con->fd_watcher.priv = con;
+ con->fd_watcher.callback = con_fd_watcher_cb;
+ con->fd_watcher.events = MANDA_FD_READ;
+ con->fd_watcher.fd = fd;
+
+ con->req_timeout.priv = con;
+ con->req_timeout.callback = con_timeout_cb;
+ con->req_timeout.timeout = 0;
+
+ con->request_ids = manda_idlist_new(65535);
+ /* id 0 is reserved so request it here */
+ first_id = manda_idlist_get(con->request_ids);
+ assert(0 == first_id);
+
+ con->requests = g_array_new(FALSE, TRUE, sizeof(request));
+
+ con->ctrl->new_fd_watcher(con->data, &con->fd_watcher);
+ con->ctrl->update_fd_watcher(con->data, &con->fd_watcher);
+ con->ctrl->new_timeout(con->data, &con->req_timeout);
+
+ return con;
+}
+
+static void con_fix_header(GByteArray *payload, guint16 command, guint16 req_id, guint16 resp_id) {
+ _write_net_uint16(payload->data + 0, command);
+ _write_net_uint16(payload->data + 2, payload->len);
+ _write_net_uint16(payload->data + 4, req_id);
+ _write_net_uint16(payload->data + 6, resp_id);
+}
+
+/* payload needs to be prefixed with 8 dummy bytes for the header */
+static void con_send_request(manda_connection *con, GByteArray *payload, guint16 command, guint16 resp_id, gpointer data, double wait_timeout) {
+ double now;
+ gint reqid;
+ request *req;
+
+ ENTER(con);
+
+ if (-1 == con->fd) {
+ /* connection closed */
+ goto error;
+ }
+
+ if (payload->len > 65535 || payload->len < 8 || 0 == command || wait_timeout <= 0) {
+ /* payload too big / invalid parameters */
+ goto error;
+ }
+
+ reqid = manda_idlist_get(con->request_ids);
+
+ if (-1 == reqid) goto error; /* no free request id available */
+
+ assert(reqid > 0 && reqid < 65536);
+
+ if ((guint) reqid > con->requests->len) {
+ g_array_set_size(con->requests, reqid+1);
+ }
+ req = &g_array_index(con->requests, request, reqid);
+
+ now = con->ctrl->get_time(con->data);
+ req->timeout = now + wait_timeout;;
+ req->timeout_step = now + TIMEOUT_STEP;
+ if (req->timeout_step > req->timeout) req->timeout_step = req->timeout;
+
+ if (con->timeout_first_id == 0) {
+ con->req_timeout.timeout = req->timeout_step;
+ con->ctrl->start_timeout(con->data, &con->req_timeout);
+ }
+
+ con_fix_header(payload, command, reqid, resp_id);
+ con_req_push(con, req, reqid);
+
+ g_queue_push_tail(&con->send_queue, payload);
+ con_fd_watcher_update(con);
+
+ goto out;
+
+error:
+ if (NULL != con->message_cb) {
+ con->message_cb(con, command, data, 0, 0, NULL);
+ }
+
+ g_byte_array_free(payload, TRUE);
+
+out:
+ LEAVE(con, _con_free);
+}
+
+/* payload needs to be prefixed with 8 dummy bytes for the header */
+static void con_send_notify(manda_connection *con, GByteArray *payload, guint16 command, guint16 resp_id) {
+ ENTER(con);
+
+ if (-1 == con->fd) {
+ /* connection closed */
+ goto error;
+ }
+
+ if (payload->len > 65535 || payload->len < 8 || 0 == command) {
+ /* payload too big / invalid parameters */
+ goto error;
+ }
+
+ con_fix_header(payload, command, 0, resp_id);
+
+ g_queue_push_tail(&con->send_queue, payload);
+ con_fd_watcher_update(con);
+
+ goto out;
+
+error:
+ g_byte_array_free(payload, TRUE);
+
+out:
+ LEAVE(con, _con_free);
+}
+
+
+
+typedef struct server_socket server_socket;
+struct server_socket {
+ int fd;
+ gpointer data;
+};
+
+static void manda_server_connection_free(manda_server_connection *con);
+static void manda_server_backend_release(manda_server_connection *con, guint32 id);
+
+manda_server* manda_server_new(gpointer srv, const manda_async_ctrl *ctrl, const manda_server_callbacks *callbacks) {
+ manda_server *s = g_slice_new(manda_server);
+ s->refcount = 1;
+ s->data = srv;
+ s->connections = g_ptr_array_new();
+ s->ctrl = ctrl;
+ s->callbacks = callbacks;
+ s->sockets = g_array_new(FALSE, TRUE, sizeof(server_socket));
+
+ return s;
+}
+
+void manda_server_acquire(manda_server *s) {
+ ++s->refcount;
+}
+
+void manda_server_close(manda_server *s) {
+ guint i;
+
+ for (i = 0; i < s->sockets->len; i++) {
+ server_socket *sock = &g_array_index(s->sockets, server_socket, i);
+ close(sock->fd);
+ }
+ g_array_set_size(s->sockets, 0);
+
+ for (i = s->connections->len; i-- > 0; ) {
+ manda_server_connection *con = g_ptr_array_index(s->connections, i);
+ con->delete_later = TRUE;
+ s->callbacks->closed_connection(s->data, con);
+ if (con->refcount == 0) manda_server_connection_free(con);
+ }
+ g_ptr_array_set_size(s->connections, 0);
+}
+
+void manda_server_release(manda_server *s) {
+ if (NULL == s) return;
+ if (0 < --s->refcount) return;
+
+ manda_server_close(s);
+
+ g_array_free(s->sockets, TRUE);
+ g_ptr_array_free(s->connections, TRUE);
+
+ g_slice_free(manda_server, s);
+}
+
+void manda_server_add_socket(manda_server *s, int fd, gpointer data);
+
+static void manda_server_connection_free(manda_server_connection *con) {
+ /* TODO */
+ g_slice_free(manda_server_connection, con);
+}
+
+void manda_server_con_close(manda_server_connection *con);
+
+static void manda_server_backend_release(manda_server_connection *con, guint32 id) {
+ manda_server_backend_use *use;
+ manda_server_backend *b;
+
+ if (id >= con->backends->len) return;
+
+ use = g_ptr_array_index(con->backends, id);
+ if (NULL == use) return;
+
+ b = use->backend;
+ if (use->ndx != b->usage->len - 1) {
+ manda_server_backend_use *u = g_ptr_array_index(b->usage, b->usage->len - 1);
+ g_ptr_array_index(b->usage, use->ndx) = u;
+ u->ndx = use->ndx;
+ }
+ b->sum_last_load -= use->last_load;
+ g_ptr_array_set_size(b->usage, b->usage->len-1);
+
+ con->refcount++;
+
+ con->srv->callbacks->release_backend(con->srv->data, b, use->ndx, use);
+ g_slice_free(manda_server_backend_use, use);
+}
+
+manda_server_backend *manda_server_backend_new(gpointer data, GString *addr);
+void manda_server_backend_free(manda_server_backend *backend);
+
+void manda_server_return_backend(manda_server_connection *con, gint16 reqid, manda_server_backend *backend);
+void manda_server_drop_backend(manda_server_backend *backend);
diff --git a/libmanda.h b/libmanda.h
index 69b8431..a24a8be 100644
--- a/libmanda.h
+++ b/libmanda.h
@@ -7,6 +7,9 @@
#include <sys/socket.h>
+/* idlist */
+typedef struct manda_IDList manda_IDList;
+
typedef enum {
MANDA_FD_READ = 1,
MANDA_FD_WRITE = 2
@@ -18,8 +21,12 @@ typedef struct manda_timeout manda_timeout;
typedef struct manda_server_callbacks manda_server_callbacks;
typedef struct manda_server_connection manda_server_connection;
+typedef struct manda_server_backend_use manda_server_backend_use;
+typedef struct manda_server_backend manda_server_backend;
typedef struct manda_server manda_server;
+typedef struct manda_client_backend_callbacks manda_client_backend_callbacks;
+typedef struct manda_client_backend manda_client_backend;
typedef struct manda_client manda_client;
typedef void (*manda_fd_watcher_cb)(manda_fd_watcher *watcher);
@@ -34,6 +41,8 @@ typedef void (*manda_new_timeout)(gpointer srv, manda_timeout *timeout);
typedef void (*manda_start_timeout)(gpointer srv, manda_timeout *timeout);
typedef void (*manda_destroy_timeout)(gpointer srv, manda_timeout *timeout);
+typedef double (*manda_get_time)(gpointer srv);
+
struct manda_async_ctrl {
manda_new_fd_watcher new_fd_watcher;
manda_update_fd_watcher update_fd_watcher;
@@ -42,23 +51,25 @@ struct manda_async_ctrl {
manda_new_timeout new_timeout;
manda_start_timeout start_timeout; /* one shot */
manda_destroy_timeout destroy_timeout;
+
+ manda_get_time get_time;
};
struct manda_fd_watcher {
gpointer data; /* application data */
manda_fd_watcher_cb callback;
+ int events; /* bitmask of manda_async_events; "update_fd_watcher" needs to check this */
+ int fd; /* filedescriptor; doesn't get changed after "new_fd_watcher" */
/* private from here */
gpointer priv;
-
- int events; /* bitmask of manda_async_events; "update_fd_watcher" needs to check this */
- int fd; /* filedescriptor; doesn't get changed after "new_fd_watcher" */
};
struct manda_timeout {
gpointer data; /* application data */
manda_timeout_cb callback;
+ double timeout; /* absolute timestamp; check in start_timeout */
/* private from here */
@@ -70,43 +81,105 @@ struct manda_timeout {
typedef void (*manda_server_new_connection)(gpointer srv, manda_server_connection *con);
typedef void (*manda_server_closed_connection)(gpointer srv, manda_server_connection *con);
-typedef void (*manda_server_acquire_backend)(gpointer srv, manda_server_connection *con, GString *name, guint16 reqid);
-typedef void (*manda_server_update_backend)(gpointer srv, manda_server_connection *con, guint32 load, guint32 backends);
-typedef void (*manda_server_release_backend)(gpointer srv, manda_server_connection *con, guint32 id);
+typedef void (*manda_server_bind_backend)(gpointer srv, manda_server_connection *con, GString *name, guint16 reqid);
+typedef void (*manda_server_update_backend)(gpointer srv, manda_server_backend *backend, guint ndx);
+typedef void (*manda_server_release_backend)(gpointer srv, manda_server_backend *backend, guint old_ndx, manda_server_backend_use *old_use);
struct manda_server_callbacks {
- manda_server_new_connection server_new_connection;
- manda_server_closed_connection server_closed_connection;
+ manda_server_new_connection new_connection;
+ manda_server_closed_connection closed_connection;
- manda_server_acquire_backend server_acquire_backend;
- manda_server_update_backend server_update_backend;
- manda_server_release_backend server_release_backend;
+ manda_server_bind_backend bind_backend;
+ manda_server_update_backend update_backend;
+ manda_server_release_backend release_backend;
};
struct manda_server_connection {
gpointer data; /* application data */
+ manda_server *srv;
/* private from here */
+ gint refcount;
+ gboolean delete_later;
+
+ GPtrArray *backends; /* manda_server_backend_use */
+ manda_IDList *idlist;
+};
+
+struct manda_server_backend_use {
+ manda_server_connection *con;
+ guint32 backend_id;
+
+ guint32 last_load, last_backends;
+
+ /* private from here */
+ manda_server_backend *backend;
+ guint ndx;
+};
+
+struct manda_server_backend {
+ gpointer data; /* application data */
+ GPtrArray *usage; /* array of manda_server_backend_use */
+ guint32 sum_last_load;
+ GString *addr;
+
+ /* private from here */
+ gint refcount;
gboolean delete_later;
};
struct manda_server {
gint refcount;
gpointer data; /* application data */
+ GPtrArray *connections;
+ /* private from here */
const manda_async_ctrl *ctrl;
const manda_server_callbacks *callbacks;
+
+ GArray *sockets;
};
manda_server* manda_server_new(gpointer srv, const manda_async_ctrl *ctrl, const manda_server_callbacks *callbacks);
void manda_server_acquire(manda_server *s);
void manda_server_release(manda_server *s);
+/* close everything */
+void manda_server_close(manda_server *s);
+
void manda_server_add_socket(manda_server *s, int fd, gpointer data);
+void manda_server_con_close(manda_server_connection *con);
+
+manda_server_backend *manda_server_backend_new(gpointer data, GString *addr);
+void manda_server_backend_free(manda_server_backend *backend);
+
+void manda_server_return_backend(manda_server_connection *con, gint16 reqid, manda_server_backend *backend);
+void manda_server_drop_backend(manda_server_backend *backend);
+
/* Client API */
+typedef void (*manda_client_return_backend)(gpointer srv, manda_client_backend *backend);
+/* backend will be free()d after the callback, don't keep the pointer; reason is NULL if the connection was closed */
+typedef void (*manda_client_lost_backend)(gpointer srv, manda_client_backend *backend, GString *reason);
+
+struct manda_client_backend_callbacks {
+ manda_client_return_backend return_backend;
+ manda_client_lost_backend lost_backend;
+};
+
+struct manda_client_backend {
+ gpointer data; /* application data */
+ manda_client *client;
+ GString *addr;
+
+ /* private from here */
+
+ const manda_client_backend_callbacks *callbacks;
+ gboolean do_release;
+};
+
struct manda_client { /* private */
gint refcount;
gpointer data; /* application data */
@@ -121,5 +194,7 @@ manda_client* manda_client_new(gpointer srv, const manda_async_ctrl *ctrl, struc
void manda_client_acquire(manda_client *c);
void manda_client_release(manda_client *c);
+manda_client_backend* manda_client_bind_backend(manda_client *c, GString *name, const manda_client_backend_callbacks *callbacks);
+void manda_client_release_backend(manda_client *c, manda_client_backend *backend);
#endif