|
|
|
@ -207,34 +207,21 @@ GString *li_worker_current_timestamp(liWorker *wrk, liTimeFunc timefunc, guint f
|
|
|
|
|
return wts->str;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* loop prepare watcher */
|
|
|
|
|
static void li_worker_loop_prepare_cb(struct ev_loop *loop, ev_prepare *w, int revents) {
|
|
|
|
|
static void li_worker_prepare_cb(struct ev_loop *loop, ev_prepare *w, int revents) {
|
|
|
|
|
liWorker *wrk = (liWorker*) w->data;
|
|
|
|
|
liServer *srv = wrk->srv;
|
|
|
|
|
GList *lnk;
|
|
|
|
|
UNUSED(loop);
|
|
|
|
|
UNUSED(revents);
|
|
|
|
|
|
|
|
|
|
/* are there pending log entries? */
|
|
|
|
|
if (g_queue_get_length(&wrk->log_queue)) {
|
|
|
|
|
//g_print("pending log entries: %d\n", g_queue_get_length(&wrk->log_queue));
|
|
|
|
|
/* take log entries from local queue, insert into global queue and notify log thread */
|
|
|
|
|
g_static_mutex_lock(&srv->logs.write_queue_mutex);
|
|
|
|
|
|
|
|
|
|
/* have to concatenate the queues by hand as g_queue_push_tail_link() cannot handle this simple task */
|
|
|
|
|
lnk = g_queue_peek_head_link(&wrk->log_queue);
|
|
|
|
|
lnk->prev = srv->logs.write_queue.tail;
|
|
|
|
|
if (srv->logs.write_queue.tail)
|
|
|
|
|
srv->logs.write_queue.tail->next = lnk;
|
|
|
|
|
else
|
|
|
|
|
srv->logs.write_queue.head = lnk;
|
|
|
|
|
srv->logs.write_queue.tail = g_queue_peek_tail_link(&wrk->log_queue);
|
|
|
|
|
srv->logs.write_queue.length++;
|
|
|
|
|
li_g_queue_merge(&srv->logs.write_queue, &wrk->log_queue);
|
|
|
|
|
|
|
|
|
|
g_static_mutex_unlock(&srv->logs.write_queue_mutex);
|
|
|
|
|
ev_async_send(srv->logs.loop, &srv->logs.watcher);
|
|
|
|
|
/* clear local worker queue */
|
|
|
|
|
g_queue_init(&wrk->log_queue);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -385,6 +372,11 @@ liWorker* li_worker_new(liServer *srv, struct ev_loop *loop) {
|
|
|
|
|
g_array_index(wrk->timestamps_local, liWorkerTS, i).str = g_string_sized_new(255);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ev_init(&wrk->loop_prepare, li_worker_prepare_cb);
|
|
|
|
|
wrk->loop_prepare.data = wrk;
|
|
|
|
|
ev_prepare_start(wrk->loop, &wrk->loop_prepare);
|
|
|
|
|
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
|
|
|
|
|
|
|
|
|
|
ev_init(&wrk->worker_exit_watcher, li_worker_exit_cb);
|
|
|
|
|
wrk->worker_exit_watcher.data = wrk;
|
|
|
|
|
ev_async_start(wrk->loop, &wrk->worker_exit_watcher);
|
|
|
|
@ -477,6 +469,8 @@ void li_worker_free(liWorker *wrk) {
|
|
|
|
|
li_collect_watcher_cb(wrk->loop, &wrk->collect_watcher, 0);
|
|
|
|
|
g_async_queue_unref(wrk->collect_queue);
|
|
|
|
|
|
|
|
|
|
li_ev_safe_ref_and_stop(ev_prepare_stop, wrk->loop, &wrk->loop_prepare);
|
|
|
|
|
|
|
|
|
|
g_string_free(wrk->tmp_str, TRUE);
|
|
|
|
|
|
|
|
|
|
li_stat_cache_free(wrk->stat_cache);
|
|
|
|
|