- Added hash, round-robin balancers

- fixed return codes in two cases
- droped some endless loops
- FIXME: re-enabling of dead hosts is missing for now


git-svn-id: svn://svn.lighttpd.net/lighttpd/branches/lighttpd-1.3.x@350 152afb58-edef-0310-8abb-c4023f1b3aa9
svn/tags/lighttpd-1.3.14
Jan Kneschke 18 years ago
parent 510b536669
commit b5e848d5b4

@ -37,7 +37,7 @@ common_src=buffer.c log.c \
fdevent_poll.c fdevent_linux_sysepoll.c \
fdevent_solaris_devpoll.c fdevent_freebsd_kqueue.c \
data_config.c bitset.c configparser.c \
inet_ntop_cache.c \
inet_ntop_cache.c crc32.c \
connections-glue.c \
configfile-glue.c \
http-header-glue.c
@ -149,7 +149,7 @@ mod_access_la_LDFLAGS = -module -export-dynamic -avoid-version -no-undefined
mod_access_la_LIBADD = $(common_libadd)
lib_LTLIBRARIES += mod_compress.la
mod_compress_la_SOURCES = mod_compress.c crc32.c
mod_compress_la_SOURCES = mod_compress.c
mod_compress_la_LDFLAGS = -module -export-dynamic -avoid-version -no-undefined
mod_compress_la_LIBADD = $(Z_LIB) $(BZ_LIB) $(common_libadd)

@ -100,10 +100,13 @@ typedef struct {
buffer *host;
unsigned short port;
int usage;
time_t disable_ts;
int is_disabled;
size_t balance;
int usage; /* fair-balancing needs the no. of connections active on this host */
int last_used_ndx; /* round robin */
} data_fastcgi;
data_fastcgi *data_fastcgi_init(void);

@ -45,6 +45,7 @@ data_fastcgi *data_fastcgi_init(void) {
ds->key = buffer_init();
ds->host = buffer_init();
ds->port = 0;
ds->is_disabled = 0;
ds->free = data_fastcgi_free;
ds->reset = data_fastcgi_reset;

@ -22,6 +22,7 @@
#include "plugin.h"
#include "inet_ntop_cache.h"
#include "crc32.h"
#include <stdio.h>
@ -53,6 +54,12 @@
* - persistent connection with upstream servers
* - HTTP/1.1
*/
typedef enum {
PROXY_BALANCE_UNSET,
PROXY_BALANCE_FAIR,
PROXY_BALANCE_HASH,
PROXY_BALANCE_RR
} proxy_balance_t;
typedef struct {
array *extensions;
@ -63,14 +70,24 @@ typedef struct {
PLUGIN_DATA;
buffer *parse_response;
buffer *balance_buf;
plugin_config **config_storage;
plugin_config conf;
} plugin_data;
typedef enum { PROXY_STATE_INIT, PROXY_STATE_CONNECT, PROXY_STATE_PREPARE_WRITE, PROXY_STATE_WRITE, PROXY_STATE_READ, PROXY_STATE_ERROR } proxy_connection_state_t;
typedef enum {
PROXY_STATE_INIT,
PROXY_STATE_CONNECT,
PROXY_STATE_PREPARE_WRITE,
PROXY_STATE_WRITE,
PROXY_STATE_READ,
PROXY_STATE_ERROR
} proxy_connection_state_t;
enum { PROXY_STDOUT, PROXY_END_REQUEST };
typedef struct {
proxy_connection_state_t state;
time_t state_timestamp;
@ -131,6 +148,7 @@ INIT_FUNC(mod_proxy_init) {
p = calloc(1, sizeof(*p));
p->parse_response = buffer_init();
p->balance_buf = buffer_init();
return p;
}
@ -142,6 +160,7 @@ FREE_FUNC(mod_proxy_free) {
UNUSED(srv);
buffer_free(p->parse_response);
buffer_free(p->balance_buf);
if (p->config_storage) {
size_t i;
@ -239,6 +258,7 @@ SETDEFAULTS_FUNC(mod_proxy_set_defaults) {
config_values_t pcv[] = {
{ "host", NULL, T_CONFIG_STRING, T_CONFIG_SCOPE_CONNECTION }, /* 0 */
{ "port", NULL, T_CONFIG_SHORT, T_CONFIG_SCOPE_CONNECTION }, /* 1 */
{ "balance", NULL, T_CONFIG_STRING, T_CONFIG_SCOPE_CONNECTION }, /* 2 */
{ NULL, NULL, T_CONFIG_UNSET, T_CONFIG_SCOPE_UNSET }
};
@ -257,6 +277,9 @@ SETDEFAULTS_FUNC(mod_proxy_set_defaults) {
pcv[0].destination = df->host;
pcv[1].destination = &(df->port);
pcv[2].destination = p->balance_buf;
buffer_reset(p->balance_buf);
if (0 != config_insert_values_internal(srv, da_host->value, pcv)) {
return HANDLER_ERROR;
@ -280,6 +303,20 @@ SETDEFAULTS_FUNC(mod_proxy_set_defaults) {
"port");
return HANDLER_ERROR;
}
if (buffer_is_empty(p->balance_buf)) {
df->balance = PROXY_BALANCE_FAIR;
} else if (buffer_is_equal_string(p->balance_buf, CONST_STR_LEN("fair"))) {
df->balance = PROXY_BALANCE_FAIR;
} else if (buffer_is_equal_string(p->balance_buf, CONST_STR_LEN("round-robin"))) {
df->balance = PROXY_BALANCE_RR;
} else if (buffer_is_equal_string(p->balance_buf, CONST_STR_LEN("hash"))) {
df->balance = PROXY_BALANCE_HASH;
} else {
log_error_write(srv, __FILE__, __LINE__, "sb",
"proxy.server->balance has to be one of: fair, round-robin, hash, but not:", p->balance_buf);
return HANDLER_ERROR;
}
/* if extension already exists, take it */
@ -684,7 +721,10 @@ static handler_t proxy_write_request(server *srv, handler_ctx *hctx) {
} else {
int socket_error;
socklen_t socket_error_len = sizeof(socket_error);
/* we don't need it anymore */
fdevent_event_del(srv->ev, &(hctx->fde_ndx), hctx->fd);
/* try to finish the connect() */
if (0 != getsockopt(hctx->fd, SOL_SOCKET, SO_ERROR, &socket_error, &socket_error_len)) {
log_error_write(srv, __FILE__, __LINE__, "ss",
@ -709,18 +749,21 @@ static handler_t proxy_write_request(server *srv, handler_ctx *hctx) {
proxy_set_state(srv, hctx, PROXY_STATE_WRITE);
hctx->write_offset = 0;
fdevent_event_add(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_OUT);
/* fall through */
case PROXY_STATE_WRITE:
/* continue with the code after the switch */
if (-1 == (r = write(hctx->fd,
hctx->write_buffer->ptr + hctx->write_offset,
hctx->write_buffer->used - hctx->write_offset))) {
if (errno != EAGAIN) {
if (errno != EAGAIN &&
errno != EINTR) {
log_error_write(srv, __FILE__, __LINE__, "ssd", "write failed:", strerror(errno), r);
return -1;
return HANDLER_ERROR;
} else {
return 0;
return HANDLER_WAIT_FOR_EVENT;
}
}
@ -728,12 +771,15 @@ static handler_t proxy_write_request(server *srv, handler_ctx *hctx) {
if (hctx->write_offset == hctx->write_buffer->used) {
proxy_set_state(srv, hctx, PROXY_STATE_READ);
fdevent_event_del(srv->ev, &(hctx->fde_ndx), hctx->fd);
fdevent_event_add(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_IN);
}
break;
return HANDLER_WAIT_FOR_EVENT;
case PROXY_STATE_READ:
/* waiting for a response */
break;
return HANDLER_WAIT_FOR_EVENT;
default:
log_error_write(srv, __FILE__, __LINE__, "s", "(debug) unknown state");
return HANDLER_ERROR;
@ -870,7 +916,6 @@ static handler_t proxy_handle_fdevent(void *s, void *ctx, int revents) {
connection *con = hctx->remote_conn;
plugin_data *p = hctx->plugin_data;
joblist_append(srv, con);
if ((revents & FDEVENT_IN) &&
hctx->state == PROXY_STATE_READ) {
@ -888,6 +933,7 @@ static handler_t proxy_handle_fdevent(void *s, void *ctx, int revents) {
proxy_connection_cleanup(srv, hctx);
joblist_append(srv, con);
return HANDLER_FINISHED;
case -1:
if (con->file_started == 0) {
@ -900,6 +946,7 @@ static handler_t proxy_handle_fdevent(void *s, void *ctx, int revents) {
connection_set_state(srv, con, CON_STATE_ERROR);
}
joblist_append(srv, con);
return HANDLER_FINISHED;
}
}
@ -933,7 +980,8 @@ static handler_t proxy_handle_fdevent(void *s, void *ctx, int revents) {
proxy_connection_close(srv, hctx);
# if 0
log_error_write(srv, __FILE__, __LINE__, "sd", "proxy-FDEVENT_HUP", con->fd);
# endif
# endif
joblist_append(srv, con);
return HANDLER_ERROR;
#endif
} else if (revents & FDEVENT_ERR) {
@ -944,6 +992,7 @@ static handler_t proxy_handle_fdevent(void *s, void *ctx, int revents) {
#if 1
log_error_write(srv, __FILE__, __LINE__, "s", "proxy-FDEVENT_ERR");
#endif
joblist_append(srv, con);
return HANDLER_ERROR;
}
@ -953,7 +1002,7 @@ static handler_t proxy_handle_fdevent(void *s, void *ctx, int revents) {
static handler_t mod_proxy_check_extension(server *srv, connection *con, void *p_d) {
plugin_data *p = p_d;
size_t s_len;
int used = -1;
int last_max = -1;
int ndx;
size_t k, i;
buffer *fn;
@ -1014,25 +1063,67 @@ static handler_t mod_proxy_check_extension(server *srv, connection *con, void *p
if (k == p->conf.extensions->used) {
return HANDLER_GO_ON;
}
/* get best server */
for (k = 0, ndx = -1; k < extension->value->used; k++) {
data_proxy *host = (data_proxy *)extension->value->data[k];
/* enable the server again, perhaps it is back again */
if ((host->usage == -1) &&
(srv->cur_ts - host->disable_ts > PROXY_RETRY_TIMEOUT)) {
host->usage = 0;
switch(1) {
case 1:
/* hash balancing */
for (k = 0, ndx = -1, last_max = -1; k < extension->value->used; k++) {
data_proxy *host = (data_proxy *)extension->value->data[k];
int url_ndx, host_ndx;
if (host->is_disabled) continue;
log_error_write(srv, __FILE__, __LINE__, "sbd", "proxy-server re-enabled:",
host->host, host->port);
url_ndx = generate_crc32c(CONST_BUF_LEN(con->uri.path));
host_ndx = generate_crc32c(CONST_BUF_LEN(con->server_name));
if ((last_max == -1) ||
(url_ndx + host_ndx > last_max)) {
last_max = url_ndx + host_ndx;
ndx = k;
}
}
break;
case 2:
/* fair balancing */
for (k = 0, ndx = -1, last_max = -1; k < extension->value->used; k++) {
data_proxy *host = (data_proxy *)extension->value->data[k];
if (host->usage != -1 && (used == -1 || host->usage < used)) {
used = host->usage;
if (host->is_disabled) continue;
if (last_max == -1 || host->usage < last_max) {
last_max = host->usage;
ndx = k;
ndx = k;
}
}
break;
case 3:
/* round robin */
for (k = 0, ndx = -1, last_max = -1; k < extension->value->used; k++) {
data_proxy *host = (data_proxy *)extension->value->data[k];
if (host->is_disabled) continue;
/* first usable ndx */
if (last_max == -1) {
last_max = k;
}
/* get next ndx */
if (k > host->last_used_ndx) {
ndx = k;
host->last_used_ndx = k;
break;
}
}
/* wrap to the start */
if (ndx != -1) {
ndx = last_max;
}
break;
}
/* found a server */
@ -1073,32 +1164,6 @@ static handler_t mod_proxy_check_extension(server *srv, connection *con, void *p
return HANDLER_GO_ON;
}
JOBLIST_FUNC(mod_proxy_handle_joblist) {
plugin_data *p = p_d;
handler_ctx *hctx = con->plugin_ctx[p->id];
if (hctx == NULL) return HANDLER_GO_ON;
if (hctx->fd != -1) {
switch (hctx->state) {
case PROXY_STATE_READ:
fdevent_event_add(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_IN);
break;
case PROXY_STATE_CONNECT:
fdevent_event_add(srv->ev, &(hctx->fde_ndx), hctx->fd, FDEVENT_OUT);
break;
default:
log_error_write(srv, __FILE__, __LINE__, "sd", "unhandled proxy.state", hctx->state);
break;
}
}
return HANDLER_GO_ON;
}
static handler_t mod_proxy_connection_close_callback(server *srv, connection *con, void *p_d) {
plugin_data *p = p_d;
@ -1116,7 +1181,6 @@ int mod_proxy_plugin_init(plugin *p) {
p->handle_connection_close = mod_proxy_connection_close_callback;
p->handle_uri_clean = mod_proxy_check_extension;
p->handle_subrequest = mod_proxy_handle_subrequest;
p->handle_joblist = mod_proxy_handle_joblist;
p->data = NULL;

Loading…
Cancel
Save