FileDocCategorySizeDatePackage
OutgoingBTPieceMessageHandler.javaAPI DocAzureus 3.0.3.412108Sun Mar 04 21:08:16 GMT 2007com.aelitis.azureus.core.peermanager.utils

OutgoingBTPieceMessageHandler

public class OutgoingBTPieceMessageHandler extends Object
Front-end manager for handling requested outgoing bittorrent Piece messages. Peers often make piece requests in batch, with multiple requests always outstanding, all of which won't necessarily be honored (i.e. we choke them), so we don't want to waste time reading in the piece data from disk ahead of time for all the requests. Thus, we only want to perform read-aheads for a small subset of the requested data at any given time, which is what this handler does, before passing the messages onto the outgoing message queue for transmission.

Fields Summary
private final org.gudy.azureus2.core3.peer.PEPeer
peer
private final com.aelitis.azureus.core.networkmanager.OutgoingMessageQueue
outgoing_message_queue
private byte
piece_version
private final LinkedList
requests
private final ArrayList
loading_messages
private final HashMap
queued_messages
private final AEMonitor
lock_mon
private boolean
destroyed
private int
request_read_ahead
private OutgoingBTPieceMessageHandlerAdapter
adapter
private final DiskManagerReadRequestListener
read_req_listener
private final OutgoingMessageQueue.MessageQueueListener
sent_message_listener
Constructors Summary
public OutgoingBTPieceMessageHandler(org.gudy.azureus2.core3.peer.PEPeer _peer, com.aelitis.azureus.core.networkmanager.OutgoingMessageQueue _outgoing_message_q, OutgoingBTPieceMessageHandlerAdapter _adapter, byte _piece_version)
Create a new handler for outbound piece messages, reading piece data from the given disk manager and transmitting the messages out the given message queue.

param
disk_manager
param
outgoing_message_q


                                  
   
   
										
	 					
		
										  
  
	peer					= _peer;
    outgoing_message_queue 	= _outgoing_message_q;
    adapter					= _adapter;
    piece_version			= _piece_version;
    
    outgoing_message_queue.registerQueueListener( sent_message_listener );
  
Methods Summary
public voidaddPieceRequest(int piece_number, int piece_offset, int length)
Register a new piece data request.

param
piece_number
param
piece_offset
param
length

  
 
  
                 
            
    if( destroyed )  return;
         
    DiskManagerReadRequest dmr = peer.getManager().getDiskManager().createReadRequest( piece_number, piece_offset, length );

    try{
      lock_mon.enter();
    
      requests.addLast( dmr );
      
    }finally{
      lock_mon.exit();
    }
    
    doReadAheadLoads();
  
public voiddestroy()

    try{
      lock_mon.enter();
  
      removeAllPieceRequests();
      
      queued_messages.clear();
      
      destroyed = true;
    }
    finally{
      lock_mon.exit();
    }
  
private voiddoReadAheadLoads()

  	List	to_submit = null;
  	try{
  		lock_mon.enter();
		
  		while( loading_messages.size() + queued_messages.size() < request_read_ahead && !requests.isEmpty() && !destroyed ) {
  			DiskManagerReadRequest dmr = (DiskManagerReadRequest)requests.removeFirst();
  			loading_messages.add( dmr );  			
  			if( to_submit == null )  to_submit = new ArrayList();
  			to_submit.add( dmr );
  		}	
    }finally{
    	lock_mon.exit();
    }
    
    /*
	if ( peer.getIp().equals( "64.71.5.2")){

		TimeFormatter.milliTrace( "obt read_ahead: -> " + (to_submit==null?0:to_submit.size()) + 
				" [lo=" + loading_messages.size() + ",qm=" + queued_messages.size() + ",re=" + requests.size() + ",rl=" + request_read_ahead + "]");		
	}
	*/
    
    if ( to_submit != null ){
    	for (int i=0;i<to_submit.size();i++){
    		peer.getManager().getAdapter().enqueueReadRequest( peer, (DiskManagerReadRequest)to_submit.get(i), read_req_listener );
    	}
    }
  
public intgetRequestCount()

		return( queued_messages.size()	+ loading_messages.size() + requests.size());
	
public int[]getRequestedPieceNumbers()
Get a list of piece numbers being requested

return
list of Long values

		if( destroyed )  return new int[0];
		
		/** Cheap hack to reduce (but not remove all) the # of duplicate entries */
		int iLastNumber = -1;
		int pos = 0;		
		int[] pieceNumbers;
	
		try {
			lock_mon.enter();			

			// allocate max size needed (we'll shrink it later)
			pieceNumbers = new int[queued_messages.size()	+ loading_messages.size() + requests.size()];

			for (Iterator iter = queued_messages.keySet().iterator(); iter.hasNext();) {
				BTPiece msg = (BTPiece) iter.next();
				if (iLastNumber != msg.getPieceNumber()) {
					iLastNumber = msg.getPieceNumber();
					pieceNumbers[pos++] = iLastNumber;
				}
			}

			for (Iterator iter = loading_messages.iterator(); iter.hasNext();) {
				DiskManagerReadRequest dmr = (DiskManagerReadRequest) iter.next();
				if (iLastNumber != dmr.getPieceNumber()) {
					iLastNumber = dmr.getPieceNumber();
					pieceNumbers[pos++] = iLastNumber;
				}
			}

			for (Iterator iter = requests.iterator(); iter.hasNext();) {
				DiskManagerReadRequest dmr = (DiskManagerReadRequest) iter.next();
				if (iLastNumber != dmr.getPieceNumber()) {
					iLastNumber = dmr.getPieceNumber();
					pieceNumbers[pos++] = iLastNumber;
				}
			}
			
		} finally {
			lock_mon.exit();
		}

		int[] trimmed = new int[pos];
		System.arraycopy(pieceNumbers, 0, trimmed, 0, pos);

		return trimmed;		
	
public booleanisStalledPendingLoad()

		return( queued_messages.size() == 0 && loading_messages.size() > 0 );
	
public voidremoveAllPieceRequests()
Remove all outstanding piece data requests.

  	if( destroyed )  return;
  	
    try{
      lock_mon.enter();
      
      // removed this trace as Alon can't remember why the trace is here anyway and as far as I can
      // see there's nothing to stop a piece being delivered to transport and removed from
      // the message queue before we're notified of this and thus it is entirely possible that
      // our view of queued messages is lagging.
      // String before_trace = outgoing_message_queue.getQueueTrace();     
      /*
      int num_queued = queued_messages.size();
      int num_removed = 0;
      
      for( Iterator i = queued_messages.keySet().iterator(); i.hasNext(); ) {
        BTPiece msg = (BTPiece)i.next();
        if( outgoing_message_queue.removeMessage( msg, true ) ) {
          i.remove();
          num_removed++;
        }
      }
      
      if( num_removed < num_queued -2 ) {
        Debug.out( "num_removed[" +num_removed+ "] < num_queued[" +num_queued+ "]:\nBEFORE:\n" +before_trace+ "\nAFTER:\n" +outgoing_message_queue.getQueueTrace() );		
      }
      */
      
      for( Iterator i = queued_messages.keySet().iterator(); i.hasNext(); ) {
          BTPiece msg = (BTPiece)i.next();
          outgoing_message_queue.removeMessage( msg, true );
      }
      
      queued_messages.clear();	// this replaces stuff above 
      requests.clear();
      loading_messages.clear();
    }
    finally{
      lock_mon.exit();
    }
    
    outgoing_message_queue.doListenerNotifications();
  
public voidremovePieceRequest(int piece_number, int piece_offset, int length)
Remove an outstanding piece data request.

param
piece_number
param
piece_offset
param
length

  	if( destroyed )  return;
  	
    DiskManagerReadRequest dmr = peer.getManager().getDiskManager().createReadRequest( piece_number, piece_offset, length );
    
    try{
      lock_mon.enter();
    
      if( requests.contains( dmr ) ) {
        requests.remove( dmr );
        return;
      }
      
      if( loading_messages.contains( dmr ) ) {
        loading_messages.remove( dmr );
        return;
      }
      
      
      for( Iterator i = queued_messages.entrySet().iterator(); i.hasNext(); ) {
        Map.Entry entry = (Map.Entry)i.next();
        if( entry.getValue().equals( dmr ) ) {  //it's already been queued
          BTPiece msg = (BTPiece)entry.getKey();
          if( outgoing_message_queue.removeMessage( msg, true ) ) {
            i.remove();
          }
          break;  //do manual listener notify
        }
      }
    }
    finally{
      lock_mon.exit();
    }
    
    outgoing_message_queue.doListenerNotifications();
  
public voidsetPieceVersion(byte version)

	  piece_version = version;
  
public voidsetRequestReadAhead(int num_to_read_ahead)

    request_read_ahead = num_to_read_ahead;