FileDocCategorySizeDatePackage
DMReaderImpl.javaAPI DocAzureus 3.0.3.413971Thu Jan 25 14:37:22 GMT 2007org.gudy.azureus2.core3.disk.impl.access.impl

DMReaderImpl

public class DMReaderImpl extends Object implements DMReader
author
parg

Fields Summary
private static final LogIDs
LOGID
private org.gudy.azureus2.core3.disk.impl.DiskManagerHelper
disk_manager
private com.aelitis.azureus.core.diskmanager.access.DiskAccessController
disk_access
private int
async_reads
private AESemaphore
async_read_sem
private boolean
started
private boolean
stopped
protected AEMonitor
this_mon
Constructors Summary
public DMReaderImpl(org.gudy.azureus2.core3.disk.impl.DiskManagerHelper _disk_manager)

	
	
	
				 
	
		disk_manager	= _disk_manager;
		
		disk_access		= disk_manager.getDiskAccessController();
	
Methods Summary
public DiskManagerReadRequestcreateRequest(int pieceNumber, int offset, int length)

		return( new DiskManagerReadRequestImpl( pieceNumber, offset, length ));
	
public DirectByteBufferreadBlock(int pieceNumber, int offset, int length)

		DiskManagerReadRequest	request = createRequest( pieceNumber, offset, length );
		
		final AESemaphore	sem = new AESemaphore( "DMReader:readBlock" );
		
		final DirectByteBuffer[]	result = {null};
		
		readBlock( 
				request,
				new DiskManagerReadRequestListener()
				{
					  public void 
					  readCompleted( 
					  		DiskManagerReadRequest 	request, 
							DirectByteBuffer 		data )
					  {
						  result[0]	= data;
						  
						  sem.release();
					  }
					  
					  public void 
					  readFailed( 
					  		DiskManagerReadRequest 	request, 
							Throwable		 		cause )
					  {
						  sem.release();
					  }
					  
					  public int
					  getPriority()
					  {
						  return( -1 );
					  }
					  
					  public void 
					  requestExecuted(long bytes) 
					  {								
					  }
				});
		
		sem.reserve();
		
		return( result[0] );
	
public voidreadBlock(DiskManagerReadRequest request, DiskManagerReadRequestListener _listener)

		request.requestStarts();
		
		final DiskManagerReadRequestListener	listener = 
			new DiskManagerReadRequestListener()
			{
				public void 
				readCompleted( 
						DiskManagerReadRequest 	request, 
						DirectByteBuffer 		data )
				{				  
					request.requestEnds( true );
	
					_listener.readCompleted( request, data );
				}
	
				public void 
				readFailed( 
						DiskManagerReadRequest 	request, 
						Throwable		 		cause )
				{ 
					request.requestEnds( false );
	
					_listener.readFailed( request, cause );
				}
	
				public int
				getPriority()
				{
					return( _listener.getPriority());
				}
				public void 
				requestExecuted(long bytes) 
				{
					_listener.requestExecuted( bytes );									
				}
			};
			
		DirectByteBuffer buffer	= null;
		
		try{
			int	length		= request.getLength();
	
			buffer = DirectByteBufferPool.getBuffer( DirectByteBuffer.AL_DM_READ,length );
	
			if ( buffer == null ) { // Fix for bug #804874
				
				Debug.out("DiskManager::readBlock:: ByteBufferPool returned null buffer");
				
				listener.readFailed( request, new Exception( "Out of memory" ));
				
				return;
			}
	
			int	pieceNumber	= request.getPieceNumber();
			int	offset		= request.getOffset();
			
			DMPieceList pieceList = disk_manager.getPieceList(pieceNumber);
	
				// temporary fix for bug 784306
			
			if ( pieceList.size() == 0 ){
				
				Debug.out("no pieceList entries for " + pieceNumber);
				
				listener.readCompleted( request, buffer );
				
				return;
			}
	
			long previousFilesLength = 0;
			
			int currentFile = 0;
			
			long fileOffset = pieceList.get(0).getOffset();
			
			while (currentFile < pieceList.size() && pieceList.getCumulativeLengthToPiece(currentFile) < offset) {
				
				previousFilesLength = pieceList.getCumulativeLengthToPiece(currentFile);
				
				currentFile++;
				
				fileOffset = 0;
			}
	
				// update the offset (we're in the middle of a file)
			
			fileOffset += offset - previousFilesLength;
			
			List	chunks = new ArrayList();
			
			int	buffer_position = 0;
			
			while ( buffer_position < length && currentFile < pieceList.size()) {
	     
				DMPieceMapEntry map_entry = pieceList.get( currentFile );
	      			
				int	length_available = map_entry.getLength() - (int)( fileOffset - map_entry.getOffset());
				
					//explicitly limit the read size to the proper length, rather than relying on the underlying file being correctly-sized
					//see long DMWriterAndCheckerImpl::checkPiece note
				
				int entry_read_limit = buffer_position + length_available;
				
					// now bring down to the required read length if this is shorter than this
					// chunk of data
				
				entry_read_limit = Math.min( length, entry_read_limit );
				
					// this chunk denotes a read up to buffer offset "entry_read_limit"
				
				chunks.add( new Object[]{ map_entry.getFile().getCacheFile(), new Long(fileOffset), new Integer( entry_read_limit )});
				
				buffer_position = entry_read_limit;
	      
				currentFile++;
				
				fileOffset = 0;
			}
	
			if ( chunks.size() == 0 ){
				
				Debug.out("no chunk reads for " + pieceNumber);
					
				listener.readCompleted( request, buffer );
				
				return;
			}
			
				// this is where we go async and need to start counting requests for the sake
				// of shutting down tidily
			
			DiskManagerReadRequestListener	l = 
				new DiskManagerReadRequestListener()
				{
					 public void 
					  readCompleted( 
					  		DiskManagerReadRequest 	request, 
							DirectByteBuffer 		data )
					 {
						 complete();
						 
						 listener.readCompleted( request, data );
					 }
					  
					  public void 
					  readFailed( 
					  		DiskManagerReadRequest 	request, 
							Throwable		 		cause )
					  {
						  complete();
						  
						  listener.readFailed( request, cause );
					  }
					  
					  public int
					  getPriority()
					  {
						  return( _listener.getPriority());
					  }
					  
					  public void 
					  requestExecuted(long bytes) 
					  {
						  _listener.requestExecuted( bytes );									
					  }
					  
					  protected void
					  complete()
					  {
						  try{
							  this_mon.enter();
							
							  async_reads--;
							  
							  if ( stopped ){
								  
								  async_read_sem.release();
							  }
						  }finally{
							  
							  this_mon.exit();
						  }
					  }
				};
			
			try{
				this_mon.enter();
				
				if ( stopped ){
				
					buffer.returnToPool();
					
					listener.readFailed( request, new Exception( "Disk reader has been stopped" ));
					
					return;
				}
				
				async_reads++;
				
			}finally{
				
				this_mon.exit();
			}
			
			new requestDispatcher( request, l, buffer, chunks );

		}catch( Throwable e ){
			
			if ( buffer != null ){
				
				buffer.returnToPool();
			}
			
			disk_manager.setFailed( "Disk read error - " + Debug.getNestedExceptionMessage(e));
			
			Debug.printStackTrace( e );
			
			listener.readFailed( request, e );
		}
	
public voidstart()

		try{
			this_mon.enter();
		
			if ( started ){
				
				throw( new RuntimeException("can't start twice" ));
			}
			
			if ( stopped ){
				
				throw( new RuntimeException("already been stopped" ));
			}
		
			started	= true;
			
		}finally{
			
			this_mon.exit();
		}
	
public voidstop()

		int	read_wait;
		
		try{
			this_mon.enter();
		
			if ( stopped || !started ){
				
				return;
			}
			
			stopped	= true;
			
			read_wait	= async_reads;
			
		}finally{
			
			this_mon.exit();
		}
		
		long	log_time 		= SystemTime.getCurrentTime();

		for (int i=0;i<read_wait;i++){
			
			long	now = SystemTime.getCurrentTime();

			if ( now < log_time ){
				
				log_time = now;
				
			}else{
				
				if ( now - log_time > 1000 ){
					
					log_time	= now;
					
					if ( Logger.isEnabled()){
						
						Logger.log(new LogEvent(disk_manager, LOGID, "Waiting for reads to complete - " + (read_wait-i) + " remaining" ));
					}
				}
			}
			
			async_read_sem.reserve();
		}