Przeglądaj źródła

add experimental iom API for multithreaded I/O multiplexing (in io.h)

master
Felix von Leitner 2 lat temu
rodzic
commit
22408afb0e
10 zmienionych plików z 366 dodań i 0 usunięć
  1. +1
    -0
      CHANGES
  2. +45
    -0
      io.h
  3. +19
    -0
      io/iom_abort.3
  4. +10
    -0
      io/iom_abort.c
  5. +33
    -0
      io/iom_add.3
  6. +28
    -0
      io/iom_add.c
  7. +30
    -0
      io/iom_init.3
  8. +38
    -0
      io/iom_init.c
  9. +38
    -0
      io/iom_wait.3
  10. +124
    -0
      io/iom_wait.c

+ 1
- 0
CHANGES Wyświetl plik

@@ -4,6 +4,7 @@
fix fmt_ip6 (Erwin Hoffmann)
add MSG_ZEROCOPY support (only used for buffers >8k)
use write in buffer_put for a slight perf improvement
add experimental iom API for multithreaded I/O multiplexing (in io.h)

0.31:
special case buffer_get_token with token length 1 through memccpy (almost 4x speedup)


+ 45
- 0
io.h Wyświetl plik

@@ -144,6 +144,51 @@ int64 io_mmapwritefile(int64 out,int64 in,uint64 off,uint64 bytes,io_write_callb
* aid in debugging the state machine if a descriptor loops or so */
unsigned int io_debugstring(int64 s,char* buf,unsigned int bufsize);

#ifdef __dietlibc__
#include <threads.h>
#else
#include <pthread.h>
#include <semaphore.h>
#endif

enum { SLOTS=128 };
typedef struct iomux {
int ctx;
int working; /* used to synchronize who is filling the queue */
unsigned int h,l; /* high, low */
struct {
int fd, events;
} q[SLOTS];
#ifdef __dietlibc__
mtx_t mtx;
cnd_t sem;
#else
sem_t sem;
#endif
} iomux_t;


/* Init master context */
int iom_init(iomux_t* c);

/* Add socket to iomux */
enum {
IOM_READ=1,
IOM_WRITE=2,
IOM_ERROR=4
};
/* return -1 if error, or | of IOM_READ, IOM_WRITE or IOM_ERROR */
int iom_add(iomux_t* c,int64 s,unsigned int events);

/* Blocking wait for single event, timeout in milliseconds */
/* return -1 if error, 0 if ok; s set to fd, revents set to known events on that fd */
/* when done with the fd, call iom_add on it again! */
/* This can be called by multiple threads in parallel */
int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout);

/* Call this to terminate all threads waiting in iom_wait */
int iom_abort(iomux_t* c);

#ifdef __cplusplus
}
#endif


+ 19
- 0
io/iom_abort.3 Wyświetl plik

@@ -0,0 +1,19 @@
.TH iom_abort 3
.SH NAME
iom_abort \- abort all pending iom_wait calls
.SH SYNTAX
.B #include <libowfat/io.h>

int \fBiom_abort\fP(iomux_t* c);
.SH DESCRIPTION
\fIiom_abort\fR will cause all currently running instances of
\fIiom_wait\fR to return immediately with return value -2.

.SH "LINKING"
You may have to add \fI-lpthread\fR to the command line in the linking
step.

.SH "RETURN VALUE"
iom_abort returns 0 on success and -1 on error, setting errno.
.SH "SEE ALSO"
iom_init, iom_add, iom_wait

+ 10
- 0
io/iom_abort.c Wyświetl plik

@@ -0,0 +1,10 @@
#include "io_internal.h"

int iom_abort(iomux_t* c) {
c->working=-2;
#ifdef __dietlibc__
return cnd_broadcast(&c->sem);
#else
return sem_post(&c->sem);
#endif
}

+ 33
- 0
io/iom_add.3 Wyświetl plik

@@ -0,0 +1,33 @@
.TH iom_add 3
.SH NAME
iom_add \- add event to I/O multiplexer
.SH SYNTAX
.B #include <libowfat/io.h>

int \fBiom_add\fP(iomux_t* c, int64 fd, unsigned int events);
.SH DESCRIPTION
iom_add adds an event you are interested in to an I/O multiplexer.

\fIfd\fR is the file descriptor (usually a socket) you are interested
in, and \fIevents\fR is the operation you want to do. It can be IOM_READ
or IOM_WRITE.

If that operation becomes possible on that descriptor, and some thread
is calling \fIiom_wait\fR at the time, it will return and tell you the
fd and the event.

Note that the event registration is removed from the iomux_t context if
it occurs. You will have to call \fIiom_wait\fR again after you handled
the event, if you are still interested in it.

Closing a file descriptor with registered events will discard the event
registration.

.SH "LINKING"
You may have to add \fI-lpthread\fR to the command line in the linking
step.

.SH "RETURN VALUE"
iom_add returns 0 on success and -1 on error, setting errno.
.SH "SEE ALSO"
iom_init, iom_wait, iom_abort

+ 28
- 0
io/iom_add.c Wyświetl plik

@@ -0,0 +1,28 @@
#include "io_internal.h"
#ifdef HAVE_EPOLL
#include <sys/epoll.h>
#endif
#ifdef HAVE_KQUEUE
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#endif

int iom_add(iomux_t* c,int64 s,unsigned int events) {
#ifdef HAVE_EPOLL
struct epoll_event e = { .events=EPOLLONESHOT, .data.fd=s };
if (events & IOM_READ) e.events|=EPOLLIN;
if (events & IOM_WRITE) e.events|=EPOLLOUT;
return epoll_ctl(c->ctx, EPOLL_CTL_ADD, s, &e);
#elif defined(HAVE_KQUEUE)
struct kevent kev;
struct timespec ts = { 0 };
EV_SET(&kev, s,
(events & IOM_READ ? EVFILT_READ : 0) +
(events & IOM_WRITE ? EVFILT_WRITE : 0),
EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, (void*)s);
return kevent(c->ctx, &kev, 1, 0, 0, &ts);
#else
#warning "only epoll and kqueue supported for now"
#endif
}

+ 30
- 0
io/iom_init.3 Wyświetl plik

@@ -0,0 +1,30 @@
.TH iom_init 3
.SH NAME
iom_init \- create new I/O multiplexer
.SH SYNTAX
.B #include <libowfat/io.h>

int \fBiom_init\fP(iomux_t* c);
.SH DESCRIPTION
iom_init initializes an I/O multiplexer.

An I/O multiplexer is a context that can be used to do I/O multiplexing
with support for multiple threads. Add events to a multiplexer using
\fIiom_add\fR, and then get the next available event with
\fIiom_wait\fR. If you are done and want to signal all the threads
something, set a volatile global variable to tell the threads to stop
and then fall \fIiom_abort\fR to tell all pending iom_wait operations in
all threads to return immediately.

After \fIiom_init\fR is done, \fIiom_add\fR and \fIiom_wait\fR can be
called from different threads on the same context, and they will
synchronize internally.

.SH "LINKING"
You may have to add \fI-lpthread\fR to the command line in the linking
step.

.SH "RETURN VALUE"
iom_init returns 0 on success and -1 on error, setting errno.
.SH "SEE ALSO"
iom_add, iom_wait, iom_abort

+ 38
- 0
io/iom_init.c Wyświetl plik

@@ -0,0 +1,38 @@
#include "io_internal.h"
#ifdef HAVE_EPOLL
#include <sys/epoll.h>
#endif
#ifdef HAVE_KQUEUE
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#endif

int iom_init(iomux_t* c) {
#ifdef HAVE_EPOLL
c->ctx = epoll_create1(EPOLL_CLOEXEC);
#elif defined(HAVE_KQUEUE)
if ((c->ctx = kqueue()) != -1) {
if (fcntl(c->ctx,F_SETFD,FD_CLOEXEC) == -1) {
close(c->ctx);
c->ctx=-1;
}
}
#else
#warning "only epoll and kqueue supported for now"
#endif
unsigned int i;
c->working=0;
c->h=c->l=0; /* no elements in queue */
for (i=0; i<SLOTS; ++i) {
c->q[i].fd=-1;
c->q[i].events=0;
}
#ifdef __dietlibc__
mtx_init(&c->mtx, mtx_timed);
cnd_init(&c->sem);
#else
sem_init(&c->sem, 0, 1);
#endif
return (c->ctx!=-1);
}

+ 38
- 0
io/iom_wait.3 Wyświetl plik

@@ -0,0 +1,38 @@
.TH iom_wait 3
.SH NAME
iom_wait \- wait for event from I/O multiplexer
.SH SYNTAX
.B #include <libowfat/io.h>

int \fBiom_wait\fP(iomux_t* c,
int64* fd, unsigned int* events,
unsigned long timeout);
.SH DESCRIPTION
iom_wait will wait for events registered to the I/O multiplexer with
\fIiom_add\fR. It will wait \fItimeout\fR milliseconds.

If during that time any of the registered events occur, \fIiom_wait\fR
will set \fIfd\fR to the file descriptor the event happened on, and
\fIevents\fR to the sum of IOM_READ, IOM_WRITE and IOM_ERROR, depending
on what event actually happened, and return 1.

If nothing happens during that time, it will return 0 and leave \fIfd\fR
and \fIevents\fR alone.

Note that the event registration is removed from the iomux_t context if
it occurs. You will have to call \fIiom_wait\fR again after you handled
the event, if you are still interested in it.

Closing a file descriptor with registered events will discard the event
registration.

.SH "LINKING"
You may have to add \fI-lpthread\fR to the command line in the linking
step.

.SH "RETURN VALUE"
iom_wait returns 1 on success, 0 if there was a timeout, and -1 on
error, setting errno. If \fIiom_abort\fR was called on the I/O
multiplexer context, it will return -2.
.SH "SEE ALSO"
iom_init, iom_add, iom_abort

+ 124
- 0
io/iom_wait.c Wyświetl plik

@@ -0,0 +1,124 @@
#include "io_internal.h"
#ifdef HAVE_EPOLL
#include <sys/epoll.h>
#endif
#ifdef HAVE_KQUEUE
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#endif
#include <errno.h>

int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout) {
for (;;) {
/* If we have an event in the queue, use that one */
int r;
if (c->working==-2) return -2; /* iomux was aborted */
for (;;) {
unsigned int f=c->l;
if (f == c->h)
break; /* no elements in queue */
int n=(f+1)%SLOTS;
if (__sync_bool_compare_and_swap(&c->l,f,n)) {
/* we got one, and its index is in f */
*s=c->q[f].fd;
*revents=c->q[f].events;
}
/* collided with another thread, try again */
}
/* The queue was empty. If someone else is already calling
* epoll_wait/kevent, then use the semaphore */
if (__sync_bool_compare_and_swap(&c->working,0,1)) {
/* we have the job to fill the struct. */
int freeslots = (c->h - c->l);
if (!freeslots) freeslots=SLOTS;

#ifdef HAVE_EPOLL
struct epoll_event ee[SLOTS];
int i;
r=epoll_wait(c->ctx, ee, freeslots, timeout);
if (r<=0) {
/* we ran into a timeout, so let someone else take over */
c->working=0;
#ifdef __dietlibc__
cnd_broadcast(&c->sem);
#else
sem_post(&c->sem);
#endif
return r;
}
for (i=0; i<r; ++i) {
/* convert events */
int e = ((ee[i].events & (EPOLLIN|EPOLLHUP|EPOLLERR)) ? IOM_READ : 0) |
((ee[i].events & (EPOLLOUT|EPOLLHUP|EPOLLERR)) ? IOM_WRITE : 0) |
((ee[i].events & EPOLLERR) ? IOM_ERROR : 0);
if (i+1==r) {
/* return last event instead of enqueueing it */
*s=ee[i].data.fd;
*revents=e;
} else {
c->q[c->h].fd=ee[i].data.fd;
c->q[c->h].events=e;
c->h = (c->h + 1) % SLOTS;
}
}
#elif defined(HAVE_KQUEUE)
struct kevent kev[SLOTS];
struct timespec ts = { .tv_sec=timeout/1000, .tv_nsec=(timeout%1000)*1000000 };
int r=kevent(c->ctx, 0, 0, &kev, freeslots, &ts);
if (r<=0) {
/* we ran into a timeout, so let someone else take over */
c->working=0;
#ifdef __dietlibc__
cnd_broadcast(&c->sem);
#else
sem_post(&c->sem);
#endif
return r;
}
for (i=0; i<r; ++i) {
/* convert events */
int e = (kev[i].filter == EVFILT_READ ? IOM_READ : 0) |
(kev[i].filter == EVFILT_WRITE ? IOM_WRITE : 0);
if (i+1==r) {
/* return last event instead of enqueueing it */
*s=kev.ident;
*revents=e;
} else {
c->q[c->h].fd=kev[i].ident;
c->q[c->h].events=e;
c->h = (c->h + 1) % SLOTS;
}
}
#else
#warning "only epoll and kqueue supported for now"
#endif
/* We need to signal the other threads.
Either there are other events left, or we need one of them to
wake up and call epoll_wait/kevent next, because we aren't
doing it anymore */
c->working=0;
#ifdef __dietlibc__
cnd_signal(&c->sem);
#else
sem_post(&c->sem);
#endif
return 1;
} else {
/* somebody else has the job to fill the queue */
struct timespec ts;
ts.tv_sec = timeout / 1000;
ts.tv_nsec = (timeout % 1000) * 1000000;
#ifdef __dietlibc__
r=cnd_timedwait(&c->sem,&c->mtx,&ts);
#else
r=sem_timedwait(&c->sem,&ts);
#endif
if (r==-1) {
if (errno==ETIMEDOUT) return 0;
return -1;
}
/* fall through into next loop iteration */
}
}
}

Ładowanie…
Anuluj
Zapisz