DMReaderImplpublic class DMReaderImpl extends Object implements DMReader
Fields Summary |
---|
private static final LogIDs | LOGID | private org.gudy.azureus2.core3.disk.impl.DiskManagerHelper | disk_manager | private com.aelitis.azureus.core.diskmanager.access.DiskAccessController | disk_access | private int | async_reads | private AESemaphore | async_read_sem | private boolean | started | private boolean | stopped | protected AEMonitor | this_mon |
Methods Summary |
---|
public DiskManagerReadRequest | createRequest(int pieceNumber, int offset, int length)
return( new DiskManagerReadRequestImpl( pieceNumber, offset, length ));
| public DirectByteBuffer | readBlock(int pieceNumber, int offset, int length)
DiskManagerReadRequest request = createRequest( pieceNumber, offset, length );
final AESemaphore sem = new AESemaphore( "DMReader:readBlock" );
final DirectByteBuffer[] result = {null};
readBlock(
request,
new DiskManagerReadRequestListener()
{
public void
readCompleted(
DiskManagerReadRequest request,
DirectByteBuffer data )
{
result[0] = data;
sem.release();
}
public void
readFailed(
DiskManagerReadRequest request,
Throwable cause )
{
sem.release();
}
public int
getPriority()
{
return( -1 );
}
public void
requestExecuted(long bytes)
{
}
});
sem.reserve();
return( result[0] );
| public void | readBlock(DiskManagerReadRequest request, DiskManagerReadRequestListener _listener)
request.requestStarts();
final DiskManagerReadRequestListener listener =
new DiskManagerReadRequestListener()
{
public void
readCompleted(
DiskManagerReadRequest request,
DirectByteBuffer data )
{
request.requestEnds( true );
_listener.readCompleted( request, data );
}
public void
readFailed(
DiskManagerReadRequest request,
Throwable cause )
{
request.requestEnds( false );
_listener.readFailed( request, cause );
}
public int
getPriority()
{
return( _listener.getPriority());
}
public void
requestExecuted(long bytes)
{
_listener.requestExecuted( bytes );
}
};
DirectByteBuffer buffer = null;
try{
int length = request.getLength();
buffer = DirectByteBufferPool.getBuffer( DirectByteBuffer.AL_DM_READ,length );
if ( buffer == null ) { // Fix for bug #804874
Debug.out("DiskManager::readBlock:: ByteBufferPool returned null buffer");
listener.readFailed( request, new Exception( "Out of memory" ));
return;
}
int pieceNumber = request.getPieceNumber();
int offset = request.getOffset();
DMPieceList pieceList = disk_manager.getPieceList(pieceNumber);
// temporary fix for bug 784306
if ( pieceList.size() == 0 ){
Debug.out("no pieceList entries for " + pieceNumber);
listener.readCompleted( request, buffer );
return;
}
long previousFilesLength = 0;
int currentFile = 0;
long fileOffset = pieceList.get(0).getOffset();
while (currentFile < pieceList.size() && pieceList.getCumulativeLengthToPiece(currentFile) < offset) {
previousFilesLength = pieceList.getCumulativeLengthToPiece(currentFile);
currentFile++;
fileOffset = 0;
}
// update the offset (we're in the middle of a file)
fileOffset += offset - previousFilesLength;
List chunks = new ArrayList();
int buffer_position = 0;
while ( buffer_position < length && currentFile < pieceList.size()) {
DMPieceMapEntry map_entry = pieceList.get( currentFile );
int length_available = map_entry.getLength() - (int)( fileOffset - map_entry.getOffset());
//explicitly limit the read size to the proper length, rather than relying on the underlying file being correctly-sized
//see long DMWriterAndCheckerImpl::checkPiece note
int entry_read_limit = buffer_position + length_available;
// now bring down to the required read length if this is shorter than this
// chunk of data
entry_read_limit = Math.min( length, entry_read_limit );
// this chunk denotes a read up to buffer offset "entry_read_limit"
chunks.add( new Object[]{ map_entry.getFile().getCacheFile(), new Long(fileOffset), new Integer( entry_read_limit )});
buffer_position = entry_read_limit;
currentFile++;
fileOffset = 0;
}
if ( chunks.size() == 0 ){
Debug.out("no chunk reads for " + pieceNumber);
listener.readCompleted( request, buffer );
return;
}
// this is where we go async and need to start counting requests for the sake
// of shutting down tidily
DiskManagerReadRequestListener l =
new DiskManagerReadRequestListener()
{
public void
readCompleted(
DiskManagerReadRequest request,
DirectByteBuffer data )
{
complete();
listener.readCompleted( request, data );
}
public void
readFailed(
DiskManagerReadRequest request,
Throwable cause )
{
complete();
listener.readFailed( request, cause );
}
public int
getPriority()
{
return( _listener.getPriority());
}
public void
requestExecuted(long bytes)
{
_listener.requestExecuted( bytes );
}
protected void
complete()
{
try{
this_mon.enter();
async_reads--;
if ( stopped ){
async_read_sem.release();
}
}finally{
this_mon.exit();
}
}
};
try{
this_mon.enter();
if ( stopped ){
buffer.returnToPool();
listener.readFailed( request, new Exception( "Disk reader has been stopped" ));
return;
}
async_reads++;
}finally{
this_mon.exit();
}
new requestDispatcher( request, l, buffer, chunks );
}catch( Throwable e ){
if ( buffer != null ){
buffer.returnToPool();
}
disk_manager.setFailed( "Disk read error - " + Debug.getNestedExceptionMessage(e));
Debug.printStackTrace( e );
listener.readFailed( request, e );
}
| public void | start()
try{
this_mon.enter();
if ( started ){
throw( new RuntimeException("can't start twice" ));
}
if ( stopped ){
throw( new RuntimeException("already been stopped" ));
}
started = true;
}finally{
this_mon.exit();
}
| public void | stop()
int read_wait;
try{
this_mon.enter();
if ( stopped || !started ){
return;
}
stopped = true;
read_wait = async_reads;
}finally{
this_mon.exit();
}
long log_time = SystemTime.getCurrentTime();
for (int i=0;i<read_wait;i++){
long now = SystemTime.getCurrentTime();
if ( now < log_time ){
log_time = now;
}else{
if ( now - log_time > 1000 ){
log_time = now;
if ( Logger.isEnabled()){
Logger.log(new LogEvent(disk_manager, LOGID, "Waiting for reads to complete - " + (read_wait-i) + " remaining" ));
}
}
}
async_read_sem.reserve();
}
|
|