FileDocCategorySizeDatePackage
ExternalSeedReaderImpl.javaAPI DocAzureus 3.0.3.422826Fri Jun 08 12:08:42 BST 2007com.aelitis.azureus.plugins.extseed.impl

ExternalSeedReaderImpl

public abstract class ExternalSeedReaderImpl extends Object implements com.aelitis.azureus.plugins.extseed.ExternalSeedReader

Fields Summary
public static final int
RECONNECT_DEFAULT
public static final int
INITIAL_DELAY
public static final int
STALLED_DOWNLOAD_SPEED
public static final int
STALLED_PEER_SPEED
private com.aelitis.azureus.plugins.extseed.ExternalSeedPlugin
plugin
private org.gudy.azureus2.plugins.torrent.Torrent
torrent
private String
status
private boolean
active
private boolean
permanent_fail
private long
last_failed_read
private int
consec_failures
private String
user_agent
private long
peer_manager_change_time
private volatile org.gudy.azureus2.plugins.peers.PeerManager
current_manager
private List
requests
private Thread
request_thread
private org.gudy.azureus2.plugins.utils.Semaphore
request_sem
private org.gudy.azureus2.plugins.utils.Monitor
requests_mon
private ExternalSeedReaderRequest
active_read_request
private int[]
priority_offsets
private int
min_availability
private int
min_speed
private long
valid_until
private boolean
transient_seed
private int
reconnect_delay
private volatile ExternalSeedReaderRequest
current_request
private List
listeners
private org.gudy.azureus2.core3.util.AESemaphore
rate_sem
private int
rate_bytes_read
private int
rate_bytes_permitted
Constructors Summary
protected ExternalSeedReaderImpl(com.aelitis.azureus.plugins.extseed.ExternalSeedPlugin _plugin, org.gudy.azureus2.plugins.torrent.Torrent _torrent, Map _params)


	
	
		 		
							
								 
	
		plugin	= _plugin;
		torrent	= _torrent;
		
		min_availability 	= getIntParam( _params, "min_avail", 1 );	// default is avail based
		min_speed			= getIntParam( _params, "min_speed", 0 );
		valid_until			= getIntParam( _params, "valid_ms", 0 );
		
		if ( valid_until > 0 ){
			
			valid_until += getSystemTime();
		}
		
		transient_seed		= getBooleanParam( _params, "transient", false );

		requests_mon	= plugin.getPluginInterface().getUtilities().getMonitor();
		request_sem		= plugin.getPluginInterface().getUtilities().getSemaphore();
		
		PluginInterface	pi = plugin.getPluginInterface();
		
		user_agent = pi.getAzureusName();
		
		try{
			Properties	props = new Properties();
		
			pi.getClientIDManager().getGenerator().generateHTTPProperties( props );
			
			String ua = props.getProperty( ClientIDGenerator.PR_USER_AGENT );
			
			if ( ua != null ){
				
				user_agent	= ua;
			}
		}catch( Throwable e ){
		}
			
		setActive( false );
	
Methods Summary
public voidaddListener(com.aelitis.azureus.plugins.extseed.ExternalSeedReaderListener l)

		listeners.add( l );
	
public voidaddRequests(java.util.List new_requests)

		try{
			requests_mon.enter();
			
			if ( !active ){
				
				Debug.out( "request added when not active!!!!" );
			}
				
			for (int i=0;i<new_requests.size();i++){
			
				requests.add( new_requests.get(i));

				request_sem.release();
			}
						
			if ( request_thread == null ){
				
				plugin.getPluginInterface().getUtilities().createThread(
						"RequestProcessor",
						new Runnable()
						{
							public void
							run()
							{
								processRequests();
							}
						});
			}

		}finally{
			
			requests_mon.exit();
		}
	
public voidcalculatePriorityOffsets(org.gudy.azureus2.plugins.peers.PeerManager peer_manager, int[] base_priorities)

		try{
			Piece[]	pieces = peer_manager.getPieces();
			
			int	piece_group_size = getPieceGroupSize();
			
			int[]	contiguous_best_pieces = new int[piece_group_size];
			int[]	contiguous_highest_pri = new int[piece_group_size];
					
			Arrays.fill( contiguous_highest_pri, -1 );
			
			int	contiguous			= 0;
			int	contiguous_best_pri	= -1;
			
			int	max_contiguous	= 0;
			
			int	max_free_reqs		= 0;
			int max_free_reqs_piece	= -1;
			
			for (int i=0;i<pieces.length;i++){
				
				Piece	piece = pieces[i];
				
				if ( piece.isFullyAllocatable()){
			
					contiguous++;
					
					int	base_pri = base_priorities[i];
					
					if ( base_pri > contiguous_best_pri ){
						
						contiguous_best_pri	= base_pri;
					}
					
					for (int j=0;j<contiguous && j<contiguous_highest_pri.length;j++){
						
						if ( contiguous_best_pri > contiguous_highest_pri[j] ){
							
							contiguous_highest_pri[j]	= contiguous_best_pri;
							contiguous_best_pieces[j]	= i - j;
						}
						
						if ( j+1 > max_contiguous ){
								
							max_contiguous	= j+1;
						}
					}
		
				}else{
					
					contiguous			= 0;
					contiguous_best_pri	= -1;
					
					if ( max_contiguous == 0 ){
						
						int	free_reqs = piece.getAllocatableRequestCount();
						
						if ( free_reqs > max_free_reqs ){
							
							max_free_reqs 		= free_reqs;
							max_free_reqs_piece	= i;
						}
					}
				}
			}
					
			if ( max_contiguous == 0 ){
			
				if ( max_free_reqs_piece >= 0 ){
					
					priority_offsets	 = new int[ (int)getTorrent().getPieceCount()];

					priority_offsets[max_free_reqs_piece] = 10000;
					
				}else{
					
					priority_offsets	= null;
				}
			}else{
				
				priority_offsets	 = new int[ (int)getTorrent().getPieceCount()];
				
				int	start_piece = contiguous_best_pieces[max_contiguous-1];
				
				for (int i=start_piece;i<start_piece+max_contiguous;i++){
								
					priority_offsets[i] = 10000 - (i-start_piece);
				}
			}
		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
			
			priority_offsets	= null;
		}
	
public voidcancelAllRequests()

		try{
			requests_mon.enter();
			
			for (int i=0;i<requests.size();i++){
				
				PeerReadRequest	request = (PeerReadRequest)requests.get(i);
			
				if ( !request.isCancelled()){
	
					request.cancel();
				}
			}	
			
			if ( active_read_request != null ){
				
				active_read_request.cancel();
			}
		}finally{
			
			requests_mon.exit();
		}	
	
public voidcancelRequest(org.gudy.azureus2.plugins.peers.PeerReadRequest request)

		try{
			requests_mon.enter();
			
			if ( requests.contains( request ) && !request.isCancelled()){
				
				request.cancel();
			}
			
		}finally{
			
			requests_mon.exit();
		}
	
public booleancheckActivation(org.gudy.azureus2.plugins.peers.PeerManager peer_manager, org.gudy.azureus2.plugins.peers.Peer peer)

		long now = getSystemTime();
		
		if ( peer_manager == current_manager ){
			
			if ( peer_manager_change_time > now ){
				
				peer_manager_change_time	= now;
			}
			
			long	time_since_started = now - peer_manager_change_time;
			
			
			if ( peer_manager != null ){
				
				if ( active ){
					
					if ( now - peer_manager_change_time > INITIAL_DELAY && readyToDeactivate( peer_manager, peer )){
													
						setActive( false );			
					}
				}else{
					
					if ( !isPermanentlyUnavailable()){
					
						if ( readyToActivate( peer_manager, peer, time_since_started )){
							
							setActive( true );				
						}
					}
				}
			}
		}else{
			
				// if the peer manager's changed then we always go inactive for a period to wait for 
				// download status to stabilise a bit
			
			peer_manager_change_time	= now;
			
			current_manager	= peer_manager;
			
			setActive( false );
		}
		
		return( active );
	
public voiddeactivate(java.lang.String reason)

		plugin.log( getName() + ": deactivating (" + reason  + ")" );
		
		checkActivation( null, null );
	
protected booleangetBooleanParam(java.util.Map map, java.lang.String name, boolean def)

		return( getIntParam( map, name, def?1:0) != 0 );
	
public java.util.ListgetExpiredRequests()

		List	res = null;
		
		try{
			requests_mon.enter();
			
			for (int i=0;i<requests.size();i++){
				
				PeerReadRequest	request = (PeerReadRequest)requests.get(i);
				
				if ( request.isExpired()){
					
					if ( res == null ){
						
						res = new ArrayList();
					}
					
					res.add( request );
				}
			}			
		}finally{
			
			requests_mon.exit();
		}	
		
		return( res );
	
protected intgetFailureCount()

		return( consec_failures );
	
protected intgetIntParam(java.util.Map map, java.lang.String name, int def)

		Object	obj = map.get(name);
		
		if ( obj instanceof Long ){
			
			return(((Long)obj).intValue());
		}
		
		return( def );
	
protected longgetLastFailTime()

		return( last_failed_read );
	
public intgetMaximumNumberOfRequests()

		if ( getRequestCount() == 0 ){
			
			return((int)(( getPieceGroupSize() * torrent.getPieceSize() ) / PeerReadRequest.NORMAL_REQUEST_SIZE ));
			
		}else{
			
			return( 0 );
		}
	
public intgetPercentDoneOfCurrentIncomingRequest()

		ExternalSeedReaderRequest	cr = current_request;
		
		if ( cr == null ){
			
			return( 0 );
		}
		
		return( cr.getPercentDoneOfCurrentIncomingRequest());
	
public intgetPermittedBytes()

		synchronized( rate_sem ){
			
			if ( rate_bytes_permitted > 0 ){
				
				return( rate_bytes_permitted );
			}
		}
		
		if ( !rate_sem.reserve( 1000 )){
			
			return( 1 );	// one byte a sec to check for connection liveness
		}
		
		return( rate_bytes_permitted );
	
protected abstract intgetPieceGroupSize()

public int[]getPriorityOffsets()

		return( priority_offsets );
	
protected abstract booleangetRequestCanSpanPieces()

public intgetRequestCount()

		try{
			requests_mon.enter();

			return( requests.size());
			
		}finally{
			
			requests_mon.exit();
		}	
	
public java.util.ListgetRequests()

		List	res = null;
		
		try{
			requests_mon.enter();
			
			res = new ArrayList( requests );
			
		}finally{
			
			requests_mon.exit();
		}	
		
		return( res );
	
public java.lang.StringgetStatus()

		return( status );
	
protected longgetSystemTime()

		return( plugin.getPluginInterface().getUtilities().getCurrentSystemTime());
	
public org.gudy.azureus2.plugins.torrent.TorrentgetTorrent()

		return( torrent );
	
protected java.lang.StringgetUserAgent()

		return( user_agent );
	
protected voidinformCancelled(org.gudy.azureus2.plugins.peers.PeerReadRequest request)

		for (int i=0;i<listeners.size();i++){
			
			try{
				((ExternalSeedReaderListener)listeners.get(i)).requestCancelled( request );
				
			}catch( Throwable e ){
				
				e.printStackTrace();
			}
		}		
	
protected voidinformComplete(org.gudy.azureus2.plugins.peers.PeerReadRequest request, byte[] buffer)

		PooledByteBuffer pool_buffer = plugin.getPluginInterface().getUtilities().allocatePooledByteBuffer( buffer );
		
		for (int i=0;i<listeners.size();i++){
			
			try{
				((ExternalSeedReaderListener)listeners.get(i)).requestComplete( request, pool_buffer );
				
			}catch( Throwable e ){
				
				e.printStackTrace();
			}
		}		
	
protected voidinformFailed(org.gudy.azureus2.plugins.peers.PeerReadRequest request)

		for (int i=0;i<listeners.size();i++){
			
			try{
				((ExternalSeedReaderListener)listeners.get(i)).requestFailed( request );
				
			}catch( Throwable e ){
				
				e.printStackTrace();
			}
		}
	
public booleanisActive()

		return( active );
	
public booleanisPermanentlyUnavailable()

		return( permanent_fail );
	
public booleanisTransient()

		return( transient_seed );
	
protected voidlog(java.lang.String str)

		plugin.log( str );
	
protected voidprocessRequests()

		try{
			requests_mon.enter();

			if ( request_thread != null ){
				
				return;
			}

			request_thread = Thread.currentThread();
			
		}finally{
			
			requests_mon.exit();
		}
		
		while( true ){
			
			try{
				if ( !request_sem.reserve(30000)){
					
					try{
						requests_mon.enter();
						
						if ( requests.size() == 0 ){
							
							request_thread	= null;
							
							break;
						}
					}finally{
						
						requests_mon.exit();
					}
				}else{
					
					List			selected_requests 	= new ArrayList();
					PeerReadRequest	cancelled_request	= null;
					
					try{
						requests_mon.enter();

							// get an advisory set to process together
						
						int	count = selectRequests( requests );
						
						if ( count <= 0 || count > requests.size()){
							
							Debug.out( "invalid count" );
							
							count	= 1;
						}
						
						for (int i=0;i<count;i++){
							
							PeerReadRequest	request = (PeerReadRequest)requests.remove(0);
							
							if ( request.isCancelled()){
								
									// if this is the first request then process it, otherwise leave
									// for the next-round
															
								if ( i == 0 ){
									
									cancelled_request = request;
									
								}else{
									
									requests.add( 0, request );
								}
								
								break;
								
							}else{
								
								selected_requests.add( request );
																
								if ( i > 0 ){
								
										// we've only got the sem for the first request, catch up for subsequent
									
									request_sem.reserve();
								}
							}
						}
						
					}finally{
						
						requests_mon.exit();
					}
					
					if ( cancelled_request != null ){
						
						informCancelled( cancelled_request );

					}else{
						
						processRequests( selected_requests );
					}
				}
			}catch( Throwable e ){
				
				e.printStackTrace();
			}
		}
	
protected voidprocessRequests(java.util.List requests)

	
		boolean	ok = false;
				
		ExternalSeedReaderRequest	request = new ExternalSeedReaderRequest( this, requests );
		
		active_read_request = request;
		
		try{
			current_request = request;
			
			readData( request );
													
			ok	= true;

		}catch( ExternalSeedException 	e ){
			
			if ( e.isPermanentFailure()){
				
				permanent_fail	= true;
			}
			
			status = "Failed: " + Debug.getNestedExceptionMessage(e);
			
			request.failed();
			
		}catch( Throwable e ){
			
			status = "Failed: " + Debug.getNestedExceptionMessage(e);
				
			request.failed();
			
		}finally{
			
			active_read_request = null;
			
			if ( ok ){
				
				last_failed_read	= 0;
				
				consec_failures		= 0;

			}else{
				last_failed_read	= getSystemTime();
				
				consec_failures++;
			}
		}
	
public intreadBytes(int max)
Rate handling

			// permission to read a bunch of bytes
		
			// we're out of step here due to multiple threads so we have to report what
			// has already happened and prepare for what will
		
		int	res = 0;
		
		synchronized( rate_sem ){
			
			if ( rate_bytes_read > 0 ){
				
				res = rate_bytes_read;
				
				if ( res > max ){
					
					res = max;
				}
				
				rate_bytes_read -= res;
			}
			
			int	rem = max - res;
			
			if ( rem > rate_bytes_permitted ){
				
				if ( rate_bytes_permitted == 0 ){
					
					rate_sem.release();
				}
				
				rate_bytes_permitted = rem;
			}
		}
		
		return( res );
	
protected abstract voidreadData(ExternalSeedReaderRequest request)

protected booleanreadyToActivate(org.gudy.azureus2.plugins.peers.PeerManager peer_manager, org.gudy.azureus2.plugins.peers.Peer peer, long time_since_start)

		boolean	early_days = time_since_start < INITIAL_DELAY;
		
		try{

				// first respect failure count 
			
			int	fail_count = getFailureCount();
			
			if ( fail_count > 0 ){
				
				int	delay	= reconnect_delay;
				
				for (int i=1;i<fail_count;i++){
					
					delay += delay;
					
					if ( delay > 30*60*1000 ){
						
						break;
					}
				}
				
				long	now = getSystemTime();
				
				long	last_fail = getLastFailTime();
				
				if ( last_fail < now && now - last_fail < delay ){
					
					return( false );
				}
			}
	
				// next obvious things like validity and the fact that we're complete
			
			if ( valid_until > 0 && getSystemTime() > valid_until ){
				
				return( false );
			}
			
			if ( peer_manager.getDownload().getState() == Download.ST_SEEDING ){
				
				return( false );
			}
					
				// now the more interesting stuff
			
			if ( transient_seed ){
						
					// kick any existing peers that are running too slowly if the download appears
					// to be stalled		
				
				Peer[]	existing_peers = peer_manager.getPeers( getIP());
						
				int	existing_peer_count = existing_peers.length;
				
				int	global_limit	= TransferSpeedValidator.getGlobalDownloadRateLimitBytesPerSecond();
				
				if ( global_limit > 0 ){
				
						// if we have a global limit in force and we are near it then no point in
						// activating 
					
					int current_down = plugin.getGlobalDownloadRateBytesPerSec();
					
					if ( global_limit - current_down < 5*1024 ){
						
						return( false );
					}
				}
				
				int	download_limit  = peer_manager.getDownloadRateLimitBytesPerSecond();
						
				if ( global_limit > 0 && global_limit < download_limit ){
					
					download_limit = global_limit;
				}
				
				if ( 	( download_limit == 0 || download_limit > STALLED_DOWNLOAD_SPEED + 5*1024 ) &&
						peer_manager.getStats().getDownloadAverage() < STALLED_DOWNLOAD_SPEED ){
					
					for (int i=0;i<existing_peers.length;i++){
					
						Peer	existing_peer = existing_peers[i];
						
							// no point in booting ourselves!
						
						if ( existing_peer instanceof ExternalSeedPeer ){
							
							continue;
						}
						
						PeerStats stats = existing_peer.getStats();
						
						if ( stats.getTimeSinceConnectionEstablished() > INITIAL_DELAY ){
							
							if ( stats.getDownloadAverage() < STALLED_PEER_SPEED ){
								
								existing_peer.close( "Replacing slow peer with web-seed", false, false );
								
								existing_peer_count--;
							}
						}
					}
				}
				
				if ( existing_peer_count == 0 ){
					
					// check to see if we have pending connections to the same address 
								
					if ( peer_manager.getPendingPeers( getIP()).length == 0 ){
						
						log( getName() + ": activating as transient seed and nothing blocking it" );
						
						return( true );
					}
				}
			}
			
				// availability and speed based stuff needs a little time before being applied
			
			if ( !early_days ){
				
				if ( min_availability > 0 ){
										
					float availability = peer_manager.getDownload().getStats().getAvailability();
				
					if ( availability < min_availability){
					
						log( getName() + ": activating as availability is poor" );
						
						return( true );
					}
				}
					
				if ( min_speed > 0 ){
										
					if ( peer_manager.getStats().getDownloadAverage() < min_speed ){
						
						log( getName() + ": activating as speed is slow" );
						
						return( true );
					}
				}	
			}
		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
		}
		
		return( false );	
	
protected booleanreadyToDeactivate(org.gudy.azureus2.plugins.peers.PeerManager peer_manager, org.gudy.azureus2.plugins.peers.Peer peer)

		try{
				// obvious stuff first
			
			if ( valid_until > 0 && getSystemTime() > valid_until ){
				
				return( true );
			}
			
			if ( peer_manager.getDownload().getState() == Download.ST_SEEDING ){
				
				return( true );
			}
		
				// more interesting stuff
			
			if ( transient_seed ){
				
				return( false );
			}
			
			if ( min_availability > 0 ){

				float availability = peer_manager.getDownload().getStats().getAvailability();
			
				if ( availability >= min_availability + 1 ){
				
					log( getName() + ": deactivating as availability is good" );
				
					return( true );
				}
			}
			
			if ( min_speed > 0 ){
				
				long	my_speed 		= peer.getStats().getDownloadAverage();
				
				long	overall_speed 	= peer_manager.getStats().getDownloadAverage();
				
				if ( overall_speed - my_speed > 2 * min_speed ){
					
					log( getName() + ": deactivating as speed is good" );

					return( true );
				}
				
			}
		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
		}
		
		return( false );
	
public voidremoveListener(com.aelitis.azureus.plugins.extseed.ExternalSeedReaderListener l)

		listeners.remove( l );
	
public voidreportBytesRead(int num)

		synchronized( rate_sem ){
			
			rate_bytes_read += num;
			
			rate_bytes_permitted -= num;
			
			if ( rate_bytes_permitted < 0 ){
				
				rate_bytes_permitted = 0;
			}
		}
	
protected intselectRequests(java.util.List requests)

		long	next_start = -1;
		
		int	last_piece_number = -1;
		
		for (int i=0;i<requests.size();i++){
			
			PeerReadRequest	request = (PeerReadRequest)requests.get(i);
			
			int	this_piece_number	= request.getPieceNumber();
			
			if ( last_piece_number != -1 && last_piece_number != this_piece_number ){
				
				if ( !getRequestCanSpanPieces()){
					
					return( i );
				}
			}
			
			long	this_start = this_piece_number * torrent.getPieceSize() + request.getOffset();
			
			if ( next_start != -1 && this_start != next_start ){
				
				return(i);
			}
			
			next_start	= this_start + request.getLength();
			
			last_piece_number	= this_piece_number;
		}
		
		return( requests.size());
	
protected voidsetActive(boolean _active)

		try{
			requests_mon.enter();
			
			active	= _active;
			
			status = active?"Active":"Idle";
			
		}finally{
			
			requests_mon.exit();
		}
	
protected voidsetReconnectDelay(int delay, boolean reset_failures)

		reconnect_delay = delay;
		
		if ( reset_failures ){
			
			consec_failures = 0;
		}