summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Bühler <stbuehler@web.de>2010-09-10 16:09:17 +0200
committerStefan Bühler <stbuehler@web.de>2010-09-10 16:09:17 +0200
commit3ab3fc6e6228cf4a79c382cfb11a7a67ecd68c12 (patch)
tree4f63dee1027fd8dafa9b540bba3cdff4b71aa5a1
parent7a88d7a2887dcfea8371e7e84bb905a2527f4133 (diff)
downloadlibmanda-3ab3fc6e6228cf4a79c382cfb11a7a67ecd68c12.tar.gz
libmanda-3ab3fc6e6228cf4a79c382cfb11a7a67ecd68c12.zip
Implement server part of libmanda
-rw-r--r--libmanda-protocol.h8
-rw-r--r--libmanda.c467
-rw-r--r--libmanda.h38
3 files changed, 451 insertions, 62 deletions
diff --git a/libmanda-protocol.h b/libmanda-protocol.h
index 260aa87..cc8656e 100644
--- a/libmanda-protocol.h
+++ b/libmanda-protocol.h
@@ -97,4 +97,12 @@
*
*/
+typedef enum {
+ MANDA_CMD_BIND_BACKEND = 0x0001,
+ MANDA_CMD_RELEASE_BACKEND = 0x0002,
+ MANDA_CMD_UPDATE_BACKEND = 0x0003,
+
+ MANDA_CMD_UNKNOWN_COMMAND = 0xffff
+} manda_commands;
+
#endif
diff --git a/libmanda.c b/libmanda.c
index 8dd8c9f..d900864 100644
--- a/libmanda.c
+++ b/libmanda.c
@@ -1,5 +1,6 @@
#include "libmanda.h"
+#include "libmanda-protocol.h"
#include "idlist.h"
#include <arpa/inet.h>
@@ -16,6 +17,11 @@
#define ENTER(x) do { ++((x)->refcount); } while(0)
#define LEAVE(x, destroy) do { if (0 == --((x)->refcount)) destroy(x); } while(0)
+#define UNUSED(x) ((void)(x))
+
+#define CONST_STR_LEN(x) (x), (sizeof(x) - 1)
+#define GSTR_LEN(x) ((x) ? (x)->str : ""), ((x) ? (x)->len : 0)
+
#if __GNUC__
# define INLINE static inline
#else
@@ -24,7 +30,6 @@
typedef struct messageheader messageheader;
typedef struct request request;
-typedef struct manda_connection manda_connection;
typedef void (*con_message_cb)(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *mesg_payload);
typedef void (*con_close_cb)(manda_connection *con);
@@ -51,6 +56,7 @@ struct manda_connection {
con_close_cb close_cb;
int fd;
+ gboolean closed;
manda_fd_watcher fd_watcher;
manda_timeout req_timeout;
@@ -71,6 +77,28 @@ struct manda_connection {
gint refcount;
};
+static void fd_no_block(int fd) {
+#ifdef O_NONBLOCK
+ fcntl(fd, F_SETFL, O_NONBLOCK | O_RDWR);
+#elif defined _WIN32
+ int i = 1;
+ ioctlsocket(fd, FIONBIO, &i);
+#else
+#error No way found to set non-blocking mode for fds.
+#endif
+}
+
+static void fd_init(int fd) {
+#ifdef FD_CLOEXEC
+ /* close fd on exec (cgi) */
+ fcntl(fd, F_SETFD, FD_CLOEXEC);
+#endif
+ fd_no_block(fd);
+}
+
+/* -------------------------- */
+/* Message building / parsing */
+/* -------------------------- */
INLINE guint8 _read_net_uint8(const guint8* buf) {
return *buf;
@@ -133,6 +161,7 @@ INLINE gboolean read_net_uint32(GByteArray *buf, guint *pos, guint32 *dest) {
}
INLINE gboolean read_net_string(GByteArray *buf, guint *pos, GString *dest) {
guint16 slen;
+
g_string_truncate(dest, 0);
if (!read_net_uint16(buf, pos, &slen)) return FALSE;
if (buf->len < slen || *pos >= buf->len - slen) {
@@ -160,14 +189,19 @@ INLINE void write_net_uint32(GByteArray *buf, guint32 val) {
g_byte_array_set_size(buf, curlen + sizeof(val));
_write_net_uint32(buf->data + curlen, val);
}
-INLINE void write_net_string(GByteArray *buf, GString *val) {
+INLINE void write_net_string(GByteArray *buf, const gchar *str, guint len) {
guint curlen = buf->len;
- g_byte_array_set_size(buf, curlen + sizeof(guint16) + val->len);
- _write_net_uint16(buf->data + curlen, val->len);
- memcpy(buf->data + curlen + sizeof(guint16), val->str, val->len);
+ g_byte_array_set_size(buf, curlen + sizeof(guint16) + len);
+ _write_net_uint16(buf->data + curlen, len);
+ memcpy(buf->data + curlen + sizeof(guint16), str, len);
}
+/* ---------------- */
+/* Basic Connection */
+/* ---------------- */
+
+static void _con_free(manda_connection *con);
static void con_req_unlink(manda_connection *con, request *req) {
request *prev = NULL, *next = NULL;
@@ -205,6 +239,12 @@ static void con_req_push(manda_connection *con, request *req, guint16 reqid) {
static void con_close(manda_connection *con) {
GByteArray *buf;
+ if (con->closed) return;
+
+ con->closed = TRUE;
+
+ ENTER(con);
+
if (NULL != con->fd_watcher.priv) {
con->ctrl->destroy_fd_watcher(con->data, &con->fd_watcher);
con->fd_watcher.priv = NULL;
@@ -264,9 +304,13 @@ static void con_close(manda_connection *con) {
g_byte_array_free(con->cur_payload, TRUE);
con->cur_payload = NULL;
}
+
+ LEAVE(con, _con_free);
}
static void _con_free(manda_connection *con) {
+ ENTER(con); /* don't want to enter _con_free again */
+
con_close(con);
g_slice_free(manda_connection, con);
@@ -336,7 +380,7 @@ static void con_fd_watcher_cb(manda_fd_watcher *watcher) {
ENTER(con);
/* handle read */
- for ( i = 0 ; (con->fd != -1) && (i < 100) ; i++ ) {
+ for ( i = 0 ; (!con->closed) && (i < 100) ; i++ ) {
if (con->cur_header_pos < 8) {
ssize_t r = read(con->fd, &con->cur_header_buf[con->cur_header_pos], 8 - con->cur_header_pos);
@@ -419,7 +463,7 @@ static void con_fd_watcher_cb(manda_fd_watcher *watcher) {
}
}
- for ( i = 0 ; (con->fd != -1) && (i < 100) && (con->send_queue.length > 0) ; i++ ) {
+ for ( i = 0 ; (!con->closed) && (i < 100) && (con->send_queue.length > 0) ; i++ ) {
GByteArray *buf = g_queue_peek_head(&con->send_queue);
ssize_t written;
@@ -531,7 +575,7 @@ static manda_connection* con_new(gpointer srv, const manda_async_ctrl *ctrl, gpo
con->req_timeout.callback = con_timeout_cb;
con->req_timeout.timeout = 0;
- con->request_ids = manda_idlist_new(65535);
+ con->request_ids = manda_idlist_new(65536);
/* id 0 is reserved so request it here */
first_id = manda_idlist_get(con->request_ids);
assert(0 == first_id);
@@ -560,7 +604,7 @@ static void con_send_request(manda_connection *con, GByteArray *payload, guint16
ENTER(con);
- if (-1 == con->fd) {
+ if (con->closed) {
/* connection closed */
goto error;
}
@@ -611,10 +655,12 @@ out:
}
/* payload needs to be prefixed with 8 dummy bytes for the header */
-static void con_send_notify(manda_connection *con, GByteArray *payload, guint16 command, guint16 resp_id) {
+static gboolean con_send_notify(manda_connection *con, GByteArray *payload, guint16 command, guint16 resp_id) {
+ gboolean res = TRUE;
+
ENTER(con);
- if (-1 == con->fd) {
+ if (con->closed) {
/* connection closed */
goto error;
}
@@ -633,21 +679,176 @@ static void con_send_notify(manda_connection *con, GByteArray *payload, guint16
error:
g_byte_array_free(payload, TRUE);
+ res = FALSE;
out:
LEAVE(con, _con_free);
+
+ return res;
}
+/* message construction helper */
+
+static GByteArray* new_payload() {
+ GByteArray* buf = g_byte_array_new();
+ g_byte_array_set_size(buf, 8);
+ return buf;
+}
+static gboolean send_unkown_command(manda_connection *con, guint16 cmd, guint16 resp_id) {
+ GByteArray *payload;
+ if (NULL == con) return FALSE;
+
+ payload = new_payload();
+ write_net_uint16(payload, cmd);
+ return con_send_notify(con, payload, MANDA_CMD_UNKNOWN_COMMAND, resp_id);
+}
+
+static gboolean send_server_bind_backend(manda_connection *con, guint16 resp_id, guint32 backend_id, const gchar *str, guint len) {
+ GByteArray *payload;
+ if (NULL == con) return FALSE;
+
+ payload = new_payload();
+ write_net_uint32(payload, backend_id);
+ write_net_string(payload, str, len);
+ return con_send_notify(con, payload, MANDA_CMD_BIND_BACKEND, resp_id);
+}
+
+static gboolean send_server_bind_backend_failed(manda_connection *con, guint16 resp_id, const gchar *str, guint len) {
+ return send_server_bind_backend(con, resp_id, 0, str, len);
+}
+
+static gboolean send_server_release_backend(manda_connection *con, guint32 backend_id, const gchar *str, guint len) {
+ GByteArray *payload;
+ if (NULL == con) return FALSE;
+
+ payload = new_payload();
+ write_net_uint32(payload, backend_id);
+ write_net_string(payload, str, len);
+ return con_send_notify(con, payload, MANDA_CMD_RELEASE_BACKEND, 0);
+}
+
+/* server connection */
typedef struct server_socket server_socket;
struct server_socket {
+ manda_server *srv;
+
int fd;
gpointer data;
+
+ manda_fd_watcher fd_watcher;
};
-static void manda_server_connection_free(manda_server_connection *con);
-static void manda_server_backend_release(manda_server_connection *con, guint32 id);
+static void scon_con_closed_cb(manda_connection *con);
+static void scon_con_message_cb(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *mesg_payload);
+static void scon_release(manda_server_connection *con);
+
+static void sbackend_free(manda_server_backend *backend);
+
+static void manda_server_release_backend(manda_server_connection *con, guint32 id);
+
+static void server_listening_cb(manda_fd_watcher *fd_watcher) {
+ server_socket *sock = fd_watcher->priv;
+ manda_server *s = sock->srv;
+ int fd;
+ int i;
+ manda_server_connection *con;
+
+ fd = accept(sock->fd, NULL, NULL);
+
+ if (-1 == fd) return;
+
+ fd_init(fd);
+
+ con = g_slice_new0(manda_server_connection);
+ con->srv = s;
+ con->refcount = 1;
+ con->backends = g_ptr_array_new();
+ con->idlist = manda_idlist_new(65536);
+ i = manda_idlist_get(con->idlist); /* 0 is an invalid backend id, reserve it */
+ assert(i == 0);
+ con->con = con_new(s->data, s->ctrl, con, scon_con_message_cb, scon_con_closed_cb, fd);
+}
+
+static void scon_free(manda_server_connection *con) {
+ ENTER(con); /* don't want to enter scon_free again */
+
+ manda_server_con_close(con);
+
+ g_slice_free(manda_server_connection, con);
+}
+static void scon_release(manda_server_connection *con) {
+ LEAVE(con, scon_free);
+}
+
+void manda_server_con_close(manda_server_connection *con) {
+ guint i;
+ manda_connection *bcon = con->con;
+
+ if (NULL == bcon) return;
+
+ ENTER(con);
+
+ g_ptr_array_remove_fast(con->srv->connections, con);
+ scon_release(con);
+
+ con->con = NULL;
+ bcon->close_cb = NULL;
+ con_close(bcon);
+ con_free(bcon);
+
+ if (NULL != con->srv->callbacks->closed_connection) {
+ con->srv->callbacks->closed_connection(con->srv->data, con);
+ }
+
+ for (i = 0; i < con->backends->len; i++) {
+ manda_server_backend_use *use = g_ptr_array_index(con->backends, i);
+ if (NULL == use) continue;
+ manda_server_release_backend(con, i);
+ }
+
+ g_ptr_array_free(con->backends, TRUE);
+ con->backends = NULL;
+ manda_idlist_free(con->idlist);
+ con->idlist = NULL;
+
+ LEAVE(con, scon_free);
+}
+
+static void scon_con_closed_cb(manda_connection *con) {
+ manda_server_connection *scon = con->priv_data;
+
+ manda_server_con_close(scon);
+}
+
+void manda_server_add_socket(manda_server *s, int fd, gpointer data) {
+ server_socket *sock = g_slice_new0(server_socket);
+
+ sock->srv = s;
+ sock->fd = fd;
+ sock->data = data;
+ sock->fd_watcher.callback = server_listening_cb;
+ sock->fd_watcher.events = MANDA_FD_READ;
+ sock->fd_watcher.fd = fd;
+ sock->fd_watcher.priv = sock;
+
+ s->ctrl->new_fd_watcher(s->data, &sock->fd_watcher);
+ s->ctrl->update_fd_watcher(s->data, &sock->fd_watcher);
+
+ g_ptr_array_add(s->sockets, sock);
+}
+
+static void manda_server_free_socket(server_socket *sock) {
+ manda_server *s = sock->srv;
+
+ s->ctrl->destroy_fd_watcher(s->data, &sock->fd_watcher);
+
+ while (-1 == close(sock->fd) && errno == EINTR) ;
+ sock->fd = -1;
+
+ g_slice_free(server_socket, sock);
+}
manda_server* manda_server_new(gpointer srv, const manda_async_ctrl *ctrl, const manda_server_callbacks *callbacks) {
manda_server *s = g_slice_new(manda_server);
@@ -656,62 +857,61 @@ manda_server* manda_server_new(gpointer srv, const manda_async_ctrl *ctrl, const
s->connections = g_ptr_array_new();
s->ctrl = ctrl;
s->callbacks = callbacks;
- s->sockets = g_array_new(FALSE, TRUE, sizeof(server_socket));
+ s->sockets = g_ptr_array_new();
return s;
}
-void manda_server_acquire(manda_server *s) {
- ++s->refcount;
-}
-
void manda_server_close(manda_server *s) {
guint i;
for (i = 0; i < s->sockets->len; i++) {
- server_socket *sock = &g_array_index(s->sockets, server_socket, i);
- close(sock->fd);
+ server_socket *sock = g_ptr_array_index(s->sockets, i);
+ manda_server_free_socket(sock);
}
- g_array_set_size(s->sockets, 0);
+ g_ptr_array_set_size(s->sockets, 0);
- for (i = s->connections->len; i-- > 0; ) {
- manda_server_connection *con = g_ptr_array_index(s->connections, i);
- con->delete_later = TRUE;
- s->callbacks->closed_connection(s->data, con);
- if (con->refcount == 0) manda_server_connection_free(con);
+ while (s->connections->len > 0) {
+ manda_server_connection *con = g_ptr_array_index(s->connections, 0);
+ manda_server_con_close(con);
}
- g_ptr_array_set_size(s->connections, 0);
}
-void manda_server_release(manda_server *s) {
- if (NULL == s) return;
- if (0 < --s->refcount) return;
-
+static void manda_server_free(manda_server *s) {
manda_server_close(s);
- g_array_free(s->sockets, TRUE);
+ g_ptr_array_free(s->sockets, TRUE);
g_ptr_array_free(s->connections, TRUE);
g_slice_free(manda_server, s);
}
-void manda_server_add_socket(manda_server *s, int fd, gpointer data);
-
-static void manda_server_connection_free(manda_server_connection *con) {
- /* TODO */
- g_slice_free(manda_server_connection, con);
+void manda_server_acquire(manda_server *s) {
+ ENTER(s);
}
-void manda_server_con_close(manda_server_connection *con);
+void manda_server_release(manda_server *s) {
+ if (NULL == s) return;
+
+ LEAVE(s, manda_server_free);
+}
-static void manda_server_backend_release(manda_server_connection *con, guint32 id) {
+static void manda_server_release_backend(manda_server_connection *con, guint32 id) {
manda_server_backend_use *use;
manda_server_backend *b;
+ if (NULL == con->backends) return;
+
if (id >= con->backends->len) return;
+ if (manda_idlist_is_used(con->idlist, id)) {
+ /* notify client */
+ send_server_release_backend(con->con, id, CONST_STR_LEN("lost backend"));
+ }
+
use = g_ptr_array_index(con->backends, id);
if (NULL == use) return;
+ g_ptr_array_index(con->backends, id) = NULL;
b = use->backend;
if (use->ndx != b->usage->len - 1) {
@@ -722,14 +922,191 @@ static void manda_server_backend_release(manda_server_connection *con, guint32 i
b->sum_last_load -= use->last_load;
g_ptr_array_set_size(b->usage, b->usage->len-1);
- con->refcount++;
+ LEAVE(b, sbackend_free);
- con->srv->callbacks->release_backend(con->srv->data, b, use->ndx, use);
+ if (NULL != con->srv && NULL != con->srv->callbacks->release_backend) {
+ con->srv->callbacks->release_backend(con->srv->data, b, use->ndx, use);
+ }
g_slice_free(manda_server_backend_use, use);
}
-manda_server_backend *manda_server_backend_new(gpointer data, GString *addr);
-void manda_server_backend_free(manda_server_backend *backend);
+manda_server_backend* manda_server_backend_new(gpointer data, GString *addr) {
+ manda_server_backend *backend = g_slice_new0(manda_server_backend);
+
+ backend->data = data;
+ backend->usage = g_ptr_array_new();
+ backend->sum_last_load = 0;
+ backend->addr = addr;
+ backend->refcount = 1;
+
+ return backend;
+}
+
+static void sbackend_free(manda_server_backend *backend) {
+ ENTER(backend); /* don't want to enter sbackend_free again */
+
+ manda_server_drop_backend(backend);
+
+ g_ptr_array_free(backend->usage, TRUE);
+ backend->usage = NULL;
+ g_string_free(backend->addr, TRUE);
+ backend->addr = NULL;
+
+ g_slice_free(manda_server_backend, backend);
+}
+
+void manda_server_backend_release(manda_server_backend *backend) {
+ if (NULL == backend) return;
+
+ LEAVE(backend, sbackend_free);
+}
+void manda_server_backend_acquire(manda_server_backend *backend) {
+ ENTER(backend);
+}
+
+gboolean manda_server_return_backend(manda_server_connection *con, gint16 reqid, manda_server_backend *backend) {
+ manda_server_backend_use *use;
+ gint i;
+
+ if (NULL == con->con || con->con->closed) return FALSE;
+
+ i = manda_idlist_get(con->idlist);
+ if (-1 == i) {
+ send_server_bind_backend_failed(con->con, reqid, CONST_STR_LEN("no free backend id available"));
+ return FALSE;
+ }
+ assert(i > 0);
+
+ use = g_slice_new0(manda_server_backend_use);
+ use->con = con;
+ use->backend_id = i;
+ use->last_load = use->last_workers = 0;
+ use->backend = backend;
+ use->ndx = backend->usage->len;
+ g_ptr_array_add(backend->usage, use);
+ if (use->backend_id >= con->backends->len) g_ptr_array_set_size(con->backends, use->backend_id + 1);
+ g_ptr_array_index(con->backends, use->backend_id) = use;
+
+ ENTER(backend);
+
+ return send_server_bind_backend(con->con, reqid, use->backend_id, GSTR_LEN(backend->addr));
+}
+
+void manda_server_return_backend_fail(manda_server_connection *con, gint16 reqid, GString *errmsg) {
+ send_server_bind_backend_failed(con->con, reqid, GSTR_LEN(errmsg));
+}
+
+void manda_server_drop_backend(manda_server_backend *backend) {
+ ENTER(backend);
+
+ while (backend->usage->len > 0) {
+ manda_server_backend_use *u = g_ptr_array_index(backend->usage, 0);
+ manda_server_release_backend(u->con, u->backend_id);
+ }
+
+ LEAVE(backend, sbackend_free);
+}
+
+static void sbackend_update(manda_server_connection *con, guint32 backend_id, guint32 load, guint32 workers) {
+ manda_server_backend_use *use;
+ manda_server_backend *b;
+
+ if (NULL == con->backends) return;
-void manda_server_return_backend(manda_server_connection *con, gint16 reqid, manda_server_backend *backend);
-void manda_server_drop_backend(manda_server_backend *backend);
+ if (backend_id >= con->backends->len) return;
+
+ use = g_ptr_array_index(con->backends, backend_id);
+ if (NULL == use) return;
+
+ b = use->backend;
+
+ b->sum_last_load += load - use->last_load;
+ use->last_load = load;
+ use->last_workers = workers;
+
+ if (NULL != con->srv->callbacks->update_backend) {
+ con->srv->callbacks->update_backend(con->srv->data, b, use->ndx);
+ }
+}
+
+
+static void scon_con_message_cb(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *payload) {
+ manda_server_connection *scon = con->priv_data;
+ manda_server *srv;
+ guint pos = 0;
+
+ UNUSED(orig_command); UNUSED(orig_data);
+
+ if (con->closed || NULL == scon || NULL == scon->srv) return;
+
+ srv = scon->srv;
+
+ ENTER(con);
+ ENTER(scon);
+ ENTER(srv);
+
+ switch (mesg_command) {
+ case MANDA_CMD_BIND_BACKEND: {
+ GString *name = NULL;
+
+ if (NULL == payload) goto error;
+ name = g_string_sized_new(payload->len - 2);
+ if (!read_net_string(payload, &pos, name)) { g_string_free(name, TRUE); goto error; }
+ if (pos != payload->len) { g_string_free(name, TRUE); goto error; }
+
+ srv->callbacks->bind_backend(srv->data, scon, name, mesg_req_id);
+
+ g_string_free(name, TRUE);
+ }
+ break;
+ case MANDA_CMD_RELEASE_BACKEND: {
+ guint32 backend_id;
+
+ if (NULL == payload) goto error;
+ if (!read_net_uint32(payload, &pos, &backend_id)) goto error;
+ if (pos != payload->len) goto error;
+
+ if (backend_id == 0) goto error; /* invalid id */
+
+ if (!manda_idlist_is_used(scon->idlist, backend_id)) goto error;
+ manda_idlist_put(scon->idlist, backend_id);
+
+ manda_server_release_backend(scon, backend_id);
+ }
+ break;
+ case MANDA_CMD_UPDATE_BACKEND: {
+ guint32 backend_id, load, workers;
+
+ if (NULL == payload) goto error;
+ if (!read_net_uint32(payload, &pos, &backend_id)) goto error;
+ if (!read_net_uint32(payload, &pos, &load)) goto error;
+ if (!read_net_uint32(payload, &pos, &workers)) goto error;
+ if (pos != payload->len) goto error;
+
+ if (backend_id == 0) goto error; /* invalid id */
+
+ if (!manda_idlist_is_used(scon->idlist, backend_id)) goto error;
+
+ sbackend_update(scon, backend_id, load, workers);
+ }
+ break;
+
+ case MANDA_CMD_UNKNOWN_COMMAND:
+ /* we require all our commands to be handled: */
+ goto error;
+
+ default:
+ send_unkown_command(con, mesg_command, mesg_req_id);
+ break;
+ }
+
+ goto out;
+
+error:
+ manda_server_con_close(scon);
+
+out:
+ LEAVE(srv, manda_server_free);
+ LEAVE(scon, scon_free);
+ LEAVE(con, _con_free);
+}
diff --git a/libmanda.h b/libmanda.h
index a24a8be..f321a42 100644
--- a/libmanda.h
+++ b/libmanda.h
@@ -29,6 +29,8 @@ typedef struct manda_client_backend_callbacks manda_client_backend_callbacks;
typedef struct manda_client_backend manda_client_backend;
typedef struct manda_client manda_client;
+typedef struct manda_connection manda_connection;
+
typedef void (*manda_fd_watcher_cb)(manda_fd_watcher *watcher);
typedef void (*manda_new_fd_watcher)(gpointer srv, manda_fd_watcher *watcher);
@@ -78,20 +80,20 @@ struct manda_timeout {
/* Server API */
-typedef void (*manda_server_new_connection)(gpointer srv, manda_server_connection *con);
-typedef void (*manda_server_closed_connection)(gpointer srv, manda_server_connection *con);
+typedef void (*manda_server_new_connection_cb)(gpointer srv, manda_server_connection *con);
+typedef void (*manda_server_closed_connection_cb)(gpointer srv, manda_server_connection *con);
-typedef void (*manda_server_bind_backend)(gpointer srv, manda_server_connection *con, GString *name, guint16 reqid);
-typedef void (*manda_server_update_backend)(gpointer srv, manda_server_backend *backend, guint ndx);
-typedef void (*manda_server_release_backend)(gpointer srv, manda_server_backend *backend, guint old_ndx, manda_server_backend_use *old_use);
+typedef void (*manda_server_bind_backend_cb)(gpointer srv, manda_server_connection *con, GString *name, guint16 reqid);
+typedef void (*manda_server_update_backend_cb)(gpointer srv, manda_server_backend *backend, guint ndx);
+typedef void (*manda_server_release_backend_cb)(gpointer srv, manda_server_backend *backend, guint old_ndx, manda_server_backend_use *old_use);
struct manda_server_callbacks {
- manda_server_new_connection new_connection;
- manda_server_closed_connection closed_connection;
+ manda_server_new_connection_cb new_connection;
+ manda_server_closed_connection_cb closed_connection;
- manda_server_bind_backend bind_backend;
- manda_server_update_backend update_backend;
- manda_server_release_backend release_backend;
+ manda_server_bind_backend_cb bind_backend;
+ manda_server_update_backend_cb update_backend;
+ manda_server_release_backend_cb release_backend;
};
struct manda_server_connection {
@@ -100,7 +102,8 @@ struct manda_server_connection {
/* private from here */
gint refcount;
- gboolean delete_later;
+
+ manda_connection *con;
GPtrArray *backends; /* manda_server_backend_use */
manda_IDList *idlist;
@@ -110,7 +113,7 @@ struct manda_server_backend_use {
manda_server_connection *con;
guint32 backend_id;
- guint32 last_load, last_backends;
+ guint32 last_load, last_workers;
/* private from here */
manda_server_backend *backend;
@@ -126,7 +129,6 @@ struct manda_server_backend {
/* private from here */
gint refcount;
- gboolean delete_later;
};
struct manda_server {
@@ -138,7 +140,7 @@ struct manda_server {
const manda_async_ctrl *ctrl;
const manda_server_callbacks *callbacks;
- GArray *sockets;
+ GPtrArray *sockets;
};
manda_server* manda_server_new(gpointer srv, const manda_async_ctrl *ctrl, const manda_server_callbacks *callbacks);
@@ -153,10 +155,12 @@ void manda_server_add_socket(manda_server *s, int fd, gpointer data);
void manda_server_con_close(manda_server_connection *con);
manda_server_backend *manda_server_backend_new(gpointer data, GString *addr);
-void manda_server_backend_free(manda_server_backend *backend);
+void manda_server_backend_acquire(manda_server_backend *backend);
+void manda_server_backend_release(manda_server_backend *backend);
-void manda_server_return_backend(manda_server_connection *con, gint16 reqid, manda_server_backend *backend);
-void manda_server_drop_backend(manda_server_backend *backend);
+gboolean manda_server_return_backend(manda_server_connection *con, gint16 reqid, manda_server_backend *backend);
+void manda_server_return_backend_fail(manda_server_connection *con, gint16 reqid, GString *errmsg);
+void manda_server_drop_backend(manda_server_backend *backend); /* tell all users that the backend is gone */
/* Client API */