Browse Source

write a small socket server with io_wait integration and add and debug

io framework enough to support the test program
master
Felix von Leitner 18 years ago
parent
commit
092b5f860b
  1. 2
      array/array_allocate.c
  2. 3
      io.h
  3. 9
      io/io_check.c
  4. 6
      io/io_close.c
  5. 1
      io/io_fd.c
  6. 9
      io/io_tryread.c
  7. 13
      io/io_trywrite.c
  8. 14
      io/io_waituntil2.c
  9. 5
      io_internal.h
  10. 95
      test/io5.c

2
array/array_allocate.c

@ -75,6 +75,6 @@ void* array_allocate(array* x,uint64 membersize,int64 pos) {
x->allocated=wanted;
byte_zero(x->p+x->initialized,x->allocated-x->initialized);
}
x->initialized=pos*membersize;
x->initialized=(pos+1)*membersize;
return x->p+pos*membersize;
}

3
io.h

@ -50,6 +50,9 @@ int64 io_canread();
/* return next descriptor from io_wait that can be written to */
int64 io_canwrite();
/* return next descriptor with expired timeout */
int64 io_timeouted();
/* put d on internal data structure, return 1 on success, 0 on error */
int io_fd(int64 d);

9
io/io_check.c

@ -0,0 +1,9 @@
#include <unistd.h>
#include <sys/time.h>
#include <poll.h>
#include <errno.h>
#include "io_internal.h"
void io_check() {
io_waituntil2(0);
}

6
io/io_close.c

@ -4,5 +4,9 @@
void io_close(int64 d) {
close(d);
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
if (e) e->inuse=0;
if (e) {
e->inuse=0;
io_dontwantread(d);
io_dontwantwrite(d);
}
}

1
io/io_fd.c

@ -12,5 +12,6 @@ int io_fd(int64 d) {
if (!(e=array_allocate(&io_fds,sizeof(io_entry),d))) return 0;
e->inuse=1;
if (r&O_NDELAY) e->nonblock=1;
e->next_read=e->next_write=-1;
return 1;
}

9
io/io_tryread.c

@ -16,7 +16,10 @@ int64 io_tryread(int64 d,char* buf,int64 len) {
p.events=POLLIN;
switch (poll(&p,1,0)) {
case -1: return -3;
case 0: errno=EAGAIN; return -1;
case 0: errno=EAGAIN;
e->canread=0;
e->next_read=-1;
return -1;
}
new.it_interval.tv_usec=0;
new.it_interval.tv_sec=0;
@ -37,5 +40,9 @@ int64 io_tryread(int64 d,char* buf,int64 len) {
if (errno!=EAGAIN)
r=-3;
}
if (r==-1 || r==0) {
e->canread=0;
e->next_read=-1;
}
return r;
}

13
io/io_trywrite.c

@ -16,7 +16,10 @@ int64 io_trywrite(int64 d,const char* buf,int64 len) {
p.events=POLLOUT;
switch (poll(&p,1,0)) {
case -1: return -3;
case 0: errno=EAGAIN; return -1;
case 0: errno=EAGAIN;
e->canwrite=0;
e->next_write=-1;
return -1;
}
new.it_interval.tv_usec=0;
new.it_interval.tv_sec=0;
@ -32,8 +35,14 @@ int64 io_trywrite(int64 d,const char* buf,int64 len) {
new.it_value.tv_sec=0;
setitimer(ITIMER_REAL,&new,&old);
}
if (r==-1)
if (r==-1) {
if (errno==EINTR) errno=EAGAIN;
if (errno!=EAGAIN)
r=-3;
}
if (r==-1 || r==0) {
e->canwrite=0;
e->next_write=-1;
}
return r;
}

14
io/io_waituntil2.c

@ -29,10 +29,18 @@ again:
if (errno==EINTR) goto again;
return -1;
}
for (i=0; i<r; ++i) {
for (i=r-1; i>=0; --i) {
io_entry* e=array_get(&io_fds,sizeof(io_entry),p->fd);
if (p->revents&POLLIN) e->canread=1;
if (p->revents&POLLOUT) e->canwrite=1;
if (p->revents&POLLIN) {
e->canread=1;
e->next_read=first_readable;
first_readable=p->fd;
}
if (p->revents&POLLOUT) {
e->canwrite=1;
e->next_write=first_writeable;
first_writeable=p->fd;
}
p++;
}
return i;

5
io_internal.h

@ -9,10 +9,15 @@ typedef struct {
unsigned int nonblock:1;
unsigned int inuse:1;
tai6464 timeout;
long next_read;
long next_write;
} io_entry;
array io_fds;
uint64 io_wanted_fds;
array io_pollfds;
unsigned long first_readable;
unsigned long first_writeable;
int64 io_waituntil2(int64 milliseconds);

95
test/io5.c

@ -0,0 +1,95 @@
#include "socket.h"
#include "io.h"
#include "buffer.h"
#include "ip6.h"
#include <errno.h>
main() {
int s=socket_tcp6();
uint32 scope_id;
char ip[16];
uint16 port;
if (socket_bind6_reuse(s,V6any,1234,0)==-1) return 111;
if (socket_listen(s,16)==-1) return 111;
io_nonblock(s);
if (!io_fd(s)) return 111;
io_wantread(s);
buffer_puts(buffer_2,"listening on port 1234 (fd #");
buffer_putulong(buffer_2,s);
buffer_putsflush(buffer_2,")\n");
for (;;) {
int64 i;
io_wait();
buffer_putsflush(buffer_2,"io_wait() returned!\n");
while ((i=io_canread())!=-1) {
if (i==s) {
int n;
while ((n=socket_accept6(s,ip,&port,&scope_id))!=-1) {
char buf[IP6_FMT];
buffer_puts(buffer_2,"accepted new connection from ");
buffer_put(buffer_2,buf,fmt_ip6(buf,ip));
buffer_puts(buffer_2,":");
buffer_putulong(buffer_2,port);
buffer_puts(buffer_2," (fd ");
buffer_putulong(buffer_2,n);
buffer_puts(buffer_2,")");
if (io_fd(n)) {
io_wantread(n);
} else {
buffer_puts(buffer_2,", but io_fd failed.");
io_close(n);
}
buffer_putnlflush(buffer_2);
}
if (errno!=EAGAIN) {
buffer_puts(buffer_2,"socket_accept6: ");
buffer_puterror(buffer_2);
buffer_putnlflush(buffer_2);
}
} else {
char buf[1024];
int l=io_tryread(i,buf,sizeof buf);
if (l==-1) {
buffer_puts(buffer_2,"io_tryread(");
buffer_putulong(buffer_2,i);
buffer_puts(buffer_2,"): ");
buffer_puterror(buffer_2);
buffer_putnlflush(buffer_2);
io_close(i);
} else if (l==0) {
buffer_puts(buffer_2,"eof on fd #");
buffer_putulong(buffer_2,i);
buffer_putnlflush(buffer_2);
io_close(i);
} else {
int r;
switch (r=io_trywrite(i,buf,l)) {
case -1:
buffer_puts(buffer_2,"io_tryread(");
buffer_putulong(buffer_2,i);
buffer_puts(buffer_2,"): ");
buffer_puterror(buffer_2);
buffer_putnlflush(buffer_2);
io_close(i);
break;
case 0:
buffer_puts(buffer_2,"write eof on fd #");
buffer_putulong(buffer_2,i);
buffer_putnlflush(buffer_2);
io_close(i);
default:
if (r!=l) {
buffer_puts(buffer_2,"short write on fd #");
buffer_putulong(buffer_2,i);
buffer_puts(buffer_2,": wrote ");
buffer_putulong(buffer_2,r);
buffer_puts(buffer_2,", wanted to write ");
buffer_putulong(buffer_2,l);
buffer_putsflush(buffer_2,").\n");
}
}
}
}
}
}
}
Loading…
Cancel
Save