Browse Source

Add liProc and liErrorPipe for angel

personal/stbuehler/wip
Stefan Bühler 13 years ago
parent
commit
766fcd4bfc
  1. 9
      include/lighttpd/angel_base.h
  2. 45
      include/lighttpd/angel_proc.h
  3. 2
      include/lighttpd/angel_server.h
  4. 15
      include/lighttpd/angel_typedefs.h
  5. 1
      src/CMakeLists.txt
  6. 192
      src/angel/angel_proc.c
  7. 82
      src/angel/angel_server.c

9
include/lighttpd/angel_base.h

@ -9,12 +9,7 @@
#include <lighttpd/module.h>
/* angel_server.h */
typedef struct liServer liServer;
typedef struct liInstance liInstance;
typedef struct liInstanceConf liInstanceConf;
#include <lighttpd/angel_typedefs.h>
#include <lighttpd/angel_value.h>
#include <lighttpd/angel_data.h>
@ -23,6 +18,8 @@ typedef struct liInstanceConf liInstanceConf;
#include <lighttpd/angel_plugin.h>
#include <lighttpd/angel_server.h>
#include <lighttpd/angel_proc.h>
#include <lighttpd/utils.h>
#endif

45
include/lighttpd/angel_proc.h

@ -0,0 +1,45 @@
#ifndef _LIGHTTPD_ANGEL_PROC_H_
#define _LIGHTTPD_ANGEL_PROC_H_
#ifndef _LIGHTTPD_ANGEL_BASE_H_
#error Please include <lighttpd/angel_base.h> instead of this file
#endif
/* The callback is not allowed to close the epipe */
typedef void (*liErrorPipeCB)(liServer *srv, liErrorPipe *epipe, GString *msg);
typedef void (*liProcSetupCB)(gpointer ctx);
struct liErrorPipe {
liServer *srv;
gpointer ctx;
liErrorPipeCB cb;
int fds[2];
ev_io fd_watcher;
};
struct liProc {
liServer *srv;
pid_t child_pid;
liErrorPipe *epipe;
gchar *appname;
};
LI_API liErrorPipe* li_error_pipe_new(liServer *srv, liErrorPipeCB cb, gpointer ctx);
LI_API void li_error_pipe_free(liErrorPipe *epipe);
/** closes out-fd */
LI_API void li_error_pipe_activate(liErrorPipe *epipe);
/** closes in-fd, moves out-fd to dest_fd */
LI_API void li_error_pipe_use(liErrorPipe *epipe, int dest_fd);
/** read remaining data from in-fd */
LI_API void li_error_pipe_flush(liErrorPipe *epipe);
LI_API liProc* li_proc_new(liServer *srv, gchar **args, gchar **env, uid_t uid, gid_t gid, gchar *username, liProcSetupCB cb, gpointer ctx);
LI_API void li_proc_free(liProc *proc);
#endif

2
include/lighttpd/angel_server.h

@ -32,7 +32,7 @@ struct liInstance {
liServer *srv;
liInstanceConf *ic;
pid_t pid;
liProc *proc;
ev_child child_watcher;
liInstanceState s_cur, s_dest;

15
include/lighttpd/angel_typedefs.h

@ -0,0 +1,15 @@
#ifndef _LIGHTTPD_ANGEL_TYPEDEFS_H_
#define _LIGHTTPD_ANGEL_TYPEDEFS_H_
/* angel_proc.h */
typedef struct liErrorPipe liErrorPipe;
typedef struct liProc liProc;
/* angel_server.h */
typedef struct liServer liServer;
typedef struct liInstance liInstance;
typedef struct liInstanceConf liInstanceConf;
#endif

1
src/CMakeLists.txt

@ -248,6 +248,7 @@ SET(ANGEL_SHARED_SRC
angel_log.c
angel_plugin.c
angel_plugin_core.c
angel_proc.c
angel_server.c
angel_value.c
)

192
src/angel/angel_proc.c

@ -0,0 +1,192 @@
#include <lighttpd/angel_base.h>
#include <grp.h>
static void read_pipe(liServer *srv, liErrorPipe *epipe, gboolean flush) {
const ssize_t max_read = 1024;
ssize_t r, toread;
GString *buf;
int count = 10;
if (-1 == epipe->fds[0]) return;
for (;;) {
if (ioctl(epipe->fds[0], FIONREAD, &toread) || toread == 0) {
toread = 256;
} else {
if (toread > max_read) toread = max_read;
}
buf = g_string_sized_new(toread);
g_string_set_size(buf, toread);
r = read(epipe->fds[0], buf->str, toread);
if (r < 0) {
g_string_free(buf, TRUE);
switch (errno) {
case EINTR: continue;
case EAGAIN:
#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
return; /* come back later */
case ECONNRESET:
goto close_epipe;
default:
ERROR(srv, "read error: %s", g_strerror(errno));
goto close_epipe;
}
} else if (r == 0) { /* EOF */
g_string_free(buf, TRUE);
goto close_epipe;
}
g_string_set_size(buf, r);
epipe->cb(srv, epipe, buf);
g_string_free(buf, TRUE);
if (!flush) break;
if (--count <= 0) {
buf = g_string_new("error while trying to flush error-pipe: didn't see EOF. closing");
epipe->cb(srv, epipe, buf);
g_string_free(buf, TRUE);
return;
}
}
return;
close_epipe:
ev_io_stop(srv->loop, &epipe->fd_watcher);
close(epipe->fds[0]);
epipe->fds[0] = -1;
}
static void error_pipe_cb(struct ev_loop *loop, ev_io *w, int revents) {
liErrorPipe *epipe = w->data;
UNUSED(loop);
UNUSED(revents);
read_pipe(epipe->srv, epipe, FALSE);
}
liErrorPipe* li_error_pipe_new(liServer *srv, liErrorPipeCB cb, gpointer ctx) {
liErrorPipe *epipe;
int fds[2];
if (-1 == pipe(fds)) {
ERROR(srv, "Couldn't create pipe: %s", g_strerror(errno));
return NULL;
}
epipe = g_slice_new0(liErrorPipe);
epipe->srv = srv;
epipe->cb = cb;
epipe->ctx = ctx;
ev_io_init(&epipe->fd_watcher, error_pipe_cb, fds[0], EV_READ);
epipe->fd_watcher.data = epipe;
epipe->fds[0] = fds[0];
epipe->fds[1] = fds[1];
li_fd_init(fds[0]);
return epipe;
}
void li_error_pipe_free(liErrorPipe *epipe) {
liServer *srv = epipe->srv;
ev_io_stop(srv->loop, &epipe->fd_watcher);
li_error_pipe_flush(epipe);
if (-1 != epipe->fds[0]) close(epipe->fds[0]);
if (-1 != epipe->fds[1]) close(epipe->fds[1]);
g_slice_free(liErrorPipe, epipe);
}
/** closes out-fd */
void li_error_pipe_activate(liErrorPipe *epipe) {
liServer *srv = epipe->srv;
if (-1 != epipe->fds[1]) close(epipe->fds[1]);
ev_io_start(srv->loop, &epipe->fd_watcher);
}
/** closes in-fd, moves out-fd to dest_fd */
void li_error_pipe_use(liErrorPipe *epipe, int dest_fd) {
if (-1 != epipe->fds[0]) {
close(epipe->fds[0]);
epipe->fds[0] = -1;
}
if (epipe->fds[1] != dest_fd) {
dup2(epipe->fds[1], dest_fd);
close(epipe->fds[1]);
epipe->fds[1] = dest_fd;
}
}
void li_error_pipe_flush(liErrorPipe *epipe) {
read_pipe(epipe->srv, epipe, TRUE);
}
static void proc_epipe_cb(liServer *srv, liErrorPipe *epipe, GString *msg) {
liProc *proc = epipe->ctx;
ERROR(srv, "%s (pid: %i): %s", proc->appname, proc->child_pid, msg->str);
}
liProc* li_proc_new(liServer *srv, gchar **args, gchar **env, uid_t uid, gid_t gid, gchar *username, liProcSetupCB cb, gpointer ctx) {
liProc *proc;
pid_t pid;
proc = g_slice_new0(liProc);
proc->srv = srv;
proc->child_pid = -1;
proc->epipe = li_error_pipe_new(srv, proc_epipe_cb, proc);
proc->appname = g_strdup(args[0]);
switch (pid = fork()) {
case 0:
li_error_pipe_use(proc->epipe, STDERR_FILENO);
setsid();
if (gid != (gid_t) -1) {
setgid(gid);
setgroups(0, NULL);
if (username) initgroups(username, gid);
}
if (cb) cb(ctx);
if (uid != (uid_t) -1) {
setuid(uid);
}
if (NULL == env) env = environ;
execve(args[0], args, env);
g_printerr("exec('%s') failed: %s\n", args[0], g_strerror(errno));
exit(-1);
break;
case -1:
ERROR(srv, "fork failed: %s", g_strerror(errno));
li_proc_free(proc);
return NULL;
default:
proc->child_pid = pid;
li_error_pipe_activate(proc->epipe);
break;
}
return proc;
}
void li_proc_free(liProc *proc) {
li_error_pipe_free(proc->epipe);
g_free(proc->appname);
g_slice_free(liProc, proc);
}

82
src/angel/angel_server.c

@ -1,8 +1,6 @@
#include <lighttpd/angel_base.h>
#include <grp.h>
static void instance_state_machine(liInstance *i);
static void jobqueue_callback(struct ev_loop *loop, ev_async *w, int revents) {
@ -98,13 +96,14 @@ static void instance_child_cb(struct ev_loop *loop, ev_child *w, int revents) {
liInstance *i = (liInstance*) w->data;
if (i->s_cur == LI_INSTANCE_LOADING) {
ERROR(i->srv, "spawning child %i failed, not restarting", i->pid);
ERROR(i->srv, "spawning child %i failed, not restarting", i->proc->child_pid);
i->s_dest = i->s_cur = LI_INSTANCE_DOWN; /* TODO: retry spawn later? */
} else {
ERROR(i->srv, "child %i died", i->pid);
ERROR(i->srv, "child %i died", i->proc->child_pid);
i->s_cur = LI_INSTANCE_DOWN;
}
i->pid = -1;
li_proc_free(i->proc);
i->proc = NULL;
li_angel_connection_free(i->acon);
i->acon = NULL;
ev_child_stop(loop, w);
@ -112,6 +111,17 @@ static void instance_child_cb(struct ev_loop *loop, ev_child *w, int revents) {
li_instance_release(i);
}
static void instance_spawn_setup(gpointer ctx) {
int *confd = ctx;
if (confd[1] != 0) {
dup2(confd[1], 0);
close(confd[1]);
}
dup2(STDERR_FILENO, STDOUT_FILENO);
}
static void instance_spawn(liInstance *i) {
int confd[2];
if (-1 == socketpair(AF_UNIX, SOCK_STREAM, 0, confd)) {
@ -122,38 +132,16 @@ static void instance_spawn(liInstance *i) {
li_fd_no_block(confd[1]);
i->acon = li_angel_connection_new(i->srv->loop, confd[0], i, instance_angel_call_cb, instance_angel_close_cb);
i->pid = fork();
switch (i->pid) {
case 0: {
gchar **args;
setsid(); /* lead session, so we don't recieve the signals for the angel */
if (getuid() == 0 && (i->ic->uid != (uid_t) -1) && (i->ic->gid != (gid_t) -1)) {
setgid(i->ic->gid);
setgroups(0, NULL);
initgroups(i->ic->username->str, i->ic->gid);
setuid(i->ic->uid);
}
i->proc = li_proc_new(i->srv, i->ic->cmd, NULL, i->ic->uid, i->ic->gid, i->ic->username->str, instance_spawn_setup, confd);
if (confd[1] != 0) {
dup2(confd[1], 0);
close(confd[1]);
}
/* TODO: close stdout/stderr ? */
execvp(i->ic->cmd[0], i->ic->cmd);
g_printerr("exec('%s') failed: %s\n", i->ic->cmd[0], g_strerror(errno));
exit(-1);
}
case -1:
break;
default:
close(confd[1]);
ev_child_set(&i->child_watcher, i->pid, 0);
ev_child_start(i->srv->loop, &i->child_watcher);
i->s_cur = LI_INSTANCE_LOADING;
li_instance_acquire(i);
ERROR(i->srv, "Instance (%i) spawned: %s", i->pid, i->ic->cmd[0]);
break;
}
if (!i->proc) return;
close(confd[1]);
ev_child_set(&i->child_watcher, i->proc->child_pid, 0);
ev_child_start(i->srv->loop, &i->child_watcher);
i->s_cur = LI_INSTANCE_LOADING;
li_instance_acquire(i);
DEBUG(i->srv, "Instance (%i) spawned: %s", i->proc->child_pid, i->ic->cmd[0]);
}
liInstance* li_server_new_instance(liServer *srv, liInstanceConf *ic) {
@ -164,7 +152,6 @@ liInstance* li_server_new_instance(liServer *srv, liInstanceConf *ic) {
i->srv = srv;
li_instance_conf_acquire(ic);
i->ic = ic;
i->pid = -1;
i->s_cur = i->s_dest = LI_INSTANCE_DOWN;
ev_child_init(&i->child_watcher, instance_child_cb, -1, 0);
i->child_watcher.data = i;
@ -190,10 +177,10 @@ void li_instance_set_state(liInstance *i, liInstanceState s) {
i->s_dest = s;
if (s == LI_INSTANCE_DOWN) {
if (i->s_cur != LI_INSTANCE_DOWN) {
kill(i->pid, SIGTERM);
kill(i->proc->child_pid, SIGTERM);
}
} else {
if (i->pid == (pid_t) -1) {
if (!i->proc) {
instance_spawn(i);
return;
} else {
@ -222,28 +209,28 @@ static void instance_state_machine(liInstance *i) {
olds = i->s_cur;
switch (i->s_dest) {
case LI_INSTANCE_DOWN:
if (i->pid == (pid_t) -1) {
if (!i->proc) {
i->s_cur = LI_INSTANCE_DOWN;
break;
}
kill(i->pid, SIGINT);
kill(i->proc->child_pid, SIGINT);
return;
case LI_INSTANCE_LOADING:
break;
case LI_INSTANCE_WARMUP:
if (i->pid == (pid_t) -1) {
if (!i->proc) {
instance_spawn(i);
return;
}
break;
case LI_INSTANCE_ACTIVE:
if (i->pid == (pid_t) -1) {
if (!i->proc) {
instance_spawn(i);
return;
}
break;
case LI_INSTANCE_SUSPEND:
if (i->pid == (pid_t) -1) {
if (!i->proc) {
instance_spawn(i);
return;
}
@ -259,10 +246,11 @@ void li_instance_release(liInstance *i) {
assert(g_atomic_int_get(&i->refcount) > 0);
if (!g_atomic_int_dec_and_test(&i->refcount)) return;
srv = i->srv;
if (i->pid != (pid_t) -1) {
if (i->proc) {
ev_child_stop(srv->loop, &i->child_watcher);
kill(i->pid, SIGTERM);
i->pid = -1;
kill(i->proc->child_pid, SIGTERM);
li_proc_free(i->proc);
i->proc = NULL;
i->s_cur = LI_INSTANCE_DOWN;
li_angel_connection_free(i->acon);
i->acon = NULL;

Loading…
Cancel
Save