An open and free bittorrent tracker https://erdgeist.org/gitweb/opentracker
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

217 lines
6.4 KiB

  1. /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
  2. It is considered beerware. Prost. Skol. Cheers or whatever.
  3. $id$ */
  4. /* System */
  5. #include <sys/types.h>
  6. #include <sys/uio.h>
  7. #include <string.h>
  8. #include <pthread.h>
  9. #include <unistd.h>
  10. #include <stdlib.h>
  11. /* Libowfat */
  12. #include "socket.h"
  13. #include "ndelay.h"
  14. #include "byte.h"
  15. #include "ip6.h"
  16. /* Opentracker */
  17. #include "trackerlogic.h"
  18. #include "ot_livesync.h"
  19. #include "ot_accesslist.h"
  20. #include "ot_stats.h"
  21. #include "ot_mutex.h"
  22. #ifdef WANT_SYNC_LIVE
  23. char groupip_1[4] = { 224,0,23,5 };
  24. #define LIVESYNC_INCOMING_BUFFSIZE (256*256)
  25. #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
  26. #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
  27. #define LIVESYNC_MAXDELAY 15 /* seconds */
  28. enum { OT_SYNC_PEER };
  29. /* Forward declaration */
  30. static void * livesync_worker( void * args );
  31. /* For outgoing packets */
  32. static int64 g_socket_in = -1;
  33. /* For incoming packets */
  34. static int64 g_socket_out = -1;
  35. static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER;
  36. char g_outbuf[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
  37. static size_t g_outbuf_data;
  38. static ot_time g_next_packet_time;
  39. static pthread_t thread_id;
  40. void livesync_init( ) {
  41. if( g_socket_in == -1 )
  42. exerr( "No socket address for live sync specified." );
  43. /* Prepare outgoing peers buffer */
  44. memcpy( g_outbuf, &g_tracker_id, sizeof( g_tracker_id ) );
  45. uint32_pack_big( g_outbuf + sizeof( g_tracker_id ), OT_SYNC_PEER);
  46. g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t );
  47. g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
  48. pthread_create( &thread_id, NULL, livesync_worker, NULL );
  49. }
  50. void livesync_deinit() {
  51. if( g_socket_in != -1 )
  52. close( g_socket_in );
  53. if( g_socket_out != -1 )
  54. close( g_socket_out );
  55. pthread_cancel( thread_id );
  56. }
  57. void livesync_bind_mcast( ot_ip6 ip, uint16_t port) {
  58. char tmpip[4] = {0,0,0,0};
  59. char *v4ip;
  60. if( !ip6_isv4mapped(ip))
  61. exerr("v6 mcast support not yet available.");
  62. v4ip = ip+12;
  63. if( g_socket_in != -1 )
  64. exerr("Error: Livesync listen ip specified twice.");
  65. if( ( g_socket_in = socket_udp4( )) < 0)
  66. exerr("Error: Cant create live sync incoming socket." );
  67. ndelay_off(g_socket_in);
  68. if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 )
  69. exerr("Error: Cant bind live sync incoming socket." );
  70. if( socket_mcjoin4( g_socket_in, groupip_1, v4ip ) )
  71. exerr("Error: Cant make live sync incoming socket join mcast group.");
  72. if( ( g_socket_out = socket_udp4()) < 0)
  73. exerr("Error: Cant create live sync outgoing socket." );
  74. if( socket_bind4_reuse( g_socket_out, v4ip, port ) == -1 )
  75. exerr("Error: Cant bind live sync outgoing socket." );
  76. socket_mcttl4(g_socket_out, 1);
  77. socket_mcloop4(g_socket_out, 0);
  78. }
  79. /* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */
  80. static void livesync_issue_peersync( ) {
  81. char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
  82. size_t data = g_outbuf_data;
  83. memcpy( mycopy, g_outbuf, data );
  84. g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t );
  85. g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
  86. /* From now this thread has a local copy of the buffer and
  87. has modified the protected element */
  88. pthread_mutex_unlock(&g_outbuf_mutex);
  89. socket_send4(g_socket_out, mycopy, data, groupip_1, LIVESYNC_PORT);
  90. }
  91. static void livesync_handle_peersync( struct ot_workstruct *ws ) {
  92. int off = sizeof( g_tracker_id ) + sizeof( uint32_t );
  93. /* Now basic sanity checks have been done on the live sync packet
  94. We might add more testing and logging. */
  95. while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= ws->request_size ) {
  96. memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), sizeof( ot_peer ) );
  97. ws->hash = (ot_hash*)(ws->request + off);
  98. if( !g_opentracker_running ) return;
  99. if( OT_PEERFLAG(&ws->peer) & PEER_FLAG_STOPPED )
  100. remove_peer_from_torrent( FLAG_MCA, ws );
  101. else
  102. add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 );
  103. off += sizeof( ot_hash ) + sizeof( ot_peer );
  104. }
  105. stats_issue_event(EVENT_SYNC, 0,
  106. (ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) /
  107. ((ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer )));
  108. }
  109. /* Tickle the live sync module from time to time, so no events get
  110. stuck when there's not enough traffic to fill udp packets fast
  111. enough */
  112. void livesync_ticker( ) {
  113. /* livesync_issue_peersync sets g_next_packet_time */
  114. pthread_mutex_lock(&g_outbuf_mutex);
  115. if( g_now_seconds > g_next_packet_time &&
  116. g_outbuf_data > sizeof( g_tracker_id ) + sizeof( uint32_t ) )
  117. livesync_issue_peersync();
  118. else
  119. pthread_mutex_unlock(&g_outbuf_mutex);
  120. }
  121. /* Inform live sync about whats going on. */
  122. void livesync_tell( struct ot_workstruct *ws ) {
  123. pthread_mutex_lock(&g_outbuf_mutex);
  124. memcpy( g_outbuf + g_outbuf_data, ws->hash, sizeof(ot_hash) );
  125. memcpy( g_outbuf + g_outbuf_data + sizeof(ot_hash), &ws->peer, sizeof(ot_peer) );
  126. g_outbuf_data += sizeof(ot_hash) + sizeof(ot_peer);
  127. if( g_outbuf_data >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS )
  128. livesync_issue_peersync();
  129. else
  130. pthread_mutex_unlock(&g_outbuf_mutex);
  131. }
  132. static void * livesync_worker( void * args ) {
  133. struct ot_workstruct ws;
  134. ot_ip6 in_ip; uint16_t in_port;
  135. (void)args;
  136. /* Initialize our "thread local storage" */
  137. ws.inbuf = ws.request = malloc( LIVESYNC_INCOMING_BUFFSIZE );
  138. ws.outbuf = ws.reply = 0;
  139. memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) );
  140. while( 1 ) {
  141. ws.request_size = socket_recv4(g_socket_in, (char*)ws.inbuf, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port);
  142. /* Expect at least tracker id and packet type */
  143. if( ws.request_size <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) )
  144. continue;
  145. if( !accesslist_isblessed(in_ip, OT_PERMISSION_MAY_LIVESYNC))
  146. continue;
  147. if( !memcmp( ws.inbuf, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
  148. /* TODO: log packet coming from ourselves */
  149. continue;
  150. }
  151. switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) {
  152. case OT_SYNC_PEER:
  153. livesync_handle_peersync( &ws );
  154. break;
  155. default:
  156. break;
  157. }
  158. }
  159. /* Never returns. */
  160. return NULL;
  161. }
  162. #endif
  163. const char *g_version_livesync_c = "$Source: /home/cvsroot/opentracker/ot_livesync.c,v $: $Revision: 1.20 $\n";