FileDocCategorySizeDatePackage
DiskAccessControllerInstance.javaAPI DocAzureus 3.0.3.420623Wed Apr 18 10:23:28 BST 2007com.aelitis.azureus.core.diskmanager.access.impl

DiskAccessControllerInstance

public class DiskAccessControllerInstance extends Object

Fields Summary
private final int
aggregation_request_limit
private final int
aggregation_byte_limit
private String
name
private boolean
enable_aggregation
private boolean
invert_threads
private int
max_threads
private int
max_mb_queued
private groupSemaphore
max_mb_sem
private long
request_bytes_queued
private long
requests_queued
private long
total_requests
private long
total_single_requests_made
private long
total_aggregated_requests_made
private long
total_bytes
private long
total_single_bytes
private long
total_aggregated_bytes
private long
io_time
private requestDispatcher[]
dispatchers
private long
last_check
private Map
torrent_dispatcher_map
private static final int
REQUEST_NUM_LOG_CHUNK
private static final int
REQUEST_BYTE_LOG_CHUNK
private int
next_request_num_log
private long
next_request_byte_log
private static ThreadLocal
tls
Constructors Summary
public DiskAccessControllerInstance(String _name, boolean _enable_aggregation, int _aggregation_request_limit, int _aggregation_byte_limit, int _max_threads, int _max_mb)

		
	
	
			
			
				
				
				
				 
			
		name				= _name;
		
		enable_aggregation			= _enable_aggregation;
		aggregation_request_limit	= _aggregation_request_limit;
		aggregation_byte_limit		= _aggregation_byte_limit;
		
		max_mb_queued		= _max_mb;
				
		max_mb_sem 			= new groupSemaphore( max_mb_queued );
		max_threads			= _max_threads;
		
		dispatchers	= new requestDispatcher[invert_threads?1:max_threads];
		
		for (int i=0;i<dispatchers.length;i++){
			dispatchers[i]	= new requestDispatcher(i);
		}
	
Methods Summary
protected longgetBlockCount()

		return( max_mb_sem.getBlockCount());
	
public longgetIOTime()

		return( io_time );
	
protected longgetQueueSize()

		return( requests_queued );
	
protected longgetQueuedBytes()

		return( request_bytes_queued );
	
protected voidgetSpaceAllowance(DiskAccessRequestImpl request)

		int	mb_diff;
				
		synchronized( torrent_dispatcher_map ){
			
			int	old_mb = (int)(request_bytes_queued/(1024*1024));
			
			request_bytes_queued += request.getSize();
							
			int	new_mb = (int)(request_bytes_queued/(1024*1024));
		
			mb_diff = new_mb - old_mb;
		
			if ( mb_diff > max_mb_queued ){
				
					// if this request is bigger than the max allowed queueable then easiest
					// approach is to bump up the limit
									
				max_mb_sem.releaseGroup( mb_diff - max_mb_queued );
				
				max_mb_queued	= mb_diff;
			}
			
			requests_queued++;
			
			if ( requests_queued >= next_request_num_log ){
				
				//System.out.println( "DAC:" + name + ": requests = " + requests_queued );
				
				next_request_num_log += REQUEST_NUM_LOG_CHUNK;
			}
			
			if ( request_bytes_queued >= next_request_byte_log ){
				
				//System.out.println( "DAC:" + name + ": bytes = " + request_bytes_queued );
				
				next_request_byte_log += REQUEST_BYTE_LOG_CHUNK;
			}
		}
		
		if ( mb_diff > 0 ){
			
			max_mb_sem.reserveGroup( mb_diff );
		}
	
public longgetTotalAggregatedBytes()

		return( total_aggregated_bytes );
	
protected longgetTotalAggregatedRequests()

		return( total_aggregated_requests_made );
	
public longgetTotalBytes()

		return( total_bytes );
	
protected longgetTotalRequests()

		return( total_requests );
	
public longgetTotalSingleBytes()

		return( total_single_bytes );
	
protected longgetTotalSingleRequests()

		return( total_single_requests_made );
	
public static voidmain(java.lang.String[] args)

		final groupSemaphore	sem = new groupSemaphore( 9 );
		
		for (int i=0;i<10;i++){
			
			new Thread()
			{
				public void
				run()
				{
					int	count = 0;
					
					while( true ){
						
						int	group =RandomUtils.generateRandomIntUpto( 10 );
						
						System.out.println( Thread.currentThread().getName() + " reserving " + group );
						
						sem.reserveGroup( group );
						
						try{
							Thread.sleep(5 + RandomUtils.generateRandomIntUpto(5));
							
						}catch( Throwable e ){
						}
												
						sem.releaseGroup( group );
					
						count++;
						
						if ( count %100 == 0 ){
							
							System.out.println( Thread.currentThread().getName() + ": " + count + " ops" );
						}
					}
				}
			}.start();
		}
	
protected voidqueueRequest(DiskAccessRequestImpl request)

		requestDispatcher	dispatcher;
		
		if ( dispatchers.length == 1 ){

			dispatcher = dispatchers[0];
			
		}else{
			
			synchronized( torrent_dispatcher_map ){
				
				long	now = System.currentTimeMillis();
				
				boolean	check = false;
				
				if ( now - last_check > 60000 || now < last_check ){
					
					check		= true;
					last_check	= now;
				}
				
				if ( check ){
					
					Iterator	it = torrent_dispatcher_map.values().iterator();
					
					while( it.hasNext()){
						
						requestDispatcher	d = (requestDispatcher)it.next();
						
						long	last_active = d.getLastRequestTime();
						
						if ( now - last_active > 60000 ){
													
							it.remove();
							
						}else if ( now < last_active ){
							
							d.setLastRequestTime( now );
						}
					}
				}
				
				TOTorrent	torrent = request.getFile().getTorrentFile().getTorrent();
						
				dispatcher = (requestDispatcher)torrent_dispatcher_map.get(torrent);			
	
				if ( dispatcher == null ){
					
					int	min_index 	= 0;
					int	min_size	= Integer.MAX_VALUE;
					
					for (int i=0;i<dispatchers.length;i++){
						
						int	size = dispatchers[i].size();
						
						if ( size == 0 ){
							
							min_index = i;
							
							break;
						}
						
						if ( size < min_size ){
							
							min_size 	= size;
							min_index	= i;
						}
					}
					
					dispatcher = dispatchers[min_index];
					
					torrent_dispatcher_map.put( torrent, dispatcher );
				}
				
				dispatcher.setLastRequestTime( now );
			}
		}
		
		dispatcher.queue( request );
	
protected voidreleaseSpaceAllowance(DiskAccessRequestImpl request)

		int	mb_diff;
		
		synchronized( torrent_dispatcher_map ){
			
			int	old_mb = (int)(request_bytes_queued/(1024*1024));
			
			request_bytes_queued -= request.getSize();
							
			int	new_mb = (int)(request_bytes_queued/(1024*1024));
		
			mb_diff = old_mb - new_mb;
			
			requests_queued--;
		}
		
		if ( mb_diff > 0 ){
			
			max_mb_sem.releaseGroup( mb_diff );
		}