Browse Source

implement stat cache

personal/stbuehler/wip
Thomas Porzelt 13 years ago
parent
commit
efab0ca75d
  1. 1
      include/lighttpd/base.h
  2. 91
      include/lighttpd/stat_cache.h
  3. 5
      include/lighttpd/typedefs.h
  4. 4
      include/lighttpd/virtualrequest.h
  5. 2
      include/lighttpd/worker.h
  6. 40
      src/plugin_core.c
  7. 213
      src/stat_cache.c
  8. 15
      src/virtualrequest.c
  9. 4
      src/worker.c
  10. 1
      src/wscript

1
include/lighttpd/base.h

@ -44,6 +44,7 @@
#include <lighttpd/environment.h>
#include <lighttpd/virtualrequest.h>
#include <lighttpd/log.h>
#include <lighttpd/stat_cache.h>
#include <lighttpd/connection.h>

91
include/lighttpd/stat_cache.h

@ -0,0 +1,91 @@
/*
* stat cache - speeding up stat()s
*
* The basic idea behind the stat cache is to reduce calls to stat() which might be slow due to disk io (some ms).
* Each worker thread has its own cache so no locking contention between threads happens which could be slow.
* This means that there will be more stat() calls than there would be with only one shared cache but since there
* should be mostly hits in most cases (few items requested frequently) it will outweight the locking contention.
* To prevent the stat() from blocking all other requests of that worker, we hand it over to another thread.
*
* Entries are removed after 10 seconds (adjustable through stat_cache.ttl setup)
*
* TODO:
* - stat_cache.ttl setup
* - create ETAGs
* - get content type from xattr
* - add support for inotify (linux). TTL for entries can be increased to 60s
*
* Technical details:
* If a stat is requested, the following procedure takes place:
* - a cache lookup is performed
* - in case of a cache HIT:
* - if state is FINISHED and entry is fresh then return entry
* - if state is FINISHED but entry old then reset entry, create new job and return NULL
* - if state is WAITING then add vrequest to entry and return NULL (looks like a cache miss)
* - in case of a cache MISS:
* - a new entry is allocated and inserted into the cache, state is set to WAITING
* - the entry is inserted into the delete queue
* - a new job is created and NULL returned
*
* In the delete queue callback we check if no vrequests are working on that entry. If yes, we free it. If not then we requeue it.
* Locking only happens in two cases: 1) a new job is send to the stat thread 2) the stat thread sends the info back to the worker.
*
*/
#ifndef _LIGHTTPD_STAT_CACHE_H_
#define _LIGHTTPD_STAT_CACHE_H_
#ifndef _LIGHTTPD_BASE_H_
#error Please include <lighttpd/base.h> instead of this file
#endif
struct stat_cache_entry {
GString *path;
GString *etag;
GString *content_type;
struct stat st;
ev_tstamp ts; /* timestamp the entry was created (not when the stat() was done) */
gint err;
gboolean failed;
enum {
STAT_CACHE_ENTRY_WAITING, /* waiting for stat thread to do the work, no info available */
STAT_CACHE_ENTRY_FINISHED, /* stat() done, info available */
} state;
GPtrArray *vrequests; /* vrequests waiting for this info */
guint refcount;
waitqueue_elem queue_elem; /* queue element for the delete_queue */
gboolean in_cache;
};
struct stat_cache {
GHashTable *entries;
GAsyncQueue *job_queue_out; /* elements waiting for stat */
GAsyncQueue *job_queue_in; /* elements with finished stat */
waitqueue delete_queue;
GThread *thread;
ev_async job_watcher;
gdouble ttl;
guint64 hits;
guint64 misses;
guint64 errors;
};
void stat_cache_new(worker *wrk, gdouble ttl);
void stat_cache_free(stat_cache *sc);
void stat_cache_job_cb(struct ev_loop *loop, ev_async *w, int revents);
void stat_cache_delete_cb(struct ev_loop *loop, ev_timer *w, int revents);
gpointer stat_cache_thread(gpointer data);
void stat_cache_entry_free(stat_cache_entry *sce);
/*
gets a stat_cache_entry for a specified path
returns NULL in case of a cache MISS and you should return HANDLER_WAIT_FOR_EVENT
*/
LI_API stat_cache_entry *stat_cache_entry_get(vrequest *vr, GString *path);
/* release a stat_cache_entry so it can be cleaned up */
LI_API void stat_cache_entry_release(vrequest *vr);
#endif

5
include/lighttpd/typedefs.h

@ -201,4 +201,9 @@ typedef struct filters filters;
struct worker;
typedef struct worker worker;
struct stat_cache_entry;
typedef struct stat_cache_entry stat_cache_entry;
struct stat_cache;
typedef struct stat_cache stat_cache;
#endif

4
include/lighttpd/virtualrequest.h

@ -82,6 +82,8 @@ struct vrequest {
gboolean actions_wait_for_response;
GList *job_queue_link;
stat_cache_entry *stat_cache_entry;
};
#define VREQUEST_WAIT_FOR_RESPONSE_HEADERS(vr) \
@ -124,4 +126,6 @@ LI_API void vrequest_joblist_append(vrequest *vr);
LI_API gboolean vrequest_stat(vrequest *vr);
LI_API gboolean vrequest_redirect(vrequest *vr, GString *uri);
#endif

2
include/lighttpd/worker.h

@ -94,6 +94,8 @@ struct worker {
GQueue job_queue;
ev_timer job_queue_watcher;
stat_cache *stat_cache;
};
LI_API worker* worker_new(struct server *srv, struct ev_loop *loop);

40
src/plugin_core.c

@ -186,18 +186,38 @@ static action* core_docroot(server *srv, plugin* p, value *val) {
static handler_t core_handle_static(vrequest *vr, gpointer param, gpointer *context) {
int fd;
stat_cache_entry *sce;
UNUSED(param);
UNUSED(context);
if (vr->physical.path->len == 0) return HANDLER_GO_ON;
if (!vr->stat_cache_entry) {
if (vr->physical.path->len == 0) return HANDLER_GO_ON;
if (!vrequest_handle_direct(vr)) return HANDLER_GO_ON;
if (!vrequest_handle_direct(vr)) return HANDLER_GO_ON;
}
sce = stat_cache_entry_get(vr, vr->physical.path);
if (!sce)
return HANDLER_WAIT_FOR_EVENT;
VR_DEBUG(vr, "serving static file: %s", vr->physical.path->str);
fd = open(vr->physical.path->str, O_RDONLY);
if (fd == -1) {
vr->response.http_status = 404;
if (sce->failed) {
/* stat failed */
VR_DEBUG(vr, "stat() failed: %s (%d)", g_strerror(sce->err), sce->err);
switch (errno) {
case ENOENT:
vr->response.http_status = 404; break;
case EACCES:
case EFAULT:
vr->response.http_status = 403; break;
default:
vr->response.http_status = 500;
}
g_print("%d\n", vr->response.http_status);
} else if ((fd = open(vr->physical.path->str, O_RDONLY)) == -1) {
VR_DEBUG(vr, "open() failed: %s (%d)", g_strerror(errno), errno);
switch (errno) {
@ -210,8 +230,6 @@ static handler_t core_handle_static(vrequest *vr, gpointer param, gpointer *cont
vr->response.http_status = 500;
}
} else {
struct stat st;
fstat(fd, &st);
#ifdef FD_CLOEXEC
fcntl(fd, F_SETFD, FD_CLOEXEC);
@ -219,7 +237,7 @@ static handler_t core_handle_static(vrequest *vr, gpointer param, gpointer *cont
/* redirect to scheme + host + path + / + querystring if directory without trailing slash */
/* TODO: local addr if HTTP 1.0 without host header */
if (S_ISDIR(st.st_mode) && vr->request.uri.orig_path->str[vr->request.uri.orig_path->len-1] != '/') {
if (S_ISDIR(sce->st.st_mode) && vr->request.uri.orig_path->str[vr->request.uri.orig_path->len-1] != '/') {
GString *host = vr->request.uri.authority->len ? vr->request.uri.authority : vr->con->local_addr_str;
GString *uri = g_string_sized_new(
8 /* https:// */ + host->len +
@ -241,7 +259,7 @@ static handler_t core_handle_static(vrequest *vr, gpointer param, gpointer *cont
http_header_overwrite(vr->response.headers, CONST_STR_LEN("Location"), GSTR_LEN(uri));
g_string_free(uri, TRUE);
close(fd);
} else if (!S_ISREG(st.st_mode)) {
} else if (!S_ISREG(sce->st.st_mode)) {
vr->response.http_status = 404;
close(fd);
} else {
@ -251,10 +269,12 @@ static handler_t core_handle_static(vrequest *vr, gpointer param, gpointer *cont
http_header_overwrite(vr->response.headers, CONST_STR_LEN("Content-Type"), GSTR_LEN(mime_str));
else
http_header_overwrite(vr->response.headers, CONST_STR_LEN("Content-Type"), CONST_STR_LEN("application/octet-stream"));
chunkqueue_append_file_fd(vr->out, NULL, 0, st.st_size, fd);
chunkqueue_append_file_fd(vr->out, NULL, 0, sce->st.st_size, fd);
}
}
stat_cache_entry_release(vr);
return HANDLER_GO_ON;
}

213
src/stat_cache.c

@ -0,0 +1,213 @@
#include <lighttpd/base.h>
void stat_cache_new(worker *wrk, gdouble ttl) {
stat_cache *sc;
GError *err;
sc = g_slice_new0(stat_cache);
sc->ttl = ttl;
sc->entries = g_hash_table_new_full((GHashFunc)g_string_hash, (GEqualFunc)g_string_equal, NULL, NULL);
sc->job_queue_in = g_async_queue_new();
sc->job_queue_out = g_async_queue_new();
waitqueue_init(&sc->delete_queue, wrk->loop, stat_cache_delete_cb, ttl, sc);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
ev_init(&sc->job_watcher, stat_cache_job_cb);
sc->job_watcher.data = wrk;
ev_async_start(wrk->loop, &sc->job_watcher);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
wrk->stat_cache = sc;
sc->thread = g_thread_create(stat_cache_thread, sc, TRUE, &err);
if (!sc->thread) {
/* failed to create thread */
assert(0);
}
}
void stat_cache_free(stat_cache *sc) {
GHashTableIter iter;
gpointer k, v;
/* wake up thread */
g_async_queue_push(sc->job_queue_out, g_slice_new0(stat_cache_entry));
g_thread_join(sc->thread);
ev_async_stop(sc->delete_queue.loop, &sc->job_watcher);
/* clear cache */
g_hash_table_iter_init(&iter, sc->entries);
while (g_hash_table_iter_next(&iter, &k, &v)) {
stat_cache_entry_free(v);
}
waitqueue_stop(&sc->delete_queue);
g_async_queue_unref(sc->job_queue_in);
g_async_queue_unref(sc->job_queue_out);
g_hash_table_destroy(sc->entries);
g_slice_free(stat_cache, sc);
}
void stat_cache_delete_cb(struct ev_loop *loop, ev_timer *w, int revents) {
stat_cache *sc = (stat_cache*) w->data;
stat_cache_entry *sce;
waitqueue_elem *wqe;
UNUSED(loop);
UNUSED(revents);
while ((wqe = waitqueue_pop(&sc->delete_queue)) != NULL) {
/* stat cache entry TTL over */
sce = wqe->data;
if (sce->refcount) {
/* if there are still vrequests using this entry just requeue it */
waitqueue_push(&sc->delete_queue, wqe);
} else {
/* no more vrequests using this entry, finally free it */
if (sce->in_cache)
g_hash_table_remove(sc->entries, sce->path);
stat_cache_entry_free(sce);
}
}
waitqueue_update(&sc->delete_queue);
}
void stat_cache_job_cb(struct ev_loop *loop, ev_async *w, int revents) {
guint i;
stat_cache_entry *sce;
stat_cache *sc = ((worker*)w->data)->stat_cache;
vrequest *vr;
UNUSED(loop);
UNUSED(revents);
while ((sce = g_async_queue_try_pop(sc->job_queue_in)) != NULL) {
if (sce->failed)
sc->errors++;
for (i = 0; i < sce->vrequests->len; i++) {
vr = g_ptr_array_index(sce->vrequests, i);
vrequest_joblist_append(vr);
}
g_ptr_array_set_size(sce->vrequests, 0);
}
}
void stat_cache_entry_free(stat_cache_entry *sce) {
assert(sce->vrequests->len == 0);
assert(sce->refcount == 0);
g_string_free(sce->path, TRUE);
g_ptr_array_free(sce->vrequests, TRUE);
g_slice_free(stat_cache_entry, sce);
}
stat_cache_entry *stat_cache_entry_get(vrequest *vr, GString *path) {
stat_cache *sc;
stat_cache_entry *sce;
sc = vr->con->wrk->stat_cache;
/* lookup entry in cache */
sce = g_hash_table_lookup(sc->entries, path);
if (sce) {
/* cache hit, check state */
if (g_atomic_int_get(&sce->state) == STAT_CACHE_ENTRY_FINISHED) {
/* stat info available, check if it is fresh */
if (sce->ts >= (CUR_TS(vr->con->wrk) - (ev_tstamp)sc->ttl)) {
/* entry fresh */
if (!vr->stat_cache_entry) {
sc->hits++;
vr->stat_cache_entry = sce;
sce->refcount++;
}
return sce;
} else {
/* entry old */
if (sce->refcount == 0) {
/* no vrequests working on the entry, reuse it */
} else {
/* there are still vrequests using this entry, replace with a new one */
sce->in_cache = FALSE;
sce = g_slice_new0(stat_cache_entry);
sce->path = g_string_new_len(GSTR_LEN(path));
sce->vrequests = g_ptr_array_sized_new(8);
sce->in_cache = TRUE;
sce->queue_elem.data = sce;
g_hash_table_replace(sc->entries, sce->path, sce);
}
sce->ts = CUR_TS(vr->con->wrk);
vr->stat_cache_entry = sce;
g_ptr_array_add(sce->vrequests, vr);
sce->refcount++;
waitqueue_push(&sc->delete_queue, &sce->queue_elem);
sce->state = STAT_CACHE_ENTRY_WAITING;
g_async_queue_push(sc->job_queue_out, sce);
sc->misses++;
return NULL;
}
} else {
/* stat info not available (state is STAT_CACHE_ENTRY_WAITING) */
vr->stat_cache_entry = sce;
g_ptr_array_add(sce->vrequests, vr);
sce->refcount++;
sc->misses++;
return NULL;
}
} else {
/* cache miss, allocate new entry */
sce = g_slice_new0(stat_cache_entry);
sce->path = g_string_new_len(GSTR_LEN(path));
sce->vrequests = g_ptr_array_sized_new(8);
sce->ts = CUR_TS(vr->con->wrk);
sce->state = STAT_CACHE_ENTRY_WAITING;
sce->in_cache = TRUE;
sce->queue_elem.data = sce;
vr->stat_cache_entry = sce;
g_ptr_array_add(sce->vrequests, vr);
sce->refcount = 1;
waitqueue_push(&sc->delete_queue, &sce->queue_elem);
g_hash_table_insert(sc->entries, sce->path, sce);
g_async_queue_push(sc->job_queue_out, sce);
sc->misses++;
return NULL;
}
}
void stat_cache_entry_release(vrequest *vr) {
vr->stat_cache_entry->refcount--;
vr->stat_cache_entry = NULL;
}
gpointer stat_cache_thread(gpointer data) {
stat_cache *sc = data;
stat_cache_entry *sce;
while (TRUE) {
sce = g_async_queue_pop(sc->job_queue_out);
/* stat cache entry with path == NULL indicates server stop */
if (!sce->path)
break;
if (stat(sce->path->str, &sce->st) == -1) {
sce->failed = TRUE;
sce->err = errno;
} else
sce->failed = FALSE;
g_atomic_int_set(&sce->state, STAT_CACHE_ENTRY_FINISHED);
g_async_queue_push(sc->job_queue_in, sce);
ev_async_send(sc->delete_queue.loop, &sc->job_watcher);
}
return NULL;
}

15
src/virtualrequest.c

@ -119,6 +119,11 @@ void vrequest_reset(vrequest *vr) {
vr->job_queue_link = NULL;
}
if (vr->stat_cache_entry) {
g_ptr_array_remove_fast(vr->stat_cache_entry->vrequests, vr);
vr->stat_cache_entry = NULL;
}
memcpy(vr->options, vr->con->srv->option_def_values->data, vr->con->srv->option_def_values->len * sizeof(option_value));
}
@ -373,3 +378,13 @@ gboolean vrequest_stat(vrequest *vr) {
vr->physical.have_stat = TRUE;
return TRUE;
}
gboolean vrequest_redirect(vrequest *vr, GString *uri) {
if (!vrequest_handle_direct(vr))
return FALSE;
vr->response.http_status = 301;
http_header_overwrite(vr->response.headers, CONST_STR_LEN("Location"), GSTR_LEN(uri));
return TRUE;
}

4
src/worker.c

@ -335,6 +335,8 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) {
ev_timer_init(&wrk->job_queue_watcher, worker_job_queue_cb, 0, 0);
wrk->job_queue_watcher.data = wrk;
stat_cache_new(wrk, 10.0);
return wrk;
}
@ -386,6 +388,8 @@ void worker_free(worker *wrk) {
g_string_free(wrk->tmp_str, TRUE);
stat_cache_free(wrk->stat_cache);
g_slice_free(worker, wrk);
}

1
src/wscript

@ -43,6 +43,7 @@ common_src = '''
request.c
response.c
server.c
stat_cache.c
sys-files.c
sys-socket.c
url_parser.rl

Loading…
Cancel
Save