Browse Source

Live sync is now handled in its own thread. Therefore it now creates and handles its own sockets.

master
Dirk Engling 13 years ago
parent
commit
0ded14be37
  1. 136
      ot_livesync.c
  2. 18
      ot_livesync.h

136
ot_livesync.c

@ -7,9 +7,11 @@
#include <sys/types.h>
#include <sys/uio.h>
#include <string.h>
#include <pthread.h>
/* Libowfat */
#include "socket.h"
#include "ndelay.h"
/* Opentracker */
#include "trackerlogic.h"
@ -17,10 +19,23 @@
#include "ot_accesslist.h"
#ifdef WANT_SYNC_LIVE
char groupip_1[4] = { LIVESYNC_MCASTDOMAIN_1 };
char groupip_1[4] = { 224,0,23,42 };
#define LIVESYNC_BUFFINSIZE (256*256)
#define LIVESYNC_BUFFSIZE 1504
#define LIVESYNC_BUFFWATER (sizeof(ot_peer)+sizeof(ot_hash))
#define LIVESYNC_MAXDELAY 15
/* Forward declaration */
static void * livesync_worker( void * args );
/* For outgoing packets */
int64 g_livesync_socket = -1;
static int64 g_livesync_socket_in = -1;
/* For incoming packets */
static int64 g_livesync_socket_out = -1;
static uint8_t livesync_inbuffer[LIVESYNC_BUFFINSIZE];
static uint8_t livesync_outbuffer_start[ LIVESYNC_BUFFSIZE ];
@ -28,34 +43,49 @@ static uint8_t *livesync_outbuffer_pos;
static uint8_t *livesync_outbuffer_highwater = livesync_outbuffer_start + LIVESYNC_BUFFSIZE - LIVESYNC_BUFFWATER;
static ot_time livesync_lastpacket_time;
static pthread_t thread_id;
void livesync_init( ) {
if( g_livesync_socket == -1 )
if( g_livesync_socket_in == -1 )
exerr( "No socket address for live sync specified." );
livesync_outbuffer_pos = livesync_outbuffer_start;
memmove( livesync_outbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) );
livesync_outbuffer_pos += sizeof( g_tracker_id );
livesync_lastpacket_time = g_now;
pthread_create( &thread_id, NULL, livesync_worker, NULL );
}
void livesync_deinit() {
pthread_cancel( thread_id );
}
void livesync_bind_mcast( char *ip, uint16_t port) {
char tmpip[4] = {0,0,0,0};
if( g_livesync_socket != -1 )
exerr("Livesync listen ip specified twice.");
if( socket_mcjoin4( ot_try_bind(tmpip, port, FLAG_MCA ), groupip_1, ip ) )
exerr("Cant join mcast group.");
g_livesync_socket = ot_try_bind( ip, port, FLAG_UDP );
io_dontwantread(g_livesync_socket);
socket_mcttl4(g_livesync_socket, 1);
socket_mcloop4(g_livesync_socket, 0);
if( g_livesync_socket_in != -1 )
exerr("Error: Livesync listen ip specified twice.");
if( ( g_livesync_socket_in = socket_udp4( )) < 0)
exerr("Error: Cant create live sync incoming socket." );
ndelay_off(g_livesync_socket_in);
if( socket_bind4_reuse( g_livesync_socket_in, tmpip, port ) == -1 )
exerr("Error: Cant bind live sync incoming socket." );
if( socket_mcjoin4( g_livesync_socket_in, groupip_1, ip ) )
exerr("Error: Cant make live sync incoming socket join mcast group.");
if( ( g_livesync_socket_out = socket_udp4()) < 0)
exerr("Error: Cant create live sync outgoing socket." );
if( socket_bind4_reuse( g_livesync_socket_out, ip, port ) == -1 )
exerr("Error: Cant bind live sync outgoing socket." );
socket_mcttl4(g_livesync_socket_out, 1);
socket_mcloop4(g_livesync_socket_out, 0);
}
static void livesync_issuepacket( ) {
socket_send4(g_livesync_socket, (char*)livesync_outbuffer_start, livesync_outbuffer_pos - livesync_outbuffer_start,
socket_send4(g_livesync_socket_out, (char*)livesync_outbuffer_start, livesync_outbuffer_pos - livesync_outbuffer_start,
groupip_1, LIVESYNC_PORT);
livesync_outbuffer_pos = livesync_outbuffer_start + sizeof( g_tracker_id );
livesync_lastpacket_time = g_now;
@ -81,42 +111,52 @@ void livesync_ticker( ) {
livesync_issuepacket();
}
/* Handle an incoming live sync packet */
void handle_livesync( int64 serversocket ) {
static void * livesync_worker( void * args ) {
uint8_t in_ip[4]; uint16_t in_port;
ssize_t datalen = socket_recv4(serversocket, (char*)livesync_inbuffer, LIVESYNC_BUFFINSIZE, (char*)in_ip, &in_port);
int off = 4;
if( datalen < (ssize_t)(sizeof( g_tracker_id ) + sizeof( ot_hash ) + sizeof( ot_peer ) ) ) {
// TODO: log invalid sync packet
return;
}
if( !accesslist_isblessed((char*)in_ip, OT_PERMISSION_MAY_LIVESYNC)) {
// TODO: log invalid sync packet
return;
}
if( !memcmp( livesync_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
// TODO: log packet coming from ourselves
return;
}
// Now basic sanity checks have been done on the live sync packet
// We might add more testing and logging.
while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) {
ot_peer *peer = (ot_peer*)(livesync_inbuffer + off + sizeof(ot_hash));
ot_hash *hash = (ot_hash*)(livesync_inbuffer + off);
if( OT_FLAG(peer) & PEER_FLAG_STOPPED )
remove_peer_from_torrent(hash, peer, NULL, FLAG_MCA);
else
add_peer_to_torrent( hash, peer WANT_SYNC_PARAM(1));
off += sizeof( ot_hash ) + sizeof( ot_peer );
}
ssize_t datalen;
int off;
args = args;
while( 1 ) {
datalen = socket_recv4(g_livesync_socket_in, (char*)livesync_inbuffer, LIVESYNC_BUFFINSIZE, (char*)in_ip, &in_port);
off = 4;
if( datalen <= 0 )
continue;
if( datalen < (ssize_t)(sizeof( g_tracker_id ) + sizeof( ot_hash ) + sizeof( ot_peer ) ) ) {
// TODO: log invalid sync packet
continue;
}
if( !accesslist_isblessed((char*)in_ip, OT_PERMISSION_MAY_LIVESYNC)) {
// TODO: log invalid sync packet
continue;
}
if( !memcmp( livesync_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
// TODO: log packet coming from ourselves
continue;
}
// Now basic sanity checks have been done on the live sync packet
// We might add more testing and logging.
while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) {
ot_peer *peer = (ot_peer*)(livesync_inbuffer + off + sizeof(ot_hash));
ot_hash *hash = (ot_hash*)(livesync_inbuffer + off);
if( OT_FLAG(peer) & PEER_FLAG_STOPPED )
remove_peer_from_torrent(hash, peer, NULL, FLAG_MCA);
else
add_peer_to_torrent( hash, peer WANT_SYNC_PARAM(1));
off += sizeof( ot_hash ) + sizeof( ot_peer );
}
}
/* Never returns. */
return NULL;
}
#endif
const char *g_version_livesync_c = "$Source: /home/cvsroot/opentracker/ot_livesync.c,v $: $Revision: 1.1 $\n";
const char *g_version_livesync_c = "$Source: /home/cvsroot/opentracker/ot_livesync.c,v $: $Revision: 1.2 $\n";

18
ot_livesync.h

@ -10,14 +10,14 @@
#include "trackerlogic.h"
/*
Syncing is done as udp packets in the multicast domain 224.23.42.N port 9696
Syncing is done as udp packets in the multicast domain 224.0.42.N port 9696
Each tracker should join the multicast group and send its live sync packets
to that group, using a ttl of 1
Format of a live sync packet is straight forward and depends on N:
For N == 1: (simple tracker2tracker sync)
For N == 23: (simple tracker2tracker sync)
0x0000 0x04 id of tracker instance
[ 0x0004 0x14 info_hash
0x0018 0x04 peer's ipv4 address
@ -25,7 +25,7 @@
0x0020 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 )
]*
For N == 2: (aggregator syncs)
For N == 24: (aggregator syncs)
0x0000 0x04 id of tracker instance
[ 0x0004 0x14 info_hash
0x0018 0x01 number of peers
@ -41,18 +41,6 @@
#ifdef WANT_SYNC_LIVE
#define LIVESYNC_PORT 9696
#define LIVESYNC_MCASTDOMAIN_1 224,23,42,1
#define LIVESYNC_MCASTDOMAIN_2 224,23,42,2
extern char groupip_1[4];
extern char groupip_2[4];
extern int64 g_livesync_socket;
#define LIVESYNC_BUFFINSIZE (256*256)
#define LIVESYNC_BUFFSIZE 1504
#define LIVESYNC_BUFFWATER (sizeof(ot_peer)+sizeof(ot_hash))
#define LIVESYNC_MAXDELAY 15
void livesync_init();
void livesync_deinit();

Loading…
Cancel
Save