summaryrefslogtreecommitdiff
path: root/ot_mutex.c
diff options
context:
space:
mode:
authorDirk Engling <erdgeist@erdgeist.org>2007-11-14 12:59:58 +0000
committerDirk Engling <erdgeist@erdgeist.org>2007-11-14 12:59:58 +0000
commitadac3bb2ab2b65915d56738225c87c9071829307 (patch)
tree89fa01799305b1590b87044b65c3db0810097e14 /ot_mutex.c
parent181afb7892e8a82c688fbef134c1b94dd12a933c (diff)
downloadopentracker-adac3bb2ab2b65915d56738225c87c9071829307.tar.gz
opentracker-adac3bb2ab2b65915d56738225c87c9071829307.zip
Introducing the workqueue
Diffstat (limited to 'ot_mutex.c')
-rw-r--r--ot_mutex.c156
1 files changed, 156 insertions, 0 deletions
diff --git a/ot_mutex.c b/ot_mutex.c
index bb82f46..b39baaa 100644
--- a/ot_mutex.c
+++ b/ot_mutex.c
@@ -4,6 +4,8 @@
/* System */
#include <pthread.h>
#include <stdio.h>
+#include <stdlib.h>
+#include <sys/mman.h>
/* Libowfat */
#include "byte.h"
@@ -15,6 +17,7 @@
/* Our global all torrents list */
static ot_vector all_torrents[OT_BUCKET_COUNT];
+/* Bucket Magic */
static int bucket_locklist[ OT_MAX_THREADS ];
static int bucket_locklist_count = 0;
static pthread_mutex_t bucket_mutex;
@@ -91,6 +94,159 @@ void mutex_bucket_unlock_by_hash( ot_hash *hash ) {
mutex_bucket_unlock( bucket );
}
+/* TaskQueue Magic */
+
+struct ot_task {
+ ot_taskid taskid;
+ ot_tasktype tasktype;
+ int64 socket;
+ int iovec_entries;
+ struct iovec *iovec;
+ struct ot_task *next;
+};
+
+static ot_taskid next_free_taskid = 1;
+static struct ot_task *tasklist = NULL;
+static pthread_mutex_t tasklist_mutex;
+static pthread_cond_t tasklist_being_filled;
+
+int mutex_workqueue_pushtask( int64 socket, ot_tasktype tasktype ) {
+ struct ot_task ** tmptask, * task;
+
+ /* Want exclusive access to tasklist */
+ pthread_mutex_lock( &tasklist_mutex );
+
+ task = malloc(sizeof( struct ot_task));
+ if( !task ) {
+ pthread_mutex_unlock( &tasklist_mutex );
+ return -1;
+ }
+
+ /* Skip to end of list */
+ tmptask = &tasklist;
+ while( *tmptask )
+ tmptask = &(*tmptask)->next;
+ *tmptask = task;
+
+ task->taskid = 0;
+ task->tasktype = tasktype;
+ task->socket = socket;
+ task->iovec_entries = 0;
+ task->iovec = NULL;
+ task->next = 0;
+
+ /* Inform waiting workers and release lock */
+ pthread_cond_broadcast( &tasklist_being_filled );
+ pthread_mutex_unlock( &tasklist_mutex );
+ return 0;
+}
+
+void mutex_workqueue_canceltask( int64 socket ) {
+ struct ot_task ** task;
+
+ /* Want exclusive access to tasklist */
+ pthread_mutex_lock( &tasklist_mutex );
+
+ task = &tasklist;
+ while( *task && ( (*task)->socket != socket ) )
+ *task = (*task)->next;
+
+ if( *task && ( (*task)->socket == socket ) ) {
+ struct iovec *iovec = (*task)->iovec;
+ struct ot_task *ptask = *task;
+ int i;
+
+ /* Free task's iovec */
+ for( i=0; i<(*task)->iovec_entries; ++i )
+ munmap( iovec[i].iov_base , iovec[i].iov_len );
+
+ *task = (*task)->next;
+ free( ptask );
+ }
+
+ /* Release lock */
+ pthread_mutex_unlock( &tasklist_mutex );
+}
+
+ot_taskid mutex_workqueue_poptask( ot_tasktype tasktype ) {
+ struct ot_task * task;
+ ot_taskid taskid = 0;
+
+ /* Want exclusive access to tasklist */
+ pthread_mutex_lock( &tasklist_mutex );
+
+ while( !taskid ) {
+ /* Skip to the first unassigned task this worker wants to do */
+ task = tasklist;
+ while( task && ( task->tasktype != tasktype ) && ( task->taskid ) )
+ task = task->next;
+
+ /* If we found an outstanding task, assign a taskid to it
+ and leave the loop */
+ if( task ) {
+ task->taskid = taskid = ++next_free_taskid;
+ break;
+ }
+
+ /* Wait until the next task is being fed */
+ pthread_cond_wait( &tasklist_being_filled, &tasklist_mutex );
+ }
+
+ /* Release lock */
+ pthread_mutex_unlock( &tasklist_mutex );
+
+ return taskid;
+}
+
+int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovec ) {
+ struct ot_task * task;
+ /* Want exclusive access to tasklist */
+ pthread_mutex_lock( &tasklist_mutex );
+
+ task = tasklist;
+ while( task && ( task->taskid != taskid ) )
+ task = task->next;
+
+ if( task ) {
+ task->iovec_entries = iovec_entries;
+ task->iovec = iovec;
+ task->tasktype = OT_TASKTYPE_DONE;
+ }
+
+ /* Release lock */
+ pthread_mutex_unlock( &tasklist_mutex );
+
+ /* Indicate whether the worker has to throw away results */
+ return task ? 0 : -1;
+}
+
+int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec ) {
+ struct ot_task ** task;
+ int64 socket = -1;
+
+ /* Want exclusive access to tasklist */
+ pthread_mutex_lock( &tasklist_mutex );
+
+ task = &tasklist;
+ while( *task && ( (*task)->tasktype != OT_TASKTYPE_DONE ) )
+ *task = (*task)->next;
+
+ if( *task && ( (*task)->tasktype == OT_TASKTYPE_DONE ) ) {
+ struct ot_task *ptask = *task;
+
+ *iovec_entries = (*task)->iovec_entries;
+ *iovec = (*task)->iovec;
+ socket = (*task)->socket;
+
+ *task = (*task)->next;
+ free( ptask );
+ }
+
+ /* Release lock */
+ pthread_mutex_unlock( &tasklist_mutex );
+ return socket;
+}
+
void mutex_init( ) {
pthread_mutex_init(&bucket_mutex, NULL);
pthread_cond_init (&bucket_being_unlocked, NULL);