DMWriterImplpublic class DMWriterImpl extends Object implements DMWriter
Fields Summary |
---|
private static final LogIDs | LOGID | private static final int | MIN_ZERO_BLOCK | private org.gudy.azureus2.core3.disk.impl.DiskManagerHelper | disk_manager | private com.aelitis.azureus.core.diskmanager.access.DiskAccessController | disk_access | private int | async_writes | private Set | write_requests | private AESemaphore | async_write_sem | private boolean | started | private volatile boolean | stopped | private int | pieceLength | private long | totalLength | private boolean | complete_recheck_in_progress | private AEMonitor | this_mon |
Constructors Summary |
---|
public DMWriterImpl(org.gudy.azureus2.core3.disk.impl.DiskManagerHelper _disk_manager)
disk_manager = _disk_manager;
disk_access = disk_manager.getDiskAccessController();
pieceLength = disk_manager.getPieceLength();
totalLength = disk_manager.getTotalLength();
|
Methods Summary |
---|
public DiskManagerWriteRequest | createWriteRequest(int pieceNumber, int offset, DirectByteBuffer buffer, java.lang.Object user_data)
return( new DiskManagerWriteRequestImpl( pieceNumber, offset, buffer, user_data ));
| public boolean | hasOutstandingWriteRequestForPiece(int piece_number)
try{
this_mon.enter();
Iterator it = write_requests.iterator();
while( it.hasNext()){
DiskManagerWriteRequest request = (DiskManagerWriteRequest)it.next();
if ( request.getPieceNumber() == piece_number ){
return( true );
}
}
return( false );
}finally{
this_mon.exit();
}
| public boolean | isChecking()
return( complete_recheck_in_progress );
| public void | start()
try{
this_mon.enter();
if ( started ){
throw( new RuntimeException( "DMWWriter: start while started"));
}
if ( stopped ){
throw( new RuntimeException( "DMWWriter: start after stopped"));
}
started = true;
}finally{
this_mon.exit();
}
| public void | stop()
int write_wait;
try{
this_mon.enter();
if ( stopped || !started ){
return;
}
stopped = true;
write_wait = async_writes;
}finally{
this_mon.exit();
}
// wait for writes
long log_time = SystemTime.getCurrentTime();
for (int i=0;i<write_wait;i++){
long now = SystemTime.getCurrentTime();
if ( now < log_time ){
log_time = now;
}else{
if ( now - log_time > 1000 ){
log_time = now;
if ( Logger.isEnabled()){
Logger.log(new LogEvent(disk_manager, LOGID, "Waiting for writes to complete - " + (write_wait-i) + " remaining" ));
}
}
}
async_write_sem.reserve();
}
| public void | writeBlock(DiskManagerWriteRequest request, DiskManagerWriteRequestListener _listener)
request.requestStarts();
final DiskManagerWriteRequestListener listener =
new DiskManagerWriteRequestListener()
{
public void
writeCompleted(
DiskManagerWriteRequest request )
{
request.requestEnds( true );
_listener.writeCompleted( request );
}
public void
writeFailed(
DiskManagerWriteRequest request,
Throwable cause )
{
request.requestEnds( false );
_listener.writeFailed( request, cause );
}
};
try{
int pieceNumber = request.getPieceNumber();
DirectByteBuffer buffer = request.getBuffer();
int offset = request.getOffset();
//Do not allow to write in a piece marked as done. we can get here if
final DiskManagerPiece dmPiece = disk_manager.getPieces()[pieceNumber];
if ( dmPiece.isDone()){
// Debug.out( "write: piece already done (" + request.getPieceNumber() + "/" + request.getOffset());
buffer.returnToPool();
listener.writeCompleted( request ); //XXX: no writing was done; is this neccesary for complete()?
}else{
int buffer_position = buffer.position(DirectByteBuffer.SS_DW);
int buffer_limit = buffer.limit(DirectByteBuffer.SS_DW);
// final long write_length = buffer_limit - buffer_position;
int previousFilesLength = 0;
int currentFile = 0;
DMPieceList pieceList = disk_manager.getPieceList(pieceNumber);
DMPieceMapEntry current_piece = pieceList.get(currentFile);
long fileOffset = current_piece.getOffset();
while ((previousFilesLength + current_piece.getLength()) < offset) {
previousFilesLength += current_piece.getLength();
currentFile++;
fileOffset = 0;
current_piece = pieceList.get(currentFile);
}
List chunks = new ArrayList();
// Now current_piece points to the first file that contains data for this block
while ( buffer_position < buffer_limit ){
current_piece = pieceList.get(currentFile);
long file_limit = buffer_position +
((current_piece.getFile().getLength() - current_piece.getOffset()) -
(offset - previousFilesLength));
if ( file_limit > buffer_limit ){
file_limit = buffer_limit;
}
// could be a zero-length file
if ( file_limit > buffer_position ){
long file_pos = fileOffset + (offset - previousFilesLength);
chunks.add(
new Object[]{ current_piece.getFile(),
new Long( file_pos ),
new Integer((int)file_limit )});
buffer_position = (int)file_limit;
}
currentFile++;
fileOffset = 0;
previousFilesLength = offset;
}
DiskManagerWriteRequestListener l =
new DiskManagerWriteRequestListener()
{
public void
writeCompleted(
DiskManagerWriteRequest request )
{
complete();
listener.writeCompleted( request );
}
public void
writeFailed(
DiskManagerWriteRequest request,
Throwable cause )
{
complete();
if ( dmPiece.isDone()){
// There's a small chance of us ending up writing the same block twice around
// the time that a file completes and gets toggled to read-only which then
// fails with a non-writeable-channel exception
// Debug.out( "writeFailed: piece already done (" + request.getPieceNumber() + "/" + request.getOffset() + "/" + write_length );
if ( Logger.isEnabled()){
Logger.log(new LogEvent(disk_manager, LOGID, "Piece " + dmPiece.getPieceNumber() + " write failed but already marked as done" ));
}
listener.writeCompleted( request );
}else{
disk_manager.setFailed( "Disk write error - " + Debug.getNestedExceptionMessage(cause));
Debug.printStackTrace( cause );
listener.writeFailed( request, cause );
}
}
protected void
complete()
{
try{
this_mon.enter();
async_writes--;
if ( !write_requests.remove( request )){
Debug.out( "request not found" );
}
if ( stopped ){
async_write_sem.release();
}
}finally{
this_mon.exit();
}
}
};
try{
this_mon.enter();
if ( stopped ){
buffer.returnToPool();
listener.writeFailed( request, new Exception( "Disk writer has been stopped" ));
return;
}else{
async_writes++;
write_requests.add( request );
}
}finally{
this_mon.exit();
}
new requestDispatcher( request, l, buffer, chunks );
}
}catch( Throwable e ){
request.getBuffer().returnToPool();
disk_manager.setFailed( "Disk write error - " + Debug.getNestedExceptionMessage(e));
Debug.printStackTrace( e );
listener.writeFailed( request, e );
}
| public boolean | zeroFile(org.gudy.azureus2.core3.disk.impl.DiskManagerFileInfoImpl file, long length)
CacheFile cache_file = file.getCacheFile();
try{
if( length == 0 ){ //create a zero-length file if it is listed in the torrent
cache_file.setLength( 0 );
}else{
int buffer_size = pieceLength < MIN_ZERO_BLOCK?MIN_ZERO_BLOCK:pieceLength;
buffer_size = ((buffer_size+1023)/1024)*1024;
DirectByteBuffer buffer = DirectByteBufferPool.getBuffer(DirectByteBuffer.AL_DM_ZERO,buffer_size);
long remainder = length;
long written = 0;
try{
final byte[] blanks = new byte[1024];
for (int i = 0; i < buffer_size/1024; i++ ){
buffer.put(DirectByteBuffer.SS_DW, blanks );
}
buffer.position(DirectByteBuffer.SS_DW, 0);
while ( remainder > 0 && !stopped ){
int write_size = buffer_size;
if ( remainder < write_size ){
write_size = (int)remainder;
buffer.limit(DirectByteBuffer.SS_DW, write_size);
}
final AESemaphore sem = new AESemaphore( "DMW&C:zeroFile" );
final Throwable[] op_failed = {null};
disk_access.queueWriteRequest(
cache_file,
written,
buffer,
false,
new DiskAccessRequestListener()
{
public void
requestComplete(
DiskAccessRequest request )
{
sem.release();
}
public void
requestCancelled(
DiskAccessRequest request )
{
op_failed[0] = new Throwable( "Request cancelled" );
sem.release();
}
public void
requestFailed(
DiskAccessRequest request,
Throwable cause )
{
op_failed[0] = cause;
sem.release();
}
public int
getPriority()
{
return( -1 );
}
public void
requestExecuted(long bytes)
{
}
});
sem.reserve();
if ( op_failed[0] != null ){
throw( op_failed[0] );
}
buffer.position(DirectByteBuffer.SS_DW, 0);
written += write_size;
remainder -= write_size;
disk_manager.setAllocated( disk_manager.getAllocated() + write_size );
disk_manager.setPercentDone((int) ((disk_manager.getAllocated() * 1000) / totalLength));
}
}finally{
buffer.returnToPool();
}
cache_file.flushCache();
}
if ( stopped ){
return false;
}
}catch ( Throwable e){
Debug.printStackTrace( e );
return( false );
}
return true;
|
|