From f4b4ddd231a38937e223ac744d41dabd0dabc7ff Mon Sep 17 00:00:00 2001 From: Jordan Lee Date: Thu, 17 Mar 2011 18:51:31 +0000 Subject: [PATCH] (trunk libT) better shutdown management of libutp and UDP trackers in tr_sessionClose(). This is a little overlapping since the utp code can be closed more-or-less immediately, but the udp manager needs to stay open in order to process the udp tracker connection requests before sending out event=stopped. Moreover DNS resolver can be shut down after the UDP tracker is shutdown. --- libtransmission/announcer-common.h | 2 +- libtransmission/announcer-udp.c | 80 ++++++++++++++++++++++++++++-- libtransmission/announcer.c | 26 ++++------ libtransmission/announcer.h | 11 ++++ libtransmission/session.c | 30 +++++++++-- libtransmission/tr-udp.c | 2 - libtransmission/tr-utp.c | 2 +- libtransmission/trevent.c | 5 +- libtransmission/web.c | 14 +++++- 9 files changed, 139 insertions(+), 33 deletions(-) diff --git a/libtransmission/announcer-common.h b/libtransmission/announcer-common.h index 938b85aba..f9f8c3b1c 100644 --- a/libtransmission/announcer-common.h +++ b/libtransmission/announcer-common.h @@ -238,6 +238,6 @@ void tr_tracker_udp_announce( tr_session * session, tr_announce_response_func response_func, void * user_data ); -void tr_tracker_udp_upkeep( tr_session * session ); +void tr_tracker_udp_start_shutdown( tr_session * session ); #endif /* _TR_ANNOUNCER_COMMON_H_ */ diff --git a/libtransmission/announcer-udp.c b/libtransmission/announcer-udp.c index a308b7665..d54593598 100644 --- a/libtransmission/announcer-udp.c +++ b/libtransmission/announcer-udp.c @@ -19,6 +19,7 @@ #include #include "transmission.h" +#include "announcer.h" #include "announcer-common.h" #include "crypto.h" #include "peer-io.h" @@ -435,13 +436,14 @@ struct tau_tracker tau_connection_t connection_id; tau_transaction_t connection_transaction_id; + time_t close_at; + tr_ptrArray announces; tr_ptrArray scrapes; }; static void tau_tracker_upkeep( struct tau_tracker * ); -#if 0 static void tau_tracker_free( struct tau_tracker * t ) { @@ -453,7 +455,6 @@ tau_tracker_free( struct tau_tracker * t ) tr_free( t->key ); tr_free( t ); } -#endif static void tau_tracker_fail_all( struct tau_tracker * tracker, @@ -523,6 +524,13 @@ tau_tracker_send_request( struct tau_tracker * tracker, evbuffer_free( buf ); } +static tr_bool +tau_tracker_is_empty( const struct tau_tracker * tracker ) +{ + return tr_ptrArrayEmpty( &tracker->announces ) + && tr_ptrArrayEmpty( &tracker->scrapes ); +} + static void tau_tracker_upkeep( struct tau_tracker * tracker ) { @@ -540,8 +548,7 @@ tau_tracker_upkeep( struct tau_tracker * tracker ) } /* are there any requests pending? */ - if( tr_ptrArrayEmpty( &tracker->announces ) && - tr_ptrArrayEmpty( &tracker->scrapes ) ) + if( tau_tracker_is_empty( tracker ) ) return; /* if we don't have an address yet, try & get one now. */ @@ -583,14 +590,21 @@ tau_tracker_upkeep( struct tau_tracker * tracker ) reqs = &tracker->announces; for( i=0, n=tr_ptrArraySize(reqs); isent_at ) { dbgmsg( tracker->key, "Sending an announce request" ); req->sent_at = now; tau_tracker_send_request( tracker, req->payload, req->payload_len ); + remove_request = req->callback == NULL; } else if( req->created_at + TAU_REQUEST_TTL < now ) { tau_announce_request_fail( tracker->session, req, FALSE, TRUE, NULL ); + remove_request = TRUE; + } + if( tracker->close_at && ( tracker->close_at <= time(NULL) ) ) + remove_request = TRUE; + if( remove_request ) { tau_announce_request_free( req ); tr_ptrArrayRemove( reqs, i ); --i; @@ -602,14 +616,21 @@ tau_tracker_upkeep( struct tau_tracker * tracker ) reqs = &tracker->scrapes; for( i=0, n=tr_ptrArraySize(reqs); isent_at ) { dbgmsg( tracker->key, "Sending a scrape request" ); req->sent_at = now; tau_tracker_send_request( tracker, req->payload, req->payload_len ); + remove_request = req->callback == NULL; } else if( req->created_at + TAU_REQUEST_TTL < now ) { tau_scrape_request_fail( tracker->session, req, FALSE, TRUE, NULL ); + remove_request = TRUE; + } + if( tracker->close_at && ( tracker->close_at <= time(NULL) ) ) + remove_request = TRUE; + if( remove_request ) { tau_scrape_request_free( req ); tr_ptrArrayRemove( reqs, i ); --i; @@ -741,6 +762,57 @@ tr_tracker_udp_upkeep( tr_session * session ) (PtrArrayForeachFunc)tau_tracker_upkeep ); } +tr_bool +tr_tracker_udp_is_empty( const tr_session * session ) +{ + int i; + int n; + struct tr_announcer_udp * tau = session->announcer_udp; + + if( tau != NULL ) + for( i=0, n=tr_ptrArraySize(&tau->trackers); itrackers, i ) ) ) + return FALSE; + + return TRUE; +} + +/* drop dead now. */ +void +tr_tracker_udp_close( tr_session * session ) +{ + struct tr_announcer_udp * tau = session->announcer_udp; + + if( tau != NULL ) + { + session->announcer_udp = NULL; + tr_ptrArrayDestruct( &tau->trackers, (PtrArrayForeachFunc)tau_tracker_free ); + tr_free( tau ); + } + +} + +/* start shutting down. + This doesn't destroy everything if there are requests, + but sets a deadline on how much longer to wait for the remaining ones */ +void +tr_tracker_udp_start_shutdown( tr_session * session ) +{ + const time_t now = time( NULL ); + struct tr_announcer_udp * tau = session->announcer_udp; + + if( tau != NULL ) + { + int i, n; + for( i=0, n=tr_ptrArraySize(&tau->trackers); itrackers, i ); + tracker->close_at = now + 3; + tau_tracker_upkeep( tracker ); + } + } +} + /* @brief process an incoming udp message if it's a tracker response. * @return true if msg was a tracker response; false otherwise */ tr_bool diff --git a/libtransmission/announcer.c b/libtransmission/announcer.c index eb6cab7e7..347db9c9f 100644 --- a/libtransmission/announcer.c +++ b/libtransmission/announcer.c @@ -124,7 +124,6 @@ typedef struct tr_announcer struct event * upkeepTimer; int slotsAvailable; int key; - time_t lpdUpkeepAt; time_t tauUpkeepAt; } tr_announcer; @@ -135,14 +134,6 @@ tr_announcerHasBacklog( const struct tr_announcer * announcer ) return announcer->slotsAvailable < 1; } -static inline time_t -jitterize( const int val ) -{ - const double jitter = 0.1; - assert( val > 0 ); - return val + tr_cryptoWeakRandInt((int)(1 + val * jitter)); -} - static void onUpkeepTimer( int foo UNUSED, short bar UNUSED, void * vannouncer ); @@ -158,7 +149,6 @@ tr_announcerInit( tr_session * session ) a->key = tr_cryptoRandInt( INT_MAX ); a->session = session; a->slotsAvailable = MAX_CONCURRENT_TASKS; - a->lpdUpkeepAt = tr_time() + jitterize(5); a->upkeepTimer = evtimer_new( session->event_base, onUpkeepTimer, a ); tr_timerAdd( a->upkeepTimer, UPKEEP_INTERVAL_SECS, 0 ); @@ -174,6 +164,8 @@ tr_announcerClose( tr_session * session ) flushCloseMessages( announcer ); + tr_tracker_udp_start_shutdown( session ); + event_free( announcer->upkeepTimer ); announcer->upkeepTimer = NULL; @@ -1491,26 +1483,30 @@ announceMore( tr_announcer * announcer ) static void onUpkeepTimer( int foo UNUSED, short bar UNUSED, void * vannouncer ) { - const time_t now = tr_time( ); tr_announcer * announcer = vannouncer; - tr_sessionLock( announcer->session ); + tr_session * session = announcer->session; + const tr_bool is_closing = session->isClosed; + const time_t now = tr_time( ); + + tr_sessionLock( session ); /* maybe send out some "stopped" messages for closed torrents */ flushCloseMessages( announcer ); /* maybe send out some announcements to trackers */ - announceMore( announcer ); + if( !is_closing ) + announceMore( announcer ); /* TAU upkeep */ if( announcer->tauUpkeepAt <= now ) { announcer->tauUpkeepAt = now + TAU_UPKEEP_INTERVAL_SECS; - tr_tracker_udp_upkeep( announcer->session ); + tr_tracker_udp_upkeep( session ); } /* set up the next timer */ tr_timerAdd( announcer->upkeepTimer, UPKEEP_INTERVAL_SECS, 0 ); - tr_sessionUnlock( announcer->session ); + tr_sessionUnlock( session ); } /*** diff --git a/libtransmission/announcer.h b/libtransmission/announcer.h index 10862b4f6..a42f47222 100644 --- a/libtransmission/announcer.h +++ b/libtransmission/announcer.h @@ -104,5 +104,16 @@ tr_tracker_stat * tr_announcerStats( const tr_torrent * torrent, void tr_announcerStatsFree( tr_tracker_stat * trackers, int trackerCount ); +/*** +**** +***/ + +void tr_tracker_udp_upkeep( tr_session * session ); + +void tr_tracker_udp_close( tr_session * session ); + +tr_bool tr_tracker_udp_is_empty( const tr_session * session ); + + #endif /* _TR_ANNOUNCER_H_ */ diff --git a/libtransmission/session.c b/libtransmission/session.c index 669606650..9e05aabee 100644 --- a/libtransmission/session.c +++ b/libtransmission/session.c @@ -21,6 +21,7 @@ #include /* stat */ #include /* opendir */ +#include /* evdns_base_free() */ #include //#define TR_SHOW_DEPRECATED @@ -1733,7 +1734,7 @@ sessionCloseImpl( void * vsession ) tr_lpdUninit( session ); tr_utpClose( session ); - tr_udpUninit( session ); + tr_dhtUninit( session ); event_free( session->saveTimer ); session->saveTimer = NULL; @@ -1758,12 +1759,33 @@ sessionCloseImpl( void * vsession ) tr_torrentFree( torrents[i] ); tr_free( torrents ); + /* Close the announcer *after* closing the torrents + so that all the &event=stopped messages will be + queued to be sent by tr_announcerClose() */ + tr_announcerClose( session ); + + /* and this goes *after* announcer close so that + it won't be idle until the announce events are sent... */ + tr_webClose( session, TR_WEB_CLOSE_WHEN_IDLE ); + tr_cacheFree( session->cache ); session->cache = NULL; - tr_announcerClose( session ); + + /* gotta keep udp running long enough to send out all + the &event=stopped UDP tracker messages */ + while( !tr_tracker_udp_is_empty( session ) ) { + tr_tracker_udp_upkeep( session ); + tr_wait_msec( 100 ); + } + + /* we had to wait until UDP trackers were closed before closing these: */ + evdns_base_free( session->evdns_base, 0 ); + session->evdns_base = NULL; + tr_tracker_udp_close( session ); + tr_udpUninit( session ); + tr_statsClose( session ); tr_peerMgrFree( session->peerMgr ); - tr_webClose( session, TR_WEB_CLOSE_WHEN_IDLE ); closeBlocklists( session ); @@ -1801,7 +1823,7 @@ tr_sessionClose( tr_session * session ) * so we need to keep the transmission thread alive * for a bit while they tell the router & tracker * that we're closing now */ - while( ( session->shared || session->web || session->announcer ) + while( ( session->shared || session->web || session->announcer || session->announcer_udp ) && !deadlineReached( deadline ) ) { dbgmsg( "waiting on port unmap (%p) or announcer (%p)... now %zu deadline %zu", diff --git a/libtransmission/tr-udp.c b/libtransmission/tr-udp.c index 2beeb1ed2..d30c955d5 100644 --- a/libtransmission/tr-udp.c +++ b/libtransmission/tr-udp.c @@ -289,8 +289,6 @@ tr_udpInit(tr_session *ss) void tr_udpUninit(tr_session *ss) { - tr_dhtUninit(ss); - if(ss->udp_socket >= 0) { tr_netCloseSocket( ss->udp_socket ); ss->udp_socket = -1; diff --git a/libtransmission/tr-utp.c b/libtransmission/tr-utp.c index eb688ec95..138c2de1d 100644 --- a/libtransmission/tr-utp.c +++ b/libtransmission/tr-utp.c @@ -169,7 +169,7 @@ tr_utpPacket(const unsigned char *buf, size_t buflen, const struct sockaddr *from, socklen_t fromlen, tr_session *ss) { - if(utp_timer == NULL) + if( !ss->isClosed && !utp_timer ) { utp_timer = evtimer_new( ss->event_base, timer_callback, ss ); if(utp_timer == NULL) diff --git a/libtransmission/trevent.c b/libtransmission/trevent.c index 947f02e0c..2d3bf7489 100644 --- a/libtransmission/trevent.c +++ b/libtransmission/trevent.c @@ -224,7 +224,6 @@ static void libeventThreadFunc( void * veh ) { struct event_base * base; - struct evdns_base * evdns_base; tr_event_handle * eh = veh; #ifndef WIN32 @@ -234,12 +233,11 @@ libeventThreadFunc( void * veh ) /* create the libevent bases */ base = event_base_new( ); - evdns_base = evdns_base_new( base, TRUE ); /* set the struct's fields */ eh->base = base; eh->session->event_base = base; - eh->session->evdns_base = evdns_base; + eh->session->evdns_base = evdns_base_new( base, TRUE ); eh->session->events = eh; /* listen to the pipe's read fd */ @@ -253,7 +251,6 @@ libeventThreadFunc( void * veh ) /* shut down the thread */ tr_lockFree( eh->lock ); - evdns_base_free( evdns_base, FALSE ); event_base_free( base ); eh->session->events = NULL; tr_free( eh ); diff --git a/libtransmission/web.c b/libtransmission/web.c index d40768bb1..6262f2915 100644 --- a/libtransmission/web.c +++ b/libtransmission/web.c @@ -330,7 +330,7 @@ tr_webThreadFunc( void * vsession ) if( web->close_mode == TR_WEB_CLOSE_NOW ) break; - if( ( web->close_mode == TR_WEB_CLOSE_WHEN_IDLE ) && !taskCount ) + if( ( web->close_mode == TR_WEB_CLOSE_WHEN_IDLE ) && ( web->tasks == NULL ) ) break; /* add tasks from the queue */ @@ -349,6 +349,8 @@ tr_webThreadFunc( void * vsession ) curl_multi_timeout( multi, &msec ); if( msec < 0 ) msec = THREADFUNC_MAX_SLEEP_MSEC; + if( session->isClosed ) + msec = 100; /* on shutdown, call perform() more frequently */ if( msec > 0 ) { int usec; @@ -368,7 +370,6 @@ tr_webThreadFunc( void * vsession ) usec = msec * 1000; t.tv_sec = usec / 1000000; t.tv_usec = usec % 1000000; - tr_select( max_fd+1, &r_fd_set, &w_fd_set, &c_fd_set, &t ); } @@ -399,6 +400,15 @@ tr_webThreadFunc( void * vsession ) --taskCount; } } + +#if 0 +{ +tr_list * l; +for( l=web->tasks; l!=NULL; l=l->next ) + fprintf( stderr, "still pending: %s\n", ((struct tr_web_task*)l->data)->url ); +} +fprintf( stderr, "loop is ending... web is closing\n" ); +#endif } /* Discard any remaining tasks.