FileDocCategorySizeDatePackage
DMWriterImpl.javaAPI DocAzureus 3.0.3.417599Thu Jan 25 14:37:22 GMT 2007org.gudy.azureus2.core3.disk.impl.access.impl

DMWriterImpl

public class DMWriterImpl extends Object implements DMWriter
author
parg

Fields Summary
private static final LogIDs
LOGID
private static final int
MIN_ZERO_BLOCK
private org.gudy.azureus2.core3.disk.impl.DiskManagerHelper
disk_manager
private com.aelitis.azureus.core.diskmanager.access.DiskAccessController
disk_access
private int
async_writes
private Set
write_requests
private AESemaphore
async_write_sem
private boolean
started
private volatile boolean
stopped
private int
pieceLength
private long
totalLength
private boolean
complete_recheck_in_progress
private AEMonitor
this_mon
Constructors Summary
public DMWriterImpl(org.gudy.azureus2.core3.disk.impl.DiskManagerHelper _disk_manager)

		
	
	
			 
	
		disk_manager	= _disk_manager;
		disk_access		= disk_manager.getDiskAccessController();
		
		pieceLength		= disk_manager.getPieceLength();
		totalLength		= disk_manager.getTotalLength();	
	
Methods Summary
public DiskManagerWriteRequestcreateWriteRequest(int pieceNumber, int offset, DirectByteBuffer buffer, java.lang.Object user_data)

		return( new DiskManagerWriteRequestImpl( pieceNumber, offset, buffer, user_data ));
	
public booleanhasOutstandingWriteRequestForPiece(int piece_number)

		try{
			this_mon.enter();

			Iterator	it = write_requests.iterator();
			
			while( it.hasNext()){
				
				DiskManagerWriteRequest	request = (DiskManagerWriteRequest)it.next();
				
				if ( request.getPieceNumber() == piece_number ){
					
					return( true );
				}
			}
			
			return( false );
			
		}finally{
			
			this_mon.exit();
		}
	
public booleanisChecking()

	   return( complete_recheck_in_progress );
	
public voidstart()

		try{
			this_mon.enter();

			if ( started ){
				
				throw( new RuntimeException( "DMWWriter: start while started"));
			}
			
			if ( stopped ){
				
				throw( new RuntimeException( "DMWWriter: start after stopped"));
			}

			started	= true;
 			
		}finally{
			
			this_mon.exit();
		}
	
public voidstop()

		int write_wait;
		
		try{
			this_mon.enter();

			if ( stopped || !started ){
			
				return;
			}
								
			stopped	= true;
         			
			write_wait	= async_writes;
			
		}finally{
			
			this_mon.exit();
		}		
		
			// wait for writes
		
		long	log_time 		= SystemTime.getCurrentTime();
		
		for (int i=0;i<write_wait;i++){
			
			long	now = SystemTime.getCurrentTime();

			if ( now < log_time ){
				
				log_time = now;
				
			}else{
				
				if ( now - log_time > 1000 ){
					
					log_time	= now;
					
					if ( Logger.isEnabled()){
						
						Logger.log(new LogEvent(disk_manager, LOGID, "Waiting for writes to complete - " + (write_wait-i) + " remaining" ));
					}
				}
			}
					
			async_write_sem.reserve();
		}	
	
public voidwriteBlock(DiskManagerWriteRequest request, DiskManagerWriteRequestListener _listener)

		request.requestStarts();
		
		final DiskManagerWriteRequestListener	listener = 
			new DiskManagerWriteRequestListener()
			{
				public void 
				writeCompleted( 
					DiskManagerWriteRequest 	request )
				{					
					request.requestEnds( true );

					_listener.writeCompleted( request );
				}
				  
				public void 
				writeFailed( 
					DiskManagerWriteRequest 	request, 
					Throwable		 			cause )
				{					
					request.requestEnds( false );

					_listener.writeFailed( request, cause );
				}
			};
		
		try{
			int					pieceNumber	= request.getPieceNumber();
			DirectByteBuffer	buffer		= request.getBuffer();
			int					offset		= request.getOffset();
			
				//Do not allow to write in a piece marked as done. we can get here if
				
			final DiskManagerPiece	dmPiece = disk_manager.getPieces()[pieceNumber];
			
			if ( dmPiece.isDone()){
				
				// Debug.out( "write: piece already done (" + request.getPieceNumber() + "/" + request.getOffset());

				buffer.returnToPool();

				listener.writeCompleted( request ); //XXX: no writing was done; is this neccesary for complete()?
				
			}else{
				
				int	buffer_position = buffer.position(DirectByteBuffer.SS_DW);
				int buffer_limit	= buffer.limit(DirectByteBuffer.SS_DW);
				
				// final long	write_length = buffer_limit - buffer_position;
				
				int previousFilesLength = 0;
				
				int currentFile = 0;
				
				DMPieceList pieceList = disk_manager.getPieceList(pieceNumber);
				
				DMPieceMapEntry current_piece = pieceList.get(currentFile);
				
				long fileOffset = current_piece.getOffset();
				
				while ((previousFilesLength + current_piece.getLength()) < offset) {
					
					previousFilesLength += current_piece.getLength();
					
					currentFile++;
					
					fileOffset = 0;
					
					current_piece = pieceList.get(currentFile);
				}
		
				List	chunks = new ArrayList();
				
					// Now current_piece points to the first file that contains data for this block
				
				while ( buffer_position < buffer_limit ){
						
					current_piece = pieceList.get(currentFile);
	
					long file_limit = buffer_position + 
										((current_piece.getFile().getLength() - current_piece.getOffset()) - 
											(offset - previousFilesLength));
		       
					if ( file_limit > buffer_limit ){
						
						file_limit	= buffer_limit;
					}
						
						// could be a zero-length file
					
					if ( file_limit > buffer_position ){
	
						long	file_pos = fileOffset + (offset - previousFilesLength);
						
						chunks.add( 
								new Object[]{ current_piece.getFile(),
								new Long( file_pos ),
								new Integer((int)file_limit )});
											
						buffer_position = (int)file_limit;
					}
					
					currentFile++;
					
					fileOffset = 0;
					
					previousFilesLength = offset;
				}
				
				
				DiskManagerWriteRequestListener	l = 
					new DiskManagerWriteRequestListener()
					{
						public void 
						writeCompleted( 
							DiskManagerWriteRequest 	request ) 
						{
							complete();
							 
							listener.writeCompleted( request );
						}
						  
						public void 
						writeFailed( 
							DiskManagerWriteRequest 	request, 
							Throwable		 			cause )
						{
							complete();
							  
							if ( dmPiece.isDone()){

									// There's a small chance of us ending up writing the same block twice around
									// the time that a file completes and gets toggled to read-only which then
									// fails with a non-writeable-channel exception
								
								// Debug.out( "writeFailed: piece already done (" + request.getPieceNumber() + "/" + request.getOffset() + "/" + write_length );
								
								if ( Logger.isEnabled()){
									
									Logger.log(new LogEvent(disk_manager, LOGID, "Piece " + dmPiece.getPieceNumber() + " write failed but already marked as done" ));
								}
								
								listener.writeCompleted( request );
								
							}else{
								
								disk_manager.setFailed( "Disk write error - " + Debug.getNestedExceptionMessage(cause));
								
								Debug.printStackTrace( cause );
								
								listener.writeFailed( request, cause );
							}
						}
						  
						protected void
						complete()
						{
							try{
								this_mon.enter();
								
								async_writes--;
								  
								if ( !write_requests.remove( request )){
									
									Debug.out( "request not found" );
								}
								
								if ( stopped ){
									  
									async_write_sem.release();
								}
							}finally{
								  
								this_mon.exit();
							}
						}
					};
	
				try{
					this_mon.enter();
					
					if ( stopped ){
						
						buffer.returnToPool();
						
						listener.writeFailed( request, new Exception( "Disk writer has been stopped" ));
						
						return;
						
					}else{
					
						async_writes++;
						
						write_requests.add( request );
					}
					
				}finally{
					
					this_mon.exit();
				}
									
				new requestDispatcher( request, l, buffer, chunks );
			}
		}catch( Throwable e ){
						
			request.getBuffer().returnToPool();
			
			disk_manager.setFailed( "Disk write error - " + Debug.getNestedExceptionMessage(e));
			
			Debug.printStackTrace( e );
			
			listener.writeFailed( request, e );
		}
	
public booleanzeroFile(org.gudy.azureus2.core3.disk.impl.DiskManagerFileInfoImpl file, long length)

		CacheFile	cache_file = file.getCacheFile();
		
		try{
			if( length == 0 ){ //create a zero-length file if it is listed in the torrent
				
				cache_file.setLength( 0 );
				
			}else{
				int	buffer_size = pieceLength < MIN_ZERO_BLOCK?MIN_ZERO_BLOCK:pieceLength;
				
				buffer_size	= ((buffer_size+1023)/1024)*1024;
				
		        DirectByteBuffer	buffer = DirectByteBufferPool.getBuffer(DirectByteBuffer.AL_DM_ZERO,buffer_size);
		    
		        long remainder	= length;
				long written 	= 0;
				
		        try{	
		        	final byte[]	blanks = new byte[1024];
		        	
					for (int i = 0; i < buffer_size/1024; i++ ){
						
						buffer.put(DirectByteBuffer.SS_DW, blanks );
					}
					
					buffer.position(DirectByteBuffer.SS_DW, 0);

					while ( remainder > 0 && !stopped ){
						
						int	write_size = buffer_size;
						
						if ( remainder < write_size ){
            	
							write_size = (int)remainder;
           
							buffer.limit(DirectByteBuffer.SS_DW, write_size);
						}
						
						final AESemaphore	sem = new AESemaphore( "DMW&C:zeroFile" );
						final Throwable[]	op_failed = {null};
						
						disk_access.queueWriteRequest(
								cache_file,
								written,
								buffer,
								false,
								new DiskAccessRequestListener()
								{
									public void
									requestComplete(
										DiskAccessRequest	request )
									{
										sem.release();
									}
									
									public void
									requestCancelled(
										DiskAccessRequest	request )
									{
										op_failed[0] = new Throwable( "Request cancelled" );
										
										sem.release();
									}
									
									public void
									requestFailed(
										DiskAccessRequest	request,
										Throwable			cause )
									{
										op_failed[0]	= cause;
										
										sem.release();
									}
									
									public int
									getPriority()
									{
										return( -1 );
									}
									
									public void 
									requestExecuted(long bytes) 
									{							
									}
								});
           
						sem.reserve();
						
						if ( op_failed[0] != null ){
							
							throw( op_failed[0] );
						}
						
						buffer.position(DirectByteBuffer.SS_DW, 0);
            
						written 	+= write_size;
						remainder 	-= write_size;
						
						disk_manager.setAllocated( disk_manager.getAllocated() + write_size );
            
						disk_manager.setPercentDone((int) ((disk_manager.getAllocated() * 1000) / totalLength));
					}
		        }finally{
		        	
		        	buffer.returnToPool();
		        }
		        
		        cache_file.flushCache();
			}
			
			if ( stopped ){
										   
				return false;
			}
		}catch ( Throwable e){ 
			
			Debug.printStackTrace( e );
			
			return( false );
		}
			
		return true;