An open and free bittorrent tracker https://erdgeist.org/gitweb/opentracker
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

167 lines
4.7 KiB

  1. /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
  2. It is considered beerware. Prost. Skol. Cheers or whatever.
  3. $id$ */
  4. /* System */
  5. #include <sys/types.h>
  6. #include <sys/mman.h>
  7. #include <sys/uio.h>
  8. #include <stdio.h>
  9. #include <string.h>
  10. #include <pthread.h>
  11. /* Libowfat */
  12. #include "scan.h"
  13. #include "byte.h"
  14. #include "io.h"
  15. /* Opentracker */
  16. #include "trackerlogic.h"
  17. #include "ot_mutex.h"
  18. #include "ot_sync.h"
  19. #include "ot_stats.h"
  20. #include "ot_iovec.h"
  21. #ifdef WANT_SYNC_BATCH
  22. #define OT_SYNC_CHUNK_SIZE (512*1024)
  23. /* Import Changeset from an external authority
  24. format: d4:syncd[..]ee
  25. [..]: ( 20:01234567890abcdefghij16:XXXXYYYY )+
  26. */
  27. int add_changeset_to_tracker( uint8_t *data, size_t len ) {
  28. ot_hash *hash;
  29. uint8_t *end = data + len;
  30. unsigned long peer_count;
  31. /* We do know, that the string is \n terminated, so it cant
  32. overflow */
  33. if( byte_diff( data, 8, "d4:syncd" ) ) return -1;
  34. data += 8;
  35. while( 1 ) {
  36. if( byte_diff( data, 3, "20:" ) ) {
  37. if( byte_diff( data, 2, "ee" ) )
  38. return -1;
  39. return 0;
  40. }
  41. data += 3;
  42. hash = (ot_hash*)data;
  43. data += sizeof( ot_hash );
  44. /* Scan string length indicator */
  45. data += ( len = scan_ulong( (char*)data, &peer_count ) );
  46. /* If no long was scanned, it is not divisible by 8, it is not
  47. followed by a colon or claims to need to much memory, we fail */
  48. if( !len || !peer_count || ( peer_count & 7 ) || ( *data++ != ':' ) || ( data + peer_count > end ) )
  49. return -1;
  50. while( peer_count > 0 ) {
  51. add_peer_to_torrent( hash, (ot_peer*)data, 1 );
  52. data += 8; peer_count -= 8;
  53. }
  54. }
  55. return 0;
  56. }
  57. /* Proposed output format
  58. d4:syncd20:<info_hash>8*N:(xxxxyyyy)*Nee
  59. */
  60. static void sync_make( int *iovec_entries, struct iovec **iovector ) {
  61. int bucket;
  62. char *r, *re;
  63. /* Setup return vector... */
  64. *iovec_entries = 0;
  65. *iovector = NULL;
  66. if( !( r = iovec_increase( iovec_entries, iovector, OT_SYNC_CHUNK_SIZE ) ) )
  67. return;
  68. /* ... and pointer to end of current output buffer.
  69. This works as a low watermark */
  70. re = r + OT_SYNC_CHUNK_SIZE;
  71. memmove( r, "d4:syncd", 8 ); r += 8;
  72. /* For each bucket... */
  73. for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
  74. /* Get exclusive access to that bucket */
  75. ot_vector *torrents_list = mutex_bucket_lock( bucket );
  76. size_t tor_offset;
  77. /* For each torrent in this bucket.. */
  78. for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
  79. /* Address torrents members */
  80. ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list;
  81. ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash;
  82. const size_t byte_count = sizeof(ot_peer) * peer_list->changeset.size;
  83. /* If we reached our low watermark in buffer... */
  84. if( re - r <= (ssize_t)(/* strlen( "20:" ) == */ 3 + sizeof( ot_hash ) + /* strlen_max( "%zd" ) == */ 12 + byte_count ) ) {
  85. /* Allocate a fresh output buffer at the end of our buffers list
  86. release bucket and return, if that fails */
  87. if( !( r = iovec_fix_increase_or_free( iovec_entries, iovector, r, OT_SYNC_CHUNK_SIZE ) ) )
  88. return mutex_bucket_unlock( bucket );
  89. /* Adjust new end of output buffer */
  90. re = r + OT_SYNC_CHUNK_SIZE;
  91. }
  92. *r++ = '2'; *r++ = '0'; *r++ = ':';
  93. memmove( r, hash, sizeof( ot_hash ) ); r += sizeof( ot_hash );
  94. r += sprintf( r, "%zd:", byte_count );
  95. memmove( r, peer_list->changeset.data, byte_count ); r += byte_count;
  96. }
  97. /* All torrents done: release lock on currenct bucket */
  98. mutex_bucket_unlock( bucket );
  99. }
  100. /* Close bencoded sync dictionary */
  101. *r++='e'; *r++='e';
  102. /* Release unused memory in current output buffer */
  103. iovec_fixlast( iovec_entries, iovector, r );
  104. }
  105. /* This is the entry point into this worker thread
  106. It grabs tasks from mutex_tasklist and delivers results back
  107. */
  108. static void * sync_worker( void * args) {
  109. int iovec_entries;
  110. struct iovec *iovector;
  111. args = args;
  112. while( 1 ) {
  113. ot_tasktype tasktype = TASK_SYNC_OUT;
  114. ot_taskid taskid = mutex_workqueue_poptask( &tasktype );
  115. sync_make( &iovec_entries, &iovector );
  116. stats_issue_event( EVENT_SYNC_OUT, FLAG_TCP, iovec_length( &iovec_entries, &iovector) );
  117. if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) )
  118. iovec_free( &iovec_entries, &iovector );
  119. }
  120. return NULL;
  121. }
  122. static pthread_t thread_id;
  123. void sync_init( ) {
  124. pthread_create( &thread_id, NULL, sync_worker, NULL );
  125. }
  126. void sync_deinit( ) {
  127. pthread_cancel( thread_id );
  128. }
  129. void sync_deliver( int64 socket ) {
  130. mutex_workqueue_pushtask( socket, TASK_SYNC_OUT );
  131. }
  132. #endif
  133. const char *g_version_sync_c = "$Source$: $Revision$\n";