FileDocCategorySizeDatePackage
EnhancedDownloadManager.javaAPI DocAzureus 3.0.3.459248Mon Oct 01 18:23:46 BST 2007com.aelitis.azureus.core.download

EnhancedDownloadManager

public class EnhancedDownloadManager extends Object

Fields Summary
public static int
DEFAULT_MINIMUM_INITIAL_BUFFER_SECS_FOR_ETA
public static int
WMP_MINIMUM_INITIAL_BUFFER_SECS_FOR_ETA
public static int
MINIMUM_INITIAL_BUFFER_SECS
public static final int
SPEED_CONTROL_INITIAL_DELAY
public static final int
SPEED_INCREASE_GRACE_PERIOD
public static final int
PEER_INJECT_GRACE_PERIOD
public static final int
IDLE_PEER_DISCONNECT_PERIOD
public static final int
IDLE_SEED_DISCONNECT_PERIOD
public static final int
MIN_SEED_CONNECTION_TIME
public static final int
IDLE_SEED_DISCONNECT_SECS
public static final int
CACHE_RECONNECT_MIN_PERIOD
public static final int
CACHE_REQUERY_MIN_PERIOD
public static final int
TARGET_SPEED_EXCESS_MARGIN
public static final int
DISCONNECT_CHECK_PERIOD
public static final int
DISCONNECT_CHECK_TICKS
public static final int
REACTIVATE_PROVIDER_PERIOD
public static final int
REACTIVATE_PROVIDER_PERIOD_TICKS
public static final int
LOG_PROG_STATS_PERIOD
public static final int
LOG_PROG_STATS_TICKS
private static final String
TRACKER_PROG_PREFIX
private static final String
PM_SEED_TIME_KEY
private static final String
PEER_CACHE_KEY
private static int
internal_content_stream_bps_increase_ratio
private static int
internal_content_stream_bps_increase_absolute
private DownloadManagerEnhancer
enhancer
private org.gudy.azureus2.core3.download.DownloadManager
download_manager
private boolean
platform_content
private transient com.aelitis.azureus.core.peermanager.piecepicker.PiecePicker
current_piece_pickler
private boolean
progressive_active
private long
content_min_delivery_bps
private int
minimum_initial_buffer_secs_for_eta
private int
explicit_minimum_buffer_bytes
private bufferETAProvider
buffer_provider
private boostETAProvider
boost_provider
private progressiveStats
progressive_stats
private boolean
progressive_informed
private long
time_download_started
private com.aelitis.azureus.core.util.average.Average
download_speed_average
private boolean
marked_active
private boolean
destroyed
private org.gudy.azureus2.core3.download.DownloadManagerListener
dmListener
private static final int
STALLED_TIMEOUT
private boolean
publish_handling_complete
private long
publish_sent
private long
publish_sent_time
private EnhancedDownloadManagerFile[]
enhanced_files
private EnhancedDownloadManagerFile
primary_file
private long
last_speed_increase
private long
last_peer_inject
private long
last_lookup_time
private LinkedList
new_peers
private List
cache_peers
private List
disconnected_cache_peers
private com.aelitis.azureus.core.peer.cache.CachePeer[]
lookup_peers
Constructors Summary
protected EnhancedDownloadManager(DownloadManagerEnhancer _enhancer, org.gudy.azureus2.core3.download.DownloadManager _download_manager)

		enhancer			= _enhancer;
		download_manager	= _download_manager;

		DiskManagerFileInfo[] files = download_manager.getDiskManagerFileInfo();
		
			// hack - we don't know the actual player until play starts so we just use the file name
		
			// TODO: we can probably read the registry to work out what player is associated with
			// the file extension?
		
		boolean	found_wmv = false;
		
		for (int i=0;i<files.length;i++){
		
			String	file_name = files[i].getFile(true).getName().toLowerCase();
			
			if ( file_name.endsWith( ".wmv" )){
				
				found_wmv = true;
				
				break;
			}
		}
		
		if ( found_wmv ){
			
			minimum_initial_buffer_secs_for_eta = WMP_MINIMUM_INITIAL_BUFFER_SECS_FOR_ETA;

		}else{
		
			minimum_initial_buffer_secs_for_eta = DEFAULT_MINIMUM_INITIAL_BUFFER_SECS_FOR_ETA;
		}
		
		TOTorrent	torrent = download_manager.getTorrent();

		if ( torrent != null ){
			
			content_min_delivery_bps = PlatformTorrentUtils.getContentMinimumSpeedBps( torrent );
			
			platform_content = PlatformTorrentUtils.isContent( torrent, true );
						
			enhanced_files = new EnhancedDownloadManagerFile[files.length];
			
			Map meta_data = PlatformTorrentUtils.getFileMetaData( torrent );

			Map files_info = meta_data==null?null:(Map)meta_data.get( "files" );
			
			long	offset = 0;
			
			for (int i=0;i<files.length;i++){
				
				DiskManagerFileInfo f = files[i];
				
				Map file_info = files_info==null?null:(Map)files_info.get( "" + i );
				
				enhanced_files[i] = new EnhancedDownloadManagerFile( f, offset, file_info );
				
				offset += f.getLength();
			}
				
			int	primary_index = PlatformTorrentUtils.getContentPrimaryFileIndex( download_manager.getTorrent());
								
			if ( primary_index >= 0 && primary_index < files.length ){
					
				primary_file = enhanced_files[primary_index];
					
			}else{
				
				primary_file = enhanced_files[0];
			}		
		}else{
			
			enhanced_files = new EnhancedDownloadManagerFile[0];
		}
			
		progressive_stats	= createProgressiveStats( download_manager, primary_file );

		download_manager.addPeerListener(
			new DownloadManagerPeerListener()
			{
       			public void
    			peerManagerWillBeAdded(
    				PEPeerManager	peer_manager )
       			{
       			}
       			
				public void
				peerManagerAdded(
					PEPeerManager	manager )
				{
					synchronized( EnhancedDownloadManager.this ){
					
						time_download_started = SystemTime.getCurrentTime();
						
						current_piece_pickler = manager.getPiecePicker();
						
						if ( progressive_active && current_piece_pickler != null ){
							
							buffer_provider.activate( current_piece_pickler );
							
							boost_provider.activate( current_piece_pickler );
						}
						
						resetVars();
					}
				}
				
				public void
				peerManagerRemoved(
					PEPeerManager	manager )
				{
					synchronized( EnhancedDownloadManager.this ){

						time_download_started 	= 0;

						progressive_active		= false;
						
						if ( current_piece_pickler != null ){
					
							buffer_provider.deactivate(  current_piece_pickler );
							
							boost_provider.deactivate(  current_piece_pickler );
							
							current_piece_pickler	= null;	
						}
						
						resetVars();
					}
				}
				
				public void
				peerAdded(
					PEPeer 	peer )
				{
					if ( platform_content ){
												
						synchronized( EnhancedDownloadManager.this ){
							
							if ( new_peers == null ){
								
								new_peers = new LinkedList();
							}
							
							new_peers.add( peer );
						}
					}
				}
					
				public void
				peerRemoved(
					PEPeer	peer )
				{
					if ( platform_content ){
							
						synchronized( EnhancedDownloadManager.this ){
							
							if ( new_peers != null ){
							
								new_peers.remove( peer );
								
								if ( new_peers.size() == 0 ){
									
									new_peers = null;
								}
							}
							
							if ( cache_peers != null ){
								
								cache_peers.remove( peer );
								
								if ( cache_peers.size() == 0 ){
									
									cache_peers = null;
								}
							}
							
							CachePeer	cache_peer = (CachePeer)peer.getData( PEER_CACHE_KEY );

							if ( cache_peer == null ){
								
									// we can disconnect before getting peer-id etc
								
								if ( lookup_peers != null ){
									
									for (int i=0;i<lookup_peers.length;i++){
										
										CachePeer cp = lookup_peers[i];
										                            
										if ( 	cp.getAddress().getHostAddress().equals( peer.getIp()) &&
												cp.getPort() == peer.getPort()){
										
											cache_peer = cp;
										}
									}
								}
							}
							
							if ( 	cache_peer != null && 
									cache_peer.getType() == CachePeer.PT_CACHE_LOGIC &&
									(	disconnected_cache_peers == null || !disconnected_cache_peers.contains( cache_peer ))){
								
									// lost connection very early - sign that the cache doesn't support
									// us
								
								if ( !peer.hasReceivedBitField()){
									
									cache_peer.setAutoReconnect( false );
								}
							}
						}
					}
				}
					
				public void
				pieceAdded(
					PEPiece 	piece )
				{
				}
					
				public void
				pieceRemoved(
					PEPiece		piece )
				{
				}
			});
	
Methods Summary
protected voidaddToDisconnectedCachePeers(com.aelitis.azureus.core.peer.cache.CachePeer cache_peer)

		if ( disconnected_cache_peers == null ){
			
			disconnected_cache_peers = new ArrayList();
		}

		for (int i=0;i<disconnected_cache_peers.size();i++){
			
			CachePeer	p = (CachePeer)disconnected_cache_peers.get(i);
			
			if ( p.sameAs( cache_peer )){
				
				return;
			}
		}
		
		disconnected_cache_peers.add( cache_peer );
	
protected voidcheckPublishing()

		if ( publish_handling_complete ){
			
			return;
		}
		
		if ( PublishUtils.isPublished( download_manager )){
			
			if ( PublishUtils.isPublishComplete( download_manager )){
				
				publish_handling_complete = true;
				
			}else{
				
				TRTrackerScraperResponse scrape = download_manager.getTrackerScrapeResponse();
				
				if ( scrape == null || scrape.getStatus() != TRTrackerScraperResponse.ST_ONLINE ){
					
					return;
				}
				
				if ( scrape.getSeeds() >= 2 ){
									
					PublishUtils.setPublishComplete( download_manager );
					
					publish_handling_complete = true;
					
				}else{
					
					PEPeerManager pm = download_manager.getPeerManager();
				
					if ( pm != null ){
				
						long	now = SystemTime.getCurrentTime();
						
						long	pub_sent = download_manager.getStats().getTotalDataBytesSent();
						
						if ( pub_sent != publish_sent ){
							
							publish_sent = pub_sent;
						
							publish_sent_time = now;
						}
						
						if ( publish_sent_time > now ){
							
							publish_sent_time = now;
						}
						
						if ( now - publish_sent_time > STALLED_TIMEOUT ){
							
							publish_sent_time = now;
							
							log( "Publish: upload stalled - switching transports" );
							
								// no data uploded recently. 
							
							pm.setPreferUDP( !pm.getPreferUDP());
							
							List peers = pm.getPeers();
							
							for (int i=0;i<peers.size();i++){
								
								PEPeer peer = (PEPeer)peers.get(i);
								
								pm.removePeer( peer, "Transport switch" );
							}
							
							download_manager.requestTrackerAnnounce( true );					

						}else if ( pm.getNbPeers() == 0 ){
									
							log( "Publish: no connected peers, forcing announce" );
							
							download_manager.requestTrackerAnnounce( true );					
						}
					}
				}
			}	
			
		}else{
			
				// we've only got to handle the possible small delay here between a download being
				// added and the flag being set
			
			if ( SystemTime.getCurrentTime() - time_download_started > 120*1000 ){
				
				publish_handling_complete = true;
			}
		}
	
protected com.aelitis.azureus.core.download.EnhancedDownloadManager$progressiveStatscreateProgressiveStats(org.gudy.azureus2.core3.download.DownloadManager dm, EnhancedDownloadManagerFile file)

		TOTorrent torrent = download_manager.getTorrent();
		
		if ( torrent != null && PlatformTorrentUtils.useEMP( torrent )){
			
			return( new progressiveStatsInternal( dm, file ));
			
		}else{
			
			return( new progressiveStatsExternal( dm, file ));

		}
	
protected voiddestroy()

		synchronized( this ){
			
			setRTA( false );

			destroyed = true;
		}
	
public longgetContiguousAvailableBytes(org.gudy.azureus2.core3.disk.DiskManagerFileInfo file)

		return( getContiguousAvailableBytes( file, 0 ));
	
public longgetContiguousAvailableBytes(org.gudy.azureus2.core3.disk.DiskManagerFileInfo file, int file_start_offset)

		DiskManager dm = download_manager.getDiskManager();
		
		if ( dm == null ){
			
			return( -1 );
		}
		
		int	piece_size = dm.getPieceLength();
		
		DiskManagerFileInfo[]	 files = dm.getFiles();
		
		long	start_index = file_start_offset;
		
		for (int i=0;i<files.length;i++){
			
			if ( files[i].getIndex() == file.getIndex()){
				
				break;
			}
			
			start_index += files[i].getLength();
		}
		
		int	first_piece_index 	= (int)( start_index / piece_size );
		int	first_piece_offset	= (int)( start_index % piece_size );
		int	last_piece_index	= file.getLastPieceNumber();
		
		DiskManagerPiece[]	pieces = dm.getPieces();
		
		DiskManagerPiece	first_piece = pieces[first_piece_index];
				
		long	available = 0;
		
		if ( !first_piece.isDone()){
			
			boolean[] blocks = first_piece.getWritten();
						
			if ( blocks == null ){
				
				if ( first_piece.isDone()){
					
					available = first_piece.getLength() - first_piece_offset;
				}
			}else{
				
				int	piece_offset = 0;
				
				for (int j=0;j<blocks.length;j++){
				
					if ( blocks[j] ){
					
						int	block_size = first_piece.getBlockSize( j );
						
						piece_offset = piece_offset + block_size;
						
						if ( available == 0 ){
						
							if ( piece_offset > first_piece_offset ){
								
								available = piece_offset - first_piece_offset;
							}
						}else{
							
							available += block_size;
						}						
					}else{
						
						break;
					}
				}
			}	
		}else{
		
			available = first_piece.getLength() - first_piece_offset; 
		
			for (int i=first_piece_index+1;i<=last_piece_index;i++){
				
				DiskManagerPiece piece = pieces[i];
				
				if ( piece.isDone()){
					
					available += piece.getLength();
					
				}else{
				
					boolean[] blocks = piece.getWritten();
							
					if ( blocks == null ){
						
						if ( piece.isDone()){
					
							available += piece.getLength();
							
						}else{
							
							break;
						}
					}else{
						
						for (int j=0;j<blocks.length;j++){
						
							if ( blocks[j] ){
							
								available += piece.getBlockSize( j );
								
							}else{
								
								break;
							}
						}
					}
					
					break;
				}
			}
		}
		
		long	max_available = file.getLength() - file_start_offset;
		
		if ( available > max_available ){
		
			available = max_available;
		}
		
		return( available );
	
protected longgetCurrentSpeed()

			// gets instantaneous speed instead of longer term average
		
		PEPeerManager	pm = download_manager.getPeerManager();
		
		long	result = 0;
		
		if ( pm != null ){
	
			Iterator	it = pm.getPeers().iterator();
		
			while( it.hasNext()){
				
				result += ((PEPeer)it.next()).getStats().getDataReceiveRate();
			}
		}
		
		return( result );
	
public org.gudy.azureus2.core3.download.DownloadManagergetDownloadManager()

		return download_manager;
	
public EnhancedDownloadManagerFile[]getFiles()

		return( enhanced_files );
	
public java.lang.StringgetName()

		return( download_manager.getDisplayName());
	
public org.gudy.azureus2.core3.disk.DiskManagerFileInfogetPrimaryFile()

		return( primary_file.getFile());
	
public booleangetProgressiveMode()

		return( progressive_active );
	
public longgetProgressivePlayETA()

		progressiveStats stats = getProgressiveStats();
		
		long	eta = stats.getETA();
				
		return( eta );
	
protected com.aelitis.azureus.core.download.EnhancedDownloadManager$progressiveStatsgetProgressiveStats()

		synchronized( this ){
			
			return( progressive_stats.getCopy());
		}
	
protected longgetTargetSpeed()

		long	target_speed = progressive_active?progressive_stats.getStreamBytesPerSecondMax():content_min_delivery_bps;
		
		if ( target_speed < content_min_delivery_bps ){
			
			target_speed = content_min_delivery_bps;
		}
			
		return( target_speed );
	
protected longgetTimeRunning()

		if ( time_download_started == 0 ){
			
			return( 0 );
		}
		
		long	now = SystemTime.getCurrentTime();
		
		if ( now < time_download_started ){
			
			time_download_started	= now;
		}
		
		return( now - time_download_started );
	
public booleanisPlatform()

		TOTorrent	torrent = download_manager.getTorrent();
		
		if ( torrent != null ){
			
			return( PlatformTorrentUtils.isContent( torrent, true ));
		}

		return( false );
	
protected voidlog(java.lang.String str)

		log( str, true );
	
protected voidlog(java.lang.String str, boolean to_file)

		log( download_manager, str, to_file );
	
protected voidlog(org.gudy.azureus2.core3.download.DownloadManager dm, java.lang.String str, boolean to_file)

		str = dm.getDisplayName() + ": " + str;
		
		if ( to_file ){
			
			AEDiagnosticsLogger diag_logger = AEDiagnostics.getLogger("v3.Stream");
			
			diag_logger.log(str);
		}
		
		if ( Constants.DIAG_TO_STDOUT ) {
			
			System.out.println(Thread.currentThread().getName() + "|"
					+ System.currentTimeMillis() + "] " + str);
		}
	
protected voidrefreshMetaData()

		progressive_stats.refreshMetaData();
	
private voidresetVars()


	 
	
	
		last_speed_increase		= 0;
		last_peer_inject		= 0;
		last_lookup_time		= 0;
		
		new_peers					= null;
		cache_peers					= null;
		disconnected_cache_peers	= null;
		lookup_peers				= null;
	
public static voidsetInternalContentStreamBPSIncreaseAbsolute(java.lang.String caller_id, int abs)

		internal_content_stream_bps_increase_absolute	= abs;
	
public static voidsetInternalContentStreamBPSIncreaseRatio(java.lang.String caller_id, int ratio)

	
		// these are here to allow other components (e.g. a plugin) to modify behaviour
		// while we verify that things work ok
	
	  
	
			
				 
	
		internal_content_stream_bps_increase_ratio	= ratio;
	
public voidsetMinimumBufferBytes(int min)

		log( "Explicit min buffer set to " + min );
		
		explicit_minimum_buffer_bytes	= min;
	
protected voidsetPeerSpeed(org.gudy.azureus2.core3.peer.PEPeer peer, int speed, long time)

		CachePeer	cache_peer = (CachePeer)peer.getData( PEER_CACHE_KEY );

		cache_peer.setSpeedChangeTime( time );
		
		peer.getStats().setDownloadRateLimitBytesPerSecond( speed );
	
public voidsetProgressiveMode(boolean active)

		TOTorrent	torrent = download_manager.getTorrent();
		
		if ( torrent == null ){

			return;
		}
		
		synchronized( this ){

			if ( progressive_active == active ){
				
				return;
			}			

			if (active && !supportsProgressiveMode()) {
				return;
			}
			
			log( "Progressive mode changed to " + active );

			final GlobalManager gm = download_manager.getGlobalManager(); 
			if (active) {
				if (dmListener == null) {
					dmListener = new DownloadManagerAdapter() {
						public void downloadComplete(DownloadManager manager) {
							gm.resumeDownloads();
						}
					};
				}
				download_manager.addListener(dmListener);
				
				// Check existing downloading torrents and turn off any
				// existing progressive/downloading
				Object[] dms = gm.getDownloadManagers().toArray();
				for (int i = 0; i < dms.length; i++) {
					DownloadManager dmCheck = (DownloadManager) dms[i];
					if (dmCheck.equals(download_manager)) {
						continue;
					}

					if (!dmCheck.isDownloadComplete(false)
							&& PlatformTorrentUtils.getAdId(dmCheck.getTorrent()) == null) {
						int state = dmCheck.getState();
						if (state == DownloadManager.STATE_DOWNLOADING
								|| state == DownloadManager.STATE_QUEUED) {
							dmCheck.pause();
						}
						EnhancedDownloadManager edmCheck = enhancer.getEnhancedDownload(dmCheck);
						if (edmCheck != null && edmCheck.getProgressiveMode()) {
							edmCheck.setProgressiveMode(false);
						}
					}
				}
				if (download_manager.isPaused()) {
					download_manager.resume();
				}

				// Make sure download can start by moving out of stop state
				// and putting at top
				if (download_manager.getState() == DownloadManager.STATE_STOPPED) {
					download_manager.setStateWaiting();
				}

				if (download_manager.getPosition() != 1) {
					download_manager.getGlobalManager().moveTo(download_manager, 1);
				}
			} else {
				download_manager.removeListener(dmListener);
				gm.resumeDownloads();
			}
			
			progressive_active	= active;

			if ( current_piece_pickler != null ){
		
				if ( progressive_active ){
					
					buffer_provider.activate( current_piece_pickler );
					
					boost_provider.activate( current_piece_pickler );
					
					progressive_stats.update( 0 );
					
				}else{
					
					buffer_provider.deactivate( current_piece_pickler );
					
					boost_provider.deactivate( current_piece_pickler );
					
					progressive_stats = createProgressiveStats( download_manager, primary_file );
				}
			}else{
				
				progressive_stats = createProgressiveStats( download_manager, primary_file );
			}
		}
		
		if ( active && !progressive_informed ){
			
			progressive_informed	= true;
			
				// tell tracker we're progressive so it can, if required, schedule more seeds
			
			Download	plugin_dl = PluginCoreUtils.wrap( download_manager );
			
			DownloadUtils.addTrackerExtension( plugin_dl, TRACKER_PROG_PREFIX, "y" );
			
			download_manager.requestTrackerAnnounce( true );
		}
	
protected voidsetRTA(boolean active)

		synchronized( this ){

			if ( marked_active && !active ){
								
				marked_active = false;

				ConcurrentHasher.getSingleton().removeRealTimeTask();
			}
			
			if ( destroyed ){
				
				return;
			}
			
			if ( !marked_active && active ){
				
				marked_active = true;

				ConcurrentHasher.getSingleton().addRealTimeTask();
			}
		}
	
public voidsetViewerPosition(org.gudy.azureus2.core3.disk.DiskManagerFileInfo file_info, long bytes)

		int	index = file_info.getIndex();
		
		if ( index < enhanced_files.length ){
			
			bytes += enhanced_files[index].getByteOffestInTorrent();
		}
		
		progressive_stats.setViewerBytePosition( bytes );
	
public booleansupportsProgressiveMode()

		TOTorrent	torrent = download_manager.getTorrent();
		
		if ( torrent == null ){
			
			return( false );
		}
		
		return( enhancer.isProgressiveAvailable() && PlatformTorrentUtils.isContentProgressive( torrent ));
	
protected voidupdateProgressiveStats(int tick_count)

		if ( !progressive_active ){
			
			return;
		}
					
		synchronized( this ){
			
			if ( tick_count % REACTIVATE_PROVIDER_PERIOD_TICKS == 0 ){
				
				PiecePicker piece_picker = current_piece_pickler;
				
				if ( piece_picker != null ){
				
					buffer_provider.checkActivation( piece_picker );
				}
			}
			
			progressive_stats.update( tick_count );
		}
	
protected voidupdateStats(int tick_count)

		updateProgressiveStats( tick_count );
		
		if ( !platform_content ){
			
			return;
		}
		
		int	state = download_manager.getState();
		
		if ( state != DownloadManager.STATE_SEEDING && state != DownloadManager.STATE_DOWNLOADING ){
			
			return;
		}

		PEPeerManager	pm = download_manager.getPeerManager();
		
		if ( pm == null ){
			
			return;
		}

		long	now = SystemTime.getCurrentTime();
		
		long	target_speed = getTargetSpeed();
		
		PEPeerManagerStats stats = pm.getStats();
		
		long	download_speed = stats.getDataReceiveRate();
		
		download_speed_average.update( download_speed );
		
		long	time_downloading = getTimeRunning();
		
		int		secs_since_last_up =  pm.getStats().getTimeSinceLastDataSentInSeconds();
		
			// deal with -1 -> infinite
		
		if ( secs_since_last_up == -1 ){
			
			Long seed_time = (Long)pm.getData( PM_SEED_TIME_KEY );
			
			if ( seed_time == null ){
				
				seed_time = new Long( now );
				
				pm.setData( PM_SEED_TIME_KEY, seed_time );
			}
			
			secs_since_last_up = (int)(( now - seed_time.longValue()) / 1000);
		}
		
		List	peers_to_kick = new ArrayList();
		
		synchronized( this ){
			
			if ( new_peers != null ){
										
				Iterator it = new_peers.iterator();
				
				while( it.hasNext()){
					
					PEPeer	peer = (PEPeer)it.next();
					
					CachePeer	cache_peer = (CachePeer)peer.getData( PEER_CACHE_KEY );

					if ( cache_peer == null ){
						
						byte[]	peer_id = peer.getId();
						
						if ( peer_id != null ){
							
							try{
								cache_peer = CacheDiscovery.categorisePeer( 
												peer_id, 
												InetAddress.getByName( peer.getIp()),
												peer.getPort());
								
								peer.setData( PEER_CACHE_KEY, cache_peer );
								
								if ( cache_peer.getType() == CachePeer.PT_CACHE_LOGIC ){
									
									if ( state == DownloadManager.STATE_SEEDING ){
										
										if ( 	now - cache_peer.getCreateTime( now ) >= MIN_SEED_CONNECTION_TIME &&
												secs_since_last_up >= IDLE_SEED_DISCONNECT_SECS ){

											peers_to_kick.add( peer );
											
											addToDisconnectedCachePeers( cache_peer );
											
										}else{
											
											if ( cache_peers == null ){
												
												cache_peers = new LinkedList();
											}
											
											cache_peers.add( peer );
										}
									}else{
										
											// cache logic rely on timely have messages to control both
											// piece allocation and client-speed
										
										peer.setHaveAggregationEnabled( false );
										
										if ( target_speed <= 0 ){
										
											setPeerSpeed( peer, -1, now );
											
											peers_to_kick.add( peer );
											
											addToDisconnectedCachePeers( cache_peer );
	
										}else{
											
											long	current_speed = (long)download_speed_average.getAverage();
											
												// if we are already exceeding required speed, block
												// the cache peer download
											
											if ( current_speed + TARGET_SPEED_EXCESS_MARGIN > target_speed ){
												
												setPeerSpeed( peer, -1, now );
											}
											
											if ( cache_peers == null ){
												
												cache_peers = new LinkedList();
											}
											
											cache_peers.add( peer );
										}
									}
								}
							}catch( Throwable e ){
								
								Debug.printStackTrace(e);
							}
							
							it.remove();
						}
					}else{
						
						it.remove();
					}
				}
				
				if ( new_peers.size() == 0 ){
					
					new_peers = null;
				}
			}
		}
		
		for (int i=0;i<peers_to_kick.size();i++){
			
			pm.removePeer((PEPeer)peers_to_kick.get(i), "Cache peer not required" );
		}
				
		if ( state == DownloadManager.STATE_DOWNLOADING ){
		
			if ( time_downloading > SPEED_CONTROL_INITIAL_DELAY ){
				
				long	current_average = (long)download_speed_average.getAverage();
					
				if ( current_average < target_speed ){
					
					long	current_speed = getCurrentSpeed();
					
						// increase cache peer contribution
						// due to latencies we need to give speed increases a time to take
						// effect to see if the limits can be reached
					
					long	difference = target_speed - current_speed;
					
					if ( last_speed_increase > now || now - last_speed_increase > SPEED_INCREASE_GRACE_PERIOD ){
		
						synchronized( this ){
	
							if ( cache_peers != null ){
								
								Iterator	it = cache_peers.iterator();
								
								while( it.hasNext() && difference > 0 ){
							
									PEPeer	peer = (PEPeer)it.next();
																				
									PEPeerStats peer_stats = peer.getStats();
									
									long peer_limit = peer_stats.getDownloadRateLimitBytesPerSecond();
									
										// try simple approach - find first cache peer that is limited
										// to less than the target
									
									if ( peer_limit == 0 ){
										
									}else{
										
										if ( peer_limit < target_speed ){
											
											setPeerSpeed( peer, (int)target_speed, now );
											
											last_speed_increase = now;
											
											difference = 0;
										}
									}
								}
							}
						}
										
						if ( 	difference > 0 &&
								last_peer_inject > now || now - last_peer_inject > PEER_INJECT_GRACE_PERIOD ){
							
							Set	connected_peers = new HashSet();
							
							List	peers_to_try = new ArrayList();
	
							if ( cache_peers != null ){
								
								Iterator	it = cache_peers.iterator();
								
								while( it.hasNext() && difference > 0 ){
							
									PEPeer	peer = (PEPeer)it.next();
						
									connected_peers.add( peer.getIp() + ":" + peer.getPort());
								}
							}
							
								// if we explicitly disconnected peers in the past then reuse them first
							
							if ( disconnected_cache_peers != null ){
								
								while( disconnected_cache_peers.size() > 0 ){
									
									CachePeer	cp = (CachePeer)disconnected_cache_peers.remove(0);
									
									if ( !connected_peers.contains( cp.getAddress().getHostAddress() + ":" + cp.getPort())){
										
											// check that this peer isn't already available as a lookup result
										
										if ( lookup_peers != null ){
											
											for (int i=0;i<lookup_peers.length;i++){
												
												CachePeer	l_cp = lookup_peers[i];
												
												if ( l_cp.sameAs( cp )){
													
													cp = null;
													
													break;
												}
											}
										}
										
										if ( cp != null ){
										
											peers_to_try.add( cp );
											
											break;
										}
									}
								}
								
								if ( disconnected_cache_peers.size() == 0 ){
									
									disconnected_cache_peers = null;
								}
							}
							
							if ( peers_to_try.size() == 0 ){
								
									// can't do the job with existing cache peers, try to find some more
								
								if ( 	lookup_peers == null || 
										now < last_lookup_time ||
										now - last_lookup_time > CACHE_REQUERY_MIN_PERIOD ){
																		
									last_lookup_time = now;
	
									lookup_peers = CacheDiscovery.lookup( download_manager.getTorrent());
								}
								
								for (int i=0;i<lookup_peers.length;i++){
									
									CachePeer	cp = lookup_peers[i];
									
									if ( cp.getAutoReconnect() && now - cp.getInjectTime(now) > CACHE_RECONNECT_MIN_PERIOD ){
										
										if ( !connected_peers.contains( cp.getAddress().getHostAddress() + ":" + cp.getPort())){
										
											peers_to_try.add( cp );
										}
									}
								}
							}
							
							if ( peers_to_try.size() > 0 ){
								
								CachePeer peer = (CachePeer)peers_to_try.get((int)( Math.random() * peers_to_try.size()));
								
								// System.out.println( "Injecting cache peer " + peer.getAddress() + ":" + peer.getPort());
								
								peer.setInjectTime( now );
								
								pm.addPeer( peer.getAddress().getHostAddress(), peer.getPort(), 0, false );
								
								last_peer_inject = now;
							}
						}
					}				
				}else if ( current_average > target_speed + TARGET_SPEED_EXCESS_MARGIN){
					
					long	current_speed = getCurrentSpeed();
	
						// decrease cache peer contribution
					
					long	difference = current_speed - ( target_speed + TARGET_SPEED_EXCESS_MARGIN );
					
					synchronized( this ){
	
						if ( cache_peers != null ){
							
							Iterator	it = cache_peers.iterator();
							
							while( it.hasNext() && difference > 0 ){
						
								PEPeer	peer = (PEPeer)it.next();
															
								PEPeerStats peer_stats = peer.getStats();
								
								long peer_rate = peer_stats.getDataReceiveRate();
								
								long peer_limit = peer_stats.getDownloadRateLimitBytesPerSecond();
	
								if ( peer_limit == -1 ){
									
										// blocked, take into account adjustment in progress
									
									difference -= peer_rate;
									
								}else if ( peer_limit != 0 && peer_rate > peer_limit ){
									
										// adjusting
									
									difference -= peer_rate - peer_limit;
									
								}else{
									
									if ( peer_rate > difference ){
																				
										setPeerSpeed( peer, (int)( peer_rate - difference ), now );
										
										difference = 0;
										
									}else{
									
										setPeerSpeed( peer, -1, now );
																				
										difference -= peer_rate;
									}
								}
							}
						}
					}
				}
			}
		}
		
		if ( tick_count % DISCONNECT_CHECK_TICKS == 0 ){
			
			peers_to_kick.clear();
			
			synchronized( this ){
				
				if ( cache_peers != null ){
					
					Iterator	it = cache_peers.iterator();
					
					while( it.hasNext()){
				
						PEPeer	peer = (PEPeer)it.next();
							
						CachePeer	cache_peer = (CachePeer)peer.getData( PEER_CACHE_KEY );

						if ( state == DownloadManager.STATE_SEEDING ){
							
							if ( 	now - cache_peer.getCreateTime( now ) >= MIN_SEED_CONNECTION_TIME &&
									secs_since_last_up >= IDLE_SEED_DISCONNECT_SECS ){

								peers_to_kick.add( peer );
								
								addToDisconnectedCachePeers( cache_peer );
							}
						}else{
							
							PEPeerStats peer_stats = peer.getStats();
						
							if ( peer_stats.getDownloadRateLimitBytesPerSecond() == -1 ){
							
								long	time = cache_peer.getSpeedChangeTime( now );
								
								if ( now - time > IDLE_PEER_DISCONNECT_PERIOD ){
									
									peers_to_kick.add( peer );
									
									addToDisconnectedCachePeers( cache_peer );
								}
							}
						}
					}
				}
			}
			
			for (int i=0;i<peers_to_kick.size();i++){
				
				pm.removePeer((PEPeer)peers_to_kick.get(i), "Cache peer disconnect-on-idle" );
			}
		}