An open and free bittorrent tracker https://erdgeist.org/gitweb/opentracker
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

845 lines
27 KiB

  1. /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
  2. It is considered beerware. Prost. Skol. Cheers or whatever.
  3. $Id$ */
  4. /* System */
  5. #include <stdint.h>
  6. #include <stdlib.h>
  7. #include <string.h>
  8. #include <arpa/inet.h>
  9. #include <sys/socket.h>
  10. #include <unistd.h>
  11. #include <errno.h>
  12. #include <signal.h>
  13. #include <stdio.h>
  14. #include <pwd.h>
  15. #include <ctype.h>
  16. #include <pthread.h>
  17. /* Libowfat */
  18. #include "socket.h"
  19. #include "io.h"
  20. #include "iob.h"
  21. #include "byte.h"
  22. #include "scan.h"
  23. #include "ip6.h"
  24. #include "ndelay.h"
  25. /* Opentracker */
  26. #include "trackerlogic.h"
  27. #include "ot_vector.h"
  28. #include "ot_mutex.h"
  29. #include "ot_stats.h"
  30. #ifndef WANT_SYNC_LIVE
  31. #define WANT_SYNC_LIVE
  32. #endif
  33. #include "ot_livesync.h"
  34. ot_ip6 g_serverip;
  35. uint16_t g_serverport = 9009;
  36. uint32_t g_tracker_id;
  37. char groupip_1[4] = { 224,0,23,5 };
  38. int g_self_pipe[2];
  39. /* If you have more than 10 peers, don't use this proxy
  40. Use 20 slots for 10 peers to have room for 10 incoming connection slots
  41. */
  42. #define MAX_PEERS 20
  43. #define LIVESYNC_INCOMING_BUFFSIZE (256*256)
  44. #define STREAMSYNC_OUTGOING_BUFFSIZE (256*256)
  45. #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
  46. #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
  47. #define LIVESYNC_MAXDELAY 15 /* seconds */
  48. /* The amount of time a complete sync cycle should take */
  49. #define OT_SYNC_INTERVAL_MINUTES 2
  50. /* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */
  51. #define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) )
  52. enum { OT_SYNC_PEER };
  53. enum { FLAG_SERVERSOCKET = 1 };
  54. /* For incoming packets */
  55. static int64 g_socket_in = -1;
  56. static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE];
  57. /* For outgoing packets */
  58. static int64 g_socket_out = -1;
  59. static uint8_t g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
  60. static uint8_t *g_peerbuffer_pos;
  61. static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS;
  62. static ot_time g_next_packet_time;
  63. static void * livesync_worker( void * args );
  64. static void * streamsync_worker( void * args );
  65. static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer );
  66. void exerr( char * message ) {
  67. fprintf( stderr, "%s\n", message );
  68. exit( 111 );
  69. }
  70. void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event_data ) {
  71. (void) event;
  72. (void) proto;
  73. (void) event_data;
  74. }
  75. void livesync_bind_mcast( ot_ip6 ip, uint16_t port) {
  76. char tmpip[4] = {0,0,0,0};
  77. char *v4ip;
  78. if( !ip6_isv4mapped(ip))
  79. exerr("v6 mcast support not yet available.");
  80. v4ip = ip+12;
  81. if( g_socket_in != -1 )
  82. exerr("Error: Livesync listen ip specified twice.");
  83. if( ( g_socket_in = socket_udp4( )) < 0)
  84. exerr("Error: Cant create live sync incoming socket." );
  85. ndelay_off(g_socket_in);
  86. if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 )
  87. exerr("Error: Cant bind live sync incoming socket." );
  88. if( socket_mcjoin4( g_socket_in, groupip_1, v4ip ) )
  89. exerr("Error: Cant make live sync incoming socket join mcast group.");
  90. if( ( g_socket_out = socket_udp4()) < 0)
  91. exerr("Error: Cant create live sync outgoing socket." );
  92. if( socket_bind4_reuse( g_socket_out, v4ip, port ) == -1 )
  93. exerr("Error: Cant bind live sync outgoing socket." );
  94. socket_mcttl4(g_socket_out, 1);
  95. socket_mcloop4(g_socket_out, 1);
  96. }
  97. size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) {
  98. int exactmatch;
  99. ot_torrent *torrent;
  100. ot_peer *peer_dest;
  101. ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash );
  102. torrent = vector_find_or_insert( torrents_list, (void*)hash, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch );
  103. if( !torrent )
  104. return -1;
  105. if( !exactmatch ) {
  106. /* Create a new torrent entry, then */
  107. memcpy( torrent->hash, hash, sizeof(ot_hash) );
  108. if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) {
  109. vector_remove_torrent( torrents_list, torrent );
  110. mutex_bucket_unlock_by_hash( hash, 0 );
  111. return -1;
  112. }
  113. byte_zero( torrent->peer_list, sizeof( ot_peerlist ) );
  114. }
  115. /* Check for peer in torrent */
  116. peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), peer, &exactmatch );
  117. if( !peer_dest ) {
  118. mutex_bucket_unlock_by_hash( hash, 0 );
  119. return -1;
  120. }
  121. /* Tell peer that it's fresh */
  122. OT_PEERTIME( peer ) = 0;
  123. /* If we hadn't had a match create peer there */
  124. if( !exactmatch ) {
  125. torrent->peer_list->peer_count++;
  126. if( OT_PEERFLAG(peer) & PEER_FLAG_SEEDING )
  127. torrent->peer_list->seed_count++;
  128. }
  129. memcpy( peer_dest, peer, sizeof(ot_peer) );
  130. mutex_bucket_unlock_by_hash( hash, 0 );
  131. return 0;
  132. }
  133. size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer ) {
  134. int exactmatch;
  135. ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash );
  136. ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch );
  137. if( exactmatch ) {
  138. ot_peerlist *peer_list = torrent->peer_list;
  139. switch( vector_remove_peer( &peer_list->peers, peer ) ) {
  140. case 2: peer_list->seed_count--; /* Fall throughs intended */
  141. case 1: peer_list->peer_count--; /* Fall throughs intended */
  142. default: break;
  143. }
  144. }
  145. mutex_bucket_unlock_by_hash( hash, 0 );
  146. return 0;
  147. }
  148. void free_peerlist( ot_peerlist *peer_list ) {
  149. if( peer_list->peers.data ) {
  150. if( OT_PEERLIST_HASBUCKETS( peer_list ) ) {
  151. ot_vector *bucket_list = (ot_vector*)(peer_list->peers.data);
  152. while( peer_list->peers.size-- )
  153. free( bucket_list++->data );
  154. }
  155. free( peer_list->peers.data );
  156. }
  157. free( peer_list );
  158. }
  159. static void livesync_handle_peersync( ssize_t datalen ) {
  160. int off = sizeof( g_tracker_id ) + sizeof( uint32_t );
  161. fprintf( stderr, "." );
  162. while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) {
  163. ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash));
  164. ot_hash *hash = (ot_hash*)(g_inbuffer + off);
  165. if( OT_PEERFLAG(peer) & PEER_FLAG_STOPPED )
  166. remove_peer_from_torrent_proxy( *hash, peer );
  167. else
  168. add_peer_to_torrent_proxy( *hash, peer );
  169. off += sizeof( ot_hash ) + sizeof( ot_peer );
  170. }
  171. }
  172. int usage( char *self ) {
  173. fprintf( stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self );
  174. return 0;
  175. }
  176. enum {
  177. FLAG_OUTGOING = 0x80,
  178. FLAG_DISCONNECTED = 0x00,
  179. FLAG_CONNECTING = 0x01,
  180. FLAG_WAITTRACKERID = 0x02,
  181. FLAG_CONNECTED = 0x03,
  182. FLAG_MASK = 0x07
  183. };
  184. #define PROXYPEER_NEEDSCONNECT(flag) ((flag)==FLAG_OUTGOING)
  185. #define PROXYPEER_ISCONNECTED(flag) (((flag)&FLAG_MASK)==FLAG_CONNECTED)
  186. #define PROXYPEER_SETDISCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED)
  187. #define PROXYPEER_SETCONNECTING(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING)
  188. #define PROXYPEER_SETWAITTRACKERID(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID)
  189. #define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED)
  190. typedef struct {
  191. int state; /* Whether we want to connect, how far our handshake is, etc. */
  192. ot_ip6 ip; /* The peer to connect to */
  193. uint16_t port; /* The peers port */
  194. uint8_t indata[8192*16]; /* Any data not processed yet */
  195. size_t indata_length; /* Length of unprocessed data */
  196. uint32_t tracker_id; /* How the other end greeted */
  197. int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */
  198. io_batch outdata; /* The iobatch containing our sync data */
  199. size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */
  200. uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */
  201. uint8_t packet_type; /* Type of current packet */
  202. uint32_t packet_tid; /* Tracker id for current packet */
  203. } proxy_peer;
  204. static void process_indata( proxy_peer * peer );
  205. void reset_info_block( proxy_peer * peer ) {
  206. peer->indata_length = 0;
  207. peer->tracker_id = 0;
  208. peer->fd = -1;
  209. peer->packet_tcount = 0;
  210. iob_reset( &peer->outdata );
  211. PROXYPEER_SETDISCONNECTED( peer->state );
  212. }
  213. /* Number of connections to peers
  214. * If a peer's IP is set, we try to reconnect, when the connection drops
  215. * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it
  216. * Multiple connections to/from the same ip are okay, if tracker_id doesn't match
  217. * Reconnect attempts occur only twice a minute
  218. */
  219. static int g_connection_count;
  220. static ot_time g_connection_reconn;
  221. static proxy_peer g_connections[MAX_PEERS];
  222. static void handle_reconnects( void ) {
  223. int i;
  224. for( i=0; i<g_connection_count; ++i )
  225. if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) {
  226. int64 newfd = socket_tcp6( );
  227. fprintf( stderr, "(Re)connecting to peer..." );
  228. if( newfd < 0 ) continue; /* No socket for you */
  229. io_fd(newfd);
  230. if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) {
  231. io_close( newfd );
  232. continue;
  233. }
  234. if( socket_connect6(newfd,g_connections[i].ip,g_connections[i].port,0) == -1 &&
  235. errno != EINPROGRESS && errno != EWOULDBLOCK ) {
  236. close(newfd);
  237. continue;
  238. }
  239. io_wantwrite(newfd); /* So we will be informed when it is connected */
  240. io_setcookie(newfd,g_connections+i);
  241. /* Prepare connection info block */
  242. reset_info_block( g_connections+i );
  243. g_connections[i].fd = newfd;
  244. PROXYPEER_SETCONNECTING( g_connections[i].state );
  245. }
  246. g_connection_reconn = time(NULL) + 30;
  247. }
  248. /* Handle incoming connection requests, check against whitelist */
  249. static void handle_accept( int64 serversocket ) {
  250. int64 newfd;
  251. ot_ip6 ip;
  252. uint16 port;
  253. while( ( newfd = socket_accept6( serversocket, ip, &port, NULL ) ) != -1 ) {
  254. /* XXX some access control */
  255. /* Put fd into a non-blocking mode */
  256. io_nonblock( newfd );
  257. if( !io_fd( newfd ) )
  258. io_close( newfd );
  259. else {
  260. /* Find a new home for our incoming connection */
  261. int i;
  262. for( i=0; i<MAX_PEERS; ++i )
  263. if( g_connections[i].state == FLAG_DISCONNECTED )
  264. break;
  265. if( i == MAX_PEERS ) {
  266. fprintf( stderr, "No room for incoming connection." );
  267. close( newfd );
  268. continue;
  269. }
  270. /* Prepare connection info block */
  271. reset_info_block( g_connections+i );
  272. PROXYPEER_SETCONNECTING( g_connections[i].state );
  273. g_connections[i].port = port;
  274. g_connections[i].fd = newfd;
  275. io_setcookie( newfd, g_connections + i );
  276. /* We expect the connecting side to begin with its tracker_id */
  277. io_wantread( newfd );
  278. }
  279. }
  280. return;
  281. }
  282. /* New sync data on the stream */
  283. static void handle_read( int64 peersocket ) {
  284. int i;
  285. int64 datalen;
  286. uint32_t tracker_id;
  287. proxy_peer *peer = io_getcookie( peersocket );
  288. if( !peer ) {
  289. /* Can't happen ;) */
  290. io_close( peersocket );
  291. return;
  292. }
  293. switch( peer->state & FLAG_MASK ) {
  294. case FLAG_DISCONNECTED:
  295. io_close( peersocket );
  296. break; /* Shouldnt happen */
  297. case FLAG_CONNECTING:
  298. case FLAG_WAITTRACKERID:
  299. /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now)
  300. This also catches 0 bytes reads == EOF and negative values, denoting connection errors */
  301. if( io_tryread( peersocket, (void*)&tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) )
  302. goto close_socket;
  303. /* See, if we already have a connection to that peer */
  304. for( i=0; i<MAX_PEERS; ++i )
  305. if( ( g_connections[i].state & FLAG_MASK ) == FLAG_CONNECTED &&
  306. g_connections[i].tracker_id == tracker_id ) {
  307. fprintf( stderr, "Peer already connected. Closing connection.\n" );
  308. goto close_socket;
  309. }
  310. /* Also no need for soliloquy */
  311. if( tracker_id == g_tracker_id )
  312. goto close_socket;
  313. /* The new connection is good, send our tracker_id on incoming connections */
  314. if( peer->state == FLAG_CONNECTING )
  315. if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) != sizeof( g_tracker_id ) )
  316. goto close_socket;
  317. peer->tracker_id = tracker_id;
  318. PROXYPEER_SETCONNECTED( peer->state );
  319. if( peer->state & FLAG_OUTGOING )
  320. fprintf( stderr, "succeeded.\n" );
  321. else
  322. fprintf( stderr, "Incoming connection successful.\n" );
  323. break;
  324. close_socket:
  325. fprintf( stderr, "Handshake incomplete, closing socket\n" );
  326. io_close( peersocket );
  327. reset_info_block( peer );
  328. break;
  329. case FLAG_CONNECTED:
  330. /* Here we acutally expect data from peer
  331. indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */
  332. datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length );
  333. if( !datalen || datalen < -1 ) {
  334. fprintf( stderr, "Connection closed by remote peer.\n" );
  335. io_close( peersocket );
  336. reset_info_block( peer );
  337. } else if( datalen > 0 ) {
  338. peer->indata_length += datalen;
  339. process_indata( peer );
  340. }
  341. break;
  342. }
  343. }
  344. /* Can write new sync data to the stream */
  345. static void handle_write( int64 peersocket ) {
  346. proxy_peer *peer = io_getcookie( peersocket );
  347. if( !peer ) {
  348. /* Can't happen ;) */
  349. io_close( peersocket );
  350. return;
  351. }
  352. switch( peer->state & FLAG_MASK ) {
  353. case FLAG_DISCONNECTED:
  354. default: /* Should not happen */
  355. io_close( peersocket );
  356. break;
  357. case FLAG_CONNECTING:
  358. /* Ensure that the connection is established and handle connection error */
  359. if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) {
  360. fprintf( stderr, "failed\n" );
  361. reset_info_block( peer );
  362. io_close( peersocket );
  363. break;
  364. }
  365. if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) == sizeof( g_tracker_id ) ) {
  366. PROXYPEER_SETWAITTRACKERID( peer->state );
  367. io_dontwantwrite( peersocket );
  368. io_wantread( peersocket );
  369. } else {
  370. fprintf( stderr, "Handshake incomplete, closing socket\n" );
  371. io_close( peersocket );
  372. reset_info_block( peer );
  373. }
  374. break;
  375. case FLAG_CONNECTED:
  376. switch( iob_send( peersocket, &peer->outdata ) ) {
  377. case 0: /* all data sent */
  378. io_dontwantwrite( peersocket );
  379. break;
  380. case -3: /* an error occured */
  381. io_close( peersocket );
  382. reset_info_block( peer );
  383. break;
  384. default: /* Normal operation or eagain */
  385. break;
  386. }
  387. break;
  388. }
  389. return;
  390. }
  391. static void server_mainloop() {
  392. int64 sock;
  393. /* inlined livesync_init() */
  394. memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) );
  395. g_peerbuffer_pos = g_peerbuffer_start;
  396. memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) );
  397. uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER);
  398. g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t);
  399. g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
  400. while(1) {
  401. /* See, if we need to connect to anyone */
  402. if( time(NULL) > g_connection_reconn )
  403. handle_reconnects( );
  404. /* Wait for io events until next approx reconn check time */
  405. io_waituntil2( 30*1000 );
  406. /* Loop over readable sockets */
  407. while( ( sock = io_canread( ) ) != -1 ) {
  408. const void *cookie = io_getcookie( sock );
  409. if( (uintptr_t)cookie == FLAG_SERVERSOCKET )
  410. handle_accept( sock );
  411. else
  412. handle_read( sock );
  413. }
  414. /* Loop over writable sockets */
  415. while( ( sock = io_canwrite( ) ) != -1 )
  416. handle_write( sock );
  417. livesync_ticker( );
  418. }
  419. }
  420. static void panic( const char *routine ) {
  421. fprintf( stderr, "%s: %s\n", routine, strerror(errno) );
  422. exit( 111 );
  423. }
  424. static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) {
  425. int64 sock = socket_tcp6( );
  426. if( socket_bind6_reuse( sock, ip, port, 0 ) == -1 )
  427. panic( "socket_bind6_reuse" );
  428. if( socket_listen( sock, SOMAXCONN) == -1 )
  429. panic( "socket_listen" );
  430. if( !io_fd( sock ) )
  431. panic( "io_fd" );
  432. io_setcookie( sock, (void*)FLAG_SERVERSOCKET );
  433. io_wantread( sock );
  434. return sock;
  435. }
  436. static int scan_ip6_port( const char *src, ot_ip6 ip, uint16 *port ) {
  437. const char *s = src;
  438. int off, bracket = 0;
  439. while( isspace(*s) ) ++s;
  440. if( *s == '[' ) ++s, ++bracket; /* for v6 style notation */
  441. if( !(off = scan_ip6( s, ip ) ) )
  442. return 0;
  443. s += off;
  444. if( *s == 0 || isspace(*s)) return s-src;
  445. if( *s == ']' && bracket ) ++s;
  446. if( !ip6_isv4mapped(ip)){
  447. if( ( bracket && *(s) != ':' ) || ( *(s) != '.' ) ) return 0;
  448. s++;
  449. } else {
  450. if( *(s++) != ':' ) return 0;
  451. }
  452. if( !(off = scan_ushort (s, port ) ) )
  453. return 0;
  454. return off+s-src;
  455. }
  456. int main( int argc, char **argv ) {
  457. static pthread_t sync_in_thread_id;
  458. static pthread_t sync_out_thread_id;
  459. ot_ip6 serverip;
  460. uint16_t tmpport;
  461. int scanon = 1, lbound = 0, sbound = 0;
  462. srandom( time(NULL) );
  463. g_tracker_id = random();
  464. noipv6=1;
  465. while( scanon ) {
  466. switch( getopt( argc, argv, ":l:c:L:h" ) ) {
  467. case -1: scanon = 0; break;
  468. case 'l':
  469. tmpport = 0;
  470. if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); }
  471. ot_try_bind( serverip, tmpport );
  472. ++sbound;
  473. break;
  474. case 'c':
  475. if( g_connection_count > MAX_PEERS / 2 ) exerr( "Connection limit exceeded.\n" );
  476. tmpport = 0;
  477. if( !scan_ip6_port( optarg,
  478. g_connections[g_connection_count].ip,
  479. &g_connections[g_connection_count].port ) ||
  480. !g_connections[g_connection_count].port ) { usage( argv[0] ); exit( 1 ); }
  481. g_connections[g_connection_count++].state = FLAG_OUTGOING;
  482. break;
  483. case 'L':
  484. tmpport = 9696;
  485. if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); }
  486. livesync_bind_mcast( serverip, tmpport); ++lbound; break;
  487. default:
  488. case '?': usage( argv[0] ); exit( 1 );
  489. }
  490. }
  491. if( !lbound ) exerr( "No livesync port bound." );
  492. if( !g_connection_count && !sbound ) exerr( "No streamsync port bound." );
  493. pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL );
  494. pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL );
  495. server_mainloop();
  496. return 0;
  497. }
  498. static void * streamsync_worker( void * args ) {
  499. (void)args;
  500. while( 1 ) {
  501. int bucket;
  502. /* For each bucket... */
  503. for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
  504. /* Get exclusive access to that bucket */
  505. ot_vector *torrents_list = mutex_bucket_lock( bucket );
  506. size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0;
  507. size_t mem, mem_a = 0, mem_b = 0;
  508. uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c;
  509. if( !torrents_list->size ) goto unlock_continue;
  510. /* For each torrent in this bucket.. */
  511. for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
  512. /* Address torrents members */
  513. ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list;
  514. switch( peer_list->peer_count ) {
  515. case 2: count_two++; break;
  516. case 1: count_one++; break;
  517. case 0: break;
  518. default: count_def++;
  519. count_peers += peer_list->peer_count;
  520. }
  521. }
  522. /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */
  523. mem = 3 * ( 1 + 1 + 2 ) + ( count_one + count_two ) * ( 19 + 1 ) + count_def * ( 19 + 8 ) +
  524. ( count_one + 2 * count_two + count_peers ) * 7;
  525. fprintf( stderr, "Mem: %zd\n", mem );
  526. ptr = ptr_a = ptr_b = ptr_c = malloc( mem );
  527. if( !ptr ) goto unlock_continue;
  528. if( count_one > 4 || !count_def ) {
  529. mem_a = 1 + 1 + 2 + count_one * ( 19 + 7 );
  530. ptr_b += mem_a; ptr_c += mem_a;
  531. ptr_a[0] = 1; /* Offset 0: packet type 1 */
  532. ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
  533. ptr_a[2] = count_one >> 8;
  534. ptr_a[3] = count_one & 255;
  535. ptr_a += 4;
  536. } else
  537. count_def += count_one;
  538. if( count_two > 4 || !count_def ) {
  539. mem_b = 1 + 1 + 2 + count_two * ( 19 + 14 );
  540. ptr_c += mem_b;
  541. ptr_b[0] = 2; /* Offset 0: packet type 2 */
  542. ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
  543. ptr_b[2] = count_two >> 8;
  544. ptr_b[3] = count_two & 255;
  545. ptr_b += 4;
  546. } else
  547. count_def += count_two;
  548. if( count_def ) {
  549. ptr_c[0] = 0; /* Offset 0: packet type 0 */
  550. ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
  551. ptr_c[2] = count_def >> 8;
  552. ptr_c[3] = count_def & 255;
  553. ptr_c += 4;
  554. }
  555. /* For each torrent in this bucket.. */
  556. for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
  557. /* Address torrents members */
  558. ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + tor_offset;
  559. ot_peerlist *peer_list = torrent->peer_list;
  560. ot_peer *peers = (ot_peer*)(peer_list->peers.data);
  561. uint8_t **dst;
  562. /* Determine destination slot */
  563. count_peers = peer_list->peer_count;
  564. switch( count_peers ) {
  565. case 0: continue;
  566. case 1: dst = mem_a ? &ptr_a : &ptr_c; break;
  567. case 2: dst = mem_b ? &ptr_b : &ptr_c; break;
  568. default: dst = &ptr_c; break;
  569. }
  570. /* Copy tail of info_hash, advance pointer */
  571. memcpy( *dst, ((uint8_t*)torrent->hash) + 1, sizeof( ot_hash ) - 1);
  572. *dst += sizeof( ot_hash ) - 1;
  573. /* Encode peer count */
  574. if( dst == &ptr_c )
  575. while( count_peers ) {
  576. if( count_peers <= 0x7f )
  577. *(*dst)++ = count_peers;
  578. else
  579. *(*dst)++ = 0x80 | ( count_peers & 0x7f );
  580. count_peers >>= 7;
  581. }
  582. /* Copy peers */
  583. count_peers = peer_list->peer_count;
  584. while( count_peers-- ) {
  585. memcpy( *dst, peers++, OT_IP_SIZE + 3 );
  586. *dst += OT_IP_SIZE + 3;
  587. }
  588. free_peerlist(peer_list);
  589. }
  590. free( torrents_list->data );
  591. memset( torrents_list, 0, sizeof(*torrents_list ) );
  592. unlock_continue:
  593. mutex_bucket_unlock( bucket, 0 );
  594. if( ptr ) {
  595. int i;
  596. if( ptr_b > ptr_c ) ptr_c = ptr_b;
  597. if( ptr_a > ptr_c ) ptr_c = ptr_a;
  598. mem = ptr_c - ptr;
  599. for( i=0; i < MAX_PEERS; ++i ) {
  600. if( PROXYPEER_ISCONNECTED(g_connections[i].state) ) {
  601. void *tmp = malloc( mem );
  602. if( tmp ) {
  603. memcpy( tmp, ptr, mem );
  604. iob_addbuf_free( &g_connections[i].outdata, tmp, mem );
  605. io_wantwrite( g_connections[i].fd );
  606. }
  607. }
  608. }
  609. free( ptr );
  610. }
  611. usleep( OT_SYNC_SLEEP );
  612. }
  613. }
  614. return 0;
  615. }
  616. static void livesync_issue_peersync( ) {
  617. socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start,
  618. groupip_1, LIVESYNC_PORT);
  619. g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t );
  620. g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
  621. }
  622. void livesync_ticker( ) {
  623. /* livesync_issue_peersync sets g_next_packet_time */
  624. if( time(NULL) > g_next_packet_time &&
  625. g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) )
  626. livesync_issue_peersync();
  627. }
  628. static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ) {
  629. // unsigned int i;
  630. *g_peerbuffer_pos = prefix;
  631. memcpy( g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1 );
  632. memcpy( g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1 );
  633. #if 0
  634. /* Dump info_hash */
  635. for( i=0; i<sizeof(ot_hash); ++i )
  636. printf( "%02X", g_peerbuffer_pos[i] );
  637. putchar( ':' );
  638. #endif
  639. g_peerbuffer_pos += sizeof(ot_hash);
  640. #if 0
  641. printf( "%hhu.%hhu.%hhu.%hhu:%hu (%02X %02X)\n", g_peerbuffer_pos[0], g_peerbuffer_pos[1], g_peerbuffer_pos[2], g_peerbuffer_pos[3],
  642. g_peerbuffer_pos[4] | ( g_peerbuffer_pos[5] << 8 ), g_peerbuffer_pos[6], g_peerbuffer_pos[7] );
  643. #endif
  644. g_peerbuffer_pos += sizeof(ot_peer);
  645. if( g_peerbuffer_pos >= g_peerbuffer_highwater )
  646. livesync_issue_peersync();
  647. }
  648. static void process_indata( proxy_peer * peer ) {
  649. size_t consumed, peers;
  650. uint8_t *data = peer->indata, *hash;
  651. uint8_t *dataend = data + peer->indata_length;
  652. while( 1 ) {
  653. /* If we're not inside of a packet, make a new one */
  654. if( !peer->packet_tcount ) {
  655. /* Ensure the header is complete or postpone processing */
  656. if( data + 4 > dataend ) break;
  657. peer->packet_type = data[0];
  658. peer->packet_tprefix = data[1];
  659. peer->packet_tcount = data[2] * 256 + data[3];
  660. data += 4;
  661. printf( "type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount );
  662. }
  663. /* Ensure size for a minimal torrent block */
  664. if( data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend ) break;
  665. /* Advance pointer to peer count or peers */
  666. hash = data;
  667. data += sizeof(ot_hash) - 1;
  668. /* Type 0 has peer count encoded before each peers */
  669. peers = peer->packet_type;
  670. if( !peers ) {
  671. int shift = 0;
  672. do peers |= ( 0x7f & *data ) << ( 7 * shift );
  673. while ( *(data++) & 0x80 && shift++ < 6 );
  674. }
  675. #if 0
  676. printf( "peers: %zd\n", peers );
  677. #endif
  678. /* Ensure enough data being read to hold all peers */
  679. if( data + (OT_IP_SIZE + 3) * peers > dataend ) {
  680. data = hash;
  681. break;
  682. }
  683. while( peers-- ) {
  684. livesync_proxytell( peer->packet_tprefix, hash, data );
  685. data += OT_IP_SIZE + 3;
  686. }
  687. --peer->packet_tcount;
  688. }
  689. consumed = data - peer->indata;
  690. memmove( peer->indata, data, peer->indata_length - consumed );
  691. peer->indata_length -= consumed;
  692. }
  693. static void * livesync_worker( void * args ) {
  694. (void)args;
  695. while( 1 ) {
  696. ot_ip6 in_ip; uint16_t in_port;
  697. size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port);
  698. /* Expect at least tracker id and packet type */
  699. if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) )
  700. continue;
  701. if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
  702. /* drop packet coming from ourselves */
  703. continue;
  704. }
  705. switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) {
  706. case OT_SYNC_PEER:
  707. livesync_handle_peersync( datalen );
  708. break;
  709. default:
  710. // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) );
  711. break;
  712. }
  713. }
  714. return 0;
  715. }