Browse Source

Basic connection state machine

personal/stbuehler/wip
Stefan Bühler 14 years ago
parent
commit
2fbc7b1408
  1. 7
      src/actions.c
  2. 8
      src/base.h
  3. 9
      src/chunk.c
  4. 4
      src/condition.c
  5. 2
      src/config_lua.c
  6. 264
      src/connection.c
  7. 66
      src/connection.h
  8. 41
      src/http_headers.c
  9. 8
      src/http_headers.h
  10. 2
      src/http_request_parser.rl
  11. 4
      src/lighttpd.c
  12. 4
      src/network.c
  13. 75
      src/plugin_core.c
  14. 35
      src/request.c
  15. 2
      src/request.h
  16. 53
      src/response.c
  17. 21
      src/response.h
  18. 12
      src/server.c
  19. 1
      src/server.h
  20. 1
      src/settings.h
  21. 21
      src/utils.c
  22. 3
      src/utils.h
  23. 2
      src/wscript

7
src/actions.c

@ -12,6 +12,7 @@ struct action_stack_element {
};
void action_release(server *srv, action *a) {
if (!a) return;
guint i;
assert(a->refcount > 0);
if (!(--a->refcount)) {
@ -162,10 +163,10 @@ action_result action_execute(server *srv, connection *con) {
res = a->value.function.func(srv, con, a->value.function.param);
switch (res) {
case ACTION_GO_ON:
break;
case ACTION_FINISHED:
break;
case ACTION_ERROR:
action_stack_clear(srv, as);
action_stack_reset(srv, as);
return res;
case ACTION_WAIT_FOR_EVENT:
return ACTION_WAIT_FOR_EVENT;
@ -182,5 +183,5 @@ action_result action_execute(server *srv, connection *con) {
}
ase->pos++;
}
return ACTION_GO_ON;
return ACTION_FINISHED;
}

8
src/base.h

@ -5,6 +5,13 @@
#define CONST_STR_LEN(x) x, x ? sizeof(x) - 1 : 0
#define GSTR_LEN(x) x ? x->str : "", x ? x->len : 0
typedef enum {
HTTP_TRANSFER_ENCODING_IDENTITY,
HTTP_TRANSFER_ENCODING_CHUNKED
} transfer_encoding_t;
struct server;
typedef struct server server;
@ -17,6 +24,7 @@ typedef struct connection connection;
#include "options.h"
#include "plugin.h"
#include "request.h"
#include "response.h"
#include "log.h"
#include "connection.h"

9
src/chunk.c

@ -17,12 +17,13 @@ static chunkfile *chunkfile_new(GString *name, int fd, gboolean is_temp) {
}
static void chunkfile_acquire(chunkfile *cf) {
// assert(cf->refcount > 0)
assert(cf->refcount > 0);
cf->refcount++;
}
static void chunkfile_release(chunkfile *cf) {
// assert(cf->refcount > 0)
if (!cf) return;
assert(cf->refcount > 0);
if (!(--cf->refcount)) {
if (-1 != cf->fd) close(cf->fd);
cf->fd = -1;
@ -214,8 +215,8 @@ static void chunk_free(chunk *c) {
c->mem = NULL;
if (c->file.file) chunkfile_release(c->file.file);
c->file.file = NULL;
if (c->file.mmap.data) munmap(c->file.mmap.data, c->file.mmap.length);
c->file.mmap.data = NULL;
if (c->file.mmap.data != MAP_FAILED) munmap(c->file.mmap.data, c->file.mmap.length);
c->file.mmap.data = MAP_FAILED;
g_slice_free(chunk, c);
}

4
src/condition.c

@ -63,6 +63,7 @@ void condition_lvalue_acquire(condition_lvalue *lvalue) {
}
void condition_lvalue_release(condition_lvalue *lvalue) {
if (!lvalue) return;
assert(lvalue->refcount > 0);
if (!(--lvalue->refcount)) {
if (lvalue->key) g_string_free(lvalue->key, TRUE);
@ -194,7 +195,8 @@ void condition_acquire(condition *c) {
void condition_release(server *srv, condition* c) {
UNUSED(srv);
/* assert(c->recount > 0); */
if (!c) return;
assert(c->refcount > 0);
if (!(--c->refcount)) {
condition_free(c);
}

2
src/config_lua.c

@ -211,7 +211,7 @@ gboolean config_lua_load(server *srv, const gchar *filename) {
lua_pop(L, 1); /* pop the ret-value */
lua_getfield(L, LUA_GLOBALSINDEX, "action");
lua_getfield(L, LUA_GLOBALSINDEX, "actions");
srv->mainaction = lua_get_action(L, -1);
lua_pop(L, 1);

264
src/connection.c

@ -1,16 +1,52 @@
#include "connection.h"
#include "network.h"
#include "log.h"
#include "utils.h"
void con_put(server *srv, connection *con); /* server.c */
void internal_error(server *srv, connection *con) {
if (con->response_headers_sent) {
CON_ERROR(srv, con, "%s", "Couldn't send '500 Internal Error': headers already sent");
connection_set_state(srv, con, CON_STATE_ERROR);
} else {
http_headers_reset(con->response.headers);
con->response.http_status = 500;
con->content_handler = NULL;
connection_set_state(srv, con, CON_STATE_WRITE_RESPONSE);
}
}
static void parse_request_body(server *srv, connection *con) {
if (con->state == CON_STATE_HANDLE_RESPONSE && !con->in->is_closed) {
/* TODO: parse chunked encoded request body */
if (con->in->bytes_in < con->request.content_length) {
chunkqueue_steal_len(con->in, con->raw_in, con->request.content_length - con->in->bytes_in);
if (con->in->bytes_in == con->request.content_length) con->in->is_closed = TRUE;
} else if (con->request.content_length == -1) {
UNUSED(srv);
if ( con->state >= CON_STATE_READ_REQUEST_CONTENT
&& con->state <= CON_STATE_WRITE_RESPONSE
&& !con->in->is_closed) {
if (con->request.content_length == -1) {
/* TODO: parse chunked encoded request body, filters */
chunkqueue_steal_all(con->in, con->raw_in);
} else {
if (con->in->bytes_in < con->request.content_length) {
chunkqueue_steal_len(con->in, con->raw_in, con->request.content_length - con->in->bytes_in);
}
if (con->in->bytes_in == con->request.content_length) con->in->is_closed = TRUE;
}
}
}
static void forward_response_body(server *srv, connection *con) {
UNUSED(srv);
if (con->state == CON_STATE_WRITE_RESPONSE && !con->raw_out->is_closed) {
if (con->out->length > 0) {
/* TODO: chunked encoding, filters */
chunkqueue_steal_all(con->raw_out, con->out);
}
if (con->in->is_closed && 0 == con->raw_out->length)
con->raw_out->is_closed = TRUE;
if (con->raw_out->length > 0) {
ev_io_add_events(srv->loop, &con->sock.watcher, EV_WRITE);
} else {
ev_io_rem_events(srv->loop, &con->sock.watcher, EV_WRITE);
}
}
}
@ -19,109 +55,295 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
connection_socket *con_sock = (connection_socket*) w->data;
server *srv = con_sock->srv;
connection *con = con_sock->con;
gboolean dojoblist = FALSE;
UNUSED(loop);
if (revents && EV_READ) {
if (revents & EV_READ) {
if (con->in->is_closed) {
/* don't read the next request before current one is done */
ev_io_set(w, w->fd, w->events && ~EV_READ);
} else {
switch(network_read(srv, con, w->fd, con->raw_in)) {
switch (network_read(srv, con, w->fd, con->raw_in)) {
case NETWORK_STATUS_SUCCESS:
parse_request_body(srv, con);
joblist_append(srv, con);
dojoblist = TRUE;
break;
case NETWORK_STATUS_FATAL_ERROR:
connection_set_state(srv, con, CON_STATE_ERROR);
joblist_append(srv, con);
dojoblist = TRUE;
break;
case NETWORK_STATUS_CONNECTION_CLOSE:
connection_set_state(srv, con, CON_STATE_CLOSE);
joblist_append(srv, con);
dojoblist = TRUE;
break;
case NETWORK_STATUS_WAIT_FOR_EVENT:
break;
case NETWORK_STATUS_WAIT_FOR_AIO_EVENT:
/* TODO ? */
ev_io_set(w, w->fd, w->events && ~EV_READ);
CON_TRACE(srv, con, "%s", "stop read");
ev_io_rem_events(loop, w, EV_READ);
break;
case NETWORK_STATUS_WAIT_FOR_FD:
/* TODO */
CON_TRACE(srv, con, "%s", "stop read");
ev_io_rem_events(loop, w, EV_READ);
break;
}
}
}
if (revents && EV_WRITE) {
if (revents & EV_WRITE) {
if (con->raw_out->length > 0) {
network_write(srv, con, w->fd, con->raw_out);
joblist_append(srv, con);
CON_TRACE(srv, con, "cq->len: raw_out=%i, out=%i", (int) con->raw_out->length, (int) con->out->length);
dojoblist = TRUE;
}
if (con->raw_out->length == 0) {
ev_io_set(w, w->fd, w->events && ~EV_WRITE);
CON_TRACE(srv, con, "%s", "stop write");
ev_io_rem_events(loop, w, EV_WRITE);
dojoblist = TRUE;
}
}
if (dojoblist)
joblist_append(srv, con);
}
connection* connection_new(server *srv) {
connection *con = g_slice_new0(connection);
UNUSED(srv);
con->state = CON_STATE_REQUEST_START;
con->response_headers_sent = FALSE;
con->raw_in = chunkqueue_new();
con->raw_out = chunkqueue_new();
con->in = chunkqueue_new();
con->out = chunkqueue_new();
con->sock.srv = srv; con->sock.con = con; con->sock.watcher.data = con;
ev_io_init(&con->sock.watcher, connection_cb, -1, 0);
con->sock.srv = srv; con->sock.con = con; con->sock.watcher.data = &con->sock;
con->remote_addr_str = g_string_sized_new(0);
con->local_addr_str = g_string_sized_new(0);
con->keep_alive = TRUE;
action_stack_init(&con->action_stack);
request_init(&con->request, con->raw_in);
response_init(&con->response);
return con;
}
void connection_reset(server *srv, connection *con) {
con->state = CON_STATE_REQUEST_START;
con->response_headers_sent = FALSE;
chunkqueue_reset(con->raw_in);
chunkqueue_reset(con->raw_out);
chunkqueue_reset(con->in);
chunkqueue_reset(con->out);
ev_io_stop(srv->loop, &con->sock.watcher);
close(con->sock.watcher.fd);
if (con->sock.watcher.fd != -1) {
shutdown(con->sock.watcher.fd, SHUT_RDWR);
close(con->sock.watcher.fd);
}
ev_io_set(&con->sock.watcher, -1, 0);
g_string_truncate(con->remote_addr_str, 0);
g_string_truncate(con->local_addr_str, 0);
con->keep_alive = TRUE;
action_stack_reset(srv, &con->action_stack);
request_reset(&con->request);
response_reset(&con->response);
}
void connection_reset_keep_alive(server *srv, connection *con) {
con->state = CON_STATE_REQUEST_START;
con->response_headers_sent = FALSE;
// chunkqueue_reset(con->raw_in);
// chunkqueue_reset(con->raw_out);
con->raw_out->is_closed = FALSE;
chunkqueue_reset(con->in);
chunkqueue_reset(con->out);
ev_io_set_events(srv->loop, &con->sock.watcher, EV_READ);
g_string_truncate(con->remote_addr_str, 0);
g_string_truncate(con->local_addr_str, 0);
con->keep_alive = TRUE;
action_stack_reset(srv, &con->action_stack);
request_reset(&con->request);
response_reset(&con->response);
}
void connection_free(server *srv, connection *con) {
con->state = CON_STATE_REQUEST_START;
con->response_headers_sent = FALSE;
chunkqueue_free(con->raw_in);
chunkqueue_free(con->raw_out);
chunkqueue_free(con->in);
chunkqueue_free(con->out);
ev_io_stop(srv->loop, &con->sock.watcher);
close(con->sock.watcher.fd);
if (con->sock.watcher.fd != -1) {
shutdown(con->sock.watcher.fd, SHUT_RDWR);
close(con->sock.watcher.fd);
}
ev_io_set(&con->sock.watcher, -1, 0);
g_string_free(con->remote_addr_str, TRUE);
g_string_free(con->local_addr_str, TRUE);
con->keep_alive = TRUE;
action_stack_clear(srv, &con->action_stack);
request_clear(&con->request);
response_clear(&con->response);
g_slice_free(connection, con);
}
void connection_set_state(server *srv, connection *con, connection_state_t state) {
if (state < con->state) {
CON_ERROR(srv, con, "Cannot move into requested state: %i => %i, move to error state", con->state, state);
state = CON_STATE_ERROR;
}
con->state = state;
}
void connection_state_machine(server *srv, connection *con) {
gboolean done = FALSE;
do {
switch (con->state) {
case CON_STATE_REQUEST_START:
/* TODO: reset some values after keep alive - or do it in CON_STATE_REQUEST_END */
connection_set_state(srv, con, CON_STATE_READ_REQUEST_HEADER);
action_enter(con, srv->mainaction);
break;
case CON_STATE_READ_REQUEST_HEADER:
TRACE(srv, "%s", "reading request header");
switch(http_request_parse(srv, con, &con->request.parser_ctx)) {
case HANDLER_FINISHED:
case HANDLER_GO_ON:
connection_set_state(srv, con, CON_STATE_VALIDATE_REQUEST_HEADER);
break;
case HANDLER_WAIT_FOR_FD:
/* TODO */
done = TRUE;
break;
case HANDLER_WAIT_FOR_EVENT:
done = TRUE;
break;
case HANDLER_ERROR:
case HANDLER_COMEBACK: /* unexpected */
/* unparsable header */
connection_set_state(srv, con, CON_STATE_ERROR);
break;
}
break;
case CON_STATE_VALIDATE_REQUEST_HEADER:
TRACE(srv, "%s", "validating request header");
connection_set_state(srv, con, CON_STATE_HANDLE_REQUEST_HEADER);
request_validate_header(srv, con);
break;
case CON_STATE_HANDLE_REQUEST_HEADER:
TRACE(srv, "%s", "handle request header");
switch (action_execute(srv, con)) {
case ACTION_WAIT_FOR_EVENT:
done = TRUE;
break;
case ACTION_GO_ON:
case ACTION_FINISHED:
if (con->state == CON_STATE_HANDLE_REQUEST_HEADER) {
internal_error(srv, con);
}
connection_set_state(srv, con, CON_STATE_WRITE_RESPONSE);
break;
case ACTION_ERROR:
/* action return error */
/* TODO: return 500 instead ? */
connection_set_state(srv, con, CON_STATE_ERROR);
break;
}
break;
case CON_STATE_READ_REQUEST_CONTENT:
case CON_STATE_HANDLE_RESPONSE_HEADER:
TRACE(srv, "%s", "read request/handle response header");
parse_request_body(srv, con);
/* TODO: call plugin content_handler */
switch (action_execute(srv, con)) {
case ACTION_WAIT_FOR_EVENT:
done = TRUE;
break;
case ACTION_GO_ON:
case ACTION_FINISHED:
connection_set_state(srv, con, CON_STATE_WRITE_RESPONSE);
break;
case ACTION_ERROR:
/* action return error */
/* TODO: return 500 instead ? */
connection_set_state(srv, con, CON_STATE_ERROR);
break;
}
break;
case CON_STATE_WRITE_RESPONSE:
if (con->in->is_closed && con->raw_out->is_closed) {
connection_set_state(srv, con, CON_STATE_RESPONSE_END);
break;
}
if (!con->response_headers_sent) {
con->response_headers_sent = TRUE;
TRACE(srv, "%s", "write response headers");
response_send_headers(srv, con);
}
TRACE(srv, "%s", "write response");
parse_request_body(srv, con);
/* TODO: call plugin content_handler */
forward_response_body(srv, con);
if (con->in->is_closed && con->raw_out->is_closed) {
connection_set_state(srv, con, CON_STATE_RESPONSE_END);
break;
}
if (con->state == CON_STATE_WRITE_RESPONSE) done = TRUE;
break;
case CON_STATE_RESPONSE_END:
TRACE(srv, "%s", "response end");
/* TODO: call plugin callbacks */
if (con->keep_alive) {
connection_reset_keep_alive(srv, con);
} else {
con_put(srv, con);
done = TRUE;
}
break;
case CON_STATE_CLOSE:
TRACE(srv, "%s", "connection closed");
/* TODO: call plugin callbacks */
con_put(srv, con);
done = TRUE;
break;
case CON_STATE_ERROR:
TRACE(srv, "%s", "connection closed (error)");
/* TODO: call plugin callbacks */
con_put(srv, con);
done = TRUE;
break;
}
} while (!done);
}
void connection_handle_direct(server *srv, connection *con) {
connection_set_state(srv, con, CON_STATE_WRITE_RESPONSE);
con->out->is_closed = TRUE;
}
void connection_handle_indirect(server *srv, connection *con, plugin *p) {
connection_set_state(srv, con, CON_STATE_READ_REQUEST_CONTENT);
con->content_handler = p;
}

66
src/connection.h

@ -4,13 +4,54 @@
#include "base.h"
typedef enum {
CON_STATE_REQUEST_START, /** after the connect, the request is initialized, keep-alive starts here again */
CON_STATE_READ_REQUEST_HEADER, /** loop in the read-request-header until the full header is received */
CON_STATE_VALIDATE_REQUEST_HEADER, /** validate the request-header */
CON_STATE_HANDLE_RESPONSE, /** find a handler for the request */
CON_STATE_RESPONSE_END, /** successful request, connection closed */
CON_STATE_ERROR, /** fatal error, connection closed */
CON_STATE_CLOSE /** connection reset by peer */
/** after the connect, the request is initialized, keep-alive starts here again */
CON_STATE_REQUEST_START,
/** loop in the read-request-header until the full header is received */
CON_STATE_READ_REQUEST_HEADER,
/** validate the request-header */
CON_STATE_VALIDATE_REQUEST_HEADER,
/** find a handler for the request; there are two ways to produce responses:
* - direct response: for things like errors/auth/redirect
* just set the status code, perhaps fill in some headers,
* append your content (if any) to the queue and do:
* connection_handle_direct(srv, con);
* this moves into the CON_STATE_HANDLE_RESPONSE_HEADER
* request body gets dropped
* - indirect response: you register your plugin as the content handler:
* connection_handle_indirect(srv, con, plugin);
* this moves into the CON_STATE_READ_REQUEST_CONTENT state automatically
* as soon as you build the response headers (e.g. from a backend),
* change to the CON_STATE_HANDLE_RESPONSE_HEADER state:
* connection_set_state(srv, con, CON_STATE_HANDLE_RESPONSE_HEADER);
*/
CON_STATE_HANDLE_REQUEST_HEADER,
/** start forwarding the request content to the handler;
* any filter for the request content must register before a handler
* for the response content registers
*/
CON_STATE_READ_REQUEST_CONTENT,
/** response headers available; this is were compress/deflate should register
* their response content filters
* if all actions are done (or one returns HANDLER_FINISHED) we start
* writing the response content
*/
CON_STATE_HANDLE_RESPONSE_HEADER,
/** start sending content */
CON_STATE_WRITE_RESPONSE,
/** successful request, connection closed - final state */
CON_STATE_RESPONSE_END,
/** connection reset by peer - final state */
CON_STATE_CLOSE,
/** fatal error, connection closed - final state */
CON_STATE_ERROR
} connection_state_t;
struct connection_socket;
@ -25,6 +66,7 @@ struct connection_socket {
struct connection {
guint idx; /** index in connection table */
connection_state_t state;
gboolean response_headers_sent;
chunkqueue *raw_in, *raw_out;
chunkqueue *in, *out;
@ -32,7 +74,7 @@ struct connection {
connection_socket sock;
sock_addr remote_addr, local_addr;
GString *remote_addr_str, *local_addr_str;
gboolean is_ssl;
gboolean is_ssl, keep_alive;
action_stack action_stack;
@ -41,6 +83,10 @@ struct connection {
request request;
physical physical;
response response;
plugin *content_handler;
struct log_t *log;
gint log_level;
};
@ -50,5 +96,9 @@ LI_API void connection_reset(server *srv, connection *con);
LI_API void connection_free(server *srv, connection *con);
LI_API void connection_set_state(server *srv, connection *con, connection_state_t state);
LI_API void connection_state_machine(server *srv, connection *con);
LI_API void connection_handle_direct(server *srv, connection *con);
LI_API void connection_handle_indirect(server *srv, connection *con, plugin *p);
#endif

41
src/http_headers.c

@ -25,73 +25,74 @@ void http_headers_free(http_headers* headers) {
/* Just insert the header (using lokey)
*/
static void header_insert(http_headers *headers, GString *lokey, GString *key, GString *value) {
GString *newval = g_string_sized_new(key->len + value->len + 2);
static void header_insert(http_headers *headers, GString *lokey,
const gchar *key, size_t keylen, const gchar *value, size_t valuelen) {
GString *newval = g_string_sized_new(keylen + valuelen + 2);
g_string_append_len(newval, key->str, key->len);
g_string_append_len(newval, key, keylen);
g_string_append_len(newval, ": ", 2);
g_string_append_len(newval, value->str, value->len);
g_string_append_len(newval, value, valuelen);
g_hash_table_insert(headers->table, lokey, newval);
}
/** If header does not exist, just insert normal header. If it exists, append (", %s", value) */
void http_header_append(http_headers *headers, GString *key, GString *value) {
void http_header_append(http_headers *headers, const gchar *key, size_t keylen, const gchar *value, size_t valuelen) {
GString *lokey, *tval;
lokey = g_string_new_len(key->str, key->len);
lokey = g_string_new_len(key, keylen);
g_string_ascii_down(lokey);
tval = (GString*) g_hash_table_lookup(headers->table, lokey);
if (!tval) {
header_insert(headers, lokey, key, value);
header_insert(headers, lokey, key, keylen, value, valuelen);
} else {
g_string_free(lokey, TRUE);
g_string_append_len(tval, ", ", 2);
g_string_append_len(tval, value->str, value->len);
g_string_append_len(tval, value, valuelen);
}
}
/** If header does not exist, just insert normal header. If it exists, append ("\r\n%s: %s", key, value) */
void http_header_insert(http_headers *headers, GString *key, GString *value) {
void http_header_insert(http_headers *headers, const gchar *key, size_t keylen, const gchar *value, size_t valuelen) {
GString *lokey, *tval;
lokey = g_string_new_len(key->str, key->len);
lokey = g_string_new_len(key, keylen);
g_string_ascii_down(lokey);
tval = (GString*) g_hash_table_lookup(headers->table, lokey);
if (!tval) {
header_insert(headers, lokey, key, value);
header_insert(headers, lokey, key, keylen, value, valuelen);
} else {
g_string_free(lokey, TRUE);
g_string_append_len(tval, "\r\n", 2);
g_string_append_len(tval, key->str, key->len);
g_string_append_len(tval, key, keylen);
g_string_append_len(tval, ": ", 2);
g_string_append_len(tval, value->str, value->len);
g_string_append_len(tval, value, valuelen);
}
}
/** If header does not exist, just insert normal header. If it exists, overwrite the value */
void http_header_overwrite(http_headers *headers, GString *key, GString *value) {
void http_header_overwrite(http_headers *headers, const gchar *key, size_t keylen, const gchar *value, size_t valuelen) {
GString *lokey, *tval;
lokey = g_string_new_len(key->str, key->len);
lokey = g_string_new_len(key, keylen);
g_string_ascii_down(lokey);
tval = (GString*) g_hash_table_lookup(headers->table, lokey);
if (!tval) {
header_insert(headers, lokey, key, value);
header_insert(headers, lokey, key, keylen, value, valuelen);
} else {
g_string_free(lokey, TRUE);
g_string_truncate(tval, 0);
g_string_append_len(tval, key->str, key->len);
g_string_append_len(tval, key, keylen);
g_string_append_len(tval, ": ", 2);
g_string_append_len(tval, value->str, value->len);
g_string_append_len(tval, value, valuelen);
}
}
LI_API gboolean http_header_remove(http_headers *headers, GString *key) {
LI_API gboolean http_header_remove(http_headers *headers, const gchar *key, size_t keylen) {
GString *lokey;
gboolean res;
lokey = g_string_new_len(key->str, key->len);
lokey = g_string_new_len(key, keylen);
g_string_ascii_down(lokey);
res = g_hash_table_remove(headers->table, lokey);
g_string_free(lokey, TRUE);

8
src/http_headers.h

@ -18,11 +18,11 @@ LI_API void http_headers_reset(http_headers* headers);
LI_API void http_headers_free(http_headers* headers);
/** If header does not exist, just insert normal header. If it exists, append (", %s", value) */
LI_API void http_header_append(http_headers *headers, GString *key, GString *value);
LI_API void http_header_append(http_headers *headers, const gchar *key, size_t keylen, const gchar *value, size_t valuelen);
/** If header does not exist, just insert normal header. If it exists, append ("\r\n%s: %s", key, value) */
LI_API void http_header_insert(http_headers *headers, GString *key, GString *value);
LI_API void http_header_insert(http_headers *headers, const gchar *key, size_t keylen, const gchar *value, size_t valuelen);
/** If header does not exist, just insert normal header. If it exists, overwrite the value */
LI_API void http_header_overwrite(http_headers *headers, GString *key, GString *value);
LI_API gboolean http_header_remove(http_headers *headers, GString *key);
LI_API void http_header_overwrite(http_headers *headers, const gchar *key, size_t keylen, const gchar *value, size_t valuelen);
LI_API gboolean http_header_remove(http_headers *headers, const gchar *key, size_t keylen);
#endif

2
src/http_request_parser.rl

@ -30,7 +30,7 @@
getStringTo(fpc, ctx->h_value);
}
action header {
http_header_insert(ctx->request->headers, ctx->h_key, ctx->h_value);
http_header_insert(ctx->request->headers, GSTR_LEN(ctx->h_key), GSTR_LEN(ctx->h_value));
}
# RFC 2616

4
src/lighttpd.c

@ -116,7 +116,9 @@ int main(int argc, char *argv[]) {
log_warning(srv, NULL, "test %s", "foo2");
log_debug(srv, NULL, "test %s", "message");
log_thread_start(srv);
sleep(3);
server_start(srv);
log_error(srv, NULL, "error %d", 23);
g_atomic_int_set(&srv->rotate_logs, TRUE);
log_warning(srv, NULL, "test %s", "foo3");

4
src/network.c

@ -83,10 +83,11 @@ network_status_t network_read(server *srv, connection *con, int fd, chunkqueue *
const ssize_t blocksize = 16*1024; /* 16k */
const off_t max_read = 16 * blocksize; /* 256k */
ssize_t r;
off_t len;
off_t len = 0;
do {
GString *buf = g_string_sized_new(blocksize);
g_string_set_size(buf, blocksize);
if (-1 == (r = net_read(fd, buf->str, blocksize))) {
g_string_free(buf, TRUE);
switch (errno) {
@ -107,6 +108,7 @@ network_status_t network_read(server *srv, connection *con, int fd, chunkqueue *
}
g_string_truncate(buf, r);
chunkqueue_append_string(cq, buf);
CON_TRACE(srv, con, "read (%i) '%s'", (int) r, buf->str);
len += r;
} while (r == blocksize && len < max_read);

75
src/plugin_core.c

@ -6,6 +6,12 @@ static action* core_list(server *srv, plugin* p, option *opt) {
guint i;
UNUSED(p);
if (opt->type == OPTION_ACTION) {
a = opt->value.opt_action.action;
action_acquire(a);
return a;
}
if (opt->type != OPTION_LIST) {
ERROR(srv, "expected list, got %s", option_type_string(opt->type));
return NULL;
@ -59,7 +65,7 @@ static action_result core_handle_static(server *srv, connection *con, gpointer p
UNUSED(param);
/* TODO */
CON_ERROR(srv, con, "%s", "Not implemented yet");
return HANDLER_ERROR;
return ACTION_ERROR;
}
static action* core_static(server *srv, plugin* p, option *opt) {
@ -72,13 +78,79 @@ static action* core_static(server *srv, plugin* p, option *opt) {
return action_new_function(core_handle_static, NULL, NULL);
}
static action_result core_handle_test(server *srv, connection *con, gpointer param) {
UNUSED(param);
if (con->state != CON_STATE_HANDLE_REQUEST_HEADER) return ACTION_GO_ON;
con->response.http_status = 200;
chunkqueue_append_string(con->out, con->request.uri.uri);
connection_handle_direct(srv, con);
CON_ERROR(srv, con, "%s", "Not implemented yet");
return ACTION_GO_ON;
}
static action* core_test(server *srv, plugin* p, option *opt) {
UNUSED(p);
if (opt) {
ERROR(srv, "%s", "static action doesn't have parameters");
return NULL;
}
return action_new_function(core_handle_test, NULL, NULL);
}
static gboolean core_listen(server *srv, plugin* p, option *opt) {
guint32 ipv4;
guint8 ipv6[16];
UNUSED(p);
if (opt->type != OPTION_STRING) {
ERROR(srv, "%s", "listen expects a string as parameter");
return FALSE;
}
if (parse_ipv4(opt->value.opt_string->str, &ipv4, NULL)) {
int s, val;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = ipv4;
addr.sin_port = htons(8080);
if (-1 == (s = socket(AF_INET, SOCK_STREAM, 0))) {
ERROR(srv, "Couldn't open socket: %s", g_strerror(errno));
return FALSE;
}
val = 1;
if (-1 == setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val))) {
close(s);
ERROR(srv, "Couldn't setsockopt(SO_REUSEADDR) to '%s': %s", inet_ntoa(*(struct in_addr*)&ipv4), g_strerror(errno));
return FALSE;
}
if (-1 == bind(s, &addr, sizeof(addr))) {
close(s);
ERROR(srv, "Couldn't bind socket to '%s': %s", inet_ntoa(*(struct in_addr*)&ipv4), g_strerror(errno));
return FALSE;
}
if (-1 == listen(s, 1000)) {
close(s);
ERROR(srv, "Couldn't listen on '%s': %s", inet_ntoa(*(struct in_addr*)&ipv4), g_strerror(errno));
return FALSE;
}
server_listen(srv, s);
TRACE(srv, "listen to ipv4: '%s'", inet_ntoa(*(struct in_addr*)&ipv4));
#ifdef HAVE_IPV6
} else if (parse_ipv6(opt->value.opt_string->str, ipv6, NULL)) {
/* TODO: IPv6 */
ERROR(srv, "%s", "IPv6 not supported yet");
return FALSE;
#endif
} else {
ERROR(srv, "Invalid ip: '%s'", opt->value.opt_string->str);
return FALSE;
}
TRACE(srv, "will listen to '%s'", opt->value.opt_string->str);
return TRUE;
}
@ -93,6 +165,7 @@ static const plugin_action actions[] = {
{ "list", core_list },
{ "when", core_when },
{ "static", core_static },
{ "test", core_test },
{ NULL, NULL }
};

35
src/request.c

@ -1,6 +1,5 @@
#include "base.h"
#include "request.h"
void request_init(request *req, chunkqueue *in) {
req->http_method = HTTP_METHOD_UNSET;
@ -58,3 +57,37 @@ void request_clear(request *req) {
http_request_parser_clear(&req->parser_ctx);
}
void request_validate_header(server *srv, connection *con) {
switch(con->request.http_method) {
case HTTP_METHOD_GET:
case HTTP_METHOD_HEAD:
/* content-length is forbidden for those */
if (con->request.content_length > 0) {
/* content-length is missing */
CON_ERROR(srv, con, "%s", "GET/HEAD with content-length -> 400");
con->keep_alive = FALSE;
con->response.http_status = 400;
connection_handle_direct(srv, con);
return;
}
con->request.content_length = 0;
break;
case HTTP_METHOD_POST:
/* content-length is required for them */
if (con->request.content_length == -1) {
/* content-length is missing */
CON_ERROR(srv, con, "%s", "POST-request, but content-length missing -> 411");
con->keep_alive = FALSE;
con->response.http_status = 411;
connection_handle_direct(srv, con);
return;
}
break;
default:
/* the may have a content-length */
break;
}
}

2
src/request.h

@ -85,4 +85,6 @@ LI_API void request_init(request *req, chunkqueue *in);
LI_API void request_reset(request *req);
LI_API void request_clear(request *req);
LI_API void request_validate_header(server *srv, connection *con);
#endif

53
src/response.c

@ -0,0 +1,53 @@
#include "base.h"
void response_init(response *resp) {
resp->headers = http_headers_new();
resp->http_status = 0;
resp->transfer_encoding = HTTP_TRANSFER_ENCODING_IDENTITY;
}
void response_reset(response *resp) {
http_headers_reset(resp->headers);
resp->http_status = 0;
resp->transfer_encoding = HTTP_TRANSFER_ENCODING_IDENTITY;
}
void response_clear(response *resp) {
http_headers_free(resp->headers);
resp->http_status = 0;
resp->transfer_encoding = HTTP_TRANSFER_ENCODING_IDENTITY;
}
void response_send_headers(server *srv, connection *con) {
GString *head = g_string_sized_new(4*1024);
if (con->request.http_version == HTTP_VERSION_1_1) {
g_string_append_len(head, CONST_STR_LEN("HTTP/1.1 "));
} else {
g_string_append_len(head, CONST_STR_LEN("HTTP/1.0 "));
}
if (con->response.http_status < 100 || con->response.http_status > 999) {
con->response.http_status = 500;
con->content_handler = NULL;
chunkqueue_reset(con->out);
}
if (0 == con->out->length && con->content_handler == NULL
&& con->response.http_status >= 400 && con->response.http_status < 600) {
chunkqueue_append_mem(con->out, CONST_STR_LEN("Custom error"));
}
if (con->content_handler == NULL) {
con->out->is_closed = TRUE;
}
g_string_append_printf(head, "%i XXX\r\n", con->response.http_status);
/* TODO: append headers */
g_string_append_len(head, CONST_STR_LEN("\r\n"));
chunkqueue_append_string(con->raw_out, head);
}

21
src/response.h

@ -0,0 +1,21 @@
#ifndef _LIGHTTPD_RESPONSE_H_
#define _LIGHTTPD_RESPONSE_H_
struct response;
typedef struct response response;
#include "http_headers.h"
struct response {
http_headers *headers;
int http_status;
transfer_encoding_t transfer_encoding;
};
LI_API void response_init(response *resp);
LI_API void response_reset(response *resp);
LI_API void response_clear(response *resp);
LI_API void response_send_headers(server *srv, connection *con);
#endif

12
src/server.c

@ -79,7 +79,7 @@ static connection* con_get(server *srv) {
return con;
}
static void con_put(server *srv, connection *con) {
void con_put(server *srv, connection *con) {
connection_reset(srv, con);
srv->connections_active--;
if (con->idx != srv->connections_active) {
@ -152,10 +152,18 @@ void server_start(server *srv) {
if (srv->state == SERVER_STOPPING || srv->state == SERVER_RUNNING) return; /* no restart after stop */
srv->state = SERVER_RUNNING;
if (!srv->mainaction) {
ERROR(srv, "%s", "No action handlers defined");
server_stop(srv);
return;
}
for (i = 0; i < srv->sockets->len; i++) {
server_socket *sock = g_array_index(srv->sockets, server_socket*, i);
ev_io_start(srv->loop, &sock->watcher);
}
ev_loop(srv->loop, 0);
}
void server_stop(server *srv) {
@ -170,5 +178,5 @@ void server_stop(server *srv) {
}
void joblist_append(server *srv, connection *con) {
/* TODO */
connection_state_machine(srv, con);
}

1
src/server.h

@ -58,6 +58,7 @@ LI_API void server_free(server* srv);
LI_API void server_listen(server *srv, int fd);
LI_API void server_start(server *srv);
LI_API void server_stop(server *srv);
LI_API void joblist_append(server *srv, connection *con);

1
src/settings.h

@ -135,7 +135,6 @@
typedef enum {
HANDLER_UNSET,
HANDLER_GO_ON,
HANDLER_FINISHED,
HANDLER_COMEBACK,

21
src/utils.c

@ -23,3 +23,24 @@ void fd_init(int fd) {
ioctlsocket(fd, FIONBIO, &i);
#endif
}
void ev_io_add_events(struct ev_loop *loop, ev_io *watcher, int events) {
if ((watcher->events & events) == events) return;
ev_io_stop(loop, watcher);
ev_io_set(watcher, watcher->fd, watcher->events | events);
ev_io_start(loop, watcher);
}
void ev_io_rem_events(struct ev_loop *loop, ev_io *watcher, int events) {
if (0 == (watcher->events & events)) return;
ev_io_stop(loop, watcher);
ev_io_set(watcher, watcher->fd, watcher->events & ~events);
ev_io_start(loop, watcher);
}
void ev_io_set_events(struct ev_loop *loop, ev_io *watcher, int events) {
if (events == (watcher->events & (EV_READ | EV_WRITE))) return;
ev_io_stop(loop, watcher);
ev_io_set(watcher, watcher->fd, (watcher->events & ~(EV_READ | EV_WRITE)) | events);
ev_io_start(loop, watcher);
}

3
src/utils.h

@ -7,5 +7,8 @@ LI_API void fatal(const gchar* msg);
/* set O_NONBLOCK and FD_CLOEXEC */
LI_API void fd_init(int fd);
LI_API void ev_io_add_events(struct ev_loop *loop, ev_io *watcher, int events);
LI_API void ev_io_rem_events(struct ev_loop *loop, ev_io *watcher, int events);
LI_API void ev_io_set_events(struct ev_loop *loop, ev_io *watcher, int events);
#endif

2
src/wscript

@ -22,6 +22,7 @@ common_source='''
options.c
plugin.c
request.c
response.c
server.c
sys-files.c
sys-socket.c
@ -147,6 +148,7 @@ def build(bld):
tests.source += common_source_lua
tests.target = 'tests'
tests.uselib += 'lighty dl ev openssl pcre lua ' + common_uselib
tests.includes = '.'
def configure(conf):
env = conf.env

Loading…
Cancel
Save