(libT) #1549: support fast exensions' "reject" and "have all/none" messages

This commit is contained in:
Charles Kerr
2008-12-02 17:10:54 +00:00
parent 3ce59ff0c6
commit f927ea5d63
9 changed files with 556 additions and 138 deletions

View File

@@ -109,6 +109,7 @@ TESTS = \
bencode-test \
clients-test \
json-test \
peer-msgs-test \
rpc-test \
test-peer-id \
utils-test
@@ -154,6 +155,10 @@ test_peer_id_SOURCES = test-peer-id.c
test_peer_id_LDADD = ${apps_ldadd}
test_peer_id_LDFLAGS = ${apps_ldflags}
peer_msgs_test_SOURCES = peer-msgs-test.c
peer_msgs_test_LDADD = ${apps_ldadd}
peer_msgs_test_LDFLAGS = ${apps_ldflags}
utils_test_SOURCES = utils-test.c
utils_test_LDADD = ${apps_ldadd}
utils_test_LDFLAGS = ${apps_ldflags}

View File

@@ -33,6 +33,8 @@
/* enable LibTransmission extension protocol */
#define ENABLE_LTEP * /
/* fast extensions */
#define ENABLE_FAST * /
/***
****
@@ -69,6 +71,14 @@ enum
#define HANDSHAKE_SET_LTEP( bits ) ( (void)0 )
#endif
#ifdef ENABLE_FAST
#define HANDSHAKE_HAS_FASTEXT( bits ) ( ( ( bits )[7] & 0x04 ) ? 1 : 0 )
#define HANDSHAKE_SET_FASTEXT( bits ) ( ( bits )[7] |= 0x04 )
#else
#define HANDSHAKE_HAS_FASTEXT( bits ) ( 0 )
#define HANDSHAKE_SET_FASTEXT( bits ) ( (void)0 )
#endif
/* http://www.azureuswiki.com/index.php/Extension_negotiation_protocol
these macros are to be used if both extended messaging and the
azureus protocol is supported, they indicate which protocol is preferred */
@@ -203,6 +213,7 @@ buildHandshakeMessage( tr_handshake * handshake,
walk += HANDSHAKE_NAME_LEN;
memset( walk, 0, HANDSHAKE_FLAGS_LEN );
HANDSHAKE_SET_LTEP( walk );
HANDSHAKE_SET_FASTEXT( walk );
walk += HANDSHAKE_FLAGS_LEN;
memcpy( walk, torrentHash, SHA_DIGEST_LENGTH );
@@ -279,11 +290,9 @@ parseHandshake( tr_handshake * handshake,
*** Extensions
**/
if( HANDSHAKE_HAS_LTEP( reserved ) )
{
tr_peerIoEnableLTEP( handshake->io, 1 );
dbgmsg( handshake, "using ltep" );
}
tr_peerIoEnableLTEP( handshake->io, HANDSHAKE_HAS_LTEP( reserved ) );
tr_peerIoEnableFEXT( handshake->io, HANDSHAKE_HAS_FASTEXT( reserved ) );
return HANDSHAKE_OK;
}
@@ -643,14 +652,12 @@ readHandshake( tr_handshake * handshake,
tr_peerIoReadBytes( handshake->io, inbuf, reserved, sizeof( reserved ) );
/**
*** Extension negotiation
*** Extensions
**/
if( HANDSHAKE_HAS_LTEP( reserved ) )
{
tr_peerIoEnableLTEP( handshake->io, 1 );
dbgmsg( handshake, "using ltep" );
}
tr_peerIoEnableLTEP( handshake->io, HANDSHAKE_HAS_LTEP( reserved ) );
tr_peerIoEnableFEXT( handshake->io, HANDSHAKE_HAS_FASTEXT( reserved ) );
/* torrent hash */
tr_peerIoReadBytes( handshake->io, inbuf, hash, sizeof( hash ) );

View File

@@ -42,6 +42,8 @@ typedef enum
{
TR_PEER_CLIENT_GOT_BLOCK,
TR_PEER_CLIENT_GOT_DATA,
TR_PEER_CLIENT_GOT_ALLOWED_FAST,
TR_PEER_CLIENT_GOT_SUGGEST,
TR_PEER_PEER_GOT_DATA,
TR_PEER_PEER_PROGRESS,
TR_PEER_ERROR,
@@ -53,7 +55,7 @@ PeerEventType;
typedef struct
{
PeerEventType eventType;
uint32_t pieceIndex; /* for GOT_BLOCK, CANCEL */
uint32_t pieceIndex; /* for GOT_BLOCK, CANCEL, ALLOWED, SUGGEST */
uint32_t offset; /* for GOT_BLOCK */
uint32_t length; /* for GOT_BLOCK + GOT_DATA */
float progress; /* for PEER_PROGRESS */

View File

@@ -83,6 +83,7 @@ struct tr_peerIo
tr_bool isIncoming;
tr_bool peerIdIsSet;
tr_bool extendedProtocolSupported;
tr_bool fastExtensionSupported;
int magicNumber;
@@ -236,6 +237,12 @@ isPeerIo( const tr_peerIo * io )
return ( io != NULL ) && ( io->magicNumber == MAGIC_NUMBER );
}
static int
isFlag( int flag )
{
return( ( flag == 0 ) || ( flag == 1 ) );
}
static tr_peerIo*
tr_peerIoNew( tr_session * session,
const tr_address * addr,
@@ -486,13 +493,37 @@ tr_peerIoGetPeersId( const tr_peerIo * io )
***
**/
void
tr_peerIoEnableFEXT( tr_peerIo * io,
int flag )
{
assert( isPeerIo( io ) );
assert( isFlag( flag ) );
dbgmsg( io, "setting FEXT support flag to %d", (flag?1:0) );
io->fastExtensionSupported = flag;
}
int
tr_peerIoSupportsFEXT( const tr_peerIo * io )
{
assert( isPeerIo( io ) );
return io->fastExtensionSupported;
}
/**
***
**/
void
tr_peerIoEnableLTEP( tr_peerIo * io,
int flag )
{
assert( isPeerIo( io ) );
assert( flag == 0 || flag == 1 );
assert( isFlag( flag ) );
dbgmsg( io, "setting LTEP support flag to %d", (flag?1:0) );
io->extendedProtocolSupported = flag;
}

View File

@@ -49,11 +49,14 @@ void tr_peerIoFree ( tr_peerIo * io );
***
**/
void tr_peerIoEnableLTEP( tr_peerIo * io,
int flag );
void tr_peerIoEnableLTEP( tr_peerIo * io, int flag );
int tr_peerIoSupportsLTEP( const tr_peerIo * io );
void tr_peerIoEnableFEXT( tr_peerIo * io, int flag );
int tr_peerIoSupportsFEXT( const tr_peerIo * io );
/**
***
**/

View File

@@ -337,10 +337,12 @@ static void
peerDestructor( tr_peer * peer )
{
assert( peer );
assert( peer->msgs );
tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
tr_peerMsgsFree( peer->msgs );
if( peer->msgs != NULL )
{
tr_peerMsgsUnsubscribe( peer->msgs, peer->msgsTag );
tr_peerMsgsFree( peer->msgs );
}
tr_peerIoFree( peer->io );
@@ -810,8 +812,7 @@ refillPulse( void * vtorrent )
/* find a peer who can ask for this block */
for( j=0; !handled && j<peerCount; )
{
const int val = tr_peerMsgsAddRequest( peers[j]->msgs,
index, offset, length );
const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, offset, length, FALSE );
switch( val )
{
case TR_ADDREQ_FULL:
@@ -920,6 +921,52 @@ refillSoon( Torrent * t )
REFILL_PERIOD_MSEC );
}
static void
peerSuggestedPiece( Torrent * t,
tr_peer * peer,
tr_piece_index_t pieceIndex,
int isFastAllowed )
{
assert( t );
assert( peer );
assert( peer->msgs );
/* is this a valid piece? */
if( pieceIndex >= t->tor->info.pieceCount )
return;
/* don't ask for it if we've already got it */
if( tr_cpPieceIsComplete( t->tor->completion, pieceIndex ) )
return;
/* don't ask for it if they don't have it */
if( !tr_bitfieldHas( peer->have, pieceIndex ) )
return;
/* don't ask for it if we're choked and it's not fast */
if( !isFastAllowed && peer->clientIsChoked )
return;
/* request the blocks that we don't have in this piece */
{
tr_block_index_t block;
const tr_torrent * tor = t->tor;
const tr_block_index_t start = tr_torPieceFirstBlock( tor, pieceIndex );
const tr_block_index_t end = start + tr_torPieceCountBlocks( tor, pieceIndex );
for( block=start; block<end; ++block )
{
if( !tr_cpBlockIsComplete( tor->completion, block ) )
{
const uint32_t offset = getBlockOffsetInPiece( tor, block );
const uint32_t length = tr_torBlockCountBytes( tor, block );
tr_peerMsgsAddRequest( peer->msgs, pieceIndex, offset, length, TRUE );
incrementPieceRequests( t, pieceIndex );
}
}
}
}
static void
peerCallbackFunc( void * vpeer,
void * vevent,
@@ -964,6 +1011,16 @@ peerCallbackFunc( void * vpeer,
break;
}
case TR_PEER_CLIENT_GOT_SUGGEST:
if( peer )
peerSuggestedPiece( t, peer, e->pieceIndex, FALSE );
break;
case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
if( peer )
peerSuggestedPiece( t, peer, e->pieceIndex, TRUE );
break;
case TR_PEER_CLIENT_GOT_DATA:
{
const time_t now = time( NULL );

View File

@@ -0,0 +1,55 @@
#include <stdio.h>
#include "transmission.h"
#include "net.h"
#include "peer-msgs.h"
#include "utils.h"
#define VERBOSE 0
#define check( A ) \
{ \
++test; \
if( A ){ \
if( VERBOSE ) \
fprintf( stderr, "PASS test #%d (%s, %d)\n", test, __FILE__,\
__LINE__ );\
} else { \
if( VERBOSE ) \
fprintf( stderr, "FAIL test #%d (%s, %d)\n", test, __FILE__,\
__LINE__ );\
return test; \
} \
}
int
main( void )
{
uint32_t i;
int test = 0;
uint8_t infohash[SHA_DIGEST_LENGTH];
struct tr_address addr;
tr_piece_index_t pieceCount = 1313;
size_t numwant;
size_t numgot;
tr_piece_index_t pieces[] = { 1059, 431, 808, 1217, 287, 376, 1188, 353, 508 };
tr_piece_index_t buf[16];
for( i = 0; i < SHA_DIGEST_LENGTH; ++i )
infohash[i] = 0xaa;
tr_pton( "80.4.4.200", &addr );
numwant = 7;
numgot = tr_generateAllowedSet( buf, numwant, pieceCount, infohash, &addr );
check( numgot == numwant );
for( i=0; i<numgot; ++i )
check( buf[i] == pieces[i] );
numwant = 9;
numgot = tr_generateAllowedSet( buf, numwant, pieceCount, infohash, &addr );
check( numgot == numwant );
for( i=0; i<numgot; ++i )
check( buf[i] == pieces[i] );
return 0;
}

View File

@@ -54,16 +54,21 @@ enum
BT_PIECE = 7,
BT_CANCEL = 8,
BT_PORT = 9,
BT_SUGGEST = 13,
BT_HAVE_ALL = 14,
BT_HAVE_NONE = 15,
BT_REJECT = 16,
BT_FEXT_SUGGEST = 13,
BT_FEXT_HAVE_ALL = 14,
BT_FEXT_HAVE_NONE = 15,
BT_FEXT_REJECT = 16,
BT_FEXT_ALLOWED_FAST = 17,
BT_LTEP = 20,
LTEP_HANDSHAKE = 0,
TR_LTEP_PEX = 1,
MIN_CHOKE_PERIOD_SEC = ( 10 ),
/* idle seconds before we send a keepalive */
@@ -90,7 +95,10 @@ enum
/* number of pieces to remove from the bitfield when
* lazy bitfields are turned on */
LAZY_PIECE_COUNT = 26
LAZY_PIECE_COUNT = 26,
/* number of pieces we'll allow in our fast set */
MAX_FAST_SET_SIZE = 3
};
/**
@@ -258,6 +266,7 @@ struct tr_peermsgs
tr_bool peerSupportsPex;
tr_bool clientSentLtepHandshake;
tr_bool peerSentLtepHandshake;
tr_bool haveFastSet;
uint8_t state;
uint8_t ut_pex_id;
@@ -265,6 +274,9 @@ struct tr_peermsgs
uint16_t minActiveRequests;
uint16_t maxActiveRequests;
size_t fastsetSize;
tr_piece_index_t fastset[MAX_FAST_SET_SIZE];
/* how long the outMessages batch should be allowed to grow before
* it's flushed -- some messages (like requests >:) should be sent
* very quickly; others aren't as urgent. */
@@ -356,11 +368,35 @@ pokeBatchPeriod( tr_peermsgs * msgs,
}
}
static void
dbgOutMessageLen( tr_peermsgs * msgs )
{
dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
}
static void
protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
{
tr_peerIo * io = msgs->io;
struct evbuffer * out = msgs->outMessages;
assert( tr_peerIoSupportsFEXT( msgs->io ) );
tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
tr_peerIoWriteUint32( io, out, req->index );
tr_peerIoWriteUint32( io, out, req->offset );
tr_peerIoWriteUint32( io, out, req->length );
dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
dbgOutMessageLen( msgs );
}
static void
protocolSendRequest( tr_peermsgs * msgs,
const struct peer_request * req )
{
tr_peerIo * io = msgs->io;
tr_peerIo * io = msgs->io;
struct evbuffer * out = msgs->outMessages;
tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
@@ -368,8 +404,9 @@ protocolSendRequest( tr_peermsgs * msgs,
tr_peerIoWriteUint32( io, out, req->index );
tr_peerIoWriteUint32( io, out, req->offset );
tr_peerIoWriteUint32( io, out, req->length );
dbgmsg( msgs, "requesting %u:%u->%u... outMessage size is now %d",
req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
dbgOutMessageLen( msgs );
pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
}
@@ -377,7 +414,7 @@ static void
protocolSendCancel( tr_peermsgs * msgs,
const struct peer_request * req )
{
tr_peerIo * io = msgs->io;
tr_peerIo * io = msgs->io;
struct evbuffer * out = msgs->outMessages;
tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
@@ -385,8 +422,9 @@ protocolSendCancel( tr_peermsgs * msgs,
tr_peerIoWriteUint32( io, out, req->index );
tr_peerIoWriteUint32( io, out, req->offset );
tr_peerIoWriteUint32( io, out, req->length );
dbgmsg( msgs, "cancelling %u:%u->%u... outMessage size is now %d",
req->index, req->offset, req->length, (int)EVBUFFER_LENGTH( out ) );
dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
dbgOutMessageLen( msgs );
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
}
@@ -394,29 +432,78 @@ static void
protocolSendHave( tr_peermsgs * msgs,
uint32_t index )
{
tr_peerIo * io = msgs->io;
tr_peerIo * io = msgs->io;
struct evbuffer * out = msgs->outMessages;
tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + sizeof( uint32_t ) );
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
tr_peerIoWriteUint8 ( io, out, BT_HAVE );
tr_peerIoWriteUint32( io, out, index );
dbgmsg( msgs, "sending Have %u.. outMessage size is now %d",
index, (int)EVBUFFER_LENGTH( out ) );
dbgmsg( msgs, "sending Have %u...", index );
dbgOutMessageLen( msgs );
pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
}
static void
protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
{
tr_peerIo * io = msgs->io;
struct evbuffer * out = msgs->outMessages;
assert( tr_peerIoSupportsFEXT( msgs->io ) );
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
tr_peerIoWriteUint32( io, out, pieceIndex );
dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
dbgOutMessageLen( msgs );
}
static void
protocolSendChoke( tr_peermsgs * msgs,
int choke )
{
tr_peerIo * io = msgs->io;
tr_peerIo * io = msgs->io;
struct evbuffer * out = msgs->outMessages;
tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
dbgmsg( msgs, "sending %s... outMessage size is now %d",
( choke ? "Choke" : "Unchoke" ),
(int)EVBUFFER_LENGTH( out ) );
dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
dbgOutMessageLen( msgs );
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
}
static void
protocolSendHaveAll( tr_peermsgs * msgs )
{
tr_peerIo * io = msgs->io;
struct evbuffer * out = msgs->outMessages;
assert( tr_peerIoSupportsFEXT( msgs->io ) );
tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
dbgmsg( msgs, "sending HAVE_ALL..." );
dbgOutMessageLen( msgs );
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
}
static void
protocolSendHaveNone( tr_peermsgs * msgs )
{
tr_peerIo * io = msgs->io;
struct evbuffer * out = msgs->outMessages;
assert( tr_peerIoSupportsFEXT( msgs->io ) );
tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
dbgmsg( msgs, "sending HAVE_NONE..." );
dbgOutMessageLen( msgs );
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
}
@@ -489,6 +576,24 @@ fireClientGotData( tr_peermsgs * msgs,
publish( msgs, &e );
}
static void
fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
e.pieceIndex = pieceIndex;
publish( msgs, &e );
}
static void
fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
e.pieceIndex = pieceIndex;
publish( msgs, &e );
}
static void
firePeerGotData( tr_peermsgs * msgs,
uint32_t length,
@@ -514,6 +619,86 @@ fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
publish( msgs, &e );
}
/**
*** ALLOWED FAST SET
*** For explanation, see http://www.bittorrent.org/beps/bep_0006.html
**/
size_t
tr_generateAllowedSet( tr_piece_index_t * setmePieces,
size_t desiredSetSize,
size_t pieceCount,
const uint8_t * infohash,
const tr_address * addr )
{
size_t setSize = 0;
assert( setmePieces );
assert( desiredSetSize <= pieceCount );
assert( desiredSetSize );
assert( pieceCount );
assert( infohash );
assert( addr );
if( addr->type == TR_AF_INET )
{
uint8_t w[SHA_DIGEST_LENGTH + 4];
uint8_t x[SHA_DIGEST_LENGTH];
*(uint32_t*)w = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 ); /* (1) */
memcpy( w + 4, infohash, SHA_DIGEST_LENGTH ); /* (2) */
tr_sha1( x, w, sizeof( w ), NULL ); /* (3) */
while( setSize<desiredSetSize )
{
int i;
for( i=0; i<5 && setSize<desiredSetSize; ++i ) /* (4) */
{
size_t k;
uint32_t j = i * 4; /* (5) */
uint32_t y = ntohl( *( uint32_t* )( x + j ) ); /* (6) */
uint32_t index = y % pieceCount; /* (7) */
for( k=0; k<setSize; ++k ) /* (8) */
if( setmePieces[k] == index )
break;
if( k == setSize )
setmePieces[setSize++] = index; /* (9) */
}
tr_sha1( x, x, sizeof( x ), NULL ); /* (3) */
}
}
return setSize;
}
static void
updateFastSet( tr_peermsgs * msgs UNUSED )
{
#if 0
const int fext = tr_peerIoSupportsFEXT( msgs->io );
const int peerIsNeedy = msgs->info->progress < 0.10;
if( fext && peerIsNeedy && !msgs->haveFastSet )
{
size_t i;
const struct tr_address * addr = tr_peerIoGetAddress( msgs->io, NULL );
const tr_info * inf = &msgs->torrent->info;
const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
/* build the fast set */
msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
msgs->haveFastSet = 1;
/* send it to the peer */
for( i=0; i<msgs->fastsetSize; ++i )
protocolSendAllowedFast( msgs, msgs->fastset[i] );
}
#endif
}
/**
*** INTEREST
**/
@@ -590,10 +775,20 @@ updateInterest( tr_peermsgs * msgs )
fireNeedReq( msgs );
}
static int
popNextRequest( tr_peermsgs * msgs,
struct peer_request * setme )
{
return reqListPop( &msgs->peerAskedFor, setme );
}
static void
cancelAllRequestsToClient( tr_peermsgs * msgs )
{
reqListClear( &msgs->peerAskedFor );
struct peer_request req;
while( popNextRequest( msgs, &req ) )
protocolSendReject( msgs, &req );
}
void
@@ -661,28 +856,30 @@ expireOldRequests( tr_peermsgs * msgs, const time_t now )
int i;
time_t oldestAllowed;
struct request_list tmp = REQUEST_LIST_INIT;
const int fext = tr_peerIoSupportsFEXT( msgs->io );
/* cancel requests that have been queued for too long */
oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
reqListCopy( &tmp, &msgs->clientWillAskFor );
for( i = 0; i < tmp.count; ++i )
{
for( i = 0; i < tmp.count; ++i ) {
const struct peer_request * req = &tmp.requests[i];
if( req->time_requested < oldestAllowed )
tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
}
reqListClear( &tmp );
/* cancel requests that were sent too long ago */
oldestAllowed = now - SENT_REQUEST_TTL_SECS;
reqListCopy( &tmp, &msgs->clientAskedFor );
for( i = 0; i < tmp.count; ++i )
{
const struct peer_request * req = &tmp.requests[i];
if( req->time_requested < oldestAllowed )
tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
/* if the peer doesn't support "Reject Request",
* cancel requests that were sent too long ago. */
if( !fext ) {
oldestAllowed = now - SENT_REQUEST_TTL_SECS;
reqListCopy( &tmp, &msgs->clientAskedFor );
for( i=0; i<tmp.count; ++i ) {
const struct peer_request * req = &tmp.requests[i];
if( req->time_requested < oldestAllowed )
tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
}
reqListClear( &tmp );
}
reqListClear( &tmp );
}
static void
@@ -744,7 +941,8 @@ tr_addreq_t
tr_peerMsgsAddRequest( tr_peermsgs * msgs,
uint32_t index,
uint32_t offset,
uint32_t length )
uint32_t length,
int doForce )
{
struct peer_request req;
@@ -757,18 +955,17 @@ tr_peerMsgsAddRequest( tr_peermsgs * msgs,
**/
/* don't send requests to choked clients */
if( msgs->info->clientIsChoked )
{
if( !doForce && msgs->info->clientIsChoked ) {
dbgmsg( msgs, "declining request because they're choking us" );
return TR_ADDREQ_CLIENT_CHOKED;
}
/* peer doesn't have this piece */
if( !tr_bitfieldHas( msgs->info->have, index ) )
if( !doForce && !tr_bitfieldHas( msgs->info->have, index ) )
return TR_ADDREQ_MISSING;
/* peer's queue is full */
if( requestQueueIsFull( msgs ) ) {
if( !doForce && requestQueueIsFull( msgs ) ) {
dbgmsg( msgs, "declining request because we're full" );
return TR_ADDREQ_FULL;
}
@@ -777,13 +974,15 @@ tr_peerMsgsAddRequest( tr_peermsgs * msgs,
req.index = index;
req.offset = offset;
req.length = length;
if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
dbgmsg( msgs, "declining because it's a duplicate" );
return TR_ADDREQ_DUPLICATE;
}
if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
dbgmsg( msgs, "declining because it's a duplicate" );
return TR_ADDREQ_DUPLICATE;
if( !doForce ) {
if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
dbgmsg( msgs, "declining because it's a duplicate" );
return TR_ADDREQ_DUPLICATE;
}
if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
dbgmsg( msgs, "declining because it's a duplicate" );
return TR_ADDREQ_DUPLICATE;
}
}
/**
@@ -851,6 +1050,7 @@ tr_peerMsgsCancel( tr_peermsgs * msgs,
}
}
/**
***
**/
@@ -1077,9 +1277,9 @@ readBtId( tr_peermsgs * msgs,
static void
updatePeerProgress( tr_peermsgs * msgs )
{
msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have )
/ (float)msgs->torrent->info.pieceCount;
msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
dbgmsg( msgs, "peer progress is %f", msgs->info->progress );
updateFastSet( msgs );
updateInterest( msgs );
firePeerProgress( msgs );
}
@@ -1088,27 +1288,26 @@ static void
peerMadeRequest( tr_peermsgs * msgs,
const struct peer_request * req )
{
const int fext = tr_peerIoSupportsFEXT( msgs->io );
const int reqIsValid = requestIsValid( msgs, req );
const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete(
msgs->torrent->completion, req->index );
const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
const int peerIsChoked = msgs->info->peerIsChoked;
if( !reqIsValid ) /* bad request */
{
int allow = FALSE;
if( !reqIsValid )
dbgmsg( msgs, "rejecting an invalid request." );
}
else if( !clientHasPiece ) /* we don't have it */
{
else if( !clientHasPiece )
dbgmsg( msgs, "rejecting request for a piece we don't have." );
}
else if( peerIsChoked ) /* doesn't he know he's choked? */
{
tr_peerMsgsSetChoke( msgs, 1 );
}
else /* YAY */
{
else if( peerIsChoked )
dbgmsg( msgs, "rejecting request from choked peer" );
else
allow = TRUE;
if( allow )
reqListAppend( &msgs->peerAskedFor, req );
}
else if( fext )
protocolSendReject( msgs, req );
}
static int
@@ -1122,12 +1321,13 @@ messageLengthIsCorrect( const tr_peermsgs * msg,
case BT_UNCHOKE:
case BT_INTERESTED:
case BT_NOT_INTERESTED:
case BT_HAVE_ALL:
case BT_HAVE_NONE:
case BT_FEXT_HAVE_ALL:
case BT_FEXT_HAVE_NONE:
return len == 1;
case BT_HAVE:
case BT_SUGGEST:
case BT_FEXT_SUGGEST:
case BT_FEXT_ALLOWED_FAST:
return len == 5;
case BT_BITFIELD:
@@ -1135,7 +1335,7 @@ messageLengthIsCorrect( const tr_peermsgs * msg,
case BT_REQUEST:
case BT_CANCEL:
case BT_REJECT:
case BT_FEXT_REJECT:
return len == 13;
case BT_PIECE:
@@ -1229,6 +1429,7 @@ readBtMessage( tr_peermsgs * msgs,
uint32_t msglen = msgs->incoming.length;
const uint8_t id = msgs->incoming.id;
const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
const int fext = tr_peerIoSupportsFEXT( msgs->io );
--msglen; /* id length */
@@ -1252,7 +1453,8 @@ readBtMessage( tr_peermsgs * msgs,
case BT_CHOKE:
dbgmsg( msgs, "got Choke" );
msgs->info->clientIsChoked = 1;
cancelAllRequestsToPeer( msgs );
if( !fext )
cancelAllRequestsToPeer( msgs );
cancelAllRequestsToClient( msgs );
break;
@@ -1314,7 +1516,8 @@ readBtMessage( tr_peermsgs * msgs,
tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
reqListRemove( &msgs->peerAskedFor, &r );
if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
protocolSendReject( msgs, &r );
break;
}
@@ -1327,36 +1530,64 @@ readBtMessage( tr_peermsgs * msgs,
tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
break;
case BT_SUGGEST:
{
dbgmsg( msgs, "Got a BT_SUGGEST" );
case BT_FEXT_SUGGEST:
dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
/* we don't do anything with this yet */
if( fext )
fireClientGotSuggest( msgs, ui32 );
else {
fireError( msgs, EPROTO );
return READ_ERR;
}
break;
}
case BT_HAVE_ALL:
dbgmsg( msgs, "Got a BT_HAVE_ALL" );
tr_bitfieldAddRange( msgs->info->have, 0,
msgs->torrent->info.pieceCount );
updatePeerProgress( msgs );
case BT_FEXT_ALLOWED_FAST:
dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
if( fext )
fireClientGotAllowedFast( msgs, ui32 );
else {
fireError( msgs, EPROTO );
return READ_ERR;
}
break;
case BT_FEXT_HAVE_ALL:
dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
if( fext ) {
tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
updatePeerProgress( msgs );
} else {
fireError( msgs, EPROTO );
return READ_ERR;
}
break;
case BT_HAVE_NONE:
dbgmsg( msgs, "Got a BT_HAVE_NONE" );
tr_bitfieldClear( msgs->info->have );
updatePeerProgress( msgs );
case BT_FEXT_HAVE_NONE:
dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
if( fext ) {
tr_bitfieldClear( msgs->info->have );
updatePeerProgress( msgs );
} else {
fireError( msgs, EPROTO );
return READ_ERR;
}
break;
case BT_REJECT:
case BT_FEXT_REJECT:
{
struct peer_request r;
dbgmsg( msgs, "Got a BT_REJECT" );
dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
tr_peerIoReadUint32( msgs->io, inbuf, &r.index );
tr_peerIoReadUint32( msgs->io, inbuf, &r.offset );
tr_peerIoReadUint32( msgs->io, inbuf, &r.length );
reqListRemove( &msgs->clientAskedFor, &r );
if( fext )
reqListRemove( &msgs->clientAskedFor, &r );
else {
fireError( msgs, EPROTO );
return READ_ERR;
}
break;
}
@@ -1531,19 +1762,13 @@ ratePulse( void * vpeer )
return TRUE;
}
static int
popNextRequest( tr_peermsgs * msgs,
struct peer_request * setme )
{
return reqListPop( &msgs->peerAskedFor, setme );
}
static size_t
fillOutputBuffer( tr_peermsgs * msgs, time_t now )
{
size_t bytesWritten = 0;
struct peer_request req;
const int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
const int fext = tr_peerIoSupportsFEXT( msgs->io );
/**
*** Protocol messages
@@ -1571,32 +1796,38 @@ fillOutputBuffer( tr_peermsgs * msgs, time_t now )
**/
if( ( tr_peerIoGetWriteBufferSpace( msgs->io ) >= msgs->torrent->blockSize )
&& popNextRequest( msgs, &req )
&& requestIsValid( msgs, &req )
&& tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
&& popNextRequest( msgs, &req ) )
{
/* send a block */
uint8_t * buf = tr_new( uint8_t, req.length );
const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
if( err ) {
fireError( msgs, err );
bytesWritten = 0;
msgs = NULL;
} else {
tr_peerIo * io = msgs->io;
struct evbuffer * out = evbuffer_new( );
dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
tr_peerIoWriteUint8 ( io, out, BT_PIECE );
tr_peerIoWriteUint32( io, out, req.index );
tr_peerIoWriteUint32( io, out, req.offset );
tr_peerIoWriteBytes ( io, out, buf, req.length );
tr_peerIoWriteBuf( io, out, TRUE );
bytesWritten += EVBUFFER_LENGTH( out );
evbuffer_free( out );
msgs->clientSentAnythingAt = now;
if( requestIsValid( msgs, &req )
&& tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
{
/* send a block */
uint8_t * buf = tr_new( uint8_t, req.length );
const int err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf );
if( err ) {
fireError( msgs, err );
bytesWritten = 0;
msgs = NULL;
} else {
tr_peerIo * io = msgs->io;
struct evbuffer * out = evbuffer_new( );
dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
tr_peerIoWriteUint8 ( io, out, BT_PIECE );
tr_peerIoWriteUint32( io, out, req.index );
tr_peerIoWriteUint32( io, out, req.offset );
tr_peerIoWriteBytes ( io, out, buf, req.length );
tr_peerIoWriteBuf( io, out, TRUE );
bytesWritten += EVBUFFER_LENGTH( out );
evbuffer_free( out );
msgs->clientSentAnythingAt = now;
}
tr_free( buf );
}
else if( fext ) /* peer needs a reject message */
{
protocolSendReject( msgs, &req );
}
tr_free( buf );
}
/**
@@ -1706,6 +1937,25 @@ sendBitfield( tr_peermsgs * msgs )
tr_bitfieldFree( field );
}
static void
tellPeerWhatWeHave( tr_peermsgs * msgs )
{
const int fext = tr_peerIoSupportsFEXT( msgs->io );
if( fext && ( tr_cpGetStatus( msgs->torrent->completion ) == TR_SEED ) )
{
protocolSendHaveAll( msgs );
}
else if( fext && ( tr_cpHaveValid( msgs->torrent->completion ) == 0 ) )
{
protocolSendHaveNone( msgs );
}
else
{
sendBitfield( msgs );
}
}
/**
***
**/
@@ -1912,7 +2162,7 @@ tr_peerMsgsNew( struct tr_torrent * torrent,
if( tr_peerIoSupportsLTEP( m->io ) )
sendLtepHandshake( m );
sendBitfield( m );
tellPeerWhatWeHave( m );
tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of
inactivity */

View File

@@ -53,9 +53,17 @@ void tr_peerMsgsFree( tr_peermsgs* );
tr_addreq_t tr_peerMsgsAddRequest( tr_peermsgs * peer,
uint32_t pieceIndex,
uint32_t offset,
uint32_t length );
uint32_t length,
int doForce );
void tr_peerMsgsUnsubscribe( tr_peermsgs * peer,
tr_publisher_tag tag );
size_t tr_generateAllowedSet( tr_piece_index_t * setmePieces,
size_t desiredSetSize,
size_t pieceCount,
const uint8_t * infohash,
const tr_address * addr );
void tr_peerMsgsUnsubscribe( tr_peermsgs * peer,
tr_publisher_tag tag );
#endif