summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libmanda.c417
-rw-r--r--libmanda.h20
2 files changed, 429 insertions, 8 deletions
diff --git a/libmanda.c b/libmanda.c
index d900864..23c9c56 100644
--- a/libmanda.c
+++ b/libmanda.c
@@ -597,10 +597,11 @@ static void con_fix_header(GByteArray *payload, guint16 command, guint16 req_id,
}
/* payload needs to be prefixed with 8 dummy bytes for the header */
-static void con_send_request(manda_connection *con, GByteArray *payload, guint16 command, guint16 resp_id, gpointer data, double wait_timeout) {
+static gboolean con_send_request(manda_connection *con, GByteArray *payload, guint16 command, guint16 resp_id, gpointer data, double wait_timeout) {
double now;
gint reqid;
request *req;
+ gboolean res = TRUE;
ENTER(con);
@@ -628,6 +629,9 @@ static void con_send_request(manda_connection *con, GByteArray *payload, guint16
now = con->ctrl->get_time(con->data);
req->timeout = now + wait_timeout;;
req->timeout_step = now + TIMEOUT_STEP;
+ req->data = data;
+ req->command = command;
+
if (req->timeout_step > req->timeout) req->timeout_step = req->timeout;
if (con->timeout_first_id == 0) {
@@ -644,14 +648,14 @@ static void con_send_request(manda_connection *con, GByteArray *payload, guint16
goto out;
error:
- if (NULL != con->message_cb) {
- con->message_cb(con, command, data, 0, 0, NULL);
- }
+ res = FALSE;
g_byte_array_free(payload, TRUE);
out:
LEAVE(con, _con_free);
+
+ return res;
}
/* payload needs to be prefixed with 8 dummy bytes for the header */
@@ -728,6 +732,35 @@ static gboolean send_server_release_backend(manda_connection *con, guint32 backe
return con_send_notify(con, payload, MANDA_CMD_RELEASE_BACKEND, 0);
}
+static gboolean send_client_bind_backend(manda_connection *con, const gchar *name, guint len, gpointer data, double wait_timeout) {
+ GByteArray *payload;
+ if (NULL == con) return FALSE;
+
+ payload = new_payload();
+ write_net_string(payload, name, len);
+ return con_send_request(con, payload, MANDA_CMD_BIND_BACKEND, 0, data, wait_timeout);
+}
+
+static gboolean send_client_release_backend(manda_connection *con, guint32 backend_id) {
+ GByteArray *payload;
+ if (NULL == con) return FALSE;
+
+ payload = new_payload();
+ write_net_uint32(payload, backend_id);
+ return con_send_notify(con, payload, MANDA_CMD_RELEASE_BACKEND, 0);
+}
+
+static gboolean send_client_update_backend(manda_connection *con, guint32 backend_id, guint32 load, guint32 workers) {
+ GByteArray *payload;
+ if (NULL == con) return FALSE;
+
+ payload = new_payload();
+ write_net_uint32(payload, backend_id);
+ write_net_uint32(payload, load);
+ write_net_uint32(payload, workers);
+ return con_send_notify(con, payload, MANDA_CMD_UPDATE_BACKEND, 0);
+}
+
/* server connection */
typedef struct server_socket server_socket;
@@ -1110,3 +1143,379 @@ out:
LEAVE(scon, scon_free);
LEAVE(con, _con_free);
}
+
+/* client connection */
+
+static void client_message_cb(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *mesg_payload);
+static void client_close_cb(manda_connection *con);
+
+static void mclient_free(manda_client *c) {
+ ENTER(c); /* don't want to enter mclient_free again */
+
+ manda_client_close(c);
+
+ g_free(c->addr);
+ c->addr = NULL;
+
+ if (NULL != c->backends) {
+ g_ptr_array_free(c->backends, TRUE);
+ c->backends = NULL;
+ }
+
+ g_slice_free(manda_client, c);
+}
+
+/* TODO: start timer to reconnect if connect failed */
+static void client_connect_cb(manda_fd_watcher *watcher) {
+ manda_client *c = watcher->priv;
+ int s = watcher->fd;
+
+ struct sockaddr addr;
+ socklen_t len;
+
+ ENTER(c);
+
+ c->ctrl->destroy_fd_watcher(c->data, &c->sock_watcher);
+ c->sock_watcher.fd = -1;
+ c->sock_fd = -1;
+
+ /* create new connection:
+ * see http://www.cyberconf.org/~cynbe/ref/nonblocking-connects.html
+ */
+
+ /* Check to see if we can determine our peer's address. */
+ len = sizeof(addr);
+ if (getpeername(s, &addr, &len) == -1) {
+ /* connect failed; find out why */
+ int err;
+ len = sizeof(err);
+#ifdef SO_ERROR
+ getsockopt(s, SOL_SOCKET, SO_ERROR, (void*)&err, &len);
+#else
+ {
+ char ch;
+ errno = 0;
+ read(s, &ch, 1);
+ err = errno;
+ }
+#endif
+ close(s);
+
+ UNUSED(err);
+
+ goto out;
+ } else {
+ /* connect succeeded */
+ c->con = con_new(c->data, c->ctrl, c, client_message_cb, client_close_cb, s);
+ }
+
+out:
+ LEAVE(c, mclient_free);
+}
+
+static void client_connect(manda_client *c) {
+ int s;
+ double now;
+
+ ENTER(c);
+
+ if (NULL != c->con || c->closed) goto out;
+ if (-1 != c->sock_fd || NULL == c->addr) goto out;
+
+ now = c->ctrl->get_time(c->data);
+ if (c->last_connect_ts + 1.0 > now) goto out; /* only try connect once per second */
+
+ c->last_connect_ts = now;
+
+ do {
+ s = socket(c->addr->sa_family, SOCK_STREAM, 0);
+ } while (-1 == s && errno == EINTR);
+
+ if (-1 == s) goto out;
+
+ fd_init(s);
+ c->sock_fd = s;
+ c->sock_watcher.fd = s;
+ c->sock_watcher.events = 0;
+ c->ctrl->new_fd_watcher(c->data, &c->sock_watcher);
+
+ if (-1 == connect(s, c->addr, c->addrlen)) {
+ switch (errno) {
+ case EINPROGRESS:
+ case EALREADY:
+ case EINTR:
+ c->sock_watcher.events = MANDA_FD_READ | MANDA_FD_WRITE;
+ c->ctrl->update_fd_watcher(c->data, &c->sock_watcher);
+ goto out;
+ case EAGAIN: /* server overloaded */
+ default:
+ c->ctrl->destroy_fd_watcher(c->data, &c->sock_watcher);
+ close(s);
+ c->sock_watcher.fd = -1;
+ c->sock_fd = -1;
+ goto out;
+ }
+ }
+
+ c->ctrl->destroy_fd_watcher(c->data, &c->sock_watcher);
+ c->sock_watcher.fd = -1;
+ c->sock_fd = -1;
+
+ c->con = con_new(c->data, c->ctrl, c, client_message_cb, client_close_cb, s);
+
+out:
+ LEAVE(c, mclient_free);
+
+ return;
+}
+
+manda_client* manda_client_new(gpointer srv, const manda_async_ctrl *ctrl, struct sockaddr *addr, socklen_t addrlen) {
+ manda_client *c = g_slice_new0(manda_client);
+
+ c->refcount = 1;
+ c->closed = FALSE;
+ c->data = srv;
+ c->ctrl = ctrl;
+
+ c->addrlen = addrlen;
+ c->addr = g_memdup(addr, addrlen);
+
+ c->sock_fd = -1;
+ c->sock_watcher.fd = -1;
+ c->sock_watcher.priv = c;
+ c->sock_watcher.events = 0;
+ c->sock_watcher.callback = client_connect_cb;
+
+ client_connect(c);
+
+ return c;
+}
+
+static void client_close(manda_client *c) {
+ if (-1 != c->sock_fd) {
+ c->ctrl->destroy_fd_watcher(c->data, &c->sock_watcher);
+ close(c->sock_fd);
+ c->sock_fd = -1;
+ c->sock_watcher.fd = -1;
+ }
+
+ if (NULL != c->con) {
+ c->con->close_cb = NULL;
+ con_close(c->con);
+ LEAVE(c->con, _con_free);
+ c->con = NULL;
+ }
+
+ if (NULL != c->backends) {
+ guint i;
+
+ for (i = 0; i < c->backends->len; i++) {
+ static const GString con_closed_msg = { CONST_STR_LEN("Connection closed"), 0 };
+ manda_client_backend *backend = g_ptr_array_index(c->backends, i);
+
+ if (NULL != backend) {
+ g_ptr_array_index(c->backends, i) = NULL;
+
+ backend->id = 0;
+ if (NULL != backend->callbacks) {
+ backend->callbacks->lost_backend(c->data, backend, &con_closed_msg);
+ }
+
+ g_string_free(backend->addr, TRUE);
+ g_slice_free(manda_client_backend, backend);
+ }
+ }
+
+ g_ptr_array_set_size(c->backends, 0);
+ }
+
+ if (!c->closed) {
+ /* TODO: timer? */
+ client_connect(c);
+ }
+}
+
+void manda_client_close(manda_client *c) {
+ ENTER(c);
+
+ c->closed = TRUE;
+ client_close(c);
+
+ LEAVE(c, mclient_free);
+}
+
+void manda_client_acquire(manda_client *c) {
+ ENTER(c);
+}
+
+void manda_client_release(manda_client *c) {
+ if (NULL == c) return;
+
+ LEAVE(c, mclient_free);
+}
+
+manda_client_backend* manda_client_bind_backend(manda_client *c, GString *name, gpointer data, const manda_client_backend_callbacks *callbacks) {
+ manda_client_backend *backend;
+
+ client_connect(c);
+
+ if (NULL == c->con) return NULL;
+
+ backend = g_slice_new(manda_client_backend);
+ backend->data = data;
+ backend->callbacks = callbacks;
+ backend->client = c;
+ backend->addr = NULL;
+ backend->id = 0;
+
+ if (!send_client_bind_backend(c->con, GSTR_LEN(name), backend, 5.0)) {
+ g_slice_free(manda_client_backend, backend);
+ return NULL;
+ }
+
+ return backend;
+}
+
+void manda_client_release_backend(manda_client_backend *backend) {
+ manda_client *c = backend->client;
+
+ if (backend->id != 0) {
+ /* "active" working backend */
+ send_client_release_backend(c->con, backend->id);
+ g_ptr_array_index(c->backends, backend->id) = NULL;
+ backend->id = 0;
+
+ g_string_free(backend->addr, TRUE);
+ g_slice_free(manda_client_backend, backend);
+ } else {
+ /* either "dead" backend or "waiting" backend */
+ /* in both cases free it somewhere else */
+
+ backend->callbacks = NULL;
+ }
+}
+
+void manda_client_update_backend(manda_client_backend *backend, guint32 load, guint32 workers) {
+ manda_client *c = backend->client;
+
+ if (backend->id != 0) {
+ send_client_update_backend(c->con, backend->id, load, workers);
+ }
+}
+
+static void client_message_cb(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 command, guint16 req_id, GByteArray *payload) {
+ manda_client *c = con->priv_data;
+ guint pos = 0;
+
+ UNUSED(req_id);
+
+ ENTER(c);
+ ENTER(con);
+
+ switch (command) {
+ case 0: /* timeout/request failed */
+ switch (orig_command) {
+ case MANDA_CMD_BIND_BACKEND: {
+ manda_client_backend *backend = orig_data;
+
+ backend->callbacks->lost_backend(c->data, backend, NULL);
+
+ g_string_free(backend->addr, TRUE);
+ g_slice_free(manda_client_backend, backend);
+ }
+ break;
+ default:
+ goto error;
+ }
+ break;
+
+ case MANDA_CMD_BIND_BACKEND:
+ switch (orig_command) {
+ case MANDA_CMD_BIND_BACKEND: {
+ manda_client_backend *backend = orig_data;
+ guint32 backend_id;
+ GString *addr = NULL;
+
+ if (NULL == payload) goto bind_backend_failed;
+ if (!read_net_uint32(payload, &pos, &backend_id)) goto bind_backend_failed;
+ addr = g_string_sized_new(payload->len - 2 - pos);
+ if (!read_net_string(payload, &pos, addr)) { g_string_free(addr, TRUE); addr = NULL; goto bind_backend_failed; }
+ if (pos != payload->len) { g_string_free(addr, TRUE); addr = NULL; goto bind_backend_failed; }
+
+ if (0 == backend_id) goto bind_backend_failed;
+
+ backend->addr = addr;
+ backend->id = backend_id;
+
+ if (backend_id >= c->backends->len) g_ptr_array_set_size(c->backends, backend_id+1);
+ g_ptr_array_index(c->backends, backend_id) = backend;
+
+ if (NULL != backend->callbacks) {
+ backend->callbacks->return_backend(c->data, backend);
+ } else {
+ manda_client_release_backend(backend);
+ }
+
+ goto out;
+
+bind_backend_failed:
+ backend->callbacks->lost_backend(c->data, backend, addr);
+
+ g_string_free(backend->addr, TRUE);
+ g_slice_free(manda_client_backend, backend);
+
+ if (NULL == addr) goto error;
+ g_string_free(addr, TRUE);
+ }
+ break;
+ }
+ break;
+
+ case MANDA_CMD_RELEASE_BACKEND: {
+ manda_client_backend *backend = NULL;
+ guint32 backend_id;
+ GString *msg = NULL;
+
+ if (NULL == payload) goto error;
+ if (!read_net_uint32(payload, &pos, &backend_id)) goto error;
+ msg = g_string_sized_new(payload->len - 2 - pos);
+ if (!read_net_string(payload, &pos, msg)) { g_string_free(msg, TRUE); goto error; }
+ if (pos != payload->len) { g_string_free(msg, TRUE); goto error; }
+
+ if (0 == backend_id) { g_string_free(msg, TRUE); goto error; }
+
+ if (backend_id >= c->backends->len) { g_string_free(msg, TRUE); goto out; }
+ backend = g_ptr_array_index(c->backends, backend_id);
+ if (NULL == backend) { g_string_free(msg, TRUE); goto out; }
+
+ g_ptr_array_index(c->backends, backend_id) = NULL;
+ backend->id = 0;
+
+ if (NULL != backend->callbacks) {
+ backend->callbacks->lost_backend(c->data, backend, msg);
+ }
+
+ g_string_free(msg, TRUE);
+ g_string_free(backend->addr, TRUE);
+ g_slice_free(manda_client_backend, backend);
+ }
+ break;
+
+ case MANDA_CMD_UNKNOWN_COMMAND:
+ goto error;
+ }
+
+ goto out;
+error:
+ client_close(c);
+
+out:
+ LEAVE(con, _con_free);
+ LEAVE(c, mclient_free);
+}
+
+static void client_close_cb(manda_connection *con) {
+ manda_client *c = con->priv_data;
+
+ client_close(c);
+}
diff --git a/libmanda.h b/libmanda.h
index f321a42..70140a7 100644
--- a/libmanda.h
+++ b/libmanda.h
@@ -166,7 +166,7 @@ void manda_server_drop_backend(manda_server_backend *backend); /* tell all users
typedef void (*manda_client_return_backend)(gpointer srv, manda_client_backend *backend);
/* backend will be free()d after the callback, don't keep the pointer; reason is NULL if the connection was closed */
-typedef void (*manda_client_lost_backend)(gpointer srv, manda_client_backend *backend, GString *reason);
+typedef void (*manda_client_lost_backend)(gpointer srv, manda_client_backend *backend, const GString *reason);
struct manda_client_backend_callbacks {
manda_client_return_backend return_backend;
@@ -181,24 +181,36 @@ struct manda_client_backend {
/* private from here */
const manda_client_backend_callbacks *callbacks;
- gboolean do_release;
+ guint32 id;
};
struct manda_client { /* private */
gint refcount;
gpointer data; /* application data */
+ gboolean closed;
const manda_async_ctrl *ctrl;
struct sockaddr *addr;
socklen_t addrlen;
+
+ manda_connection *con;
+
+ GPtrArray *backends; /* manda_client_backend* */
+
+ /* establish connection */
+ double last_connect_ts;
+ int sock_fd;
+ manda_fd_watcher sock_watcher;
};
manda_client* manda_client_new(gpointer srv, const manda_async_ctrl *ctrl, struct sockaddr *addr, socklen_t addrlen);
+void manda_client_close(manda_client *c);
void manda_client_acquire(manda_client *c);
void manda_client_release(manda_client *c);
-manda_client_backend* manda_client_bind_backend(manda_client *c, GString *name, const manda_client_backend_callbacks *callbacks);
-void manda_client_release_backend(manda_client *c, manda_client_backend *backend);
+manda_client_backend* manda_client_bind_backend(manda_client *c, GString *name, gpointer data, const manda_client_backend_callbacks *callbacks);
+void manda_client_release_backend(manda_client_backend *backend);
+void manda_client_update_backend(manda_client_backend *backend, guint32 load, guint32 workers);
#endif