2013-06-03 23:12:35 +00:00
|
|
|
#include <lighttpd/fetch.h>
|
|
|
|
|
|
|
|
struct liFetchDatabase {
|
|
|
|
guint refcount, internal_refcount;
|
|
|
|
GMutex *lock;
|
|
|
|
GHashTable *cache; /* GString -> liFetchEntryP. key is in entry->public.key */
|
|
|
|
GQueue lru_queue, lru_negative_queue;
|
|
|
|
|
|
|
|
gint cache_size, neg_cache_size;
|
|
|
|
const liFetchCallbacks *callbacks;
|
|
|
|
gpointer data;
|
|
|
|
};
|
|
|
|
|
|
|
|
typedef struct liFetchEntryP liFetchEntryP;
|
|
|
|
|
|
|
|
typedef enum {
|
|
|
|
LI_ENTRY_LOOKUP,
|
|
|
|
LI_ENTRY_VALID,
|
|
|
|
LI_ENTRY_REFRESH_OLD,
|
|
|
|
LI_ENTRY_REFRESH_INVALID, /* got invalid while refreshing */
|
|
|
|
LI_ENTRY_REFRESH_NEW,
|
|
|
|
LI_ENTRY_INVALID
|
|
|
|
} liFetchEntryState;
|
|
|
|
|
|
|
|
struct liFetchEntryP {
|
|
|
|
gint refcount;
|
|
|
|
liFetchDatabase *db; /* keeps "internal" reference */
|
|
|
|
liFetchEntry public;
|
|
|
|
|
|
|
|
gint state; /* atomic access, write only with db->lock */
|
2013-06-09 16:09:59 +00:00
|
|
|
GQueue wait_queue; /* <liFetchWaitElement links> */
|
2013-06-03 23:12:35 +00:00
|
|
|
liFetchEntryP *refreshing;
|
|
|
|
GList lru_link; /* only in LI_ENTRY_VALID state */
|
|
|
|
};
|
|
|
|
|
|
|
|
/* struct liFetchWait is not defined; liFetchWait* is used as liFetchEntryP* */
|
|
|
|
|
|
|
|
typedef struct liFetchWaitElement liFetchWaitElement;
|
|
|
|
struct liFetchWaitElement {
|
|
|
|
liFetchWakeupCB wakeup;
|
|
|
|
GList wait_link;
|
|
|
|
};
|
|
|
|
|
|
|
|
static GQueue entry_extract_wait_queue(liFetchEntryP *pentry) {
|
|
|
|
GQueue q = pentry->wait_queue;
|
|
|
|
pentry->wait_queue.head = pentry->wait_queue.tail = NULL;
|
|
|
|
pentry->wait_queue.length = 0;
|
|
|
|
return q;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void wakeup_jobs(GQueue *wait_queue) {
|
|
|
|
GList *wait_link;
|
|
|
|
|
|
|
|
while (NULL != (wait_link = g_queue_pop_head_link(wait_queue))) {
|
|
|
|
liFetchWaitElement *elem = LI_CONTAINER_OF(wait_link, liFetchWaitElement, wait_link);
|
|
|
|
elem->wakeup(elem->wait_link.data);
|
|
|
|
g_slice_free(liFetchWaitElement, elem);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static gboolean wakeup_has_job(GQueue *wait_queue, liFetchWakeupCB wakeup, gpointer wakeup_data) {
|
|
|
|
GList *wait_link;
|
|
|
|
|
|
|
|
for (wait_link = wait_queue->head; NULL != wait_link; wait_link = wait_link->next) {
|
|
|
|
liFetchWaitElement *elem = LI_CONTAINER_OF(wait_link, liFetchWaitElement, wait_link);
|
|
|
|
if (wakeup == elem->wakeup && wakeup_data == elem->wait_link.data) return TRUE;
|
|
|
|
}
|
|
|
|
|
|
|
|
return FALSE;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void wakeup_add_job(GQueue *wait_queue, liFetchWakeupCB wakeup, gpointer wakeup_data) {
|
|
|
|
liFetchWaitElement *elem = g_slice_new0(liFetchWaitElement);
|
|
|
|
elem->wait_link.data = wakeup_data;
|
|
|
|
elem->wakeup = wakeup;
|
|
|
|
g_queue_push_tail_link(wait_queue, &elem->wait_link);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void remove_from_lru(liFetchEntryP *pentry) {
|
|
|
|
g_queue_unlink(
|
|
|
|
NULL != pentry->public.data ? &pentry->db->lru_queue : &pentry->db->lru_negative_queue,
|
|
|
|
&pentry->lru_link);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void append_to_lru(liFetchEntryP *pentry) {
|
|
|
|
liFetchDatabase *db = pentry->db;
|
|
|
|
GQueue *q = NULL != pentry->public.data ? &db->lru_queue : &db->lru_negative_queue;
|
|
|
|
guint limit = NULL != pentry->public.data ? db->cache_size : db->neg_cache_size;
|
|
|
|
|
|
|
|
assert(LI_ENTRY_VALID == pentry->state);
|
|
|
|
g_queue_push_tail_link(q, &pentry->lru_link);
|
|
|
|
while (q->length > limit) {
|
|
|
|
GList *lru_link =g_queue_peek_head_link(q);
|
|
|
|
liFetchEntryP *purge_entry = LI_CONTAINER_OF(lru_link, liFetchEntryP, lru_link);
|
|
|
|
g_hash_table_remove(db->cache, purge_entry->public.key);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static void fetch_db_int_acquire(liFetchDatabase* db) {
|
|
|
|
assert(g_atomic_int_get(&db->internal_refcount) > 0);
|
|
|
|
g_atomic_int_inc(&db->internal_refcount);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void fetch_db_int_release(liFetchDatabase* db) {
|
|
|
|
assert(g_atomic_int_get(&db->internal_refcount) > 0);
|
|
|
|
if (g_atomic_int_dec_and_test(&db->internal_refcount)) {
|
|
|
|
db->internal_refcount = 1;
|
|
|
|
assert(NULL == db->cache);
|
|
|
|
|
|
|
|
if (NULL != db->callbacks->free_db) db->callbacks->free_db(db->data);
|
|
|
|
|
|
|
|
g_mutex_free(db->lock);
|
|
|
|
db->lock = NULL;
|
|
|
|
db->data = NULL;
|
|
|
|
db->callbacks = NULL;
|
|
|
|
assert(1 == db->internal_refcount);
|
|
|
|
g_slice_free(liFetchDatabase, db);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void li_fetch_database_acquire(liFetchDatabase* db) {
|
|
|
|
assert(g_atomic_int_get(&db->refcount) > 0);
|
|
|
|
assert(g_atomic_int_get(&db->internal_refcount) > 0);
|
|
|
|
g_atomic_int_inc(&db->refcount);
|
|
|
|
}
|
|
|
|
|
|
|
|
void li_fetch_database_release(liFetchDatabase* db) {
|
|
|
|
assert(g_atomic_int_get(&db->refcount) > 0);
|
|
|
|
assert(g_atomic_int_get(&db->internal_refcount) > 0);
|
|
|
|
if (g_atomic_int_dec_and_test(&db->refcount)) {
|
|
|
|
GHashTable *cache = db->cache;
|
|
|
|
db->refcount = 1;
|
|
|
|
|
|
|
|
g_mutex_lock(db->lock);
|
|
|
|
|
|
|
|
assert(NULL != cache);
|
|
|
|
db->cache = NULL;
|
|
|
|
g_hash_table_destroy(cache);
|
|
|
|
|
|
|
|
g_mutex_unlock(db->lock);
|
|
|
|
|
|
|
|
assert(1 == db->refcount);
|
|
|
|
db->refcount = 0;
|
|
|
|
fetch_db_int_release(db);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void li_fetch_entry_acquire(liFetchEntry *entry) {
|
|
|
|
liFetchEntryP *pentry = LI_CONTAINER_OF(entry, liFetchEntryP, public);
|
|
|
|
assert(g_atomic_int_get(&pentry->refcount) > 0);
|
|
|
|
g_atomic_int_inc(&pentry->refcount);
|
|
|
|
}
|
|
|
|
|
|
|
|
void li_fetch_entry_release(liFetchEntry *entry) {
|
|
|
|
liFetchEntryP *pentry;
|
|
|
|
if (NULL == entry) return;
|
|
|
|
|
|
|
|
pentry = LI_CONTAINER_OF(entry, liFetchEntryP, public);
|
|
|
|
assert(g_atomic_int_get(&pentry->refcount) > 0);
|
|
|
|
if (g_atomic_int_dec_and_test(&pentry->refcount)) {
|
|
|
|
liFetchEntryState state = g_atomic_int_get(&pentry->state);
|
|
|
|
liFetchDatabase *db = pentry->db;
|
|
|
|
pentry->refcount = 1;
|
|
|
|
|
|
|
|
if (NULL != db->callbacks->free_entry) db->callbacks->free_entry(db->data, entry);
|
|
|
|
pentry->public.data = pentry->public.backend_data = NULL;
|
|
|
|
g_string_free(pentry->public.key, TRUE);
|
|
|
|
pentry->public.key = NULL;
|
|
|
|
|
|
|
|
assert(LI_ENTRY_INVALID == state);
|
|
|
|
|
|
|
|
pentry->db = NULL;
|
|
|
|
fetch_db_int_release(db);
|
|
|
|
assert(1 == pentry->refcount);
|
|
|
|
pentry->refcount = 0;
|
|
|
|
g_slice_free(liFetchEntryP, pentry);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void li_fetch_invalidate(liFetchDatabase* db, GString *key) {
|
|
|
|
fetch_db_int_acquire(db);
|
|
|
|
g_mutex_lock(db->lock);
|
|
|
|
|
|
|
|
{
|
|
|
|
liFetchEntryP *pentry = g_hash_table_lookup(db->cache, key);
|
|
|
|
liFetchEntryState state;
|
|
|
|
|
|
|
|
if (NULL == pentry) goto out;
|
|
|
|
|
|
|
|
state = g_atomic_int_get(&pentry->state);
|
|
|
|
assert(LI_ENTRY_REFRESH_NEW != state && LI_ENTRY_INVALID != state); /* this is never in the cache */
|
|
|
|
switch (state) {
|
|
|
|
case LI_ENTRY_LOOKUP:
|
|
|
|
goto out;
|
|
|
|
case LI_ENTRY_VALID:
|
|
|
|
/* hashtable destroy callback will mark as invalid */
|
|
|
|
break;
|
|
|
|
case LI_ENTRY_REFRESH_OLD:
|
|
|
|
g_atomic_int_set(&pentry->state, LI_ENTRY_REFRESH_INVALID);
|
|
|
|
goto out;
|
|
|
|
case LI_ENTRY_REFRESH_INVALID:
|
|
|
|
goto out;
|
|
|
|
case LI_ENTRY_REFRESH_NEW: /* not reachable */
|
|
|
|
break;
|
|
|
|
case LI_ENTRY_INVALID: /* not reachable */
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
g_hash_table_remove(db->cache, key);
|
|
|
|
}
|
|
|
|
|
|
|
|
out:
|
|
|
|
g_mutex_unlock(db->lock);
|
|
|
|
fetch_db_int_release(db);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* db is already locked */
|
|
|
|
static void cache_delete_data_cb(gpointer data) {
|
|
|
|
liFetchEntryP *pentry = data;
|
|
|
|
liFetchEntryState state = g_atomic_int_get(&pentry->state);
|
|
|
|
GQueue wait_queue = G_QUEUE_INIT;
|
|
|
|
liFetchEntryP *new_entry = NULL;
|
|
|
|
|
|
|
|
assert(LI_ENTRY_REFRESH_NEW != state && LI_ENTRY_INVALID != state); /* this is never in the cache */
|
|
|
|
|
|
|
|
switch (state) {
|
|
|
|
case LI_ENTRY_LOOKUP:
|
|
|
|
wait_queue = entry_extract_wait_queue(pentry);
|
|
|
|
break;
|
|
|
|
case LI_ENTRY_VALID:
|
|
|
|
remove_from_lru(pentry);
|
|
|
|
break;
|
|
|
|
case LI_ENTRY_REFRESH_OLD:
|
|
|
|
new_entry = pentry->refreshing;
|
|
|
|
pentry->refreshing = NULL;
|
|
|
|
break;
|
|
|
|
case LI_ENTRY_REFRESH_INVALID:
|
|
|
|
new_entry = pentry->refreshing;
|
|
|
|
pentry->refreshing = NULL;
|
|
|
|
wait_queue = pentry->wait_queue;
|
|
|
|
g_queue_init(&pentry->wait_queue);
|
|
|
|
break;
|
|
|
|
case LI_ENTRY_REFRESH_NEW: /* not reachable */
|
|
|
|
break;
|
|
|
|
case LI_ENTRY_INVALID: /* not reachable */
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
g_atomic_int_set(&pentry->state, LI_ENTRY_INVALID);
|
|
|
|
|
|
|
|
if (NULL != new_entry) {
|
|
|
|
assert(pentry == new_entry->refreshing);
|
|
|
|
assert(LI_ENTRY_REFRESH_NEW == g_atomic_int_get(&new_entry->state));
|
|
|
|
new_entry->refreshing = NULL;
|
|
|
|
g_atomic_int_set(&new_entry->state, LI_ENTRY_INVALID);
|
|
|
|
li_fetch_entry_release(&pentry->public);
|
|
|
|
li_fetch_entry_release(&new_entry->public);
|
|
|
|
}
|
|
|
|
|
|
|
|
wakeup_jobs(&wait_queue);
|
|
|
|
|
|
|
|
li_fetch_entry_release(&pentry->public);
|
|
|
|
}
|
|
|
|
|
|
|
|
liFetchDatabase* li_fetch_database_new(const liFetchCallbacks *callbacks, gpointer data, guint cache_size, guint neg_cache_size) {
|
|
|
|
liFetchDatabase *db = g_slice_new0(liFetchDatabase);
|
|
|
|
|
|
|
|
db->refcount = 1;
|
|
|
|
db->internal_refcount = 1; /* db->refcount > 0 keeps internal ref */
|
|
|
|
db->lock = g_mutex_new();
|
|
|
|
db->cache = g_hash_table_new_full((GHashFunc) g_string_hash, (GEqualFunc) g_string_equal, NULL, cache_delete_data_cb);
|
|
|
|
db->cache_size = cache_size;
|
|
|
|
db->neg_cache_size = neg_cache_size;
|
|
|
|
db->callbacks = callbacks;
|
|
|
|
db->data = data;
|
|
|
|
|
|
|
|
return db;
|
|
|
|
}
|
|
|
|
|
|
|
|
void li_fetch_entry_ready(liFetchEntry *entry) {
|
|
|
|
/* releases entry reference */
|
|
|
|
liFetchEntryP *pentry = LI_CONTAINER_OF(entry, liFetchEntryP, public);
|
|
|
|
liFetchDatabase *db = pentry->db;
|
|
|
|
GQueue wait_queue = G_QUEUE_INIT;
|
|
|
|
|
|
|
|
g_mutex_lock(db->lock);
|
|
|
|
|
|
|
|
{
|
|
|
|
liFetchEntryState state = g_atomic_int_get(&pentry->state);
|
|
|
|
wait_queue = entry_extract_wait_queue(pentry);
|
|
|
|
|
|
|
|
if (LI_ENTRY_INVALID == state) goto out;
|
|
|
|
|
|
|
|
assert(LI_ENTRY_LOOKUP == state);
|
|
|
|
|
|
|
|
g_atomic_int_set(&pentry->state, LI_ENTRY_VALID);
|
|
|
|
append_to_lru(pentry);
|
|
|
|
}
|
|
|
|
|
|
|
|
out:
|
|
|
|
g_mutex_unlock(db->lock);
|
|
|
|
|
|
|
|
li_fetch_entry_release(entry);
|
|
|
|
|
|
|
|
wakeup_jobs(&wait_queue);
|
|
|
|
}
|
|
|
|
|
|
|
|
void li_fetch_entry_refresh(liFetchEntry *entry) {
|
|
|
|
liFetchEntryP *pentry = LI_CONTAINER_OF(entry, liFetchEntryP, public);
|
|
|
|
liFetchDatabase *db = pentry->db;
|
|
|
|
liFetchEntryP *new_entry = NULL;
|
|
|
|
|
|
|
|
li_fetch_entry_acquire(entry);
|
|
|
|
g_mutex_lock(db->lock);
|
|
|
|
|
|
|
|
{
|
|
|
|
liFetchEntryState state = g_atomic_int_get(&pentry->state);
|
|
|
|
|
|
|
|
if (NULL == db->cache) goto out; /* cache already destroyed, nothing to refresh */
|
|
|
|
|
|
|
|
switch (state) {
|
|
|
|
case LI_ENTRY_LOOKUP:
|
|
|
|
goto out;
|
|
|
|
case LI_ENTRY_VALID:
|
|
|
|
remove_from_lru(pentry);
|
|
|
|
break;
|
|
|
|
case LI_ENTRY_REFRESH_OLD:
|
|
|
|
goto out;
|
|
|
|
case LI_ENTRY_REFRESH_INVALID:
|
|
|
|
goto out;
|
|
|
|
case LI_ENTRY_REFRESH_NEW:
|
|
|
|
goto out;
|
|
|
|
case LI_ENTRY_INVALID:
|
|
|
|
goto out;
|
|
|
|
}
|
|
|
|
|
|
|
|
fetch_db_int_acquire(db);
|
|
|
|
new_entry = g_slice_new0(liFetchEntryP);
|
|
|
|
new_entry->db = db;
|
|
|
|
new_entry->state = LI_ENTRY_REFRESH_NEW;
|
|
|
|
new_entry->refcount = 2; /* one in pentry->refreshing, the other for the callback */
|
|
|
|
new_entry->public.key = g_string_new_len(GSTR_LEN(entry->key));
|
|
|
|
new_entry->public.data = new_entry->public.backend_data = NULL;
|
|
|
|
|
|
|
|
g_atomic_int_set(&pentry->state, LI_ENTRY_REFRESH_OLD);
|
|
|
|
|
|
|
|
li_fetch_entry_acquire(entry);
|
|
|
|
new_entry->refreshing = pentry;
|
|
|
|
pentry->refreshing = new_entry;
|
|
|
|
}
|
|
|
|
|
|
|
|
out:
|
|
|
|
g_mutex_unlock(db->lock);
|
|
|
|
|
|
|
|
if (NULL != new_entry) {
|
|
|
|
db->callbacks->refresh(db, db->data, &pentry->public, &new_entry->public);
|
|
|
|
}
|
|
|
|
|
|
|
|
li_fetch_entry_release(entry);
|
|
|
|
}
|
|
|
|
|
|
|
|
void li_fetch_entry_refresh_skip(liFetchEntry *new_entry) {
|
|
|
|
/* releases new_entry reference */
|
|
|
|
liFetchEntryP *pnew_entry = LI_CONTAINER_OF(new_entry, liFetchEntryP, public);
|
|
|
|
liFetchDatabase *db = pnew_entry->db;
|
|
|
|
liFetchEntryP *pentry;
|
|
|
|
gboolean new_lookup = FALSE;
|
|
|
|
|
|
|
|
g_mutex_lock(db->lock);
|
|
|
|
|
|
|
|
{
|
|
|
|
liFetchEntryState state = g_atomic_int_get(&pnew_entry->state);
|
|
|
|
g_atomic_int_set(&pnew_entry->state, LI_ENTRY_INVALID);
|
|
|
|
|
|
|
|
if (LI_ENTRY_INVALID == state) goto out;
|
|
|
|
|
|
|
|
assert(LI_ENTRY_REFRESH_NEW == state);
|
|
|
|
|
|
|
|
pentry = pnew_entry->refreshing;
|
|
|
|
pnew_entry->refreshing = NULL;
|
|
|
|
assert(pnew_entry == pentry->refreshing);
|
|
|
|
|
|
|
|
state = g_atomic_int_get(&pentry->state);
|
|
|
|
assert(LI_ENTRY_REFRESH_OLD == state || LI_ENTRY_REFRESH_INVALID == state);
|
|
|
|
if (LI_ENTRY_REFRESH_OLD == state) {
|
|
|
|
g_atomic_int_set(&pentry->state, LI_ENTRY_VALID);
|
|
|
|
append_to_lru(pentry);
|
|
|
|
} else { /* LI_ENTRY_REFRESH_INVALID */
|
|
|
|
if (pnew_entry->wait_queue.length > 0) {
|
|
|
|
/* someone could be waiting. trigger a new lookup */
|
|
|
|
g_atomic_int_set(&pnew_entry->state, LI_ENTRY_LOOKUP);
|
|
|
|
new_lookup = TRUE;
|
|
|
|
/* use new entry, as waiting entries are using it */
|
|
|
|
g_hash_table_replace(db->cache, pnew_entry->public.key, pnew_entry);
|
|
|
|
} else {
|
|
|
|
/* no one waiting, just remove the cache entry */
|
|
|
|
g_hash_table_remove(db->cache, pentry->public.key);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
li_fetch_entry_release(&pnew_entry->public);
|
|
|
|
li_fetch_entry_release(&pentry->public);
|
|
|
|
}
|
|
|
|
|
|
|
|
out:
|
|
|
|
g_mutex_unlock(db->lock);
|
|
|
|
|
|
|
|
if (new_lookup) {
|
|
|
|
db->callbacks->lookup(db, db->data, &pnew_entry->public);
|
|
|
|
} else {
|
|
|
|
li_fetch_entry_release(&pnew_entry->public);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void li_fetch_entry_refresh_ready(liFetchEntry *new_entry) {
|
|
|
|
/* releases new_entry reference */
|
|
|
|
liFetchEntryP *pnew_entry = LI_CONTAINER_OF(new_entry, liFetchEntryP, public);
|
|
|
|
liFetchDatabase *db = pnew_entry->db;
|
|
|
|
liFetchEntryP *pentry;
|
|
|
|
|
|
|
|
GQueue wait_queue = G_QUEUE_INIT;
|
|
|
|
|
|
|
|
g_mutex_lock(db->lock);
|
|
|
|
|
|
|
|
{
|
|
|
|
liFetchEntryState state = g_atomic_int_get(&pnew_entry->state);
|
|
|
|
|
|
|
|
if (LI_ENTRY_INVALID == state) goto out;
|
|
|
|
|
|
|
|
assert(LI_ENTRY_REFRESH_NEW == state);
|
|
|
|
|
|
|
|
pentry = pnew_entry->refreshing;
|
|
|
|
pnew_entry->refreshing = NULL;
|
|
|
|
assert(pnew_entry == pentry->refreshing);
|
|
|
|
|
|
|
|
state = g_atomic_int_get(&pentry->state);
|
|
|
|
assert(LI_ENTRY_REFRESH_OLD == state || LI_ENTRY_REFRESH_INVALID == state);
|
|
|
|
|
|
|
|
wait_queue = entry_extract_wait_queue(pnew_entry);
|
|
|
|
|
|
|
|
li_fetch_entry_release(&pentry->public);
|
|
|
|
/* not releasing &pnew_entry->public - insert into cache instead */
|
|
|
|
|
|
|
|
g_atomic_int_set(&pentry->state, LI_ENTRY_INVALID);
|
|
|
|
g_atomic_int_set(&pnew_entry->state, LI_ENTRY_VALID);
|
|
|
|
g_hash_table_replace(db->cache, pentry->public.key, pentry);
|
|
|
|
append_to_lru(pnew_entry);
|
|
|
|
/* old entry wasn't in LRU anymore as it was in a REFRESH, not in a VALID state */
|
|
|
|
}
|
|
|
|
|
|
|
|
out:
|
|
|
|
g_mutex_unlock(db->lock);
|
|
|
|
|
|
|
|
li_fetch_entry_release(&pnew_entry->public);
|
|
|
|
|
|
|
|
wakeup_jobs(&wait_queue);
|
|
|
|
}
|
|
|
|
|
|
|
|
gboolean li_fetch_entry_revalidate(liFetchEntry *entry) {
|
|
|
|
liFetchEntryP *pentry = LI_CONTAINER_OF(entry, liFetchEntryP, public);
|
|
|
|
liFetchDatabase *db = pentry->db;
|
|
|
|
liFetchEntryState state = g_atomic_int_get(&pentry->state);
|
|
|
|
|
|
|
|
assert(LI_ENTRY_REFRESH_NEW != state);
|
|
|
|
switch (state) {
|
|
|
|
case LI_ENTRY_LOOKUP:
|
|
|
|
return FALSE;
|
|
|
|
case LI_ENTRY_VALID:
|
|
|
|
break;
|
|
|
|
case LI_ENTRY_REFRESH_OLD:
|
|
|
|
break;
|
|
|
|
case LI_ENTRY_REFRESH_INVALID:
|
|
|
|
return FALSE;
|
|
|
|
case LI_ENTRY_REFRESH_NEW:
|
|
|
|
return FALSE;
|
|
|
|
case LI_ENTRY_INVALID:
|
|
|
|
return FALSE;
|
|
|
|
}
|
|
|
|
|
|
|
|
return db->callbacks->revalidate(db, db->data, entry);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void wakeup_jobref(gpointer data) {
|
|
|
|
liJobRef *jobref = (liJobRef*) data;
|
|
|
|
li_job_async(jobref);
|
|
|
|
li_job_ref_release(jobref);
|
|
|
|
}
|
|
|
|
|
|
|
|
liFetchEntry* li_fetch_get(liFetchDatabase* db, GString *key, liJobRef *jobref, liFetchWait **wait) {
|
|
|
|
liFetchEntry *entry;
|
|
|
|
|
|
|
|
if (NULL == *wait) {
|
|
|
|
li_job_ref_acquire(jobref);
|
|
|
|
}
|
|
|
|
|
|
|
|
entry = li_fetch_get2(db, key, wakeup_jobref, jobref, wait);
|
|
|
|
|
|
|
|
if (NULL == *wait) {
|
|
|
|
li_job_ref_release(jobref);
|
|
|
|
}
|
|
|
|
|
|
|
|
return entry;
|
|
|
|
}
|
|
|
|
|
|
|
|
liFetchEntry* li_fetch_get2(liFetchDatabase* db, GString *key, liFetchWakeupCB wakeup, gpointer wakeup_data, liFetchWait **wait) {
|
|
|
|
liFetchEntryP *pentry, *pentry_new = NULL;
|
|
|
|
liFetchEntryP *result = NULL;
|
|
|
|
liFetchEntryState state;
|
|
|
|
|
|
|
|
assert(NULL != wakeup);
|
|
|
|
|
|
|
|
g_mutex_lock(db->lock);
|
|
|
|
|
|
|
|
if (*wait != NULL) {
|
|
|
|
pentry = (liFetchEntryP*) *wait;
|
|
|
|
|
|
|
|
if (wakeup_has_job(&pentry->wait_queue, wakeup, wakeup_data)) goto out; /* not notified yet */
|
|
|
|
|
|
|
|
/* notified, don't care about state - it was good once */
|
|
|
|
*wait = NULL;
|
|
|
|
result = pentry;
|
|
|
|
goto out;
|
|
|
|
}
|
|
|
|
|
|
|
|
pentry = g_hash_table_lookup(db->cache, key);
|
|
|
|
if (NULL != pentry) {
|
|
|
|
li_fetch_entry_acquire(&pentry->public);
|
|
|
|
g_mutex_unlock(db->lock);
|
|
|
|
/* unlock for revalidation and refresh */
|
|
|
|
if (li_fetch_entry_revalidate(&pentry->public)) {
|
|
|
|
/* db already unlocked */
|
|
|
|
return &pentry->public;
|
|
|
|
}
|
|
|
|
li_fetch_entry_refresh(&pentry->public);
|
|
|
|
g_mutex_lock(db->lock);
|
|
|
|
|
|
|
|
pentry_new = pentry;
|
|
|
|
|
|
|
|
state = g_atomic_int_get(&pentry->state);
|
|
|
|
assert(LI_ENTRY_REFRESH_NEW != state);
|
|
|
|
switch (state) {
|
|
|
|
case LI_ENTRY_LOOKUP:
|
|
|
|
/* lookup in progress, just register wakeup */
|
|
|
|
break;
|
|
|
|
case LI_ENTRY_VALID:
|
|
|
|
/* refresh already done - fresh entry, return */
|
|
|
|
result = pentry;
|
|
|
|
goto out;
|
|
|
|
case LI_ENTRY_REFRESH_OLD:
|
|
|
|
/* refresh in progress, but data is now invalid. register wakeup */
|
|
|
|
g_atomic_int_set(&pentry->state, LI_ENTRY_REFRESH_INVALID);
|
|
|
|
pentry_new = pentry->refreshing; /* wait on refresh "new" entry */
|
|
|
|
li_fetch_entry_acquire(&pentry_new->public);
|
|
|
|
li_fetch_entry_release(&pentry->public);
|
|
|
|
break;
|
|
|
|
case LI_ENTRY_REFRESH_INVALID:
|
|
|
|
/* refresh in progress, already invalid. just register wakeup */
|
|
|
|
pentry_new = pentry->refreshing; /* wait on refresh "new" entry */
|
|
|
|
li_fetch_entry_acquire(&pentry_new->public);
|
|
|
|
li_fetch_entry_release(&pentry->public);
|
|
|
|
break;
|
|
|
|
case LI_ENTRY_REFRESH_NEW: /* not reachable */
|
|
|
|
break;
|
|
|
|
case LI_ENTRY_INVALID:
|
|
|
|
/* entry got removed from cache, can't add wakeup anymore */
|
|
|
|
/* if there is an entry in the cache it is fresh enough,
|
|
|
|
* otherwise do a new lookup */
|
|
|
|
li_fetch_entry_release(&pentry->public);
|
|
|
|
result = g_hash_table_lookup(db->cache, key);
|
|
|
|
if (NULL != result) {
|
|
|
|
li_fetch_entry_acquire(&pentry_new->public);
|
|
|
|
goto out;
|
|
|
|
}
|
|
|
|
goto create_new_entry;
|
|
|
|
}
|
|
|
|
|
|
|
|
wakeup_add_job(&pentry_new->wait_queue, wakeup, wakeup_data);
|
|
|
|
|
|
|
|
*wait = (liFetchWait*) pentry_new;
|
|
|
|
} else {
|
|
|
|
create_new_entry:
|
|
|
|
/* create new entry */
|
|
|
|
fetch_db_int_acquire(db);
|
|
|
|
pentry = g_slice_new0(liFetchEntryP);
|
|
|
|
pentry->db = db;
|
|
|
|
pentry->state = LI_ENTRY_LOOKUP;
|
|
|
|
pentry->refcount = 3; /* one for us, one for cache, one for the callback */
|
|
|
|
pentry->public.key = g_string_new_len(GSTR_LEN(key));
|
|
|
|
pentry->public.data = pentry->public.backend_data = NULL;
|
|
|
|
|
|
|
|
g_hash_table_replace(db->cache, pentry->public.key, pentry);
|
|
|
|
|
|
|
|
g_mutex_unlock(db->lock);
|
|
|
|
/* unlock for lookup */
|
|
|
|
db->callbacks->lookup(db, db->data, &pentry->public);
|
|
|
|
|
|
|
|
state = g_atomic_int_get(&pentry->state);
|
|
|
|
if (LI_ENTRY_LOOKUP != state) return &pentry->public;
|
|
|
|
g_mutex_lock(db->lock);
|
|
|
|
|
|
|
|
/* do we need to register wakeup? */
|
|
|
|
|
|
|
|
/* read again */
|
|
|
|
state = g_atomic_int_get(&pentry->state);
|
|
|
|
if (LI_ENTRY_LOOKUP != state) {
|
|
|
|
/* catch especially LI_ENTRY_INVALID which doesn't have a wait_queue anymore.
|
|
|
|
* any entry is fine here, we just asked for the lookup! - don't care how "valid" it is */
|
|
|
|
result = pentry;
|
|
|
|
goto out;
|
|
|
|
}
|
|
|
|
|
2013-06-09 16:09:59 +00:00
|
|
|
wakeup_add_job(&pentry->wait_queue, wakeup, wakeup_data);
|
2013-06-03 23:12:35 +00:00
|
|
|
|
|
|
|
*wait = (liFetchWait*) pentry;
|
|
|
|
}
|
|
|
|
|
|
|
|
out:
|
|
|
|
g_mutex_unlock(db->lock);
|
|
|
|
|
|
|
|
return (NULL != result) ? &result->public : NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
void li_fetch_cancel(liFetchWait **wait) {
|
|
|
|
liFetchEntryP* pentry = (liFetchEntryP*) *wait;
|
|
|
|
if (NULL != pentry) li_fetch_entry_release(&pentry->public);
|
|
|
|
}
|