An open and free bittorrent tracker https://erdgeist.org/gitweb/opentracker
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

166 lines
4.7 KiB

13 years ago
14 years ago
14 years ago
14 years ago
14 years ago
14 years ago
  1. /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
  2. It is considered beerware. Prost. Skol. Cheers or whatever.
  3. $id$ */
  4. /* System */
  5. #include <sys/types.h>
  6. #include <sys/mman.h>
  7. #include <sys/uio.h>
  8. #include <stdio.h>
  9. #include <string.h>
  10. #include <pthread.h>
  11. /* Libowfat */
  12. #include "scan.h"
  13. #include "byte.h"
  14. #include "io.h"
  15. /* Opentracker */
  16. #include "trackerlogic.h"
  17. #include "ot_mutex.h"
  18. #include "ot_sync.h"
  19. #include "ot_stats.h"
  20. #include "ot_iovec.h"
  21. #ifdef WANT_SYNC_BATCH
  22. #define OT_SYNC_CHUNK_SIZE (512*1024)
  23. /* Import Changeset from an external authority
  24. format: d4:syncd[..]ee
  25. [..]: ( 20:01234567890abcdefghij16:XXXXYYYY )+
  26. */
  27. int add_changeset_to_tracker( uint8_t *data, size_t len ) {
  28. ot_hash *hash;
  29. uint8_t *end = data + len;
  30. unsigned long peer_count;
  31. /* We do know, that the string is \n terminated, so it cant
  32. overflow */
  33. if( byte_diff( data, 8, "d4:syncd" ) ) return -1;
  34. data += 8;
  35. while( 1 ) {
  36. if( byte_diff( data, 3, "20:" ) ) {
  37. if( byte_diff( data, 2, "ee" ) )
  38. return -1;
  39. return 0;
  40. }
  41. data += 3;
  42. hash = (ot_hash*)data;
  43. data += sizeof( ot_hash );
  44. /* Scan string length indicator */
  45. data += ( len = scan_ulong( (char*)data, &peer_count ) );
  46. /* If no long was scanned, it is not divisible by 8, it is not
  47. followed by a colon or claims to need to much memory, we fail */
  48. if( !len || !peer_count || ( peer_count & 7 ) || ( *data++ != ':' ) || ( data + peer_count > end ) )
  49. return -1;
  50. while( peer_count > 0 ) {
  51. add_peer_to_torrent( hash, (ot_peer*)data, 1 );
  52. data += 8; peer_count -= 8;
  53. }
  54. }
  55. return 0;
  56. }
  57. /* Proposed output format
  58. d4:syncd20:<info_hash>8*N:(xxxxyyyy)*Nee
  59. */
  60. static void sync_make( int *iovec_entries, struct iovec **iovector ) {
  61. int bucket;
  62. char *r, *re;
  63. /* Setup return vector... */
  64. *iovec_entries = 0;
  65. *iovector = NULL;
  66. if( !( r = iovec_increase( iovec_entries, iovector, OT_SYNC_CHUNK_SIZE ) ) )
  67. return;
  68. /* ... and pointer to end of current output buffer.
  69. This works as a low watermark */
  70. re = r + OT_SYNC_CHUNK_SIZE;
  71. memmove( r, "d4:syncd", 8 ); r += 8;
  72. /* For each bucket... */
  73. for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
  74. /* Get exclusive access to that bucket */
  75. ot_vector *torrents_list = mutex_bucket_lock( bucket );
  76. size_t tor_offset;
  77. /* For each torrent in this bucket.. */
  78. for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
  79. /* Address torrents members */
  80. ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list;
  81. ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash;
  82. const size_t byte_count = sizeof(ot_peer) * peer_list->changeset.size;
  83. /* If we reached our low watermark in buffer... */
  84. if( re - r <= (ssize_t)(/* strlen( "20:" ) == */ 3 + sizeof( ot_hash ) + /* strlen_max( "%zd" ) == */ 12 + byte_count ) ) {
  85. /* Allocate a fresh output buffer at the end of our buffers list
  86. release bucket and return, if that fails */
  87. if( !( r = iovec_fix_increase_or_free( iovec_entries, iovector, r, OT_SYNC_CHUNK_SIZE ) ) )
  88. return mutex_bucket_unlock( bucket );
  89. /* Adjust new end of output buffer */
  90. re = r + OT_SYNC_CHUNK_SIZE;
  91. }
  92. *r++ = '2'; *r++ = '0'; *r++ = ':';
  93. memmove( r, hash, sizeof( ot_hash ) ); r += sizeof( ot_hash );
  94. r += sprintf( r, "%zd:", byte_count );
  95. memmove( r, peer_list->changeset.data, byte_count ); r += byte_count;
  96. }
  97. /* All torrents done: release lock on currenct bucket */
  98. mutex_bucket_unlock( bucket );
  99. }
  100. /* Close bencoded sync dictionary */
  101. *r++='e'; *r++='e';
  102. /* Release unused memory in current output buffer */
  103. iovec_fixlast( iovec_entries, iovector, r );
  104. }
  105. /* This is the entry point into this worker thread
  106. It grabs tasks from mutex_tasklist and delivers results back
  107. */
  108. static void * sync_worker( void * args) {
  109. int iovec_entries;
  110. struct iovec *iovector;
  111. args = args;
  112. while( 1 ) {
  113. ot_tasktype tasktype = TASK_SYNC_OUT;
  114. ot_taskid taskid = mutex_workqueue_poptask( &tasktype );
  115. sync_make( &iovec_entries, &iovector );
  116. stats_issue_event( EVENT_SYNC_OUT, FLAG_TCP, iovec_length( &iovec_entries, &iovector) );
  117. if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) )
  118. iovec_free( &iovec_entries, &iovector );
  119. }
  120. return NULL;
  121. }
  122. static pthread_t thread_id;
  123. void sync_init( ) {
  124. pthread_create( &thread_id, NULL, sync_worker, NULL );
  125. }
  126. void sync_deinit( ) {
  127. pthread_cancel( thread_id );
  128. }
  129. void sync_deliver( int64 socket ) {
  130. mutex_workqueue_pushtask( socket, TASK_SYNC_OUT );
  131. }
  132. #endif
  133. const char *g_version_sync_c = "$Source$: $Revision$\n";