Browse Source

Removed HANDLER_FINISHED, implemented real job queue and added some basic balancer structs

personal/stbuehler/wip
Stefan Bühler 13 years ago
parent
commit
025f0b5f84
  1. 28
      include/lighttpd/actions.h
  2. 1
      include/lighttpd/typedefs.h
  3. 2
      include/lighttpd/virtualrequest.h
  4. 3
      include/lighttpd/worker.h
  5. 1
      src/CMakeLists.txt
  6. 34
      src/actions.c
  7. 7
      src/connection.c
  8. 2
      src/filter_chunked.c
  9. 40
      src/modules/mod_balancer.c
  10. 27
      src/virtualrequest.c
  11. 21
      src/worker.c

28
include/lighttpd/actions.h

@ -10,11 +10,19 @@ typedef enum {
ACTION_TSETTING,
ACTION_TFUNCTION,
ACTION_TCONDITION,
ACTION_TLIST
ACTION_TLIST,
ACTION_TBALANCER
} action_type;
typedef enum {
BACKEND_OVERLOAD,
BACKEND_DEAD
} backend_error;
struct action_stack {
GArray* stack;
gboolean handle_backend_fail;
backend_error backend_error;
};
/* param is the param registered with the callbacks;
@ -35,6 +43,22 @@ struct action_func {
};
typedef struct action_func action_func;
typedef handler_t (*BackendSelect)(vrequest *vr, gpointer param, gpointer *context);
typedef handler_t (*BackendFallback)(vrequest *vr, gpointer param, gpointer *context);
typedef handler_t (*BackendFinished)(vrequest *vr, gpointer param, gpointer context);
typedef handler_t (*BalancerFree)(server *srv, gpointer param);
struct balancer_func {
BackendSelect select;
BackendFallback fallback;
BackendFinished finished;
BalancerFree free;
gpointer param;
gboolean provide_backlog;
};
typedef struct balancer_func balancer_func;
struct action {
gint refcount;
action_type type;
@ -51,6 +75,8 @@ struct action {
action_func function;
GArray* list; /** array of (action*) */
balancer_func balancer;
} data;
};

1
include/lighttpd/typedefs.h

@ -8,7 +8,6 @@ typedef enum {
typedef enum {
HANDLER_GO_ON,
HANDLER_FINISHED,
HANDLER_COMEBACK,
HANDLER_WAIT_FOR_EVENT,
HANDLER_ERROR,

2
include/lighttpd/virtualrequest.h

@ -75,6 +75,8 @@ struct vrequest {
action_stack action_stack;
gboolean actions_wait_for_response;
GList *job_queue_link;
};
LI_API vrequest* vrequest_new(struct connection *con, vrequest_handler handle_response_headers, vrequest_handler handle_response_body, vrequest_handler handle_response_error, vrequest_handler handle_request_headers);

3
include/lighttpd/worker.h

@ -85,6 +85,9 @@ struct worker {
/* collect framework */
ev_async collect_watcher;
GAsyncQueue *collect_queue;
GQueue job_queue;
ev_timer job_queue_watcher;
};
LI_API worker* worker_new(struct server *srv, struct ev_loop *loop);

1
src/CMakeLists.txt

@ -336,6 +336,7 @@ SET(COMMON_CFLAGS "${LUA_CFLAGS} ${EV_CFLAGS} ${GTHREAD_CFLAGS} ${GMODULE_CFLAGS
ADD_AND_INSTALL_LIBRARY(mod_fortune "modules/mod_fortune.c")
ADD_AND_INSTALL_LIBRARY(mod_status "modules/mod_status.c")
ADD_AND_INSTALL_LIBRARY(mod_balancer "modules/mod_balancer.c")
ADD_TARGET_PROPERTIES(lighttpd LINK_FLAGS ${COMMON_LDFLAGS})
ADD_TARGET_PROPERTIES(lighttpd COMPILE_FLAGS ${COMMON_CFLAGS})

34
src/actions.c

@ -99,8 +99,20 @@ action *action_new_condition(condition *cond, action *target, action *target_els
static void action_stack_element_release(server *srv, vrequest *vr, action_stack_element *ase) {
action *a = ase->act;
if (!ase || !a) return;
if (a->type == ACTION_TFUNCTION && ase->data.context && a->data.function.cleanup) {
a->data.function.cleanup(vr, a->data.function.param, ase->data.context);
switch (a->type) {
case ACTION_TSETTING:
break;
case ACTION_TFUNCTION:
if (ase->data.context && a->data.function.cleanup) {
a->data.function.cleanup(vr, a->data.function.param, ase->data.context);
}
break;
case ACTION_TCONDITION:
case ACTION_TLIST:
break;
case ACTION_TBALANCER:
a->data.balancer.finished(vr, a->data.balancer.param, ase->data.context);
break;
}
action_release(srv, ase->act);
ase->act = NULL;
@ -175,7 +187,6 @@ handler_t action_execute(vrequest *vr) {
res = a->data.function.func(vr, a->data.function.param, &ase->data.context);
switch (res) {
case HANDLER_GO_ON:
case HANDLER_FINISHED:
ase->finished = TRUE;
break;
case HANDLER_ERROR:
@ -191,7 +202,6 @@ handler_t action_execute(vrequest *vr) {
res = condition_check(vr, a->data.condition.cond, &condres);
switch (res) {
case HANDLER_GO_ON:
case HANDLER_FINISHED:
action_stack_pop(srv, vr, as);
if (condres) {
action_enter(vr, a->data.condition.target);
@ -216,7 +226,21 @@ handler_t action_execute(vrequest *vr) {
ase->data.pos++;
}
break;
case ACTION_TBALANCER:
res = a->data.balancer.select(vr, a->data.balancer.param, &ase->data.context);
switch (res) {
case HANDLER_GO_ON:
ase->finished = TRUE;
break;
case HANDLER_ERROR:
action_stack_reset(vr, as);
case HANDLER_COMEBACK:
case HANDLER_WAIT_FOR_EVENT:
case HANDLER_WAIT_FOR_FD:
return res;
}
break;
}
}
return HANDLER_FINISHED;
return HANDLER_GO_ON;
}

7
src/connection.c

@ -140,7 +140,6 @@ static gboolean connection_handle_read(connection *con) {
VR_DEBUG(vr, "%s", "reading request header");
}
switch(http_request_parse(con->mainvr, &con->req_parser_ctx)) {
case HANDLER_FINISHED:
case HANDLER_GO_ON:
break; /* go on */
case HANDLER_WAIT_FOR_EVENT:
@ -271,7 +270,7 @@ static handler_t mainvr_handle_response_headers(vrequest *vr) {
static handler_t mainvr_handle_response_body(vrequest *vr) {
connection *con = vr->con;
if (check_response_done(con)) return HANDLER_FINISHED;
if (check_response_done(con)) return HANDLER_GO_ON;
if (CORE_OPTION(CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "%s", "write response");
@ -280,14 +279,14 @@ static handler_t mainvr_handle_response_body(vrequest *vr) {
parse_request_body(con);
forward_response_body(con);
if (check_response_done(con)) return HANDLER_FINISHED;
if (check_response_done(con)) return HANDLER_GO_ON;
return HANDLER_GO_ON;
}
static handler_t mainvr_handle_response_error(vrequest *vr) {
connection_internal_error(vr->con);
return HANDLER_FINISHED;
return HANDLER_GO_ON;
}
static handler_t mainvr_handle_request_headers(vrequest *vr) {

2
src/filter_chunked.c

@ -37,7 +37,7 @@ handler_t filter_chunked_encode(connection *con, chunkqueue *out, chunkqueue *in
chunkqueue_append_mem(out, CONST_STR_LEN("0\r\n"));
out->is_closed = TRUE;
}
return HANDLER_FINISHED;
return HANDLER_GO_ON;
}
return HANDLER_GO_ON;
}

40
src/modules/mod_balancer.c

@ -0,0 +1,40 @@
#include <lighttpd/base.h>
static const plugin_option options[] = {
{ NULL, 0, NULL, NULL, NULL }
};
static const plugin_action actions[] = {
// { "balancer.rr", status_page },
{ NULL, NULL }
};
static const plugin_setup setups[] = {
{ NULL, NULL }
};
static void plugin_init(server *srv, plugin *p) {
UNUSED(srv);
p->options = options;
p->actions = actions;
p->setups = setups;
}
LI_API gboolean mod_balancer_init(modules *mods, module *mod) {
MODULE_VERSION_CHECK(mods);
mod->config = plugin_register(mods->main, "mod_balancer", plugin_init);
return mod->config != NULL;
}
LI_API gboolean mod_balancer_free(modules *mods, module *mod) {
if (mod->config)
plugin_free(mods->main, mod->config);
return TRUE;
}

27
src/virtualrequest.c

@ -77,6 +77,11 @@ void vrequest_free(vrequest* vr) {
action_stack_clear(vr, &vr->action_stack);
if (vr->job_queue_link) {
g_queue_delete_link(&vr->con->wrk->job_queue, vr->job_queue_link);
vr->job_queue_link = NULL;
}
g_slice_free1(vr->con->srv->option_def_values->len * sizeof(option_value), vr->options);
g_slice_free(vrequest, vr);
@ -96,6 +101,11 @@ void vrequest_reset(vrequest *vr) {
action_stack_reset(vr, &vr->action_stack);
if (vr->job_queue_link) {
g_queue_delete_link(&vr->con->wrk->job_queue, vr->job_queue_link);
vr->job_queue_link = NULL;
}
memcpy(vr->options, vr->con->srv->option_def_values->data, vr->con->srv->option_def_values->len * sizeof(option_value));
}
@ -161,7 +171,6 @@ static gboolean vrequest_do_handle_actions(vrequest *vr) {
handler_t res = action_execute(vr);
switch (res) {
case HANDLER_GO_ON:
case HANDLER_FINISHED:
if (vr->state == VRS_HANDLE_REQUEST_HEADERS) {
VR_ERROR(vr, "%s", "actions didn't handle request");
/* request not handled */
@ -192,7 +201,6 @@ static gboolean vrequest_do_handle_read(vrequest *vr) {
res = vr->handle_request_body(vr);
switch (res) {
case HANDLER_GO_ON:
case HANDLER_FINISHED:
break;
case HANDLER_COMEBACK:
vrequest_joblist_append(vr); /* come back later */
@ -218,7 +226,6 @@ static gboolean vrequest_do_handle_write(vrequest *vr) {
res = vr->handle_response_body(vr);
switch (res) {
case HANDLER_GO_ON:
case HANDLER_FINISHED:
break;
case HANDLER_COMEBACK:
vrequest_joblist_append(vr); /* come back later */
@ -250,7 +257,6 @@ void vrequest_state_machine(vrequest *vr) {
res = vr->handle_request_headers(vr);
switch (res) {
case HANDLER_GO_ON:
case HANDLER_FINISHED:
break;
case HANDLER_COMEBACK:
vrequest_joblist_append(vr); /* come back later */
@ -258,7 +264,7 @@ void vrequest_state_machine(vrequest *vr) {
break;
case HANDLER_WAIT_FOR_FD: /* TODO: wait for fd */
case HANDLER_WAIT_FOR_EVENT:
done = TRUE;
done = (vr->state == VRS_HANDLE_REQUEST_HEADERS);
break;
case HANDLER_ERROR:
vrequest_error(vr);
@ -275,7 +281,6 @@ void vrequest_state_machine(vrequest *vr) {
res = vr->handle_response_headers(vr);
switch (res) {
case HANDLER_GO_ON:
case HANDLER_FINISHED:
vr->state = VRS_WRITE_CONTENT;
break;
case HANDLER_COMEBACK:
@ -284,7 +289,7 @@ void vrequest_state_machine(vrequest *vr) {
break;
case HANDLER_WAIT_FOR_FD: /* TODO: wait for fd */
case HANDLER_WAIT_FOR_EVENT:
done = TRUE;
done = (vr->state == VRS_HANDLE_REQUEST_HEADERS);
break;
case HANDLER_ERROR:
vrequest_error(vr);
@ -306,6 +311,10 @@ void vrequest_state_machine(vrequest *vr) {
}
void vrequest_joblist_append(vrequest *vr) {
/* TODO */
vrequest_state_machine(vr);
GQueue *const q = &vr->con->wrk->job_queue;
worker *wrk = vr->con->wrk;
if (vr->job_queue_link) return; /* already in queue */
g_queue_push_tail(q, vr);
vr->job_queue_link = g_queue_peek_tail_link(q);
ev_timer_start(wrk->loop, &wrk->job_queue_watcher);
}

21
src/worker.c

@ -135,6 +135,22 @@ static void worker_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) {
waitqueue_update(&wrk->throttle_queue);
}
/* run vreqest state machine */
static void worker_job_queue_cb(struct ev_loop *loop, ev_timer *w, int revents) {
worker *wrk = (worker*) w->data;
GQueue q = wrk->job_queue;
vrequest *vr;
UNUSED(loop);
UNUSED(revents);
g_queue_init(&wrk->job_queue); /* reset queue, elements are in q */
while (NULL != (vr = g_queue_pop_head(&q))) {
vr->job_queue_link = NULL;
vrequest_state_machine(vr);
}
}
/* cache timestamp */
GString *worker_current_timestamp(worker *wrk) {
time_t cur_ts = (time_t)CUR_TS(wrk);
@ -299,6 +315,11 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) {
waitqueue_init(&wrk->throttle_queue, wrk->loop, worker_throttle_cb, 0.5, wrk);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
/* job queue */
g_queue_init(&wrk->job_queue);
ev_timer_init(&wrk->job_queue_watcher, worker_job_queue_cb, 0, 0);
wrk->job_queue_watcher.data = wrk;
return wrk;
}

Loading…
Cancel
Save