|
|
|
@ -15,7 +15,7 @@ struct liBackendWait {
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
liBackendConnection_p *con;
|
|
|
|
|
GList link; /* link in wait_queue (either pool or wpool) */
|
|
|
|
|
GList wait_queue_link; /* link in wait_queue (either pool or wpool) */
|
|
|
|
|
|
|
|
|
|
gboolean failed;
|
|
|
|
|
liVRequest *vr;
|
|
|
|
@ -264,7 +264,7 @@ static void S_backend_pool_failed(liBackendWorkerPool *wpool) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
while (NULL != (elem = g_queue_pop_head_link(&pool->wait_queue))) {
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(elem, liBackendWait, link);
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(elem, liBackendWait, wait_queue_link);
|
|
|
|
|
bwait->failed = TRUE;
|
|
|
|
|
li_job_async(bwait->vr_ref);
|
|
|
|
|
}
|
|
|
|
@ -272,7 +272,7 @@ static void S_backend_pool_failed(liBackendWorkerPool *wpool) {
|
|
|
|
|
for (guint i = 0, len = wpool->wrk->srv->worker_count; i < len; ++i) {
|
|
|
|
|
liBackendWorkerPool *_wpool = &pool->worker_pools[i];
|
|
|
|
|
while (NULL != (elem = g_queue_pop_head_link(&_wpool->wait_queue))) {
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(elem, liBackendWait, link);
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(elem, liBackendWait, wait_queue_link);
|
|
|
|
|
bwait->failed = TRUE;
|
|
|
|
|
li_job_async(bwait->vr_ref);
|
|
|
|
|
}
|
|
|
|
@ -440,7 +440,7 @@ static void S_backend_pool_distribute(liBackendPool_p *pool, liWorker *wrk) {
|
|
|
|
|
if (0 == wpool->wait_queue.length) return;
|
|
|
|
|
|
|
|
|
|
while (wpool->wait_queue.length > 0 && wpool->idle > 0) {
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(g_queue_pop_head_link(&wpool->wait_queue), liBackendWait, link);
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(g_queue_pop_head_link(&wpool->wait_queue), liBackendWait, wait_queue_link);
|
|
|
|
|
liBackendConnection_p *con = g_ptr_array_index(wpool->connections, wpool->active + wpool->reserved);
|
|
|
|
|
|
|
|
|
|
bwait->con = con;
|
|
|
|
@ -481,9 +481,9 @@ static void S_backend_pool_distribute(liBackendPool_p *pool, liWorker *wrk) {
|
|
|
|
|
{
|
|
|
|
|
guint i;
|
|
|
|
|
for (i = use; i > 0; --i) {
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(g_queue_pop_head_link(&pool->wait_queue), liBackendWait, link);
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(g_queue_pop_head_link(&pool->wait_queue), liBackendWait, wait_queue_link);
|
|
|
|
|
liBackendWorkerPool *wpool = &pool->worker_pools[bwait->vr->wrk->ndx];
|
|
|
|
|
g_queue_push_tail_link(&wpool->wait_queue, &bwait->link);
|
|
|
|
|
g_queue_push_tail_link(&wpool->wait_queue, &bwait->wait_queue_link);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -495,7 +495,7 @@ static void S_backend_pool_distribute(liBackendPool_p *pool, liWorker *wrk) {
|
|
|
|
|
ERROR(wrk->srv, "pool %i: queue: %i, idle: %i", i, wpool->wait_queue.length, wpool->idle);
|
|
|
|
|
|
|
|
|
|
while (wpool->wait_queue.length > 0 && wpool->idle > 0) {
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(g_queue_pop_head_link(&wpool->wait_queue), liBackendWait, link);
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(g_queue_pop_head_link(&wpool->wait_queue), liBackendWait, wait_queue_link);
|
|
|
|
|
liBackendConnection_p *con = g_ptr_array_index(wpool->connections, wpool->active + wpool->reserved);
|
|
|
|
|
|
|
|
|
|
bwait->con = con;
|
|
|
|
@ -521,7 +521,7 @@ static void S_backend_pool_distribute(liBackendPool_p *pool, liWorker *wrk) {
|
|
|
|
|
liBackendWorkerPool *wpool = &pool->worker_pools[i];
|
|
|
|
|
|
|
|
|
|
while (wpool->wait_queue.length > 0) {
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(g_queue_pop_head_link(&wpool->wait_queue), liBackendWait, link);
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(g_queue_pop_head_link(&wpool->wait_queue), liBackendWait, wait_queue_link);
|
|
|
|
|
liBackendConnection_p *con;
|
|
|
|
|
liBackendWorkerPool *srcpool;
|
|
|
|
|
|
|
|
|
@ -562,35 +562,35 @@ static void S_backend_pool_distribute(liBackendPool_p *pool, liWorker *wrk) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void S_backend_wait_queue_unshift(GQueue *queue, GList *link) {
|
|
|
|
|
static void S_backend_wait_queue_unshift(GQueue *queue, GList *lnk) {
|
|
|
|
|
if (0 == queue->length) {
|
|
|
|
|
g_queue_push_head_link(queue, link);
|
|
|
|
|
g_queue_push_head_link(queue, lnk);
|
|
|
|
|
} else {
|
|
|
|
|
liBackendWait *link_wait = LI_CONTAINER_OF(link, liBackendWait, link);
|
|
|
|
|
liBackendWait *link_wait = LI_CONTAINER_OF(lnk, liBackendWait, wait_queue_link);
|
|
|
|
|
GList *cursor = queue->head;
|
|
|
|
|
liBackendWait *wait = LI_CONTAINER_OF(cursor, liBackendWait, link);
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(cursor, liBackendWait, wait_queue_link);
|
|
|
|
|
|
|
|
|
|
if (wait->ts_started > link_wait->ts_started) {
|
|
|
|
|
g_queue_push_head_link(queue, link);
|
|
|
|
|
if (bwait->ts_started > link_wait->ts_started) {
|
|
|
|
|
g_queue_push_head_link(queue, lnk);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
do {
|
|
|
|
|
cursor = cursor->next;
|
|
|
|
|
if (NULL == cursor) {
|
|
|
|
|
g_queue_push_tail_link(queue, link);
|
|
|
|
|
g_queue_push_tail_link(queue, lnk);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
wait = LI_CONTAINER_OF(cursor, liBackendWait, link);
|
|
|
|
|
} while (wait->ts_started < link_wait->ts_started);
|
|
|
|
|
bwait = LI_CONTAINER_OF(cursor, liBackendWait, wait_queue_link);
|
|
|
|
|
} while (bwait->ts_started < link_wait->ts_started);
|
|
|
|
|
|
|
|
|
|
/* insert link before cursor; link will neither be the first nor the last element,
|
|
|
|
|
/* insert lnk before cursor; lnk will neither be the first nor the last element,
|
|
|
|
|
* so we don't have to udpate queue->head/tail
|
|
|
|
|
*/
|
|
|
|
|
link->next = cursor;
|
|
|
|
|
link->prev = cursor->prev;
|
|
|
|
|
cursor->prev->next = link;
|
|
|
|
|
cursor->prev = link;
|
|
|
|
|
lnk->next = cursor;
|
|
|
|
|
lnk->prev = cursor->prev;
|
|
|
|
|
cursor->prev->next = lnk;
|
|
|
|
|
cursor->prev = lnk;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -604,9 +604,9 @@ static void backend_connection_close(liBackendPool_p *pool, liBackendConnection_
|
|
|
|
|
if (NULL != con->wait) {
|
|
|
|
|
con->wait->con = NULL;
|
|
|
|
|
if (pool->public.config->max_connections <= 0) {
|
|
|
|
|
S_backend_wait_queue_unshift(&wpool->wait_queue, &con->wait->link);
|
|
|
|
|
S_backend_wait_queue_unshift(&wpool->wait_queue, &con->wait->wait_queue_link);
|
|
|
|
|
} else {
|
|
|
|
|
S_backend_wait_queue_unshift(&pool->wait_queue, &con->wait->link);
|
|
|
|
|
S_backend_wait_queue_unshift(&pool->wait_queue, &con->wait->wait_queue_link);
|
|
|
|
|
}
|
|
|
|
|
S_backend_pool_distribute(pool, con->wait->vr->wrk);
|
|
|
|
|
con->wait = NULL;
|
|
|
|
@ -728,7 +728,7 @@ static void S_backend_pool_update_wait_queue_timer(liBackendWorkerPool *wpool) {
|
|
|
|
|
|
|
|
|
|
if (pool->wait_queue.length > 0) {
|
|
|
|
|
li_tstamp now = li_cur_ts(wpool->wrk);
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(g_queue_peek_head_link(&pool->wait_queue), liBackendWait, link);
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(g_queue_peek_head_link(&pool->wait_queue), liBackendWait, wait_queue_link);
|
|
|
|
|
li_tstamp repeat = bwait->ts_started + pool->public.config->wait_timeout - now;
|
|
|
|
|
|
|
|
|
|
if (repeat < 0.05) repeat = 0.05;
|
|
|
|
@ -750,7 +750,7 @@ static void backend_pool_wait_queue_timeout(liEventBase *watcher, int events) {
|
|
|
|
|
g_mutex_lock(pool->lock);
|
|
|
|
|
|
|
|
|
|
while (pool->wait_queue.length > 0) {
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(g_queue_peek_head_link(&pool->wait_queue), liBackendWait, link);
|
|
|
|
|
liBackendWait *bwait = LI_CONTAINER_OF(g_queue_peek_head_link(&pool->wait_queue), liBackendWait, wait_queue_link);
|
|
|
|
|
|
|
|
|
|
if (bwait->ts_started <= due) {
|
|
|
|
|
g_queue_pop_head_link(&pool->wait_queue);
|
|
|
|
@ -953,9 +953,9 @@ liBackendResult li_backend_get(liVRequest *vr, liBackendPool *bpool, liBackendCo
|
|
|
|
|
*pbwait = bwait;
|
|
|
|
|
|
|
|
|
|
if (pool->public.config->max_connections <= 0) {
|
|
|
|
|
g_queue_push_tail_link(&wpool->wait_queue, &bwait->link);
|
|
|
|
|
g_queue_push_tail_link(&wpool->wait_queue, &bwait->wait_queue_link);
|
|
|
|
|
} else {
|
|
|
|
|
g_queue_push_tail_link(&pool->wait_queue, &bwait->link);
|
|
|
|
|
g_queue_push_tail_link(&pool->wait_queue, &bwait->wait_queue_link);
|
|
|
|
|
S_backend_pool_update_wait_queue_timer(wpool);
|
|
|
|
|
}
|
|
|
|
|
S_backend_pool_distribute(pool, vr->wrk);
|
|
|
|
@ -1038,9 +1038,9 @@ void li_backend_wait_stop(liVRequest *vr, liBackendPool *bpool, liBackendWait **
|
|
|
|
|
bwait->con = NULL;
|
|
|
|
|
} else if (pool->public.config->max_connections <= 0) {
|
|
|
|
|
liBackendWorkerPool *wpool = &pool->worker_pools[bwait->vr->wrk->ndx];
|
|
|
|
|
g_queue_unlink(&wpool->wait_queue, &bwait->link);
|
|
|
|
|
g_queue_unlink(&wpool->wait_queue, &bwait->wait_queue_link);
|
|
|
|
|
} else {
|
|
|
|
|
g_queue_unlink(&pool->wait_queue, &bwait->link);
|
|
|
|
|
g_queue_unlink(&pool->wait_queue, &bwait->wait_queue_link);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|