summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFelix von Leitner <felix-libowfat@fefe.de>2018-03-27 02:23:00 +0000
committerFelix von Leitner <felix-libowfat@fefe.de>2018-03-27 02:23:00 +0000
commit22408afb0e2117b1e1742e36519f9d18d8bc9c82 (patch)
treebd5cbc1fcd65ddbbbefa8d2f377dc269f4f7f484
parent9edee65561a76cccc8c9a03883db8bdbd75aa565 (diff)
downloadlibowfat-22408afb0e2117b1e1742e36519f9d18d8bc9c82.tar.gz
libowfat-22408afb0e2117b1e1742e36519f9d18d8bc9c82.zip
add experimental iom API for multithreaded I/O multiplexing (in io.h)
-rw-r--r--CHANGES1
-rw-r--r--io.h45
-rw-r--r--io/iom_abort.319
-rw-r--r--io/iom_abort.c10
-rw-r--r--io/iom_add.333
-rw-r--r--io/iom_add.c28
-rw-r--r--io/iom_init.330
-rw-r--r--io/iom_init.c38
-rw-r--r--io/iom_wait.338
-rw-r--r--io/iom_wait.c124
10 files changed, 366 insertions, 0 deletions
diff --git a/CHANGES b/CHANGES
index 552282e..a03e0fe 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,7 @@
fix fmt_ip6 (Erwin Hoffmann)
add MSG_ZEROCOPY support (only used for buffers >8k)
use write in buffer_put for a slight perf improvement
+ add experimental iom API for multithreaded I/O multiplexing (in io.h)
0.31:
special case buffer_get_token with token length 1 through memccpy (almost 4x speedup)
diff --git a/io.h b/io.h
index 3752e1b..5284245 100644
--- a/io.h
+++ b/io.h
@@ -144,6 +144,51 @@ int64 io_mmapwritefile(int64 out,int64 in,uint64 off,uint64 bytes,io_write_callb
* aid in debugging the state machine if a descriptor loops or so */
unsigned int io_debugstring(int64 s,char* buf,unsigned int bufsize);
+#ifdef __dietlibc__
+#include <threads.h>
+#else
+#include <pthread.h>
+#include <semaphore.h>
+#endif
+
+enum { SLOTS=128 };
+typedef struct iomux {
+ int ctx;
+ int working; /* used to synchronize who is filling the queue */
+ unsigned int h,l; /* high, low */
+ struct {
+ int fd, events;
+ } q[SLOTS];
+#ifdef __dietlibc__
+ mtx_t mtx;
+ cnd_t sem;
+#else
+ sem_t sem;
+#endif
+} iomux_t;
+
+
+/* Init master context */
+int iom_init(iomux_t* c);
+
+/* Add socket to iomux */
+enum {
+ IOM_READ=1,
+ IOM_WRITE=2,
+ IOM_ERROR=4
+};
+/* return -1 if error, or | of IOM_READ, IOM_WRITE or IOM_ERROR */
+int iom_add(iomux_t* c,int64 s,unsigned int events);
+
+/* Blocking wait for single event, timeout in milliseconds */
+/* return -1 if error, 0 if ok; s set to fd, revents set to known events on that fd */
+/* when done with the fd, call iom_add on it again! */
+/* This can be called by multiple threads in parallel */
+int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout);
+
+/* Call this to terminate all threads waiting in iom_wait */
+int iom_abort(iomux_t* c);
+
#ifdef __cplusplus
}
#endif
diff --git a/io/iom_abort.3 b/io/iom_abort.3
new file mode 100644
index 0000000..0336f6b
--- /dev/null
+++ b/io/iom_abort.3
@@ -0,0 +1,19 @@
+.TH iom_abort 3
+.SH NAME
+iom_abort \- abort all pending iom_wait calls
+.SH SYNTAX
+.B #include <libowfat/io.h>
+
+int \fBiom_abort\fP(iomux_t* c);
+.SH DESCRIPTION
+\fIiom_abort\fR will cause all currently running instances of
+\fIiom_wait\fR to return immediately with return value -2.
+
+.SH "LINKING"
+You may have to add \fI-lpthread\fR to the command line in the linking
+step.
+
+.SH "RETURN VALUE"
+iom_abort returns 0 on success and -1 on error, setting errno.
+.SH "SEE ALSO"
+iom_init, iom_add, iom_wait
diff --git a/io/iom_abort.c b/io/iom_abort.c
new file mode 100644
index 0000000..89d1406
--- /dev/null
+++ b/io/iom_abort.c
@@ -0,0 +1,10 @@
+#include "io_internal.h"
+
+int iom_abort(iomux_t* c) {
+ c->working=-2;
+#ifdef __dietlibc__
+ return cnd_broadcast(&c->sem);
+#else
+ return sem_post(&c->sem);
+#endif
+}
diff --git a/io/iom_add.3 b/io/iom_add.3
new file mode 100644
index 0000000..760a7d5
--- /dev/null
+++ b/io/iom_add.3
@@ -0,0 +1,33 @@
+.TH iom_add 3
+.SH NAME
+iom_add \- add event to I/O multiplexer
+.SH SYNTAX
+.B #include <libowfat/io.h>
+
+int \fBiom_add\fP(iomux_t* c, int64 fd, unsigned int events);
+.SH DESCRIPTION
+iom_add adds an event you are interested in to an I/O multiplexer.
+
+\fIfd\fR is the file descriptor (usually a socket) you are interested
+in, and \fIevents\fR is the operation you want to do. It can be IOM_READ
+or IOM_WRITE.
+
+If that operation becomes possible on that descriptor, and some thread
+is calling \fIiom_wait\fR at the time, it will return and tell you the
+fd and the event.
+
+Note that the event registration is removed from the iomux_t context if
+it occurs. You will have to call \fIiom_wait\fR again after you handled
+the event, if you are still interested in it.
+
+Closing a file descriptor with registered events will discard the event
+registration.
+
+.SH "LINKING"
+You may have to add \fI-lpthread\fR to the command line in the linking
+step.
+
+.SH "RETURN VALUE"
+iom_add returns 0 on success and -1 on error, setting errno.
+.SH "SEE ALSO"
+iom_init, iom_wait, iom_abort
diff --git a/io/iom_add.c b/io/iom_add.c
new file mode 100644
index 0000000..1d6d06f
--- /dev/null
+++ b/io/iom_add.c
@@ -0,0 +1,28 @@
+#include "io_internal.h"
+#ifdef HAVE_EPOLL
+#include <sys/epoll.h>
+#endif
+#ifdef HAVE_KQUEUE
+#include <sys/types.h>
+#include <sys/event.h>
+#include <sys/time.h>
+#endif
+
+int iom_add(iomux_t* c,int64 s,unsigned int events) {
+#ifdef HAVE_EPOLL
+ struct epoll_event e = { .events=EPOLLONESHOT, .data.fd=s };
+ if (events & IOM_READ) e.events|=EPOLLIN;
+ if (events & IOM_WRITE) e.events|=EPOLLOUT;
+ return epoll_ctl(c->ctx, EPOLL_CTL_ADD, s, &e);
+#elif defined(HAVE_KQUEUE)
+ struct kevent kev;
+ struct timespec ts = { 0 };
+ EV_SET(&kev, s,
+ (events & IOM_READ ? EVFILT_READ : 0) +
+ (events & IOM_WRITE ? EVFILT_WRITE : 0),
+ EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, (void*)s);
+ return kevent(c->ctx, &kev, 1, 0, 0, &ts);
+#else
+#warning "only epoll and kqueue supported for now"
+#endif
+}
diff --git a/io/iom_init.3 b/io/iom_init.3
new file mode 100644
index 0000000..7d6f9d3
--- /dev/null
+++ b/io/iom_init.3
@@ -0,0 +1,30 @@
+.TH iom_init 3
+.SH NAME
+iom_init \- create new I/O multiplexer
+.SH SYNTAX
+.B #include <libowfat/io.h>
+
+int \fBiom_init\fP(iomux_t* c);
+.SH DESCRIPTION
+iom_init initializes an I/O multiplexer.
+
+An I/O multiplexer is a context that can be used to do I/O multiplexing
+with support for multiple threads. Add events to a multiplexer using
+\fIiom_add\fR, and then get the next available event with
+\fIiom_wait\fR. If you are done and want to signal all the threads
+something, set a volatile global variable to tell the threads to stop
+and then fall \fIiom_abort\fR to tell all pending iom_wait operations in
+all threads to return immediately.
+
+After \fIiom_init\fR is done, \fIiom_add\fR and \fIiom_wait\fR can be
+called from different threads on the same context, and they will
+synchronize internally.
+
+.SH "LINKING"
+You may have to add \fI-lpthread\fR to the command line in the linking
+step.
+
+.SH "RETURN VALUE"
+iom_init returns 0 on success and -1 on error, setting errno.
+.SH "SEE ALSO"
+iom_add, iom_wait, iom_abort
diff --git a/io/iom_init.c b/io/iom_init.c
new file mode 100644
index 0000000..769af8b
--- /dev/null
+++ b/io/iom_init.c
@@ -0,0 +1,38 @@
+#include "io_internal.h"
+#ifdef HAVE_EPOLL
+#include <sys/epoll.h>
+#endif
+#ifdef HAVE_KQUEUE
+#include <sys/types.h>
+#include <sys/event.h>
+#include <sys/time.h>
+#endif
+
+int iom_init(iomux_t* c) {
+#ifdef HAVE_EPOLL
+ c->ctx = epoll_create1(EPOLL_CLOEXEC);
+#elif defined(HAVE_KQUEUE)
+ if ((c->ctx = kqueue()) != -1) {
+ if (fcntl(c->ctx,F_SETFD,FD_CLOEXEC) == -1) {
+ close(c->ctx);
+ c->ctx=-1;
+ }
+ }
+#else
+#warning "only epoll and kqueue supported for now"
+#endif
+ unsigned int i;
+ c->working=0;
+ c->h=c->l=0; /* no elements in queue */
+ for (i=0; i<SLOTS; ++i) {
+ c->q[i].fd=-1;
+ c->q[i].events=0;
+ }
+#ifdef __dietlibc__
+ mtx_init(&c->mtx, mtx_timed);
+ cnd_init(&c->sem);
+#else
+ sem_init(&c->sem, 0, 1);
+#endif
+ return (c->ctx!=-1);
+}
diff --git a/io/iom_wait.3 b/io/iom_wait.3
new file mode 100644
index 0000000..e332114
--- /dev/null
+++ b/io/iom_wait.3
@@ -0,0 +1,38 @@
+.TH iom_wait 3
+.SH NAME
+iom_wait \- wait for event from I/O multiplexer
+.SH SYNTAX
+.B #include <libowfat/io.h>
+
+int \fBiom_wait\fP(iomux_t* c,
+ int64* fd, unsigned int* events,
+ unsigned long timeout);
+.SH DESCRIPTION
+iom_wait will wait for events registered to the I/O multiplexer with
+\fIiom_add\fR. It will wait \fItimeout\fR milliseconds.
+
+If during that time any of the registered events occur, \fIiom_wait\fR
+will set \fIfd\fR to the file descriptor the event happened on, and
+\fIevents\fR to the sum of IOM_READ, IOM_WRITE and IOM_ERROR, depending
+on what event actually happened, and return 1.
+
+If nothing happens during that time, it will return 0 and leave \fIfd\fR
+and \fIevents\fR alone.
+
+Note that the event registration is removed from the iomux_t context if
+it occurs. You will have to call \fIiom_wait\fR again after you handled
+the event, if you are still interested in it.
+
+Closing a file descriptor with registered events will discard the event
+registration.
+
+.SH "LINKING"
+You may have to add \fI-lpthread\fR to the command line in the linking
+step.
+
+.SH "RETURN VALUE"
+iom_wait returns 1 on success, 0 if there was a timeout, and -1 on
+error, setting errno. If \fIiom_abort\fR was called on the I/O
+multiplexer context, it will return -2.
+.SH "SEE ALSO"
+iom_init, iom_add, iom_abort
diff --git a/io/iom_wait.c b/io/iom_wait.c
new file mode 100644
index 0000000..e9187b0
--- /dev/null
+++ b/io/iom_wait.c
@@ -0,0 +1,124 @@
+#include "io_internal.h"
+#ifdef HAVE_EPOLL
+#include <sys/epoll.h>
+#endif
+#ifdef HAVE_KQUEUE
+#include <sys/types.h>
+#include <sys/event.h>
+#include <sys/time.h>
+#endif
+#include <errno.h>
+
+int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout) {
+ for (;;) {
+ /* If we have an event in the queue, use that one */
+ int r;
+ if (c->working==-2) return -2; /* iomux was aborted */
+ for (;;) {
+ unsigned int f=c->l;
+ if (f == c->h)
+ break; /* no elements in queue */
+ int n=(f+1)%SLOTS;
+ if (__sync_bool_compare_and_swap(&c->l,f,n)) {
+ /* we got one, and its index is in f */
+ *s=c->q[f].fd;
+ *revents=c->q[f].events;
+ }
+ /* collided with another thread, try again */
+ }
+ /* The queue was empty. If someone else is already calling
+ * epoll_wait/kevent, then use the semaphore */
+ if (__sync_bool_compare_and_swap(&c->working,0,1)) {
+ /* we have the job to fill the struct. */
+ int freeslots = (c->h - c->l);
+ if (!freeslots) freeslots=SLOTS;
+
+#ifdef HAVE_EPOLL
+ struct epoll_event ee[SLOTS];
+ int i;
+ r=epoll_wait(c->ctx, ee, freeslots, timeout);
+ if (r<=0) {
+ /* we ran into a timeout, so let someone else take over */
+ c->working=0;
+#ifdef __dietlibc__
+ cnd_broadcast(&c->sem);
+#else
+ sem_post(&c->sem);
+#endif
+ return r;
+ }
+ for (i=0; i<r; ++i) {
+ /* convert events */
+ int e = ((ee[i].events & (EPOLLIN|EPOLLHUP|EPOLLERR)) ? IOM_READ : 0) |
+ ((ee[i].events & (EPOLLOUT|EPOLLHUP|EPOLLERR)) ? IOM_WRITE : 0) |
+ ((ee[i].events & EPOLLERR) ? IOM_ERROR : 0);
+ if (i+1==r) {
+ /* return last event instead of enqueueing it */
+ *s=ee[i].data.fd;
+ *revents=e;
+ } else {
+ c->q[c->h].fd=ee[i].data.fd;
+ c->q[c->h].events=e;
+ c->h = (c->h + 1) % SLOTS;
+ }
+ }
+#elif defined(HAVE_KQUEUE)
+ struct kevent kev[SLOTS];
+ struct timespec ts = { .tv_sec=timeout/1000, .tv_nsec=(timeout%1000)*1000000 };
+ int r=kevent(c->ctx, 0, 0, &kev, freeslots, &ts);
+ if (r<=0) {
+ /* we ran into a timeout, so let someone else take over */
+ c->working=0;
+#ifdef __dietlibc__
+ cnd_broadcast(&c->sem);
+#else
+ sem_post(&c->sem);
+#endif
+ return r;
+ }
+ for (i=0; i<r; ++i) {
+ /* convert events */
+ int e = (kev[i].filter == EVFILT_READ ? IOM_READ : 0) |
+ (kev[i].filter == EVFILT_WRITE ? IOM_WRITE : 0);
+ if (i+1==r) {
+ /* return last event instead of enqueueing it */
+ *s=kev.ident;
+ *revents=e;
+ } else {
+ c->q[c->h].fd=kev[i].ident;
+ c->q[c->h].events=e;
+ c->h = (c->h + 1) % SLOTS;
+ }
+ }
+#else
+#warning "only epoll and kqueue supported for now"
+#endif
+ /* We need to signal the other threads.
+ Either there are other events left, or we need one of them to
+ wake up and call epoll_wait/kevent next, because we aren't
+ doing it anymore */
+ c->working=0;
+#ifdef __dietlibc__
+ cnd_signal(&c->sem);
+#else
+ sem_post(&c->sem);
+#endif
+ return 1;
+ } else {
+ /* somebody else has the job to fill the queue */
+ struct timespec ts;
+ ts.tv_sec = timeout / 1000;
+ ts.tv_nsec = (timeout % 1000) * 1000000;
+#ifdef __dietlibc__
+ r=cnd_timedwait(&c->sem,&c->mtx,&ts);
+#else
+ r=sem_timedwait(&c->sem,&ts);
+#endif
+ if (r==-1) {
+ if (errno==ETIMEDOUT) return 0;
+ return -1;
+ }
+ /* fall through into next loop iteration */
+ }
+ }
+}