|
|
|
@ -43,6 +43,27 @@ void stream_start(server *srv, stream *s) {
|
|
|
|
|
ev_io_add_events(srv->loop, &s->watcher, events);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
gssize stream_closed(server *srv, stream *s) {
|
|
|
|
|
ev_io_stop(srv->loop, &s->watcher);
|
|
|
|
|
if (s->fd != -1) {
|
|
|
|
|
shutdown(s->fd, SHUT_RDWR);
|
|
|
|
|
close(s->fd);
|
|
|
|
|
s->fd = -1;
|
|
|
|
|
}
|
|
|
|
|
s->closed = TRUE;
|
|
|
|
|
g_string_truncate(s->buffer, 0);
|
|
|
|
|
if (s->other->closed) return -1;
|
|
|
|
|
if (s->other->buffer->len == 0) {
|
|
|
|
|
/* nothing to send on the other side, so close that too. */
|
|
|
|
|
return stream_closed(srv, s->other);
|
|
|
|
|
} else {
|
|
|
|
|
/* we can't send the data, so we don't read it */
|
|
|
|
|
/* other stream gets closed if its buffer gets empty */
|
|
|
|
|
ev_io_rem_events(srv->loop, &s->other->watcher, EV_READ);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* -1: connection got closed, 0: nothing to read, n: read n bytes */
|
|
|
|
|
gssize stream_read(server *srv, stream *s, char *buf, gssize bufsize) {
|
|
|
|
|
gssize len;
|
|
|
|
@ -55,8 +76,7 @@ gssize stream_read(server *srv, stream *s, char *buf, gssize bufsize) {
|
|
|
|
|
/* nothing to read */
|
|
|
|
|
return 0;
|
|
|
|
|
case ECONNRESET:
|
|
|
|
|
stream_close(srv, s, s->other);
|
|
|
|
|
return -1;
|
|
|
|
|
return stream_closed(srv, s);
|
|
|
|
|
case EINTR: /* try again */
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
@ -66,13 +86,16 @@ gssize stream_read(server *srv, stream *s, char *buf, gssize bufsize) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (0 == len) { /* connection closed */
|
|
|
|
|
stream_close(srv, s, s->other);
|
|
|
|
|
return -1;
|
|
|
|
|
return stream_closed(srv, s);
|
|
|
|
|
}
|
|
|
|
|
return len;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void stream_append(server *srv, stream *s, char *buf, gssize bufsize) {
|
|
|
|
|
if (s->closed) {
|
|
|
|
|
ev_io_rem_events(srv->loop, &s->other->watcher, EV_READ);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
g_string_append_len(s->buffer, buf, bufsize);
|
|
|
|
|
if (s->buffer->len > 0) ev_io_add_events(srv->loop, &s->watcher, EV_WRITE);
|
|
|
|
|
if (s->buffer->len > MAX_STREAM_BUF_SIZE) ev_io_rem_events(srv->loop, &s->other->watcher, EV_READ);
|
|
|
|
@ -91,15 +114,21 @@ gssize stream_write(server *srv, stream *s) {
|
|
|
|
|
return 0;
|
|
|
|
|
case ECONNRESET:
|
|
|
|
|
case EPIPE:
|
|
|
|
|
stream_close(srv, s, s->other);
|
|
|
|
|
return -1;
|
|
|
|
|
return stream_closed(srv, s);
|
|
|
|
|
case EINTR: /* try again */
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
g_message("read error: %s", g_strerror(errno));
|
|
|
|
|
g_message("write error: %s", g_strerror(errno));
|
|
|
|
|
stream_close(srv, s, s->other);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
g_string_erase(s->buffer, 0, len);
|
|
|
|
|
if (s->buffer->len == 0) {
|
|
|
|
|
if (s->other->closed) return stream_closed(srv, s);
|
|
|
|
|
ev_io_rem_events(srv->loop, &s->watcher, EV_WRITE);
|
|
|
|
|
}
|
|
|
|
|
if (s->buffer->len < MAX_STREAM_BUF_SIZE && !s->other->closed)
|
|
|
|
|
ev_io_add_events(srv->loop, &s->other->watcher, EV_READ);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|