Browse Source

Network handling code skeleton stands, is untested and no configure code there, yet.

master
Dirk Engling 12 years ago
parent
commit
fd149f843e
  1. 183
      proxy.c

183
proxy.c

@ -1,7 +1,7 @@
/* This software was written by Dirk Engling <erdgeist@erdgeist.org>
It is considered beerware. Prost. Skol. Cheers or whatever.
$Id: proxy.c,v 1.3 2009/09/29 06:03:39 erdgeist Exp $ */
$Id: proxy.c,v 1.4 2009/10/01 17:16:15 erdgeist Exp $ */
/* System */
#include <stdint.h>
@ -30,9 +30,11 @@
#include "trackerlogic.h"
#include "ot_vector.h"
#include "ot_mutex.h"
#include "ot_livesync.h"
#include "ot_stats.h"
#define WANT_SYNC_LIVE
#include "ot_livesync.h"
ot_ip6 g_serverip;
uint16_t g_serverport = 9009;
uint32_t g_tracker_id;
@ -49,6 +51,7 @@ int g_self_pipe[2];
#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
#define LIVESYNC_MAXDELAY 15 /* seconds */
/* The amount of time a complete sync cycle should take */
#define OT_SYNC_INTERVAL_MINUTES 2
@ -65,10 +68,14 @@ static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE];
/* For outgoing packets */
static int64 g_socket_out = -1;
//static uint8_t g_outbuffer[STREAMSYNC_OUTGOING_BUFFSIZE];
static uint8_t g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
static uint8_t *g_peerbuffer_pos;
static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS;
static ot_time g_next_packet_time;
static void * livesync_worker( void * args );
static void * streamsync_worker( void * args );
static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer );
void exerr( char * message ) {
fprintf( stderr, "%s\n", message );
@ -226,15 +233,31 @@ enum {
#define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED)
typedef struct {
int state; /* Whether we want to connect, how far our handshake is, etc. */
ot_ip6 ip; /* The peer to connect to */
uint16_t port; /* The peers port */
uint8_t *indata; /* Any data not processed yet */
size_t indata_length; /* Length of unprocessed data */
uint32_t tracker_id; /* How the other end greeted */
int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */
io_batch outdata; /* The iobatch containing our sync data */
int state; /* Whether we want to connect, how far our handshake is, etc. */
ot_ip6 ip; /* The peer to connect to */
uint16_t port; /* The peers port */
uint8_t indata[8192*16]; /* Any data not processed yet */
size_t indata_length; /* Length of unprocessed data */
uint32_t tracker_id; /* How the other end greeted */
int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */
io_batch outdata; /* The iobatch containing our sync data */
int packet_tcount; /* Number of unprocessed torrents in packet we currently receive */
char packet_tprefix; /* Prefix byte for all torrents in current packet */
char packet_type; /* Type of current packet */
uint32_t packet_tid; /* Tracker id for current packet */
} proxy_peer;
static void process_indata( proxy_peer * peer );
void reset_info_block( proxy_peer * peer ) {
peer->indata_length = 0;
peer->tracker_id = 0;
peer->fd = -1;
peer->packet_tcount = 0;
iob_reset( &peer->outdata );
PROXYPEER_SETDISCONNECTED( peer->state );
}
/* Number of connections to peers
* If a peer's IP is set, we try to reconnect, when the connection drops
@ -266,12 +289,8 @@ static void handle_reconnects( void ) {
io_setcookie(newfd,g_connections+i);
/* Prepare connection info block */
free( g_connections[i].indata );
g_connections[i].indata = 0;
g_connections[i].indata_length = 0;
reset_info_block( g_connections+i );
g_connections[i].fd = newfd;
g_connections[i].tracker_id = 0;
iob_reset( &g_connections[i].outdata );
PROXYPEER_SETCONNECTING( g_connections[i].state );
}
g_connection_reconn = time(NULL) + 30;
@ -305,16 +324,10 @@ static void handle_accept( int64 serversocket ) {
}
/* Prepare connection info block */
free( g_connections[i].indata );
g_connections[i].indata = 0;
g_connections[i].indata_length = 0;
g_connections[i].port = port;
g_connections[i].fd = newfd;
g_connections[i].tracker_id = 0;
iob_reset( &g_connections[i].outdata );
g_connections[i].tracker_id = 0;
reset_info_block( g_connections+i );
PROXYPEER_SETCONNECTING( g_connections[i].state );
g_connections[i].port = port;
g_connections[i].fd = newfd;
io_setcookie( newfd, g_connections + i );
@ -328,19 +341,25 @@ static void handle_accept( int64 serversocket ) {
/* New sync data on the stream */
static void handle_read( int64 peersocket ) {
int i;
int64 datalen;
uint32_t tracker_id;
proxy_peer *peer = io_getcookie( peersocket );
if( !peer ) {
/* Can't happen ;) */
close( peersocket );
io_close( peersocket );
return;
}
switch( peer->state & FLAG_MASK ) {
case FLAG_DISCONNECTED: break; /* Shouldnt happen */
case FLAG_DISCONNECTED:
io_close( peersocket );
break; /* Shouldnt happen */
case FLAG_CONNECTING:
case FLAG_WAITTRACKERID:
/* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) */
if( io_tryread( peersocket, &tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) )
/* We want at least the first four bytes to come at once, to avoid keeping extra states (for now)
This also catches 0 bytes reads == EOF and negative values, denoting connection errors */
if( io_tryread( peersocket, (void*)&tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) )
goto close_socket;
/* See, if we already have a connection to that peer */
@ -363,12 +382,20 @@ static void handle_read( int64 peersocket ) {
break;
close_socket:
io_close( peersocket );
PROXYPEER_SETDISCONNECTED( peer->state );
reset_info_block( peer );
break;
case FLAG_CONNECTED:
/* Here we acutally expect data from peer
indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */
datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length );
if( !datalen || datalen < -1 ) {
io_close( peersocket );
reset_info_block( peer );
} else if( datalen > 0 ) {
peer->indata_length += datalen;
process_indata( peer );
}
break;
}
}
@ -377,13 +404,22 @@ static void handle_write( int64 peersocket ) {
proxy_peer *peer = io_getcookie( peersocket );
if( !peer ) {
/* Can't happen ;) */
close( peersocket );
io_close( peersocket );
return;
}
switch( peer->state & FLAG_MASK ) {
case FLAG_DISCONNECTED: break; /* Shouldnt happen */
case FLAG_DISCONNECTED:
default: /* Should not happen */
io_close( peersocket );
break;
case FLAG_CONNECTING:
/* Ensure that the connection is established and handle connection error */
if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) {
io_close( peersocket );
break;
}
io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) );
PROXYPEER_SETWAITTRACKERID( peer->state );
io_dontwantwrite( peersocket );
@ -396,15 +432,12 @@ static void handle_write( int64 peersocket ) {
break;
case -3: /* an error occured */
io_close( peersocket );
PROXYPEER_SETDISCONNECTED( peer->state );
iob_reset( &peer->outdata );
free( peer->indata );
reset_info_block( peer );
break;
default: /* Normal operation or eagain */
break;
}
break;
default:
break;
}
return;
@ -414,6 +447,14 @@ static void server_mainloop() {
int64 sock;
tai6464 now;
/* inlined livesync_init() */
memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) );
g_peerbuffer_pos = g_peerbuffer_start;
memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) );
uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER);
g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t);
g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
while(1) {
/* See, if we need to connect to anyone */
if( time(NULL) > g_connection_reconn )
@ -436,6 +477,8 @@ static void server_mainloop() {
/* Loop over writable sockets */
while( ( sock = io_canwrite( ) ) != -1 )
handle_write( sock );
livesync_ticker( );
}
}
@ -600,6 +643,68 @@ unlock_continue:
return 0;
}
static void livesync_issue_peersync( ) {
socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start,
groupip_1, LIVESYNC_PORT);
g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t );
g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
}
void livesync_ticker( ) {
/* livesync_issue_peersync sets g_next_packet_time */
if( time(NULL) > g_next_packet_time &&
g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) )
livesync_issue_peersync();
}
static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ) {
*g_peerbuffer_pos = prefix;
memcpy( g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1 );
memcpy( g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1 );
g_peerbuffer_pos += sizeof(ot_hash) + sizeof(ot_peer);
if( g_peerbuffer_pos >= g_peerbuffer_highwater )
livesync_issue_peersync();
}
static void process_indata( proxy_peer * peer ) {
int ensuremem, consumed, peers;
uint8_t *data = peer->indata, *hash;
uint8_t *dataend = data + peer->indata_length;
while( 1 ) {
/* If we're not inside of a packet, make a new one */
if( !peer->packet_tcount ) {
/* Ensure the header is complete or postpone processing */
if( data + 8 > dataend ) break;
memcpy( &peer->packet_tid, data, sizeof(peer->packet_tid) );
peer->packet_type = data[4];
peer->packet_tprefix = data[5];
peer->packet_tcount = data[6] * 256 + data[7];
data += 8;
}
/* ensure size for the complete torrent block */
if( data + 26 > dataend ) break;
peers = peer->packet_type ? peer->packet_type : data[19];
ensuremem = 19 + ( peer->packet_type == 0 ) + 7 * peers;
if( data + ensuremem > dataend ) break;
hash = data;
data += 19 + ( peer->packet_type == 0 );
while( peers-- ) {
livesync_proxytell( peer->packet_tprefix, hash, data );
data += 7;
}
}
consumed = data - peer->indata;
memmove( peer->indata, data, peer->indata_length - consumed );
peer->indata_length -= consumed;
}
static void * livesync_worker( void * args ) {
(void)args;
while( 1 ) {

Loading…
Cancel
Save