first round of ev_async

This commit is contained in:
Marc Alexander Lehmann 2008-01-31 13:10:56 +00:00
parent 98d802caf4
commit 14f38f5fa8
6 changed files with 255 additions and 76 deletions

223
ev.c
View File

@ -1,7 +1,7 @@
/*
* libev event processing core, watcher management
*
* Copyright (c) 2007 Marc Alexander Lehmann <libev@schmorp.de>
* Copyright (c) 2007,2008 Marc Alexander Lehmann <libev@schmorp.de>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modifica-
@ -293,7 +293,7 @@ typedef ev_watcher_time *WT;
#if EV_USE_MONOTONIC
/* sig_atomic_t is used to avoid per-thread variables or locking but still */
/* giving it a reasonably high chance of working on typical architetcures */
static sig_atomic_t have_monotonic; /* did clock_gettime (CLOCK_MONOTONIC) work? */
static EV_ATOMIC_T have_monotonic; /* did clock_gettime (CLOCK_MONOTONIC) work? */
#endif
#ifdef _WIN32
@ -765,15 +765,13 @@ adjustheap (WT *heap, int N, int k)
typedef struct
{
WL head;
sig_atomic_t volatile gotsig;
EV_ATOMIC_T gotsig;
} ANSIG;
static ANSIG *signals;
static int signalmax;
static int sigpipe [2];
static sig_atomic_t volatile gotsig;
static ev_io sigev;
static EV_ATOMIC_T gotsig;
void inline_size
signals_init (ANSIG *base, int count)
@ -787,22 +785,99 @@ signals_init (ANSIG *base, int count)
}
}
/*****************************************************************************/
void inline_speed
fd_intern (int fd)
{
#ifdef _WIN32
int arg = 1;
ioctlsocket (_get_osfhandle (fd), FIONBIO, &arg);
#else
fcntl (fd, F_SETFD, FD_CLOEXEC);
fcntl (fd, F_SETFL, O_NONBLOCK);
#endif
}
static void noinline
evpipe_init (EV_P)
{
if (!ev_is_active (&pipeev))
{
while (pipe (evpipe))
syserr ("(libev) error creating signal/async pipe");
fd_intern (evpipe [0]);
fd_intern (evpipe [1]);
ev_io_set (&pipeev, evpipe [0], EV_READ);
ev_io_start (EV_A_ &pipeev);
ev_unref (EV_A); /* child watcher should not keep loop alive */
}
}
void inline_size
evpipe_write (EV_P_ int sig, int async)
{
if (!(gotasync || gotsig))
{
int old_errno = errno;
if (sig) gotsig = 1;
if (async) gotasync = 1;
write (evpipe [1], &old_errno, 1);
errno = old_errno;
}
}
static void
pipecb (EV_P_ ev_io *iow, int revents)
{
{
int dummy;
read (evpipe [0], &dummy, 1);
}
if (gotsig)
{
int signum;
gotsig = 0;
for (signum = signalmax; signum--; )
if (signals [signum].gotsig)
ev_feed_signal_event (EV_A_ signum + 1);
}
if (gotasync)
{
int i;
gotasync = 0;
for (i = asynccnt; i--; )
if (asyncs [i]->sent)
{
asyncs [i]->sent = 0;
ev_feed_event (EV_A_ asyncs [i], EV_ASYNC);
}
}
}
/*****************************************************************************/
static void
sighandler (int signum)
{
#if EV_MULTIPLICITY
struct ev_loop *loop = &default_loop_struct;
#endif
#if _WIN32
signal (signum, sighandler);
#endif
signals [signum - 1].gotsig = 1;
if (!gotsig)
{
int old_errno = errno;
gotsig = 1;
write (sigpipe [1], &signum, 1);
errno = old_errno;
}
evpipe_write (EV_A_ 1, 0);
}
void noinline
@ -825,42 +900,6 @@ ev_feed_signal_event (EV_P_ int signum)
ev_feed_event (EV_A_ (W)w, EV_SIGNAL);
}
static void
sigcb (EV_P_ ev_io *iow, int revents)
{
int signum;
read (sigpipe [0], &revents, 1);
gotsig = 0;
for (signum = signalmax; signum--; )
if (signals [signum].gotsig)
ev_feed_signal_event (EV_A_ signum + 1);
}
void inline_speed
fd_intern (int fd)
{
#ifdef _WIN32
int arg = 1;
ioctlsocket (_get_osfhandle (fd), FIONBIO, &arg);
#else
fcntl (fd, F_SETFD, FD_CLOEXEC);
fcntl (fd, F_SETFL, O_NONBLOCK);
#endif
}
static void noinline
siginit (EV_P)
{
fd_intern (sigpipe [0]);
fd_intern (sigpipe [1]);
ev_io_set (&sigev, sigpipe [0], EV_READ);
ev_io_start (EV_A_ &sigev);
ev_unref (EV_A); /* child watcher should not keep loop alive */
}
/*****************************************************************************/
static WL childs [EV_PID_HASHSIZE];
@ -1086,8 +1125,8 @@ loop_init (EV_P_ unsigned int flags)
if (!backend && (flags & EVBACKEND_SELECT)) backend = select_init (EV_A_ flags);
#endif
ev_init (&sigev, sigcb);
ev_set_priority (&sigev, EV_MAXPRI);
ev_init (&pipeev, pipecb);
ev_set_priority (&pipeev, EV_MAXPRI);
}
}
@ -1096,6 +1135,15 @@ loop_destroy (EV_P)
{
int i;
if (ev_is_active (&pipeev))
{
ev_ref (EV_A); /* signal watcher */
ev_io_stop (EV_A_ &pipeev);
close (evpipe [0]); evpipe [0] = 0;
close (evpipe [1]); evpipe [1] = 0;
}
#if EV_USE_INOTIFY
if (fs_fd >= 0)
close (fs_fd);
@ -1163,20 +1211,19 @@ loop_fork (EV_P)
infy_fork (EV_A);
#endif
if (ev_is_active (&sigev))
if (ev_is_active (&pipeev))
{
/* default loop */
/* this "locks" the handlers against writing to the pipe */
gotsig = gotasync = 1;
ev_ref (EV_A);
ev_io_stop (EV_A_ &sigev);
close (sigpipe [0]);
close (sigpipe [1]);
ev_io_stop (EV_A_ &pipeev);
close (evpipe [0]);
close (evpipe [1]);
while (pipe (sigpipe))
syserr ("(libev) error creating pipe");
siginit (EV_A);
sigcb (EV_A_ &sigev, EV_READ);
evpipe_init (EV_A);
/* now iterate over everything */
evcb (EV_A_ &pipeev, EV_READ);
}
postfork = 0;
@ -1221,10 +1268,6 @@ int
ev_default_loop (unsigned int flags)
#endif
{
if (sigpipe [0] == sigpipe [1])
if (pipe (sigpipe))
return 0;
if (!ev_default_loop_ptr)
{
#if EV_MULTIPLICITY
@ -1237,8 +1280,6 @@ ev_default_loop (unsigned int flags)
if (ev_backend (EV_A))
{
siginit (EV_A);
#ifndef _WIN32
ev_signal_init (&childev, childcb, SIGCHLD);
ev_set_priority (&childev, EV_MAXPRI);
@ -1265,12 +1306,6 @@ ev_default_destroy (void)
ev_signal_stop (EV_A_ &childev);
#endif
ev_ref (EV_A); /* signal watcher */
ev_io_stop (EV_A_ &sigev);
close (sigpipe [0]); sigpipe [0] = 0;
close (sigpipe [1]); sigpipe [1] = 0;
loop_destroy (EV_A);
}
@ -1867,6 +1902,8 @@ ev_signal_start (EV_P_ ev_signal *w)
assert (("ev_signal_start called with illegal signal number", w->signum > 0));
evpipe_init (EV_A);
{
#ifndef _WIN32
sigset_t full, prev;
@ -2389,6 +2426,44 @@ ev_fork_stop (EV_P_ ev_fork *w)
}
#endif
#if EV_ASYNC_ENABLE
void
ev_async_start (EV_P_ ev_async *w)
{
if (expect_false (ev_is_active (w)))
return;
evpipe_init (EV_A);
ev_start (EV_A_ (W)w, ++asynccnt);
array_needsize (ev_async *, asyncs, asyncmax, asynccnt, EMPTY2);
asyncs [asynccnt - 1] = w;
}
void
ev_async_stop (EV_P_ ev_async *w)
{
clear_pending (EV_A_ (W)w);
if (expect_false (!ev_is_active (w)))
return;
{
int active = ((W)w)->active;
asyncs [active - 1] = asyncs [--asynccnt];
((W)asyncs [active - 1])->active = active;
}
ev_stop (EV_A_ (W)w);
}
void
ev_async_send (EV_P_ ev_async *w)
{
w->sent = 1;
evpipe_write (EV_A_ 0, 1);
}
#endif
/*****************************************************************************/
struct ev_once

34
ev.h
View File

@ -1,7 +1,7 @@
/*
* libev native API header
*
* Copyright (c) 2007 Marc Alexander Lehmann <libev@schmorp.de>
* Copyright (c) 2007,2008 Marc Alexander Lehmann <libev@schmorp.de>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modifica-
@ -78,6 +78,15 @@ typedef double ev_tstamp;
# define EV_EMBED_ENABLE 1
#endif
#ifndef EV_ASYNC_ENABLE
# define EV_ASYNC_ENABLE 1
#endif
#ifndef EV_ATOMIC_T
# include <signal.h>
# define EV_ATOMIC_T sig_atomic_t volatile
#endif
/*****************************************************************************/
#if EV_STAT_ENABLE
@ -120,6 +129,7 @@ struct ev_loop;
#define EV_CHECK 0x00008000L /* event loop finished poll */
#define EV_EMBED 0x00010000L /* embedded event loop needs sweep */
#define EV_FORK 0x00020000L /* event loop resumed in child */
#define EV_ASYNC 0x00040000L /* async intra-loop signal */
#define EV_ERROR 0x80000000L /* sent when an error occurs */
/* can be used to add custom fields to all watchers, while losing binary compatibility */
@ -307,6 +317,17 @@ typedef struct ev_embed
} ev_embed;
#endif
#if EV_ASYNC_ENABLE
/* invoked when somebody calls ev_async_send on the watcher */
/* revent EV_ASYNC */
typedef struct ev_async
{
EV_WATCHER (ev_async)
EV_ATOMIC_T sent; /* private */
} ev_async;
#endif
/* the presence of this union forces similar struct layout */
union ev_any_watcher
{
@ -332,6 +353,9 @@ union ev_any_watcher
#if EV_EMBED_ENABLE
struct ev_embed embed;
#endif
#if EV_ASYND_ENABLE
struct ev_async async;
#endif
};
/* bits for ev_default_loop and ev_loop_new */
@ -465,6 +489,7 @@ void ev_once (EV_P_ int fd, int events, ev_tstamp timeout, void (*cb)(int revent
#define ev_check_set(ev) /* nop, yes, this is a serious in-joke */
#define ev_embed_set(ev,other_) do { (ev)->other = (other_); } while (0)
#define ev_fork_set(ev) /* nop, yes, this is a serious in-joke */
#define ev_async_set(ev) do { (ev)->gotasync = 0; } while (0)
#define ev_io_init(ev,cb,fd,events) do { ev_init ((ev), (cb)); ev_io_set ((ev),(fd),(events)); } while (0)
#define ev_timer_init(ev,cb,after,repeat) do { ev_init ((ev), (cb)); ev_timer_set ((ev),(after),(repeat)); } while (0)
@ -477,6 +502,7 @@ void ev_once (EV_P_ int fd, int events, ev_tstamp timeout, void (*cb)(int revent
#define ev_check_init(ev,cb) do { ev_init ((ev), (cb)); ev_check_set ((ev)); } while (0)
#define ev_embed_init(ev,cb,other) do { ev_init ((ev), (cb)); ev_embed_set ((ev),(other)); } while (0)
#define ev_fork_init(ev,cb) do { ev_init ((ev), (cb)); ev_fork_set ((ev)); } while (0)
#define ev_async_init(ev,cb) do { ev_init ((ev), (cb)); ev_async_set ((ev)); } while (0)
#define ev_is_pending(ev) (0 + ((ev_watcher *)(void *)(ev))->pending) /* ro, true when watcher is waiting for callback invocation */
#define ev_is_active(ev) (0 + ((ev_watcher *)(void *)(ev))->active) /* ro, true when the watcher has been started */
@ -552,6 +578,12 @@ void ev_embed_stop (EV_P_ ev_embed *w);
void ev_embed_sweep (EV_P_ ev_embed *w);
# endif
# if EV_ASYNC_ENABLE
void ev_async_start (EV_P_ ev_async *w);
void ev_async_stop (EV_P_ ev_async *w);
void ev_async_send (EV_P_ ev_async *w);
# endif
#endif
#ifdef __cplusplus

49
ev.pod
View File

@ -776,6 +776,10 @@ The embedded event loop specified in the C<ev_embed> watcher needs attention.
The event loop has been resumed in the child process after fork (see
C<ev_fork>).
=item C<EV_ASYNC>
The given async watcher has been asynchronously notified (see C<ev_async>).
=item C<EV_ERROR>
An unspecified error has occured, the watcher has been stopped. This might
@ -2048,6 +2052,51 @@ believe me.
=back
=head2 C<ev_async> - how to wake up another event loop
In general, you cannot use an C<ev_loop> from multiple threads or other
asynchronous sources such as signal handlers (as opposed to multiple event
loops - those are of course safe to use in different threads).
Sometimes, however, you need to wake up another event loop you do not
control, for example because it belongs to another thread. This is what
C<ev_async> watchers do: as long as the C<ev_async> watcher is active, you
can signal it by calling C<ev_async_send>, which is thread- and signal
safe.
This functionality is very similar to C<ev_signal> watchers, as signals,
too, are asynchronous in nature, and signals, too, will be compressed
(i.e. the number of callback invocations may be less than the number of
C<ev_async_sent> calls).
Unlike C<ev_signal> watchers, C<ev_async> works with any event loop, not
just the default loop.
=head3 Watcher-Specific Functions and Data Members
=over 4
=item ev_async_init (ev_async *, callback)
Initialises and configures the async watcher - it has no parameters of any
kind. There is a C<ev_asynd_set> macro, but using it is utterly pointless,
believe me.
=item ev_async_send (loop, ev_async *)
Sends/signals/activates the given C<ev_async> watcher, that is, feeds
an C<EV_ASYNC> event on the watcher into the event loop. Unlike
C<ev_feed_event>, this call is safe to do in other threads, signal or
similar contexts (see the dicusssion of C<EV_ATOMIC_T> in the embedding
section below on what exactly this means).
This call incurs the overhead of a syscall only once per loop iteration,
so while the overhead might be noticable, it doesn't apply to repeated
calls to C<ev_async_send>.
=back
=head1 OTHER FUNCTIONS
There are some other functions of possible interest. Described. Here. Now.

View File

@ -1,7 +1,7 @@
/*
* loop member variable declarations
*
* Copyright (c) 2007 Marc Alexander Lehmann <libev@schmorp.de>
* Copyright (c) 2007,2008 Marc Alexander Lehmann <libev@schmorp.de>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modifica-
@ -55,6 +55,9 @@ VARx(ev_tstamp, backend_fudge) /* assumed typical timer resolution */
VAR (backend_modify, void (*backend_modify)(EV_P_ int fd, int oev, int nev))
VAR (backend_poll , void (*backend_poll)(EV_P_ ev_tstamp timeout))
VAR (evpipe, int evpipe [2])
VARx(ev_io, pipeev)
#if !defined(_WIN32) || EV_GENWRAP
VARx(pid_t, curpid)
#endif
@ -137,6 +140,13 @@ VARx(int, forkmax)
VARx(int, forkcnt)
#endif
#if EV_ASYNC_ENABLE || EV_GENWRAP
VARx(EV_ATOMIC_T, gotasync)
VARx(struct ev_async **, asyncs)
VARx(int, asyncmax)
VARx(int, asynccnt)
#endif
#if EV_USE_INOTIFY || EV_GENWRAP
VARx(int, fs_fd)
VARx(ev_io, fs_w)

View File

@ -13,6 +13,8 @@
#define backend_fudge ((loop)->backend_fudge)
#define backend_modify ((loop)->backend_modify)
#define backend_poll ((loop)->backend_poll)
#define evpipe ((loop)->evpipe)
#define pipeev ((loop)->pipeev)
#define curpid ((loop)->curpid)
#define postfork ((loop)->postfork)
#define vec_ri ((loop)->vec_ri)
@ -61,6 +63,10 @@
#define forks ((loop)->forks)
#define forkmax ((loop)->forkmax)
#define forkcnt ((loop)->forkcnt)
#define gotasync ((loop)->gotasync)
#define asyncs ((loop)->asyncs)
#define asyncmax ((loop)->asyncmax)
#define asynccnt ((loop)->asynccnt)
#define fs_fd ((loop)->fs_fd)
#define fs_w ((loop)->fs_w)
#define fs_hash ((loop)->fs_hash)
@ -78,6 +84,8 @@
#undef backend_fudge
#undef backend_modify
#undef backend_poll
#undef evpipe
#undef pipeev
#undef curpid
#undef postfork
#undef vec_ri
@ -126,6 +134,10 @@
#undef forks
#undef forkmax
#undef forkcnt
#undef gotasync
#undef asyncs
#undef asyncmax
#undef asynccnt
#undef fs_fd
#undef fs_w
#undef fs_hash

View File

@ -1,5 +1,6 @@
/*
* Copyright (c) 2000-2004 Niels Provos <provos@citi.umich.edu>
* Copyright (c) 2008 Marc Alexander Lehmann <libev@schmorp.de>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without