Browse Source

Add more angel connection handling code (and helper functions)

personal/stbuehler/wip
Stefan Bühler 13 years ago
parent
commit
75505f73e1
  1. 2
      include/lighttpd/angel_base.h
  2. 70
      include/lighttpd/angel_connection.h
  3. 1
      include/lighttpd/angel_data.h
  4. 2
      include/lighttpd/angel_server.h
  5. 1
      include/lighttpd/base.h
  6. 3
      include/lighttpd/idlist.h
  7. 9
      include/lighttpd/utils.h
  8. 6
      src/CMakeLists.txt
  9. 583
      src/angel_connection.c
  10. 39
      src/angel_data.c
  11. 9
      src/idlist.c
  12. 64
      src/utils.c

2
include/lighttpd/angel_base.h

@ -25,4 +25,6 @@ typedef struct instance instance;
#include <lighttpd/angel_plugin.h>
#include <lighttpd/angel_server.h>
#include <lighttpd/utils.h>
#endif

70
include/lighttpd/angel_connection.h

@ -1,19 +1,56 @@
#ifndef _LIGHTTPD_ANGEL_CONNECTION_H_
#define _LIGHTTPD_ANGEL_CONNECTION_H_
#include <lighttpd/idlist.h>
#define ANGEL_CALL_MAX_STR_LEN (64*1024) /* must fit into a gint32 */
struct angel_connection;
typedef struct angel_connection angel_connection;
struct angel_call;
typedef struct angel_call angel_call;
typedef void (*AngelCallback)(angel_call *acall, gboolean timeout, GString *error, GString *data, GArray *fds);
typedef void (*AngelCallback)(gpointer ctx, gboolean timeout, GString *error, GString *data, GArray *fds);
typedef void (*AngelReceiveCall)(angel_connection *acon,
const gchar *mod, gsize mod_len, const gchar *action, gsize action_len,
gint32 id,
GString *data);
typedef void (*AngelReceiveResult)(angel_connection *acon,
const gchar *mod, gsize mod_len, const gchar *action, gsize action_len,
gint32 id,
GString *error, GString *data, GArray *fds);
/* gets called after read/write errors */
typedef void (*AngelCloseCallback)(angel_connection *acon, GError *err);
struct angel_connection {
gpointer data;
GStaticMutex mutex; /* angel itself has no threads */
struct ev_loop *loop;
int fd;
idlist *call_id_list;
GPtrArray *call_table;
ev_io fd_watcher;
ev_async out_notify_watcher;
GQueue *out;
angel_buffer in;
AngelReceiveCall recv_call;
AngelReceiveResult recv_result;
AngelCloseCallback close_cb;
/* parse input */
struct {
gboolean have_header;
gint32 type, id;
gint32 mod_len, action_len, error_len, data_len, missing_fds;
guint body_size;
GString *mod, *action, *error, *data;
GArray *fds;
} parse;
};
/* with multi-threading you should protect the structure
@ -24,27 +61,39 @@ struct angel_call {
AngelCallback callback;
/* internal data */
gint32 id; /* id is -1 if there is no call pending (the callback may still be running) */
guint timeout;
angel_connection *acon;
ev_timer timeout_watcher;
ev_io fd_watcher;
};
/* error handling */
#define ANGEL_CALL_ERROR angel_call_error_quark()
LI_API GQuark angel_call_error_quark();
#define ANGEL_CONNECTION_ERROR angel_connection_error_quark()
LI_API GQuark angel_connection_error_quark();
typedef enum {
ANGEL_CALL_ALREADY_RUNNING /* the angel_call struct is already in use for a call */
ANGEL_CALL_ALREADY_RUNNING, /* the angel_call struct is already in use for a call */
ANGEL_CALL_OUT_OF_CALL_IDS, /* too many calls already pending */
ANGEL_CALL_INVALID /* invalid params */
} AngelCallError;
typedef enum {
ANGEL_CONNECTION_CLOSED, /* error on socket */
ANGEL_CONNECTION_INVALID_DATA /* invalid data from stream */
} AngelConnectionError;
/* create connection */
LI_API angel_connection* angel_connection_create(int fd);
LI_API angel_connection* angel_connection_create(
struct ev_loop *loop, int fd, gpointer data,
AngelReceiveCall recv_call, AngelReceiveResult recv_result, AngelCloseCallback close_cb);
LI_API angel_call *angel_call_create(AngelCallback callback, ev_tstamp timeout);
/* returns TRUE if a call was cancelled; make sure you don't call free while you're calling send_call */
LI_API gboolean angel_call_free(angel_call *call);
/* calls */
/* the GString* parameters get stolen by the angel call (moved to chunkqueue) */
LI_API void angel_call_init(angel_call *call);
LI_API gboolean angel_send_simple_call(
angel_connection *acon,
const gchar *mod, gsize mod_len, const gchar *action, gsize action_len,
@ -54,18 +103,19 @@ LI_API gboolean angel_send_simple_call(
LI_API gboolean angel_send_call(
angel_connection *acon,
const gchar *mod, gsize mod_len, const gchar *action, gsize action_len,
angel_call *call, guint timeout,
angel_call *call,
GString *data,
GError **err);
LI_API gboolean angel_send_result(
angel_connection *acon,
const gchar *mod, gsize mod_len, const gchar *action, gsize action_len,
angel_call *call, guint timeout,
gint32 id,
GString *error, GString *data, GArray *fds,
GError **err);
LI_API gboolean angel_cancel_call(angel_connection *acon, angel_call *call);
/* free temporary needed memroy; call this once in while after some activity */
LI_API void angel_cleanup_tables(angel_connection *acon);
/* Usage */
#if 0

1
include/lighttpd/angel_data.h

@ -54,5 +54,6 @@ LI_API gboolean angel_data_read_int32(angel_buffer *buf, gint32 *val, GError **e
LI_API gboolean angel_data_read_int64(angel_buffer *buf, gint64 *val, GError **err);
LI_API gboolean angel_data_read_char (angel_buffer *buf, gchar *val, GError **err);
LI_API gboolean angel_data_read_str (angel_buffer *buf, GString **val, GError **err);
LI_API gboolean angel_data_read_mem (angel_buffer *buf, GString **val, gsize len, GError **err);
#endif

2
include/lighttpd/angel_server.h

@ -11,6 +11,8 @@
struct instance {
pid_t pid;
angel_connection *con;
};
struct server {

1
include/lighttpd/base.h

@ -61,6 +61,7 @@
#include <lighttpd/network.h>
#include <lighttpd/encoding.h>
#include <lighttpd/etag.h>
#include <lighttpd/lighttpd-glue.h>
#include <lighttpd/utils.h>
#include <lighttpd/lighttpd-glue.h>

3
include/lighttpd/idlist.h

@ -32,6 +32,9 @@ LI_API void idlist_free(idlist *l);
/* request new id; return -1 if no id is available, valid ids are always > 0 */
LI_API gint idlist_get(idlist *l);
/* check whether an id is in use and can be "_put" */
LI_API gboolean idlist_is_used(idlist *l, gint id);
/* release id. never release an id more than once! */
LI_API void idlist_put(idlist *l, gint id);

9
include/lighttpd/utils.h

@ -15,6 +15,15 @@ LI_API void fatal(const gchar* msg);
/* set O_NONBLOCK and FD_CLOEXEC */
LI_API void fd_init(int fd);
LI_API void fd_no_block(int fd);
LI_API void fd_block(int fd);
#ifndef _WIN32
/* return -2 for EAGAIN, -1 for some other error, 0 for success */
LI_API int send_fd(int s, int fd); /* write fd to unix socket s */
LI_API int receive_fd(int s, int *fd); /* read fd from unix socket s */
#endif
LI_API void ev_io_add_events(struct ev_loop *loop, ev_io *watcher, int events);
LI_API void ev_io_rem_events(struct ev_loop *loop, ev_io *watcher, int events);
LI_API void ev_io_set_events(struct ev_loop *loop, ev_io *watcher, int events);

6
src/CMakeLists.txt

@ -263,8 +263,9 @@ INCLUDE_DIRECTORIES(${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR} ${CM
SET(COMMON_SRC
angel.c
angel_fake.c
angel_connection.c
angel_data.c
angel_fake.c
actions.c
base.c
chunk.c
@ -280,6 +281,7 @@ SET(COMMON_SRC
http_headers.c
http_request_parser.c
http_response_parser.c
idlist.c
ip_parsers.c
lighttpd-glue.c
log.c
@ -360,6 +362,7 @@ ADD_TARGET_PROPERTIES(lighttpd COMPILE_FLAGS ${COMMON_CFLAGS})
ADD_EXECUTABLE(lighttpd-angel
angel_config_parser.c
angel_connection.c
angel_data.c
angel_log.c
angel_main.c
@ -367,6 +370,7 @@ ADD_EXECUTABLE(lighttpd-angel
angel_plugin_core.c
angel_server.c
angel_value.c
idlist.c
ip_parsers.c
module.c
utils.c

583
src/angel_connection.c

@ -0,0 +1,583 @@
#include <lighttpd/utils.h>
#include <lighttpd/angel_data.h>
#include <lighttpd/angel_connection.h>
#define ANGEL_MAGIC ((gint32) 0x8a930a9f)
typedef enum {
ANGEL_CALL_SEND_SIMPLE = 1,
ANGEL_CALL_SEND_CALL = 2,
ANGEL_CALL_SEND_RESULT = 3
} angel_call_send_t;
typedef struct {
enum { ANGEL_CONNECTION_ITEM_GSTRING, ANGEL_CONNECTION_ITEM_FDS } type;
union {
struct {
GString *buf;
guint pos;
} string;
struct {
GArray *fds;
guint pos;
} fds;
} value;
} angel_connection_send_item_t;
static void send_queue_push_string(GQueue *queue, GString *buf) {
angel_connection_send_item_t *i;
if (!buf || !buf->len) return;
i = g_slice_new0(angel_connection_send_item_t);
i->type = ANGEL_CONNECTION_ITEM_GSTRING;
i->value.string.buf = buf;
i->value.string.pos = 0;
g_queue_push_tail(queue, i);
}
static void send_queue_push_fds(GQueue *queue, GArray *fds) {
angel_connection_send_item_t *i;
if (!fds || !fds->len) return;
i = g_slice_new0(angel_connection_send_item_t);
i->type = ANGEL_CONNECTION_ITEM_FDS;
i->value.fds.fds = fds;
i->value.fds.pos = 0;
g_queue_push_tail(queue, i);
}
static void send_queue_item_free(angel_connection_send_item_t *i) {
if (!i) return;
switch (i->type) {
case ANGEL_CONNECTION_ITEM_GSTRING:
g_string_free(i->value.string.buf, TRUE);
break;
case ANGEL_CONNECTION_ITEM_FDS:
g_array_free(i->value.fds.fds, TRUE);
break;
}
g_slice_free(angel_connection_send_item_t, i);
}
static void send_queue_clean(GQueue *queue) {
angel_connection_send_item_t *i;
while (NULL != (i = g_queue_peek_head(queue))) {
switch (i->type) {
case ANGEL_CONNECTION_ITEM_GSTRING:
if (i->value.string.pos < i->value.string.buf->len) return;
g_string_free(i->value.string.buf, TRUE);
break;
case ANGEL_CONNECTION_ITEM_FDS:
if (i->value.fds.pos < i->value.fds.fds->len) return;
g_array_free(i->value.fds.fds, TRUE);
break;
}
g_queue_peek_head(queue);
g_slice_free(angel_connection_send_item_t, i);
}
}
GQuark angel_call_error_quark() {
return g_quark_from_static_string("angel-call-error-quark");
}
GQuark angel_connection_error_quark() {
return g_quark_from_static_string("angel-connection-error-quark");
}
static gboolean angel_fill_buffer(angel_connection *acon, guint need, GError **err) {
gsize old_len;
ssize_t want, r;
if (acon->in.pos > 0) {
g_string_erase(acon->in.data, 0, acon->in.pos);
acon->in.pos = 0;
}
if (acon->in.data->len >= need) return TRUE;
want = need - acon->in.data->len; /* always > 0 */
old_len = acon->in.data->len;
g_string_set_size(acon->in.data, need);
for ( ; want > 0; ) {
r = read(acon->fd, acon->in.data + old_len, want);
if (r < 0) {
switch (errno) {
case EINTR:
continue;
case EAGAIN:
#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
g_string_set_size(acon->in.data, old_len);
return TRUE;
default:
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_CLOSED,
"read error: %s", g_strerror(errno));
g_string_set_size(acon->in.data, old_len);
return FALSE;
}
} else if (r == 0) { /* eof */
errno = ECONNRESET;
g_string_set_size(acon->in.data, old_len);
return FALSE;
} else {
want -= r;
old_len += r;
}
}
g_string_set_size(acon->in.data, old_len);
return TRUE;
}
static gboolean angel_dispatch(angel_connection *acon, GError **err) {
gint32 id = acon->parse.id, type = acon->parse.type;
angel_call *call = NULL;
if (type != ANGEL_CALL_SEND_SIMPLE) {
g_static_mutex_lock(&acon->mutex);
if (!idlist_is_used(acon->call_id_list, id)) {
g_static_mutex_unlock(&acon->mutex);
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA,
"Invalid id: %i", (gint) id);
return FALSE;
}
idlist_put(acon->call_id_list, id);
if (type == ANGEL_CALL_SEND_RESULT && (guint) id < acon->call_table->len) {
call = (angel_call*) g_ptr_array_index(acon->call_table, id);
g_ptr_array_index(acon->call_table, id) = NULL;
if (call) {
ev_timer_stop(acon->loop, &call->timeout_watcher);
if (!call->callback) {
g_slice_free(angel_call, call);
call = NULL;
}
}
}
g_static_mutex_unlock(&acon->mutex);
} else if (-1 != id) {
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA,
"Invalid id: %i, should be -1 for simple call", (gint) id);
return FALSE;
}
switch (type) {
case ANGEL_CALL_SEND_SIMPLE:
if (acon->parse.error->len > 0 || acon->parse.fds->len > 0) {
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA,
"Wrong data in call");
return FALSE;
}
acon->recv_call(acon, GSTR_LEN(acon->parse.mod), GSTR_LEN(acon->parse.action),
id, acon->parse.data);
break;
case ANGEL_CALL_SEND_RESULT:
if (call) {
acon->recv_result(acon, GSTR_LEN(acon->parse.mod), GSTR_LEN(acon->parse.action),
id, acon->parse.error, acon->parse.data, acon->parse.fds);
}
break;
default:
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA,
"Invalid type: %i", (gint) type);
return FALSE;
}
return TRUE;
}
static gboolean angel_connection_read(angel_connection *acon, GError **err) {
for ( ;; ) {
if (!acon->parse.have_header) {
gint32 magic;
if (!angel_fill_buffer(acon, 8*4, err)) return FALSE;
if (acon->in.data->len - acon->in.pos < 8*4) return TRUE; /* need more data */
if (!angel_data_read_int32(&acon->in, &magic, err)) return FALSE;
if (!angel_data_read_int32(&acon->in, &acon->parse.type, err)) return FALSE;
if (!angel_data_read_int32(&acon->in, &acon->parse.id, err)) return FALSE;
if (!angel_data_read_int32(&acon->in, &acon->parse.mod_len, err)) return FALSE;
if (!angel_data_read_int32(&acon->in, &acon->parse.action_len, err)) return FALSE;
if (!angel_data_read_int32(&acon->in, &acon->parse.error_len, err)) return FALSE;
if (!angel_data_read_int32(&acon->in, &acon->parse.data_len, err)) return FALSE;
if (!angel_data_read_int32(&acon->in, &acon->parse.missing_fds, err)) return FALSE;
if (ANGEL_MAGIC != magic) {
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA,
"Invalid magic: 0x%x (should be 0x%x)", (gint) magic, (gint) ANGEL_MAGIC);
return FALSE;
}
acon->parse.body_size = acon->parse.mod_len + acon->parse.action_len + acon->parse.error_len + acon->parse.data_len;
acon->parse.have_header = TRUE;
}
if (!angel_fill_buffer(acon, acon->parse.body_size, err)) return FALSE;
if (acon->in.data->len - acon->in.pos < acon->parse.body_size) return TRUE; /* need more data */
while (acon->parse.missing_fds > 0) {
int fd = -1;
switch (receive_fd(acon->fd, &fd)) {
case 0:
g_array_append_val(acon->parse.fds, fd);
acon->parse.missing_fds--;
break;
case -1:
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_CLOSED,
"receive fd error: %s", g_strerror(errno));
return FALSE;
case -2:
return TRUE; /* need more data */
}
}
if (!angel_data_read_mem(&acon->in, &acon->parse.mod, acon->parse.mod_len, err)) return FALSE;
if (!angel_data_read_mem(&acon->in, &acon->parse.action, acon->parse.action_len, err)) return FALSE;
if (!angel_data_read_mem(&acon->in, &acon->parse.error, acon->parse.error_len, err)) return FALSE;
if (!angel_data_read_mem(&acon->in, &acon->parse.data, acon->parse.data_len, err)) return FALSE;
if (!angel_dispatch(acon, err)) return FALSE;
g_string_truncate(acon->parse.error, 0);
g_string_truncate(acon->parse.data, 0);
g_array_set_size(acon->parse.fds, 0);
}
}
static void angel_connection_io_cb(struct ev_loop *loop, ev_io *w, int revents) {
angel_connection *acon = (angel_connection*) w->data;
if (revents | EV_WRITE) {
GString *out_str;
int i;
ssize_t written, len;
gchar *data;
gboolean out_queue_empty;
angel_connection_send_item_t *send_item;
g_static_mutex_lock(&acon->mutex);
send_item = g_queue_peek_head(acon->out);
g_static_mutex_unlock(&acon->mutex);
for (i = 0; send_item && (i < 10); i++) { /* don't send more than 10 chunks */
switch (send_item->type) {
case ANGEL_CONNECTION_ITEM_GSTRING:
out_str = send_item->value.string.buf;
written = send_item->value.string.pos;
data = out_str->str + written;
len = out_str->len - written;
while (len > 0) {
written = write(w->fd, data, len);
if (written < 0) {
switch (errno) {
case EINTR:
continue;
case EAGAIN:
#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
goto write_eagain;
default: /* Fatal error, connection has to be closed */
ev_async_stop(loop, &acon->out_notify_watcher);
ev_io_stop(loop, &acon->fd_watcher);
acon->close_cb(acon, NULL); /* TODO: set err */
return;
}
} else {
data += written;
len -= written;
send_item->value.string.pos += written;
}
}
break;
case ANGEL_CONNECTION_ITEM_FDS:
while (send_item->value.fds.pos < send_item->value.fds.fds->len) {
switch (send_fd(w->fd, g_array_index(send_item->value.fds.fds, int, send_item->value.fds.pos))) {
case 0: continue;
case -1: /* Fatal error, connection has to be closed */
ev_async_stop(loop, &acon->out_notify_watcher);
ev_io_stop(loop, &acon->fd_watcher);
acon->close_cb(acon, NULL); /* TODO: set err */
return;
case -2: goto write_eagain;
}
send_item->value.fds.pos++;
}
break;
}
send_queue_item_free(send_item);
g_static_mutex_lock(&acon->mutex);
g_queue_pop_head(acon->out);
send_item = g_queue_peek_head(acon->out);
g_static_mutex_unlock(&acon->mutex);
}
write_eagain:
g_static_mutex_lock(&acon->mutex);
send_queue_clean(acon->out);
out_queue_empty = (0 == acon->out->length);
g_static_mutex_unlock(&acon->mutex);
if (out_queue_empty) ev_io_rem_events(loop, w, EV_WRITE);
}
if (revents | EV_READ) {
GError *err = NULL;
if (!angel_connection_read(acon, &err)) {
ev_async_stop(loop, &acon->out_notify_watcher);
ev_io_stop(loop, &acon->fd_watcher);
acon->close_cb(acon, err);
}
}
}
static void angel_connection_out_notify_cb(struct ev_loop *loop, ev_async *w, int revents) {
angel_connection *acon = (angel_connection*) w->data;
UNUSED(revents);
ev_io_add_events(loop, &acon->fd_watcher, EV_WRITE);
}
/* create connection */
angel_connection* angel_connection_create(struct ev_loop *loop, int fd, gpointer data,
AngelReceiveCall recv_call, AngelReceiveResult recv_result, AngelCloseCallback close_cb) {
angel_connection *acon = g_slice_new0(angel_connection);
acon->data = data;
g_static_mutex_init(&acon->mutex);
acon->loop = loop;
acon->fd = fd;
acon->call_id_list = idlist_new(65535);
ev_io_init(&acon->fd_watcher, angel_connection_io_cb, fd, EV_READ);
acon->fd_watcher.data = acon;
ev_async_init(&acon->out_notify_watcher, angel_connection_out_notify_cb);
acon->out_notify_watcher.data = acon;
acon->out = g_queue_new();
acon->in.data = g_string_sized_new(0);
acon->in.pos = 0;
acon->recv_call = recv_call;
acon->recv_result = recv_result;
acon->close_cb = close_cb;
return acon;
}
static void angel_call_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
angel_call* call = (angel_call*) w->data;
angel_connection *acon = call->acon;
AngelCallback cb = NULL;
gpointer ctx;
UNUSED(loop); UNUSED(revents);
g_static_mutex_lock(&acon->mutex);
g_ptr_array_index(acon->call_table, call->id) = NULL;
if (NULL == (cb = call->callback)) {
g_slice_free(angel_call, call);
}
ctx = call->context;
g_static_mutex_unlock(&acon->mutex);
if (cb) cb(ctx, TRUE, NULL, NULL, NULL);
}
angel_call *angel_call_create(AngelCallback callback, ev_tstamp timeout) {
angel_call* call = g_slice_new0(angel_call);
g_assert(NULL != callback);
call->callback = callback;
ev_timer_init(&call->timeout_watcher, angel_call_timeout_cb, 0, timeout);
call->timeout_watcher.data = call;
call->id = -1;
return call;
}
/* returns TRUE if a call was cancelled */
gboolean angel_call_free(angel_call *call) {
gboolean r = FALSE;
if (call->acon) {
angel_connection *acon = call->acon;
g_static_mutex_lock(&acon->mutex);
if (-1 != call->id) {
r = TRUE;
call->callback = NULL;
} else {
g_slice_free(angel_call, call);
}
g_static_mutex_unlock(&acon->mutex);
} else {
g_slice_free(angel_call, call);
}
return r;
}
static gboolean prepare_call_header(GString **pbuf,
gint32 type, gint32 id,
const gchar *mod, gsize mod_len, const gchar *action, gsize action_len,
gsize error_len, gsize data_len, gsize fd_count, GError **err) {
GString *buf;
buf = g_string_sized_new(8*4 + mod_len + action_len);
*pbuf = buf;
if (!angel_data_write_int32(buf, ANGEL_MAGIC, err)) return FALSE;
if (!angel_data_write_int32(buf, type, err)) return FALSE;
if (!angel_data_write_int32(buf, id, err)) return FALSE;
if (!angel_data_write_int32(buf, mod_len, err)) return FALSE;
if (!angel_data_write_int32(buf, action_len, err)) return FALSE;
if (!angel_data_write_int32(buf, error_len, err)) return FALSE;
if (!angel_data_write_int32(buf, data_len, err)) return FALSE;
if (!angel_data_write_int32(buf, fd_count, err)) return FALSE;
g_string_append_len(buf, mod, mod_len);
g_string_append_len(buf, action, action_len);
return TRUE;
}
gboolean angel_send_simple_call(
angel_connection *acon,
const gchar *mod, gsize mod_len, const gchar *action, gsize action_len,
GString *data,
GError **err) {
GString *buf = NULL;
gboolean queue_was_empty;
if (err && *err) goto error;
if (data->len > ANGEL_CALL_MAX_STR_LEN) {
g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_INVALID, "data too lang for angel call: %" G_GSIZE_FORMAT " > %i", data->len, ANGEL_CALL_MAX_STR_LEN);
goto error;
}
if (!prepare_call_header(&buf, ANGEL_CALL_SEND_SIMPLE, -1, mod, mod_len, action, action_len, 0, data->len, 0, err)) goto error;
g_static_mutex_lock(&acon->mutex);
queue_was_empty = (0 == acon->out->length);
send_queue_push_string(acon->out, buf);
send_queue_push_string(acon->out, data);
g_static_mutex_unlock(&acon->mutex);
if (queue_was_empty)
ev_async_send(acon->loop, &acon->out_notify_watcher);
return TRUE;
error:
if (data) g_string_free(data, TRUE);
if (buf) g_string_free(buf, TRUE);
return FALSE;
}
gboolean angel_send_call(
angel_connection *acon,
const gchar *mod, gsize mod_len, const gchar *action, gsize action_len,
angel_call *call,
GString *data,
GError **err) {
GString *buf = NULL;
gboolean queue_was_empty;
if (err && *err) goto error;
g_static_mutex_lock(&acon->mutex);
if (-1 != call->id) {
g_static_mutex_unlock(&acon->mutex);
g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_ALREADY_RUNNING, "call already running");
goto error_before_new_id;
}
if (-1 == (call->id = idlist_get(acon->call_id_list))) {
g_static_mutex_unlock(&acon->mutex);
g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_OUT_OF_CALL_IDS, "out of call ids");
goto error;
}
if ((guint) call->id >= acon->call_table->len) {
g_ptr_array_set_size(acon->call_table, call->id + 1);
}
g_ptr_array_index(acon->call_table, call->id) = call;
g_static_mutex_unlock(&acon->mutex);
if (data->len > ANGEL_CALL_MAX_STR_LEN) {
g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_INVALID, "data too lang for angel call: %" G_GSIZE_FORMAT " > %i", data->len, ANGEL_CALL_MAX_STR_LEN);
goto error;
}
if (!prepare_call_header(&buf, ANGEL_CALL_SEND_CALL, call->id, mod, mod_len, action, action_len, 0, data->len, 0, err)) goto error;
g_static_mutex_lock(&acon->mutex);
queue_was_empty = (0 == acon->out->length);
send_queue_push_string(acon->out, buf);
send_queue_push_string(acon->out, data);
g_static_mutex_unlock(&acon->mutex);
if (queue_was_empty)
ev_async_send(acon->loop, &acon->out_notify_watcher);
return TRUE;
error:
if (-1 != call->id) {
idlist_put(acon->call_id_list, call->id);
call->id = -1;
}
error_before_new_id:
if (data) g_string_free(data, TRUE);
if (buf) g_string_free(buf, TRUE);
return FALSE;
}
gboolean angel_send_result(
angel_connection *acon,
const gchar *mod, gsize mod_len, const gchar *action, gsize action_len,
gint32 id,
GString *error, GString *data, GArray *fds,
GError **err) {
GString *buf = NULL;
gboolean queue_was_empty;
if (err && *err) goto error;
if (data->len > ANGEL_CALL_MAX_STR_LEN) {
g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_INVALID, "data too lang for angel call: %" G_GSIZE_FORMAT " > %i", data->len, ANGEL_CALL_MAX_STR_LEN);
goto error;
}
if (!prepare_call_header(&buf, ANGEL_CALL_SEND_RESULT, id, mod, mod_len, action, action_len, error->len, data->len, fds->len, err)) goto error;
g_static_mutex_lock(&acon->mutex);
queue_was_empty = (0 == acon->out->length);
send_queue_push_string(acon->out, buf);
send_queue_push_string(acon->out, error);
send_queue_push_string(acon->out, data);
send_queue_push_fds(acon->out, fds);
g_static_mutex_unlock(&acon->mutex);
if (queue_was_empty)
ev_async_send(acon->loop, &acon->out_notify_watcher);
return TRUE;
error:
if (data) g_string_free(data, TRUE);
if (buf) g_string_free(buf, TRUE);
return FALSE;
return FALSE;
}
/* free temporary needed memroy; call this once in while after some activity */
void angel_cleanup_tables(angel_connection *acon) {
UNUSED(acon);
/* TODO
guint max_used_id = idlist_cleanup(acon->call_id_list);
g_ptr_array_set_size(acon->call_id_list, max_used_id);
*/
}

39
src/angel_data.c

@ -1,5 +1,5 @@
#include <lighttpd/base.h>
#include <lighttpd/settings.h>
#include <lighttpd/angel_data.h>
/* error handling */
@ -92,35 +92,44 @@ gboolean angel_data_read_char (angel_buffer *buf, gchar *val, GError **err) {
return TRUE;
}
gboolean angel_data_read_mem (angel_buffer *buf, GString **val, gsize len, GError **err) {
GString *s;
g_return_val_if_fail(err == NULL || *err == NULL, FALSE);
if (buf->data->len - buf->pos < len) {
return error_eof(err, "string-data");
}
s = *val;
if (!s) {
*val = s = g_string_sized_new(len);
} else {
g_string_truncate(s, 0);
}
g_string_append_len(s, buf->data->str + buf->pos, len);
buf->pos += len;
return TRUE;
}
gboolean angel_data_read_str (angel_buffer *buf, GString **val, GError **err) {
gint32 ilen;
gsize len;
GString *s;
g_return_val_if_fail(err == NULL || *err == NULL, FALSE);
if (buf->data->len - buf->pos < sizeof(gint32)) {
return error_eof(err, "string-length");
}
memcpy(&ilen, buf->data->str + buf->pos, sizeof(len));
memcpy(&ilen, buf->data->str + buf->pos, sizeof(ilen));
buf->pos += sizeof(gint32);
if (ilen < 0 || ilen > ANGEL_DATA_MAX_STR_LEN) {
buf->pos -= sizeof(gint32);
g_set_error(err,
ANGEL_DATA_ERROR,
ANGEL_DATA_ERROR_INVALID_STRING_LENGTH,
"String length in buffer invalid: %i", (gint) ilen);
return FALSE;
}
len = (gsize) ilen;
if (buf->data->len - buf->pos < len) {
return error_eof(err, "string-data");
}
s = *val;
if (!s) {
*val = s = g_string_sized_new(len);
} else {
g_string_truncate(s, 0);
if (!angel_data_read_mem(buf, val, (gsize) ilen, err)) {
buf->pos -= sizeof(gint32);
return FALSE;
}
g_string_append_len(s, buf->data->str + buf->pos, len);
buf->pos += len;
return TRUE;
}

9
src/idlist.c

@ -97,6 +97,15 @@ gint idlist_get(idlist *l) {
return newid;
}
gboolean idlist_is_used(idlist *l, gint id) {
GArray *a = l->bitvector;
guint ndx = id / UL_BITS, bndx = id % UL_BITS;
gulong bmask = 1 << bndx;
if (id < 0 || ndx > a->len) return FALSE;
return (0 != (g_array_index(a, gulong, ndx) & (bmask)));
}
void idlist_put(idlist *l, gint id) {
clear_bit(l->bitvector, id);

64
src/utils.c

@ -6,26 +6,76 @@
#include <stdio.h>
#include <fcntl.h>
#include <stropts.h>
void fatal(const gchar* msg) {
fprintf(stderr, "%s\n", msg);
abort();
}
void fd_init(int fd) {
#ifdef _WIN32
void fd_no_block(int fd) {
#ifdef O_NONBLOCK
fcntl(fd, F_SETFL, O_NONBLOCK | O_RDWR);
#elif defined _WIN32
int i = 1;
ioctlsocket(fd, FIONBIO, &i);
#else
#error No way found to set non-blocking mode for fds.
#endif
#ifdef FD_CLOEXEC
/* close fd on exec (cgi) */
fcntl(fd, F_SETFD, FD_CLOEXEC);
#endif
}
void fd_block(int fd) {
#ifdef O_NONBLOCK
fcntl(fd, F_SETFL, O_NONBLOCK | O_RDWR);
fcntl(fd, F_SETFL, O_RDWR);
#elif defined _WIN32
int i = 0;
ioctlsocket(fd, FIONBIO, &i);
#else
#error No way found to set blocking mode for fds.
#endif
}
void fd_init(int fd) {
#ifdef FD_CLOEXEC
/* close fd on exec (cgi) */
fcntl(fd, F_SETFD, FD_CLOEXEC);
#endif
fd_no_block(fd);
}
#ifndef _WIN32
int send_fd(int s, int fd) { /* write fd to unix socket s */
for ( ;; ) {
if (-1 == ioctl(s, I_SENDFD, fd)) {
switch (errno) {
case EINTR: break;
case EAGAIN: return -2;
default: return -1;
}
} else {
return 0;
}
}
}
int receive_fd(int s, int *fd) { /* read fd from unix socket s */
struct strrecvfd res;
memset(&res, 0, sizeof(res));
for ( ;; ) {
if (-1 == ioctl(s, I_RECVFD, &res)) {
switch (errno) {
case EINTR: break;
case EAGAIN: return -2;
default: return -1;
}
} else {
*fd = res.fd;
}
}
}
#endif
void ev_io_add_events(struct ev_loop *loop, ev_io *watcher, int events) {
if ((watcher->events & events) == events) return;
ev_io_stop(loop, watcher);

Loading…
Cancel
Save