diff --git a/libtransmission/peer-common.h b/libtransmission/peer-common.h index df8e8967b..3cebcf520 100644 --- a/libtransmission/peer-common.h +++ b/libtransmission/peer-common.h @@ -47,8 +47,7 @@ typedef enum TR_PEER_PEER_PROGRESS, TR_PEER_ERROR, TR_PEER_CANCEL, - TR_PEER_UPLOAD_ONLY, - TR_PEER_NEED_REQ + TR_PEER_UPLOAD_ONLY } PeerEventType; diff --git a/libtransmission/peer-mgr.c b/libtransmission/peer-mgr.c index a64c377c1..dc90383e6 100644 --- a/libtransmission/peer-mgr.c +++ b/libtransmission/peer-mgr.c @@ -1,4 +1,3 @@ - /* * This file Copyright (C) 2007-2009 Charles Kerr * @@ -47,7 +46,13 @@ enum RECHOKE_PERIOD_MSEC = ( 10 * 1000 ), /* minimum interval for refilling peers' request lists */ - REFILL_PERIOD_MSEC = 333, + REFILL_PERIOD_MSEC = 400, + + /* how frequently to reallocate bandwidth */ + BANDWIDTH_PERIOD_MSEC = 500, + + /* how frequently to decide which peers live and die */ + RECONNECT_PERIOD_MSEC = 500, /* when many peers are available, keep idle ones this long */ MIN_UPLOAD_IDLE_SECS = ( 30 ), @@ -55,12 +60,6 @@ enum /* when few peers are available, keep idle ones this long */ MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ), - /* how frequently to decide which peers live and die */ - RECONNECT_PERIOD_MSEC = ( 2 * 1000 ), - - /* how frequently to reallocate bandwidth */ - BANDWIDTH_PERIOD_MSEC = 500, - /* max # of peers to ask fer per torrent per reconnect pulse */ MAX_RECONNECTIONS_PER_PULSE = 16, @@ -74,6 +73,7 @@ enum /* use for bitwise operations w/peer_atom.myflags */ MYFLAG_BANNED = 1, + /* use for bitwise operations w/peer_atom.myflags */ /* unreachable for now... but not banned. * if they try to connect to us it's okay */ MYFLAG_UNREACHABLE = 2, @@ -126,9 +126,6 @@ typedef struct tr_torrent_peers tr_ptrArray pool; /* struct peer_atom */ tr_ptrArray peers; /* tr_peer */ tr_ptrArray webseeds; /* tr_webseed */ - tr_timer * reconnectTimer; - tr_timer * rechokeTimer; - tr_timer * refillTimer; tr_torrent * tor; tr_peer * optimistic; /* the optimistic peer, or NULL if none */ @@ -141,6 +138,9 @@ struct tr_peerMgr tr_session * session; tr_ptrArray incomingHandshakes; /* tr_handshake */ tr_timer * bandwidthTimer; + tr_timer * rechokeTimer; + tr_timer * refillTimer; + tr_timer * reconnectTimer; }; #define tordbg( t, ... ) \ @@ -380,10 +380,6 @@ torrentDestructor( void * vt ) memcpy( hash, t->hash, SHA_DIGEST_LENGTH ); - tr_timerFree( &t->reconnectTimer ); - tr_timerFree( &t->rechokeTimer ); - tr_timerFree( &t->refillTimer ); - tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree ); tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free ); tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL ); @@ -424,8 +420,10 @@ torrentConstructor( tr_peerMgr * manager, } -static int bandwidthPulse( void * vmgr ); - +static int bandwidthPulse ( void * vmgr ); +static int rechokePulse ( void * vmgr ); +static int refillPulse ( void * vmgr ); +static int reconnectPulse ( void * vmgr ); tr_peerMgr* tr_peerMgrNew( tr_session * session ) @@ -435,6 +433,12 @@ tr_peerMgrNew( tr_session * session ) m->session = session; m->incomingHandshakes = TR_PTR_ARRAY_INIT; m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC ); + m->rechokeTimer = tr_timerNew( session, rechokePulse, m, RECHOKE_PERIOD_MSEC ); + m->refillTimer = tr_timerNew( session, refillPulse, m, REFILL_PERIOD_MSEC ); + m->reconnectTimer = tr_timerNew( session, reconnectPulse, m, RECONNECT_PERIOD_MSEC ); + + rechokePulse( m ); + return m; } @@ -443,6 +447,9 @@ tr_peerMgrFree( tr_peerMgr * manager ) { managerLock( manager ); + tr_timerFree( &manager->reconnectTimer ); + tr_timerFree( &manager->refillTimer ); + tr_timerFree( &manager->rechokeTimer ); tr_timerFree( &manager->bandwidthTimer ); /* free the handshakes. Abort invokes handshakeDoneCB(), which removes @@ -727,8 +734,8 @@ getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b ) return (uint32_t)( blockPos - piecePos ); } -static int -refillPulse( void * vtorrent ) +static void +refillTorrent( Torrent * t ) { tr_block_index_t block; int peerCount; @@ -736,15 +743,13 @@ refillPulse( void * vtorrent ) tr_peer ** peers; tr_webseed ** webseeds; struct tr_blockIterator * blockIterator; - Torrent * t = vtorrent; tr_torrent * tor = t->tor; if( !t->isRunning ) - return TRUE; + return; if( tr_torrentIsSeed( t->tor ) ) - return TRUE; + return; - torrentLock( t ); tordbg( t, "Refilling Request Buffers..." ); blockIterator = blockIteratorNew( t ); @@ -816,10 +821,21 @@ refillPulse( void * vtorrent ) blockIteratorFree( blockIterator ); tr_free( webseeds ); tr_free( peers ); +} - t->refillTimer = NULL; - torrentUnlock( t ); - return FALSE; +static int +refillPulse( void * vmgr ) +{ + tr_torrent * tor = NULL; + tr_peerMgr * mgr = vmgr; + managerLock( mgr ); + + while(( tor = tr_torrentNext( mgr->session, tor ))) + if( tor->isRunning && !tr_torrentIsSeed( tor ) ) + refillTorrent( tor->torrentPeers ); + + managerUnlock( mgr ); + return TRUE; } static void @@ -868,15 +884,6 @@ gotBadPiece( Torrent * t, tor->downloadedCur -= MIN( tor->downloadedCur, byteCount ); } -static void -refillSoon( Torrent * t ) -{ - if( t->refillTimer == NULL ) - t->refillTimer = tr_timerNew( t->manager->session, - refillPulse, t, - REFILL_PERIOD_MSEC ); -} - static void peerSuggestedPiece( Torrent * t UNUSED, tr_peer * peer UNUSED, @@ -944,10 +951,6 @@ peerCallbackFunc( void * vpeer, void * vevent, void * vt ) } break; - case TR_PEER_NEED_REQ: - refillSoon( t ); - break; - case TR_PEER_CANCEL: decrementPieceRequests( t, e->pieceIndex ); break; @@ -1506,42 +1509,10 @@ tr_peerMgrGetPeers( tr_torrent * tor, return peersReturning; } -static int reconnectPulse( void * vtorrent ); - -static int rechokePulse( void * vtorrent ); - void tr_peerMgrStartTorrent( tr_torrent * tor ) { - Torrent * t = tor->torrentPeers; - - managerLock( t->manager ); - - assert( t ); - assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) ); - assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) ); - - if( !t->isRunning ) - { - t->isRunning = 1; - - t->reconnectTimer = tr_timerNew( t->manager->session, - reconnectPulse, t, - RECONNECT_PERIOD_MSEC ); - - t->rechokeTimer = tr_timerNew( t->manager->session, - rechokePulse, t, - RECHOKE_PERIOD_MSEC ); - - reconnectPulse( t ); - - rechokePulse( t ); - - if( !tr_ptrArrayEmpty( &t->webseeds ) ) - refillSoon( t ); - } - - managerUnlock( t->manager ); + tor->torrentPeers->isRunning = TRUE; } static void @@ -1549,9 +1520,7 @@ stopTorrent( Torrent * t ) { assert( torrentIsLocked( t ) ); - t->isRunning = 0; - tr_timerFree( &t->rechokeTimer ); - tr_timerFree( &t->reconnectTimer ); + t->isRunning = FALSE; /* disconnect the peers. */ tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor ); @@ -1867,7 +1836,7 @@ isSame( const tr_peer * peer ) **/ static void -rechoke( Torrent * t ) +rechokeTorrent( Torrent * t ) { int i, size, unchokedInterested; const int peerCount = tr_ptrArraySize( &t->peers ); @@ -1967,13 +1936,17 @@ rechoke( Torrent * t ) } static int -rechokePulse( void * vtorrent ) +rechokePulse( void * vmgr ) { - Torrent * t = vtorrent; + tr_torrent * tor = NULL; + tr_peerMgr * mgr = vmgr; + managerLock( mgr ); - torrentLock( t ); - rechoke( t ); - torrentUnlock( t ); + while(( tor = tr_torrentNext( mgr->session, tor ))) + if( tor->isRunning ) + rechokeTorrent( tor->torrentPeers ); + + managerUnlock( mgr ); return TRUE; } @@ -2215,16 +2188,13 @@ closePeer( Torrent * t, tr_peer * peer ) removePeer( t, peer ); } -static int -reconnectPulse( void * vtorrent ) +static void +reconnectTorrent( Torrent * t ) { - Torrent * t = vtorrent; static time_t prevTime = 0; static int newConnectionsThisSecond = 0; time_t now; - torrentLock( t ); - now = time( NULL ); if( prevTime != now ) { @@ -2326,8 +2296,20 @@ reconnectPulse( void * vtorrent ) tr_free( mustClose ); tr_free( canClose ); } +} - torrentUnlock( t ); +static int +reconnectPulse( void * vmgr ) +{ + tr_torrent * tor = NULL; + tr_peerMgr * mgr = vmgr; + managerLock( mgr ); + + while(( tor = tr_torrentNext( mgr->session, tor ))) + if( tor->isRunning ) + reconnectTorrent( tor->torrentPeers ); + + managerUnlock( mgr ); return TRUE; } diff --git a/libtransmission/peer-msgs.c b/libtransmission/peer-msgs.c index b8b347155..7bdece765 100644 --- a/libtransmission/peer-msgs.c +++ b/libtransmission/peer-msgs.c @@ -419,14 +419,6 @@ fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly ) publish( msgs, &e ); } -static void -fireNeedReq( tr_peermsgs * msgs ) -{ - tr_peer_event e = blankEvent; - e.eventType = TR_PEER_NEED_REQ; - publish( msgs, &e ); -} - static void firePeerProgress( tr_peermsgs * msgs ) { @@ -653,8 +645,6 @@ updateInterest( tr_peermsgs * msgs ) if( i != msgs->peer->clientIsInterested ) sendInterest( msgs, i ); - if( i ) - fireNeedReq( msgs ); } static int @@ -819,9 +809,6 @@ pumpRequestQueue( tr_peermsgs * msgs, const time_t now ) if( sent ) dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued", sent, msgs->clientAskedFor.len, msgs->clientWillAskFor.len ); - - if( len < max ) - fireNeedReq( msgs ); } static TR_INLINE int @@ -1366,7 +1353,6 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen ) case BT_UNCHOKE: dbgmsg( msgs, "got Unchoke" ); msgs->peer->clientIsChoked = 0; - fireNeedReq( msgs ); break; case BT_INTERESTED: @@ -1392,13 +1378,10 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen ) break; case BT_BITFIELD: - { dbgmsg( msgs, "got a bitfield" ); tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen ); updatePeerProgress( msgs ); - fireNeedReq( msgs ); break; - } case BT_REQUEST: { diff --git a/libtransmission/session.c b/libtransmission/session.c index ba5ecb5e1..6dc49f15e 100644 --- a/libtransmission/session.c +++ b/libtransmission/session.c @@ -359,28 +359,66 @@ tr_sessionSaveSettings( tr_session * session, const char * configDir, tr_benc * static void metainfoLookupRescan( tr_session * ); static void tr_sessionInitImpl( void * ); +struct init_data +{ + tr_session * session; + const char * configDir; + tr_bool messageQueuingEnabled; + tr_benc * clientSettings; +}; + tr_session * tr_sessionInit( const char * tag, const char * configDir, tr_bool messageQueuingEnabled, tr_benc * clientSettings ) { - int64_t i; - int64_t j; - tr_bool found; - const char * str; - tr_benc settings; tr_session * session; - char * filename; + struct init_data data; assert( tr_bencIsDict( clientSettings ) ); + /* initialize the bare skeleton of the session object */ session = tr_new0( tr_session, 1 ); session->bandwidth = tr_bandwidthNew( session, NULL ); session->lock = tr_lockNew( ); session->tag = tr_strdup( tag ); session->magicNumber = SESSION_MAGIC_NUMBER; + /* start the libtransmission thread */ + tr_netInit( ); /* must go before tr_eventInit */ + tr_eventInit( session ); + assert( session->events != NULL ); + + /* run the rest in the libtransmission thread */ + session->isWaiting = TRUE; + data.session = session; + data.configDir = configDir; + data.messageQueuingEnabled = messageQueuingEnabled; + data.clientSettings = clientSettings; + tr_runInEventThread( session, tr_sessionInitImpl, &data ); + while( session->isWaiting ) + tr_wait( 100 ); + + return session; +} + +static void +tr_sessionInitImpl( void * vdata ) +{ + int64_t i; + int64_t j; + tr_bool found; + const char * str; + tr_benc settings; + char * filename; + struct init_data * data = vdata; + tr_benc * clientSettings = data->clientSettings; + tr_session * session = data->session; + + assert( tr_amInEventThread( session ) ); + assert( tr_bencIsDict( clientSettings ) ); + dbgmsg( "tr_sessionInit: the session's top-level bandwidth object is %p", session->bandwidth ); tr_bencInitDict( &settings, 0 ); @@ -399,7 +437,7 @@ tr_sessionInit( const char * tag, found = tr_bencDictFindInt( &settings, TR_PREFS_KEY_MSGLEVEL, &i ); assert( found ); tr_setMessageLevel( i ); - tr_setMessageQueuing( messageQueuingEnabled ); + tr_setMessageQueuing( data->messageQueuingEnabled ); found = tr_bencDictFindInt( &settings, TR_PREFS_KEY_PEX_ENABLED, &i ); @@ -455,11 +493,9 @@ tr_sessionInit( const char * tag, session->so_sndbuf = 1500 * 3; /* 3x MTU for most ethernet/wireless */ session->so_rcvbuf = 8192; - tr_setConfigDir( session, configDir ); + tr_setConfigDir( session, data->configDir ); - tr_netInit( ); /* must go before tr_eventInit */ - tr_eventInit( session ); - assert( session->events != NULL ); + tr_trackerSessionInit( session ); session->peerMgr = tr_peerMgrNew( session ); @@ -531,18 +567,6 @@ tr_sessionInit( const char * tag, tr_bencFree( &settings ); - session->isWaiting = TRUE; - tr_runInEventThread( session, tr_sessionInitImpl, session ); - while( session->isWaiting ) - tr_wait( 100 ); - - return session; -} -static void -tr_sessionInitImpl( void * vsession ) -{ - tr_session * session = vsession; - assert( tr_isSession( session ) ); /* first %s is the application name diff --git a/libtransmission/tracker.c b/libtransmission/tracker.c index 7582d5f51..49921de8f 100644 --- a/libtransmission/tracker.c +++ b/libtransmission/tracker.c @@ -841,17 +841,14 @@ struct tr_tracker_handle static int trackerPulse( void * vsession ); -static void -ensureGlobalsExist( tr_session * session ) +void +tr_trackerSessionInit( tr_session * session ) { - if( session->tracker == NULL ) - { - session->tracker = tr_new0( struct tr_tracker_handle, 1 ); - session->tracker->pulseTimer = - tr_timerNew( session, trackerPulse, session, - PULSE_INTERVAL_MSEC ); - dbgmsg( NULL, "creating tracker timer" ); - } + assert( tr_isSession( session ) ); + + session->tracker = tr_new0( struct tr_tracker_handle, 1 ); + session->tracker->pulseTimer = tr_timerNew( session, trackerPulse, session, PULSE_INTERVAL_MSEC ); + dbgmsg( NULL, "creating tracker timer" ); } void @@ -1035,8 +1032,6 @@ tr_trackerNew( const tr_torrent * torrent ) const tr_info * info = &torrent->info; tr_tracker * t; - ensureGlobalsExist( torrent->session ); - t = tr_new0( tr_tracker, 1 ); t->publisher = TR_PUBLISHER_INIT; t->session = torrent->session; diff --git a/libtransmission/tracker.h b/libtransmission/tracker.h index 8d8d1bd35..374d17781 100644 --- a/libtransmission/tracker.h +++ b/libtransmission/tracker.h @@ -31,6 +31,12 @@ tr_tracker * tr_trackerNew( const tr_torrent * ); void tr_trackerFree( tr_tracker * ); +/** +*** +**/ + +void tr_trackerSessionInit( tr_session * ); + void tr_trackerSessionClose( tr_session * ); /** diff --git a/libtransmission/trevent.c b/libtransmission/trevent.c index 4435f37ab..45d641f31 100644 --- a/libtransmission/trevent.c +++ b/libtransmission/trevent.c @@ -191,19 +191,6 @@ readFromPipe( int fd, break; } - case 't': /* create timer */ - { - tr_timer * timer; - const size_t nwant = sizeof( timer ); - const ssize_t ngot = piperead( fd, &timer, nwant ); - if( !eh->die && ( ngot == (ssize_t)nwant ) ) - { - dbgmsg( "adding timer in libevent thread" ); - evtimer_add( &timer->event, &timer->tv ); - } - break; - } - case '\0': /* eof */ { dbgmsg( "pipe eof reached... removing event listener" ); @@ -294,7 +281,7 @@ tr_bool tr_amInEventThread( tr_session * session ) { assert( tr_isSession( session ) ); - assert( session->events ); + assert( session->events != NULL ); return tr_amInThread( session->events->thread ); } @@ -341,38 +328,23 @@ tr_timerFree( tr_timer ** ptimer ) } tr_timer* -tr_timerNew( tr_session * session, - timer_func func, - void * user_data, - uint64_t interval_milliseconds ) +tr_timerNew( tr_session * session, + timer_func func, + void * user_data, + uint64_t interval_milliseconds ) { tr_timer * timer; - assert( tr_isSession( session ) ); - assert( session->events != NULL ); + assert( tr_amInEventThread( session ) ); timer = tr_new0( tr_timer, 1 ); - tr_timevalMsec( interval_milliseconds, &timer->tv ); timer->func = func; timer->user_data = user_data; timer->eh = session->events; + + tr_timevalMsec( interval_milliseconds, &timer->tv ); evtimer_set( &timer->event, timerCallback, timer ); - - if( tr_amInThread( session->events->thread ) ) - { - evtimer_add( &timer->event, &timer->tv ); - } - else - { - const char ch = 't'; - int fd = session->events->fds[1]; - tr_lock * lock = session->events->lock; - - tr_lockLock( lock ); - pipewrite( fd, &ch, 1 ); - pipewrite( fd, &timer, sizeof( timer ) ); - tr_lockUnlock( lock ); - } + evtimer_add( &timer->event, &timer->tv ); return timer; } diff --git a/libtransmission/webseed.c b/libtransmission/webseed.c index 078827f16..21d860cd0 100644 --- a/libtransmission/webseed.c +++ b/libtransmission/webseed.c @@ -61,14 +61,6 @@ publish( tr_webseed * w, w->callback( NULL, e, w->callback_userdata ); } -static void -fireNeedReq( tr_webseed * w ) -{ - tr_peer_event e = blankEvent; - e.eventType = TR_PEER_NEED_REQ; - publish( w, &e ); -} - static void fireClientGotBlock( tr_webseed * w, uint32_t pieceIndex, uint32_t offset, uint32_t length ) { @@ -180,10 +172,8 @@ webResponseFunc( tr_session * session, w->busy = 0; if( w->dead ) tr_webseedFree( w ); - else { + else fireClientGotBlock( w, w->pieceIndex, w->pieceOffset, w->byteCount ); - fireNeedReq( w ); - } } } }