Fix handling closing sockets (read buffer until EOF, error or timeout)
parent
de499089ff
commit
f221bac60e
|
@ -14,17 +14,56 @@ struct worker_closing_socket {
|
|||
liWorker *wrk;
|
||||
GList *link;
|
||||
int fd;
|
||||
ev_tstamp close_timeout;
|
||||
};
|
||||
|
||||
static void worker_close_socket_now(worker_closing_socket *scs) {
|
||||
liWorker *wrk = scs->wrk;
|
||||
|
||||
shutdown(scs->fd, SHUT_RD);
|
||||
close(scs->fd);
|
||||
g_queue_delete_link(&wrk->closing_sockets, scs->link);
|
||||
g_slice_free(worker_closing_socket, scs);
|
||||
}
|
||||
|
||||
static void worker_closing_socket_cb(int revents, void* arg) {
|
||||
worker_closing_socket *scs = (worker_closing_socket*) arg;
|
||||
liWorker *wrk = scs->wrk;
|
||||
ssize_t r;
|
||||
UNUSED(revents);
|
||||
|
||||
/* Whatever happend: we just close the socket */
|
||||
shutdown(scs->fd, SHUT_RD);
|
||||
close(scs->fd);
|
||||
g_queue_delete_link(&scs->wrk->closing_sockets, scs->link);
|
||||
g_slice_free(worker_closing_socket, scs);
|
||||
/* empty the input buffer, wait for EOF or timeout or a socket error to close it */
|
||||
g_string_set_size(wrk->tmp_str, 1024);
|
||||
for (;;) {
|
||||
r = read(scs->fd, wrk->tmp_str->str, wrk->tmp_str->len);
|
||||
if (0 == r) break; /* got EOF */
|
||||
if (0 > r) { /* error */
|
||||
switch (errno) {
|
||||
case EINTR:
|
||||
/* call read again */
|
||||
continue;
|
||||
case EAGAIN:
|
||||
#if EWOULDBLOCK != EAGAIN
|
||||
case EWOULDBLOCK:
|
||||
#endif
|
||||
/* check timeout: */
|
||||
if (!(revents & EV_TIMEOUT)) {
|
||||
/* wait again */
|
||||
ev_once(wrk->loop, scs->fd, EV_READ, scs->close_timeout - ev_now(wrk->loop), worker_closing_socket_cb, scs);
|
||||
return;
|
||||
}
|
||||
/* timeout reached, break switch and loop */
|
||||
break;
|
||||
default:
|
||||
/* real error (probably ECONNRESET or similar): break switch and loop */
|
||||
/* no logging: there is no context anymore for the socket */
|
||||
break;
|
||||
}
|
||||
break; /* end loop */
|
||||
}
|
||||
}
|
||||
|
||||
worker_close_socket_now(scs);
|
||||
}
|
||||
|
||||
void li_worker_add_closing_socket(liWorker *wrk, int fd) {
|
||||
|
@ -42,13 +81,14 @@ void li_worker_add_closing_socket(liWorker *wrk, int fd) {
|
|||
scs->fd = fd;
|
||||
g_queue_push_tail(&wrk->closing_sockets, scs);
|
||||
scs->link = g_queue_peek_tail_link(&wrk->closing_sockets);
|
||||
scs->close_timeout = ev_now(wrk->loop) + 10.0;
|
||||
|
||||
ev_once(wrk->loop, fd, EV_READ, 10.0, worker_closing_socket_cb, scs);
|
||||
}
|
||||
|
||||
/* Kill it - frees fd */
|
||||
static void worker_rem_closing_socket(liWorker *wrk, worker_closing_socket *scs) {
|
||||
ev_feed_fd_event(wrk->loop, scs->fd, EV_READ);
|
||||
ev_feed_fd_event(wrk->loop, scs->fd, EV_TIMEOUT);
|
||||
}
|
||||
|
||||
/* Keep alive */
|
||||
|
@ -387,7 +427,7 @@ void li_worker_free(liWorker *wrk) {
|
|||
{ /* force closing sockets */
|
||||
GList *iter;
|
||||
for (iter = g_queue_peek_head_link(&wrk->closing_sockets); iter; iter = g_list_next(iter)) {
|
||||
worker_closing_socket_cb(EV_TIMEOUT, (worker_closing_socket*) iter->data);
|
||||
worker_close_socket_now((worker_closing_socket*) iter->data);
|
||||
}
|
||||
g_queue_clear(&wrk->closing_sockets);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue