better encapsulation of platform-specific constructs: tr_thread_t, tr_cond_t, tr_lock_t

This commit is contained in:
Charles Kerr
2007-07-30 15:27:52 +00:00
parent 7ba03d40b7
commit ebb141b232
13 changed files with 501 additions and 466 deletions
+7 -7
View File
@@ -35,7 +35,7 @@
struct tr_choking_s
{
tr_lock_t lock;
tr_lock_t * lock;
tr_handle_t * h;
int slots;
};
@@ -47,14 +47,14 @@ tr_choking_t * tr_chokingInit( tr_handle_t * h )
c = tr_new0( tr_choking_t, 1 );
c->h = h;
c->slots = 4242;
tr_lockInit( &c->lock );
c->lock = tr_lockNew( );
return c;
}
void tr_chokingSetLimit( tr_choking_t * c, int limit )
{
tr_lockLock( &c->lock );
tr_lockLock( c->lock );
if( limit < 0 )
c->slots = 4242;
else
@@ -65,7 +65,7 @@ void tr_chokingSetLimit( tr_choking_t * c, int limit )
50 KB/s -> 10 * 5.00 KB/s
100 KB/s -> 14 * 7.14 KB/s */
c->slots = lrintf( sqrt( 2 * limit ) );
tr_lockUnlock( &c->lock );
tr_lockUnlock( c->lock );
}
#define sortPeersAscending(a,ac,z,zc,n,nc) sortPeers(a,ac,z,zc,n,nc,0)
@@ -137,7 +137,7 @@ void tr_chokingPulse( tr_choking_t * c )
tr_torrent_t * tor;
uint64_t now = tr_date();
tr_lockLock( &c->lock );
tr_lockLock( c->lock );
/* Lock all torrents and get the total number of peers */
peersTotalCount = 0;
@@ -310,11 +310,11 @@ void tr_chokingPulse( tr_choking_t * c )
for( tor = c->h->torrentList; tor; tor = tor->next )
tr_torrentWriterUnlock( tor );
tr_lockUnlock( &c->lock );
tr_lockUnlock( c->lock );
}
void tr_chokingClose( tr_choking_t * c )
{
tr_lockClose( &c->lock );
tr_lockFree( c->lock );
tr_free( c );
}
+26 -26
View File
@@ -63,8 +63,8 @@ tr_openFile_t;
typedef struct tr_fd_s
{
tr_lock_t lock;
tr_cond_t cond;
tr_lock_t * lock;
tr_cond_t * cond;
int reserved;
@@ -100,8 +100,8 @@ void tr_fdInit()
gFd = calloc( 1, sizeof( tr_fd_t ) );
/* Init lock and cond */
tr_lockInit( &gFd->lock );
tr_condInit( &gFd->cond );
gFd->lock = tr_lockNew( );
gFd->cond = tr_condNew( );
/* Detect the maximum number of open files or sockets */
for( i = 0; i < 4096; i++ )
@@ -143,7 +143,7 @@ int tr_fdFileOpen( const char * folder, const char * name, int write )
int i, winner, ret;
uint64_t date;
tr_lockLock( &gFd->lock );
tr_lockLock( gFd->lock );
/* Is it already open? */
for( i = 0; i < TR_MAX_OPEN_FILES; i++ )
@@ -158,7 +158,7 @@ int tr_fdFileOpen( const char * folder, const char * name, int write )
{
/* File is being closed by another thread, wait until
* it's done before we reopen it */
tr_condWait( &gFd->cond, &gFd->lock );
tr_condWait( gFd->cond, gFd->lock );
i = -1;
continue;
}
@@ -209,13 +209,13 @@ int tr_fdFileOpen( const char * folder, const char * name, int write )
}
/* All used! Wait a bit and try again */
tr_condWait( &gFd->cond, &gFd->lock );
tr_condWait( gFd->cond, gFd->lock );
}
open:
if( ( ret = OpenFile( winner, folder, name, write ) ) )
{
tr_lockUnlock( &gFd->lock );
tr_lockUnlock( gFd->lock );
return ret;
}
snprintf( gFd->open[winner].folder, MAX_PATH_LENGTH, "%s", folder );
@@ -225,7 +225,7 @@ open:
done:
gFd->open[winner].status = STATUS_USED;
gFd->open[winner].date = tr_date();
tr_lockUnlock( &gFd->lock );
tr_lockUnlock( gFd->lock );
return gFd->open[winner].file;
}
@@ -236,7 +236,7 @@ done:
void tr_fdFileRelease( int file )
{
int i;
tr_lockLock( &gFd->lock );
tr_lockLock( gFd->lock );
for( i = 0; i < TR_MAX_OPEN_FILES; i++ )
{
@@ -247,8 +247,8 @@ void tr_fdFileRelease( int file )
}
}
tr_condSignal( &gFd->cond );
tr_lockUnlock( &gFd->lock );
tr_condSignal( gFd->cond );
tr_lockUnlock( gFd->lock );
}
/***********************************************************************
@@ -258,7 +258,7 @@ void tr_fdFileClose( const char * folder, const char * name )
{
int i;
tr_lockLock( &gFd->lock );
tr_lockLock( gFd->lock );
for( i = 0; i < TR_MAX_OPEN_FILES; i++ )
{
@@ -273,7 +273,7 @@ void tr_fdFileClose( const char * folder, const char * name )
}
}
tr_lockUnlock( &gFd->lock );
tr_lockUnlock( gFd->lock );
}
@@ -333,7 +333,7 @@ int tr_fdSocketCreate( int type, int priority )
{
int s = -1;
tr_lockLock( &gFd->lock );
tr_lockLock( gFd->lock );
if( priority && gFd->reserved >= TR_RESERVED_FDS )
priority = FALSE;
@@ -350,7 +350,7 @@ int tr_fdSocketCreate( int type, int priority )
else
gFd->normal++;
}
tr_lockUnlock( &gFd->lock );
tr_lockUnlock( gFd->lock );
return s;
}
@@ -361,7 +361,7 @@ int tr_fdSocketAccept( int b, struct in_addr * addr, in_port_t * port )
unsigned len;
struct sockaddr_in sock;
tr_lockLock( &gFd->lock );
tr_lockLock( gFd->lock );
if( gFd->normal < gFd->normalMax )
{
len = sizeof( sock );
@@ -380,7 +380,7 @@ int tr_fdSocketAccept( int b, struct in_addr * addr, in_port_t * port )
}
gFd->normal++;
}
tr_lockUnlock( &gFd->lock );
tr_lockUnlock( gFd->lock );
return s;
}
@@ -390,7 +390,7 @@ int tr_fdSocketAccept( int b, struct in_addr * addr, in_port_t * port )
**********************************************************************/
void tr_fdSocketClose( int s )
{
tr_lockLock( &gFd->lock );
tr_lockLock( gFd->lock );
#ifdef BEOS_NETSERVER
closesocket( s );
#else
@@ -400,7 +400,7 @@ void tr_fdSocketClose( int s )
gFd->reserved--;
else
gFd->normal--;
tr_lockUnlock( &gFd->lock );
tr_lockUnlock( gFd->lock );
}
/***********************************************************************
@@ -408,8 +408,8 @@ void tr_fdSocketClose( int s )
**********************************************************************/
void tr_fdClose()
{
tr_lockClose( &gFd->lock );
tr_condClose( &gFd->cond );
tr_lockFree( gFd->lock );
tr_condFree( gFd->cond );
free( gFd );
}
@@ -504,7 +504,7 @@ static void CloseFile( int i )
* it is done */
while( file->status & STATUS_CLOSING )
{
tr_condWait( &gFd->cond, &gFd->lock );
tr_condWait( gFd->cond, gFd->lock );
}
if( file->status & STATUS_INVALID )
{
@@ -518,10 +518,10 @@ static void CloseFile( int i )
}
tr_dbg( "Closing %s in %s (%d)", file->name, file->folder, file->write );
file->status = STATUS_CLOSING;
tr_lockUnlock( &gFd->lock );
tr_lockUnlock( gFd->lock );
close( file->file );
tr_lockLock( &gFd->lock );
tr_lockLock( gFd->lock );
file->status = STATUS_INVALID;
tr_condSignal( &gFd->cond );
tr_condSignal( gFd->cond );
}
+2 -2
View File
@@ -193,8 +193,8 @@ struct tr_torrent_s
tr_bitfield_t * uncheckedPieces;
run_status_t runStatus;
cp_status_t cpStatus;
tr_thread_t thread;
tr_rwlock_t lock;
tr_thread_t * thread;
tr_rwlock_t * lock;
tr_tracker_t * tracker;
tr_io_t * io;
-1
View File
@@ -23,7 +23,6 @@ void tr_list_free ( tr_list_t* );
tr_list_t* tr_list_append ( tr_list_t*, void * data );
tr_list_t* tr_list_prepend ( tr_list_t*, void * data );
tr_list_t* tr_list_remove_data ( tr_list_t*, const void * data );
tr_list_t* tr_list_pop ( tr_list_t*, void ** setme );
typedef int (*TrListCompareFunc)(const void * a, const void * b);
tr_list_t* tr_list_find ( tr_list_t*, TrListCompareFunc func, const void * b );
+6 -13
View File
@@ -407,20 +407,15 @@ static void tr_realMakeMetaInfo ( tr_metainfo_builder_t * builder )
static tr_metainfo_builder_t * queue = NULL;
static int workerIsRunning = 0;
static tr_thread_t workerThread;
static tr_thread_t * workerThread = NULL;
static tr_lock_t* getQueueLock( tr_handle_t * h )
{
static tr_lock_t * lock = NULL;
tr_sharedLock( h->shared );
if( lock == NULL )
{
lock = tr_new0( tr_lock_t, 1 );
tr_lockInit( lock );
}
if( !lock )
lock = tr_lockNew( );
tr_sharedUnlock( h->shared );
return lock;
@@ -450,7 +445,7 @@ static void workerFunc( void * user_data )
tr_realMakeMetaInfo ( builder );
}
workerIsRunning = 0;
workerThread = NULL;
}
void
@@ -480,10 +475,8 @@ tr_makeMetaInfo( tr_metainfo_builder_t * builder,
tr_lockLock( lock );
builder->nextBuilder = queue;
queue = builder;
if( !workerIsRunning ) {
workerIsRunning = 1;
tr_threadCreate( &workerThread, workerFunc, builder->handle, "makeMeta" );
}
if( !workerThread )
workerThread = tr_threadNew( workerFunc, builder->handle, "makeMeta" );
tr_lockUnlock( lock );
}
+23 -24
View File
@@ -56,10 +56,10 @@ int tr_netResolve( const char * address, struct in_addr * addr )
return ( addr->s_addr == 0xFFFFFFFF );
}
static tr_thread_t resolveThread;
static tr_lock_t resolveLock;
static tr_cond_t resolveCond;
static volatile int resolveDie;
static tr_thread_t * resolveThread;
static tr_lock_t * resolveLock;
static tr_cond_t * resolveCond;
static volatile int resolveDie;
static tr_resolve_t * resolveQueue;
static void resolveRelease ( tr_resolve_t * );
@@ -85,9 +85,9 @@ void tr_netResolveThreadInit()
{
resolveDie = 0;
resolveQueue = NULL;
tr_lockInit( &resolveLock );
tr_condInit( &resolveCond );
tr_threadCreate( &resolveThread, resolveFunc, NULL, "resolve" );
resolveLock = tr_lockNew( );
resolveCond = tr_condNew( );
resolveThread = tr_threadNew( resolveFunc, NULL, "resolve" );
}
/***********************************************************************
@@ -99,10 +99,10 @@ void tr_netResolveThreadInit()
**********************************************************************/
void tr_netResolveThreadClose()
{
tr_lockLock( &resolveLock );
tr_lockLock( resolveLock );
resolveDie = 1;
tr_lockUnlock( &resolveLock );
tr_condSignal( &resolveCond );
tr_lockUnlock( resolveLock );
tr_condSignal( resolveCond );
tr_wait( 200 );
}
@@ -113,15 +113,13 @@ void tr_netResolveThreadClose()
**********************************************************************/
tr_resolve_t * tr_netResolveInit( const char * address )
{
tr_resolve_t * r;
r = malloc( sizeof( tr_resolve_t ) );
tr_resolve_t * r = tr_new0( tr_resolve_t, 1 );
r->status = TR_NET_WAIT;
r->address = strdup( address );
r->refcount = 2;
r->next = NULL;
tr_lockLock( &resolveLock );
tr_lockLock( resolveLock );
if( !resolveQueue )
{
resolveQueue = r;
@@ -132,8 +130,8 @@ tr_resolve_t * tr_netResolveInit( const char * address )
for( iter = resolveQueue; iter->next; iter = iter->next );
iter->next = r;
}
tr_lockUnlock( &resolveLock );
tr_condSignal( &resolveCond );
tr_lockUnlock( resolveLock );
tr_condSignal( resolveCond );
return r;
}
@@ -147,13 +145,13 @@ tr_tristate_t tr_netResolvePulse( tr_resolve_t * r, struct in_addr * addr )
{
tr_tristate_t ret;
tr_lockLock( &resolveLock );
tr_lockLock( resolveLock );
ret = r->status;
if( ret == TR_NET_OK )
{
*addr = r->addr;
}
tr_lockUnlock( &resolveLock );
tr_lockUnlock( resolveLock );
return ret;
}
@@ -196,20 +194,20 @@ static void resolveFunc( void * arg UNUSED )
tr_resolve_t * r;
struct hostent * host;
tr_lockLock( &resolveLock );
tr_lockLock( resolveLock );
while( !resolveDie )
{
if( !( r = resolveQueue ) )
{
tr_condWait( &resolveCond, &resolveLock );
tr_condWait( resolveCond, resolveLock );
continue;
}
/* Blocking resolution */
tr_lockUnlock( &resolveLock );
tr_lockUnlock( resolveLock );
host = gethostbyname( r->address );
tr_lockLock( &resolveLock );
tr_lockLock( resolveLock );
if( host )
{
@@ -226,8 +224,9 @@ static void resolveFunc( void * arg UNUSED )
}
/* Clean up */
tr_lockUnlock( &resolveLock );
tr_lockClose( &resolveLock );
tr_lockUnlock( resolveLock );
tr_lockFree( resolveLock );
resolveLock = NULL;
while( ( r = resolveQueue ) )
{
resolveQueue = r->next;
+1 -1
View File
@@ -34,7 +34,7 @@ int tr_netResolve( const char *, struct in_addr * );
typedef struct tr_resolve_s tr_resolve_t;
void tr_netResolveThreadInit();
void tr_netResolveThreadClose();
tr_resolve_t * tr_netResolveInit( const char * );
tr_resolve_t * tr_netResolveInit( const char * address );
tr_tristate_t tr_netResolvePulse( tr_resolve_t *, struct in_addr * );
void tr_netResolveClose( tr_resolve_t * );
+373 -280
View File
@@ -30,13 +30,382 @@
#ifdef SYS_BEOS
#include <fs_info.h>
#include <FindDirectory.h>
#include <kernel/OS.h>
#define BEOS_MAX_THREADS 256
#else
#include <pthread.h>
#endif
#include <sys/types.h>
#include <dirent.h>
#include <fcntl.h>
#include <unistd.h> /* getuid getpid close */
#include "transmission.h"
#include "utils.h"
/***
**** THREADS
***/
struct tr_thread_s
{
void (* func ) ( void * );
void * arg;
char * name;
#ifdef SYS_BEOS
thread_id thread;
#else
pthread_t thread;
#endif
};
static void
ThreadFunc( void * _t )
{
tr_thread_t * t = _t;
char* name = tr_strdup( t->name );
#ifdef SYS_BEOS
/* This is required because on BeOS, SIGINT is sent to each thread,
which kills them not nicely */
signal( SIGINT, SIG_IGN );
#endif
tr_dbg( "Thread '%s' started", name );
t->func( t->arg );
tr_dbg( "Thread '%s' exited", name );
tr_free( name );
}
tr_thread_t *
tr_threadNew( void (*func)(void *),
void * arg,
const char * name )
{
tr_thread_t * t = tr_new0( tr_thread_t, 1 );
t->func = func;
t->arg = arg;
t->name = tr_strdup( name );
#ifdef SYS_BEOS
t->thread = spawn_thread( (void*)ThreadFunc, name, B_NORMAL_PRIORITY, t );
resume_thread( t->thread );
#else
pthread_create( &t->thread, NULL, (void * (*) (void *)) ThreadFunc, t );
#endif
return t;
}
void
tr_threadJoin( tr_thread_t * t )
{
if( t != NULL )
{
#ifdef SYS_BEOS
long exit;
wait_for_thread( t->thread, &exit );
#else
pthread_join( t->thread, NULL );
#endif
tr_dbg( "Thread '%s' joined", t->name );
tr_free( t->name );
t->name = NULL;
t->func = NULL;
tr_free( t );
}
}
/***
**** LOCKS
***/
struct tr_lock_s
{
#ifdef SYS_BEOS
sem_id lock;
#else
pthread_mutex_t lock;
#endif
};
tr_lock_t*
tr_lockNew( void )
{
tr_lock_t * l = tr_new0( tr_lock_t, 1 );
#ifdef SYS_BEOS
l->lock = create_sem( 1, "" );
#else
pthread_mutex_init( &l->lock, NULL );
#endif
return l;
}
void
tr_lockFree( tr_lock_t * l )
{
#ifdef SYS_BEOS
delete_sem( l->lock );
#else
pthread_mutex_destroy( &l->lock );
#endif
tr_free( l );
}
int
tr_lockTryLock( tr_lock_t * l )
{
#ifdef SYS_BEOS
return acquire_sem_etc( l->lock, 1, B_RELATIVE_TIMEOUT, 0 );
#else
/* success on zero! */
return pthread_mutex_trylock( &l->lock );
#endif
}
void
tr_lockLock( tr_lock_t * l )
{
#ifdef SYS_BEOS
acquire_sem( l->lock );
#else
pthread_mutex_lock( &l->lock );
#endif
}
void
tr_lockUnlock( tr_lock_t * l )
{
#ifdef SYS_BEOS
release_sem( l->lock );
#else
pthread_mutex_unlock( &l->lock );
#endif
}
/***
**** RW LOCK
***/
struct tr_rwlock_s
{
tr_lock_t * lock;
tr_cond_t * readCond;
tr_cond_t * writeCond;
size_t readCount;
size_t wantToRead;
size_t wantToWrite;
int haveWriter;
};
static void
tr_rwSignal( tr_rwlock_t * rw )
{
if ( rw->wantToWrite )
tr_condSignal( rw->writeCond );
else if ( rw->wantToRead )
tr_condBroadcast( rw->readCond );
}
tr_rwlock_t*
tr_rwNew ( void )
{
tr_rwlock_t * rw = tr_new0( tr_rwlock_t, 1 );
rw->lock = tr_lockNew( );
rw->readCond = tr_condNew( );
rw->writeCond = tr_condNew( );
return rw;
}
void
tr_rwReaderLock( tr_rwlock_t * rw )
{
tr_lockLock( rw->lock );
rw->wantToRead++;
while( rw->haveWriter || rw->wantToWrite )
tr_condWait( rw->readCond, rw->lock );
rw->wantToRead--;
rw->readCount++;
tr_lockUnlock( rw->lock );
}
int
tr_rwReaderTrylock( tr_rwlock_t * rw )
{
int ret = FALSE;
tr_lockLock( rw->lock );
if ( !rw->haveWriter && !rw->wantToWrite ) {
rw->readCount++;
ret = TRUE;
}
tr_lockUnlock( rw->lock );
return ret;
}
void
tr_rwReaderUnlock( tr_rwlock_t * rw )
{
tr_lockLock( rw->lock );
--rw->readCount;
if( !rw->readCount )
tr_rwSignal( rw );
tr_lockUnlock( rw->lock );
}
void
tr_rwWriterLock( tr_rwlock_t * rw )
{
tr_lockLock( rw->lock );
rw->wantToWrite++;
while( rw->haveWriter || rw->readCount )
tr_condWait( rw->writeCond, rw->lock );
rw->wantToWrite--;
rw->haveWriter = TRUE;
tr_lockUnlock( rw->lock );
}
int
tr_rwWriterTrylock( tr_rwlock_t * rw )
{
int ret = FALSE;
tr_lockLock( rw->lock );
if( !rw->haveWriter && !rw->readCount )
ret = rw->haveWriter = TRUE;
tr_lockUnlock( rw->lock );
return ret;
}
void
tr_rwWriterUnlock( tr_rwlock_t * rw )
{
tr_lockLock( rw->lock );
rw->haveWriter = FALSE;
tr_rwSignal( rw );
tr_lockUnlock( rw->lock );
}
void
tr_rwFree( tr_rwlock_t * rw )
{
tr_condFree( rw->writeCond );
tr_condFree( rw->readCond );
tr_lockFree( rw->lock );
tr_free( rw );
}
/***
**** COND
***/
struct tr_cond_s
{
#ifdef SYS_BEOS
sem_id sem;
thread_id theads[BEOS_MAX_THREADS];
int start, end;
#else
pthread_cond_t cond;
#endif
};
tr_cond_t*
tr_condNew( void )
{
tr_cond_t * c = tr_new0( tr_cond_t, 1 );
#ifdef SYS_BEOS
c->sem = create_sem( 1, "" );
c->start = 0;
c->end = 0;
#else
pthread_cond_init( &c->cond, NULL );
#endif
return c;
}
void tr_condWait( tr_cond_t * c, tr_lock_t * l )
{
#ifdef SYS_BEOS
/* Keep track of that thread */
acquire_sem( c->sem );
c->threads[c->end] = find_thread( NULL );
c->end = ( c->end + 1 ) % BEOS_MAX_THREADS;
assert( c->end != c->start ); /* We hit BEOS_MAX_THREADS, arggh */
release_sem( c->sem );
release_sem( *l );
suspend_thread( find_thread( NULL ) ); /* Wait for signal */
acquire_sem( *l );
#else
pthread_cond_wait( &c->cond, &l->lock );
#endif
}
#ifdef SYS_BEOS
static int condTrySignal( tr_cond_t * c )
{
if( c->start == c->end )
return 1;
for( ;; )
{
thread_info info;
get_thread_info( c->threads[c->start], &info );
if( info.state == B_THREAD_SUSPENDED )
{
resume_thread( c->threads[c->start] );
c->start = ( c->start + 1 ) % BEOS_MAX_THREADS;
break;
}
/* The thread is not suspended yet, which can happen since
* tr_condWait does not atomically suspends after releasing
* the semaphore. Wait a bit and try again. */
snooze( 5000 );
}
return 0;
}
#endif
void tr_condSignal( tr_cond_t * c )
{
#ifdef SYS_BEOS
acquire_sem( c->sem );
condTrySignal( c );
release_sem( c->sem );
#else
pthread_cond_signal( &c->cond );
#endif
}
void tr_condBroadcast( tr_cond_t * c )
{
#ifdef SYS_BEOS
acquire_sem( c->sem );
while( !condTrySignal( c ) );
release_sem( c->sem );
#else
pthread_cond_broadcast( &c->cond );
#endif
}
void
tr_condFree( tr_cond_t * c )
{
#ifdef SYS_BEOS
delete_sem( c->sem );
#else
pthread_cond_destroy( &c->cond );
#endif
tr_free( c );
}
/***
**** PATHS
***/
#if !defined( SYS_BEOS ) && !defined( __AMIGAOS4__ )
@@ -204,190 +573,11 @@ tr_getTorrentsDirectory( void )
return buf;
}
static void ThreadFunc( void * _t )
{
tr_thread_t * t = _t;
char* name = tr_strdup( t->name );
/***
**** SOCKETS
***/
#ifdef SYS_BEOS
/* This is required because on BeOS, SIGINT is sent to each thread,
which kills them not nicely */
signal( SIGINT, SIG_IGN );
#endif
tr_dbg( "Thread '%s' started", name );
t->func( t->arg );
tr_dbg( "Thread '%s' exited", name );
tr_free( name );
}
void tr_threadCreate( tr_thread_t * t,
void (*func)(void *), void * arg,
const char * name )
{
t->func = func;
t->arg = arg;
t->name = tr_strdup( name );
#ifdef SYS_BEOS
t->thread = spawn_thread( (void *) ThreadFunc, name,
B_NORMAL_PRIORITY, t );
resume_thread( t->thread );
#else
pthread_create( &t->thread, NULL, (void * (*) (void *)) ThreadFunc, t );
#endif
}
const tr_thread_t THREAD_EMPTY = { NULL, NULL, NULL, 0 };
void tr_threadJoin( tr_thread_t * t )
{
if( t->func != NULL )
{
#ifdef SYS_BEOS
long exit;
wait_for_thread( t->thread, &exit );
#else
pthread_join( t->thread, NULL );
#endif
tr_dbg( "Thread '%s' joined", t->name );
tr_free( t->name );
t->name = NULL;
t->func = NULL;
}
}
void tr_lockInit( tr_lock_t * l )
{
#ifdef SYS_BEOS
*l = create_sem( 1, "" );
#else
pthread_mutex_init( l, NULL );
#endif
}
void tr_lockClose( tr_lock_t * l )
{
#ifdef SYS_BEOS
delete_sem( *l );
#else
pthread_mutex_destroy( l );
#endif
}
int tr_lockTryLock( tr_lock_t * l )
{
#ifdef SYS_BEOS
return acquire_sem_etc( *l, 1, B_RELATIVE_TIMEOUT, 0 );
#else
/* success on zero! */
return pthread_mutex_trylock( l );
#endif
}
void tr_lockLock( tr_lock_t * l )
{
#ifdef SYS_BEOS
acquire_sem( *l );
#else
pthread_mutex_lock( l );
#endif
}
void tr_lockUnlock( tr_lock_t * l )
{
#ifdef SYS_BEOS
release_sem( *l );
#else
pthread_mutex_unlock( l );
#endif
}
void tr_condInit( tr_cond_t * c )
{
#ifdef SYS_BEOS
c->sem = create_sem( 1, "" );
c->start = 0;
c->end = 0;
#else
pthread_cond_init( c, NULL );
#endif
}
void tr_condWait( tr_cond_t * c, tr_lock_t * l )
{
#ifdef SYS_BEOS
/* Keep track of that thread */
acquire_sem( c->sem );
c->threads[c->end] = find_thread( NULL );
c->end = ( c->end + 1 ) % BEOS_MAX_THREADS;
assert( c->end != c->start ); /* We hit BEOS_MAX_THREADS, arggh */
release_sem( c->sem );
release_sem( *l );
suspend_thread( find_thread( NULL ) ); /* Wait for signal */
acquire_sem( *l );
#else
pthread_cond_wait( c, l );
#endif
}
#ifdef SYS_BEOS
static int condTrySignal( tr_cond_t * c )
{
if( c->start == c->end )
return 1;
for( ;; )
{
thread_info info;
get_thread_info( c->threads[c->start], &info );
if( info.state == B_THREAD_SUSPENDED )
{
resume_thread( c->threads[c->start] );
c->start = ( c->start + 1 ) % BEOS_MAX_THREADS;
break;
}
/* The thread is not suspended yet, which can happen since
* tr_condWait does not atomically suspends after releasing
* the semaphore. Wait a bit and try again. */
snooze( 5000 );
}
return 0;
}
#endif
void tr_condSignal( tr_cond_t * c )
{
#ifdef SYS_BEOS
acquire_sem( c->sem );
condTrySignal( c );
release_sem( c->sem );
#else
pthread_cond_signal( c );
#endif
}
void tr_condBroadcast( tr_cond_t * c )
{
#ifdef SYS_BEOS
acquire_sem( c->sem );
while( !condTrySignal( c ) );
release_sem( c->sem );
#else
pthread_cond_broadcast( c );
#endif
}
void tr_condClose( tr_cond_t * c )
{
#ifdef SYS_BEOS
delete_sem( c->sem );
#else
pthread_cond_destroy( c );
#endif
}
#if defined( BSD )
#ifdef BSD
#include <sys/types.h>
#include <sys/socket.h>
@@ -759,100 +949,3 @@ tr_getDefaultRoute( struct in_addr * addr UNUSED )
}
#endif
/***
****
***/
static void
tr_rwSignal( tr_rwlock_t * rw )
{
if ( rw->wantToWrite )
tr_condSignal( &rw->writeCond );
else if ( rw->wantToRead )
tr_condBroadcast( &rw->readCond );
}
void
tr_rwInit ( tr_rwlock_t * rw )
{
memset( rw, 0, sizeof(tr_rwlock_t) );
tr_lockInit( &rw->lock );
tr_condInit( &rw->readCond );
tr_condInit( &rw->writeCond );
}
void
tr_rwReaderLock( tr_rwlock_t * rw )
{
tr_lockLock( &rw->lock );
rw->wantToRead++;
while( rw->haveWriter || rw->wantToWrite )
tr_condWait( &rw->readCond, &rw->lock );
rw->wantToRead--;
rw->readCount++;
tr_lockUnlock( &rw->lock );
}
int
tr_rwReaderTrylock( tr_rwlock_t * rw )
{
int ret = FALSE;
tr_lockLock( &rw->lock );
if ( !rw->haveWriter && !rw->wantToWrite ) {
rw->readCount++;
ret = TRUE;
}
tr_lockUnlock( &rw->lock );
return ret;
}
void
tr_rwReaderUnlock( tr_rwlock_t * rw )
{
tr_lockLock( &rw->lock );
--rw->readCount;
if( !rw->readCount )
tr_rwSignal( rw );
tr_lockUnlock( &rw->lock );
}
void
tr_rwWriterLock( tr_rwlock_t * rw )
{
tr_lockLock( &rw->lock );
rw->wantToWrite++;
while( rw->haveWriter || rw->readCount )
tr_condWait( &rw->writeCond, &rw->lock );
rw->wantToWrite--;
rw->haveWriter = TRUE;
tr_lockUnlock( &rw->lock );
}
int
tr_rwWriterTrylock( tr_rwlock_t * rw )
{
int ret = FALSE;
tr_lockLock( &rw->lock );
if( !rw->haveWriter && !rw->readCount )
ret = rw->haveWriter = TRUE;
tr_lockUnlock( &rw->lock );
return ret;
}
void
tr_rwWriterUnlock( tr_rwlock_t * rw )
{
tr_lockLock( &rw->lock );
rw->haveWriter = FALSE;
tr_rwSignal( rw );
tr_lockUnlock( &rw->lock );
}
void
tr_rwClose( tr_rwlock_t * rw )
{
tr_condClose( &rw->writeCond );
tr_condClose( &rw->readCond );
tr_lockClose( &rw->lock );
}
+25 -64
View File
@@ -22,85 +22,46 @@
* DEALINGS IN THE SOFTWARE.
*****************************************************************************/
#ifndef TR_PLATFORM_H
#define TR_PLATFORM_H 1
#define TR_PLATFORM_H
#ifdef SYS_BEOS
#include <kernel/OS.h>
#define BEOS_MAX_THREADS 256
typedef thread_id tr_thread_id_t;
typedef sem_id tr_lock_t;
typedef struct
{
sem_id sem;
thread_id threads[BEOS_MAX_THREADS];
int start;
int end;
} tr_cond_t;
#else
#include <pthread.h>
typedef pthread_t tr_thread_id_t;
typedef pthread_mutex_t tr_lock_t;
typedef pthread_cond_t tr_cond_t;
#endif
typedef struct tr_thread_s
{
void (* func ) ( void * );
void * arg;
char * name;
tr_thread_id_t thread;
}
tr_thread_t;
typedef struct tr_lock_s tr_lock_t;
typedef struct tr_cond_s tr_cond_t;
typedef struct tr_thread_s tr_thread_t;
const char * tr_getHomeDirectory( void );
const char * tr_getCacheDirectory( void );
const char * tr_getTorrentsDirectory( void );
/**
* When instantiating a thread with a deferred call to tr_threadCreate(),
* initializing it to THREAD_EMPTY makes calls tr_threadJoin() safe.
*/
const tr_thread_t THREAD_EMPTY;
tr_thread_t* tr_threadNew ( void (*func)(void *), void * arg, const char * name );
void tr_threadJoin ( tr_thread_t * );
void tr_threadCreate ( tr_thread_t *, void (*func)(void *),
void * arg, const char * name );
void tr_threadJoin ( tr_thread_t * );
void tr_lockInit ( tr_lock_t * );
void tr_lockClose ( tr_lock_t * );
int tr_lockTryLock ( tr_lock_t * );
void tr_lockLock ( tr_lock_t * );
void tr_lockUnlock ( tr_lock_t * );
tr_lock_t * tr_lockNew ( void );
void tr_lockFree ( tr_lock_t * );
int tr_lockTryLock ( tr_lock_t * );
void tr_lockLock ( tr_lock_t * );
void tr_lockUnlock ( tr_lock_t * );
void tr_condInit ( tr_cond_t * );
void tr_condSignal ( tr_cond_t * );
void tr_condBroadcast ( tr_cond_t * );
void tr_condClose ( tr_cond_t * );
void tr_condWait ( tr_cond_t *, tr_lock_t * );
tr_cond_t * tr_condNew ( void );
void tr_condFree ( tr_cond_t * );
void tr_condSignal ( tr_cond_t * );
void tr_condBroadcast ( tr_cond_t * );
void tr_condWait ( tr_cond_t *, tr_lock_t * );
/***
**** RW lock:
**** The lock can be had by one writer or any number of readers.
***/
typedef struct tr_rwlock_s
{
tr_lock_t lock;
tr_cond_t readCond;
tr_cond_t writeCond;
size_t readCount;
size_t wantToRead;
size_t wantToWrite;
int haveWriter;
}
tr_rwlock_t;
typedef struct tr_rwlock_s tr_rwlock_t;
void tr_rwInit ( tr_rwlock_t * );
void tr_rwClose ( tr_rwlock_t * );
void tr_rwReaderLock ( tr_rwlock_t * );
int tr_rwReaderTrylock ( tr_rwlock_t * );
void tr_rwReaderUnlock ( tr_rwlock_t * );
void tr_rwWriterLock ( tr_rwlock_t * );
int tr_rwWriterTrylock ( tr_rwlock_t * );
void tr_rwWriterUnlock ( tr_rwlock_t * );
tr_rwlock_t* tr_rwNew ( void );
void tr_rwFree ( tr_rwlock_t * );
void tr_rwReaderLock ( tr_rwlock_t * );
int tr_rwReaderTrylock ( tr_rwlock_t * );
void tr_rwReaderUnlock ( tr_rwlock_t * );
void tr_rwWriterLock ( tr_rwlock_t * );
int tr_rwWriterTrylock ( tr_rwlock_t * );
void tr_rwWriterUnlock ( tr_rwlock_t * );
struct in_addr; /* forward declaration to calm gcc down */
+13 -13
View File
@@ -41,7 +41,7 @@ tr_transfer_t;
struct tr_ratecontrol_s
{
tr_rwlock_t lock;
tr_rwlock_t * lock;
int limit;
int newest;
tr_transfer_t transfers[HISTORY_SIZE];
@@ -77,7 +77,7 @@ tr_rcInit( void )
{
tr_ratecontrol_t * r = tr_new0( tr_ratecontrol_t, 1 );
r->limit = 0;
tr_rwInit( &r->lock );
r->lock = tr_rwNew( );
return r;
}
@@ -85,7 +85,7 @@ void
tr_rcClose( tr_ratecontrol_t * r )
{
tr_rcReset( r );
tr_rwClose( &r->lock );
tr_rwFree( r->lock );
tr_free( r );
}
@@ -97,11 +97,11 @@ int
tr_rcCanTransfer( const tr_ratecontrol_t * r )
{
int ret;
tr_rwReaderLock( (tr_rwlock_t*)&r->lock );
tr_rwReaderLock( (tr_rwlock_t*)r->lock );
ret = rateForInterval( r, SHORT_INTERVAL_MSEC ) < r->limit;
tr_rwReaderUnlock( (tr_rwlock_t*)&r->lock );
tr_rwReaderUnlock( (tr_rwlock_t*)r->lock );
return ret;
}
@@ -109,11 +109,11 @@ float
tr_rcRate( const tr_ratecontrol_t * r )
{
float ret;
tr_rwReaderLock( (tr_rwlock_t*)&r->lock );
tr_rwReaderLock( (tr_rwlock_t*)r->lock );
ret = rateForInterval( r, LONG_INTERVAL_MSEC );
tr_rwReaderUnlock( (tr_rwlock_t*)&r->lock );
tr_rwReaderUnlock( (tr_rwlock_t*)r->lock );
return ret;
}
@@ -129,7 +129,7 @@ tr_rcTransferred( tr_ratecontrol_t * r, int size )
if( size < 100 ) /* don't count small messages */
return;
tr_rwWriterLock( &r->lock );
tr_rwWriterLock( r->lock );
now = tr_date ();
if( r->transfers[r->newest].date + GRANULARITY_MSEC >= now )
@@ -140,24 +140,24 @@ tr_rcTransferred( tr_ratecontrol_t * r, int size )
r->transfers[r->newest].size = size;
}
tr_rwWriterUnlock( &r->lock );
tr_rwWriterUnlock( r->lock );
}
void
tr_rcReset( tr_ratecontrol_t * r )
{
tr_rwWriterLock( &r->lock );
tr_rwWriterLock( r->lock );
r->newest = 0;
memset( r->transfers, 0, sizeof(tr_transfer_t) * HISTORY_SIZE );
tr_rwWriterUnlock( &r->lock );
tr_rwWriterUnlock( r->lock );
}
void
tr_rcSetLimit( tr_ratecontrol_t * r, int limit )
{
tr_rwWriterLock( &r->lock );
tr_rwWriterLock( r->lock );
r->limit = limit;
tr_rwWriterUnlock( &r->lock );
tr_rwWriterUnlock( r->lock );
}
int
+11 -14
View File
@@ -46,9 +46,9 @@
struct tr_shared_s
{
tr_handle_t * h;
volatile int die;
tr_thread_t thread;
tr_lock_t lock;
volatile int die;
tr_thread_t * thread;
tr_lock_t * lock;
/* Incoming connections */
int publicPort;
@@ -84,19 +84,16 @@ tr_shared_t * tr_sharedInit( tr_handle_t * h )
{
tr_shared_t * s = calloc( 1, sizeof( tr_shared_t ) );
s->h = h;
tr_lockInit( &s->lock );
s->h = h;
s->lock = tr_lockNew( );
s->publicPort = -1;
s->bindPort = -1;
s->bindSocket = -1;
s->natpmp = tr_natpmpInit();
s->upnp = tr_upnpInit();
s->choking = tr_chokingInit( h );
/* Launch the thread */
s->die = 0;
tr_threadCreate( &s->thread, SharedLoop, s, "shared" );
s->die = 0;
s->thread = tr_threadNew( SharedLoop, s, "shared" );
return s;
}
@@ -112,7 +109,7 @@ void tr_sharedClose( tr_shared_t * s )
/* Stop the thread */
s->die = 1;
tr_threadJoin( &s->thread );
tr_threadJoin( s->thread );
/* Clean up */
for( ii = 0; ii < s->peerCount; ii++ )
@@ -123,7 +120,7 @@ void tr_sharedClose( tr_shared_t * s )
{
tr_netClose( s->bindSocket );
}
tr_lockClose( &s->lock );
tr_lockFree( s->lock );
tr_natpmpClose( s->natpmp );
tr_upnpClose( s->upnp );
tr_chokingClose( s->choking );
@@ -137,11 +134,11 @@ void tr_sharedClose( tr_shared_t * s )
**********************************************************************/
void tr_sharedLock( tr_shared_t * s )
{
tr_lockLock( &s->lock );
tr_lockLock( s->lock );
}
void tr_sharedUnlock( tr_shared_t * s )
{
tr_lockUnlock( &s->lock );
tr_lockUnlock( s->lock );
}
/***********************************************************************
+12 -16
View File
@@ -46,25 +46,25 @@
void
tr_torrentReaderLock( const tr_torrent_t * tor )
{
tr_rwReaderLock ( (tr_rwlock_t*)&tor->lock );
tr_rwReaderLock ( (tr_rwlock_t*)tor->lock );
}
void
tr_torrentReaderUnlock( const tr_torrent_t * tor )
{
tr_rwReaderUnlock ( (tr_rwlock_t*)&tor->lock );
tr_rwReaderUnlock ( (tr_rwlock_t*)tor->lock );
}
void
tr_torrentWriterLock( tr_torrent_t * tor )
{
tr_rwWriterLock ( &tor->lock );
tr_rwWriterLock ( tor->lock );
}
void
tr_torrentWriterUnlock( tr_torrent_t * tor )
{
tr_rwWriterUnlock ( &tor->lock );
tr_rwWriterUnlock ( tor->lock );
}
/***
@@ -259,8 +259,7 @@ torrentRealInit( tr_handle_t * h,
tr_torrentInitFilePieces( tor );
tor->thread = THREAD_EMPTY;
tr_rwInit( &tor->lock );
tor->lock = tr_rwNew( );
tor->upload = tr_rcInit();
tor->download = tr_rcInit();
@@ -305,7 +304,7 @@ torrentRealInit( tr_handle_t * h,
tr_sharedUnlock( h->shared );
snprintf( name, sizeof( name ), "torrent %p (%s)", tor, tor->info.name );
tr_threadCreate( &tor->thread, torrentThreadLoop, tor, name );
tor->thread = tr_threadNew( torrentThreadLoop, tor, name );
}
static int
@@ -1042,7 +1041,7 @@ tr_torrentFree( tr_torrent_t * tor )
tr_sharedLock( h->shared );
tr_rwClose( &tor->lock );
tr_rwFree( tor->lock );
tr_cpClose( tor->completion );
tr_rcClose( tor->upload );
@@ -1097,15 +1096,12 @@ recheckCpState( tr_torrent_t * tor )
static void
torrentThreadLoop ( void * _tor )
{
static tr_lock_t checkFilesLock;
static int checkFilesLockInited = FALSE;
static tr_lock_t * checkFilesLock = NULL;
tr_torrent_t * tor = _tor;
/* create the check-files mutex */
if( !checkFilesLockInited ) {
checkFilesLockInited = TRUE;
tr_lockInit( &checkFilesLock );
}
if( !checkFilesLock )
checkFilesLock = tr_lockNew( );
/* loop until the torrent is being deleted */
while( ! ( tor->dieFlag && (tor->runStatus == TR_RUN_STOPPED) ) )
@@ -1173,7 +1169,7 @@ torrentThreadLoop ( void * _tor )
/* do we need to check files? */
if( tor->uncheckedPieces )
{
if( !tr_lockTryLock( &checkFilesLock ) )
if( !tr_lockTryLock( checkFilesLock ) )
{
run_status_t realStatus;
@@ -1189,7 +1185,7 @@ torrentThreadLoop ( void * _tor )
tor->cpStatus = tr_cpGetStatus( tor->completion );
tr_torrentWriterUnlock( tor );
tr_lockUnlock( &checkFilesLock );
tr_lockUnlock( checkFilesLock );
}
continue;
}
+2 -5
View File
@@ -50,11 +50,8 @@ static tr_msg_list_t ** messageQueueTail = &messageQueue;
void tr_msgInit( void )
{
if( NULL == messageLock )
{
messageLock = calloc( 1, sizeof( *messageLock ) );
tr_lockInit( messageLock );
}
if( !messageLock )
messageLock = tr_lockNew( );
}
void tr_setMessageLevel( int level )