OutgoingBTPieceMessageHandlerpublic class OutgoingBTPieceMessageHandler extends Object Front-end manager for handling requested outgoing bittorrent Piece messages.
Peers often make piece requests in batch, with multiple requests always
outstanding, all of which won't necessarily be honored (i.e. we choke them),
so we don't want to waste time reading in the piece data from disk ahead
of time for all the requests. Thus, we only want to perform read-aheads for a
small subset of the requested data at any given time, which is what this handler
does, before passing the messages onto the outgoing message queue for transmission. |
Fields Summary |
---|
private final org.gudy.azureus2.core3.peer.PEPeer | peer | private final com.aelitis.azureus.core.networkmanager.OutgoingMessageQueue | outgoing_message_queue | private byte | piece_version | private final LinkedList | requests | private final ArrayList | loading_messages | private final HashMap | queued_messages | private final AEMonitor | lock_mon | private boolean | destroyed | private int | request_read_ahead | private OutgoingBTPieceMessageHandlerAdapter | adapter | private final DiskManagerReadRequestListener | read_req_listener | private final OutgoingMessageQueue.MessageQueueListener | sent_message_listener |
Methods Summary |
---|
public void | addPieceRequest(int piece_number, int piece_offset, int length)Register a new piece data request.
if( destroyed ) return;
DiskManagerReadRequest dmr = peer.getManager().getDiskManager().createReadRequest( piece_number, piece_offset, length );
try{
lock_mon.enter();
requests.addLast( dmr );
}finally{
lock_mon.exit();
}
doReadAheadLoads();
| public void | destroy()
try{
lock_mon.enter();
removeAllPieceRequests();
queued_messages.clear();
destroyed = true;
}
finally{
lock_mon.exit();
}
| private void | doReadAheadLoads()
List to_submit = null;
try{
lock_mon.enter();
while( loading_messages.size() + queued_messages.size() < request_read_ahead && !requests.isEmpty() && !destroyed ) {
DiskManagerReadRequest dmr = (DiskManagerReadRequest)requests.removeFirst();
loading_messages.add( dmr );
if( to_submit == null ) to_submit = new ArrayList();
to_submit.add( dmr );
}
}finally{
lock_mon.exit();
}
/*
if ( peer.getIp().equals( "64.71.5.2")){
TimeFormatter.milliTrace( "obt read_ahead: -> " + (to_submit==null?0:to_submit.size()) +
" [lo=" + loading_messages.size() + ",qm=" + queued_messages.size() + ",re=" + requests.size() + ",rl=" + request_read_ahead + "]");
}
*/
if ( to_submit != null ){
for (int i=0;i<to_submit.size();i++){
peer.getManager().getAdapter().enqueueReadRequest( peer, (DiskManagerReadRequest)to_submit.get(i), read_req_listener );
}
}
| public int | getRequestCount()
return( queued_messages.size() + loading_messages.size() + requests.size());
| public int[] | getRequestedPieceNumbers()Get a list of piece numbers being requested
if( destroyed ) return new int[0];
/** Cheap hack to reduce (but not remove all) the # of duplicate entries */
int iLastNumber = -1;
int pos = 0;
int[] pieceNumbers;
try {
lock_mon.enter();
// allocate max size needed (we'll shrink it later)
pieceNumbers = new int[queued_messages.size() + loading_messages.size() + requests.size()];
for (Iterator iter = queued_messages.keySet().iterator(); iter.hasNext();) {
BTPiece msg = (BTPiece) iter.next();
if (iLastNumber != msg.getPieceNumber()) {
iLastNumber = msg.getPieceNumber();
pieceNumbers[pos++] = iLastNumber;
}
}
for (Iterator iter = loading_messages.iterator(); iter.hasNext();) {
DiskManagerReadRequest dmr = (DiskManagerReadRequest) iter.next();
if (iLastNumber != dmr.getPieceNumber()) {
iLastNumber = dmr.getPieceNumber();
pieceNumbers[pos++] = iLastNumber;
}
}
for (Iterator iter = requests.iterator(); iter.hasNext();) {
DiskManagerReadRequest dmr = (DiskManagerReadRequest) iter.next();
if (iLastNumber != dmr.getPieceNumber()) {
iLastNumber = dmr.getPieceNumber();
pieceNumbers[pos++] = iLastNumber;
}
}
} finally {
lock_mon.exit();
}
int[] trimmed = new int[pos];
System.arraycopy(pieceNumbers, 0, trimmed, 0, pos);
return trimmed;
| public boolean | isStalledPendingLoad()
return( queued_messages.size() == 0 && loading_messages.size() > 0 );
| public void | removeAllPieceRequests()Remove all outstanding piece data requests.
if( destroyed ) return;
try{
lock_mon.enter();
// removed this trace as Alon can't remember why the trace is here anyway and as far as I can
// see there's nothing to stop a piece being delivered to transport and removed from
// the message queue before we're notified of this and thus it is entirely possible that
// our view of queued messages is lagging.
// String before_trace = outgoing_message_queue.getQueueTrace();
/*
int num_queued = queued_messages.size();
int num_removed = 0;
for( Iterator i = queued_messages.keySet().iterator(); i.hasNext(); ) {
BTPiece msg = (BTPiece)i.next();
if( outgoing_message_queue.removeMessage( msg, true ) ) {
i.remove();
num_removed++;
}
}
if( num_removed < num_queued -2 ) {
Debug.out( "num_removed[" +num_removed+ "] < num_queued[" +num_queued+ "]:\nBEFORE:\n" +before_trace+ "\nAFTER:\n" +outgoing_message_queue.getQueueTrace() );
}
*/
for( Iterator i = queued_messages.keySet().iterator(); i.hasNext(); ) {
BTPiece msg = (BTPiece)i.next();
outgoing_message_queue.removeMessage( msg, true );
}
queued_messages.clear(); // this replaces stuff above
requests.clear();
loading_messages.clear();
}
finally{
lock_mon.exit();
}
outgoing_message_queue.doListenerNotifications();
| public void | removePieceRequest(int piece_number, int piece_offset, int length)Remove an outstanding piece data request.
if( destroyed ) return;
DiskManagerReadRequest dmr = peer.getManager().getDiskManager().createReadRequest( piece_number, piece_offset, length );
try{
lock_mon.enter();
if( requests.contains( dmr ) ) {
requests.remove( dmr );
return;
}
if( loading_messages.contains( dmr ) ) {
loading_messages.remove( dmr );
return;
}
for( Iterator i = queued_messages.entrySet().iterator(); i.hasNext(); ) {
Map.Entry entry = (Map.Entry)i.next();
if( entry.getValue().equals( dmr ) ) { //it's already been queued
BTPiece msg = (BTPiece)entry.getKey();
if( outgoing_message_queue.removeMessage( msg, true ) ) {
i.remove();
}
break; //do manual listener notify
}
}
}
finally{
lock_mon.exit();
}
outgoing_message_queue.doListenerNotifications();
| public void | setPieceVersion(byte version)
piece_version = version;
| public void | setRequestReadAhead(int num_to_read_ahead)
request_read_ahead = num_to_read_ahead;
|
|