FileDocCategorySizeDatePackage
IncomingMessageQueueImpl.javaAPI DocAzureus 3.0.3.48109Mon Jun 25 16:48:32 BST 2007com.aelitis.azureus.core.networkmanager.impl

IncomingMessageQueueImpl

public class IncomingMessageQueueImpl extends Object implements com.aelitis.azureus.core.networkmanager.IncomingMessageQueue
Inbound peer message queue.

Fields Summary
private volatile ArrayList
listeners
private final AEMonitor
listeners_mon
private MessageStreamDecoder
stream_decoder
private final com.aelitis.azureus.core.networkmanager.NetworkConnection
connection
Constructors Summary
public IncomingMessageQueueImpl(MessageStreamDecoder stream_decoder, com.aelitis.azureus.core.networkmanager.NetworkConnection connection)
Create a new incoming message queue.

param
stream_decoder default message stream decoder
param
connection owner to read from


  
                       
         
    this.connection = connection;
    this.stream_decoder = stream_decoder;
  
Methods Summary
public voidcancelQueueListener(MessageQueueListener listener)
Cancel queue event notification listener.

param
listener

    try{  listeners_mon.enter();
      //copy-on-write
      ArrayList new_list = new ArrayList( listeners );
      new_list.remove( listener );
      listeners = new_list;
    }
    finally{  listeners_mon.exit();  }
  
public voiddestroy()
Destroy this queue.

    stream_decoder.destroy();
  
public MessageStreamDecodergetDecoder()

	  return( stream_decoder );
  
public intgetPercentDoneOfCurrentMessage()
Get the percentage of the current message that has already been received.

return
percentage complete (0-99), or -1 if no message is currently being received

    return stream_decoder.getPercentDoneOfCurrentMessage();
  
public voidnotifyOfExternallyReceivedMessage(Message message)
Notifty the queue (and its listeners) of a message received externally on the queue's behalf.

param
message received externally

    ArrayList listeners_ref = listeners;  //copy-on-write
    boolean handled = false;

    DirectByteBuffer[] dbbs = message.getData();
    int size = 0;
    for( int i=0; i < dbbs.length; i++ ) {
      size += dbbs[i].remaining( DirectByteBuffer.SS_NET );
    }
    
    
    for( int x=0; x < listeners_ref.size(); x++ ) {
      MessageQueueListener mql = (MessageQueueListener)listeners_ref.get( x );
      handled = handled || mql.messageReceived( message );
      
      if( message.getType() == Message.TYPE_DATA_PAYLOAD ) {
        mql.dataBytesReceived( size );
      }
      else {
        mql.protocolBytesReceived( size );
      }
    }
    
    if( !handled ) {
      if( listeners_ref.size() > 0 ) {
        System.out.println( "no registered listeners [out of " +listeners_ref.size()+ "] handled decoded message [" +message.getDescription()+ "]" );
      }
      
      DirectByteBuffer[] buffs = message.getData();
      for( int x=0; x < buffs.length; x++ ) {
        buffs[ x ].returnToPool();
      }
    }
  
public intreceiveFromTransport(int max_bytes)
Receive (read) message(s) data from the underlying transport.

param
max_bytes to read
return
number of bytes received
throws
IOException on receive error

    if( max_bytes < 1 ) {
      Debug.out( "max_bytes < 1: " +max_bytes );
      return 0;
    }
    
    if( listeners.isEmpty() ) {
      Debug.out( "no queue listeners registered!" );
      throw new IOException( "no queue listeners registered!" );
    }
    
    int bytes_read;
    
    try{
	    	//perform decode op
    	
	    bytes_read = stream_decoder.performStreamDecode( connection.getTransport(), max_bytes );
    
    }catch( RuntimeException e ){
    	
    	Debug.out( "Stream decode for " + connection.getString() + " failed: " + Debug.getNestedExceptionMessageAndStack(e));
    	
    	throw( e );
    }
    
    //check if anything was decoded and notify listeners if so
    Message[] messages = stream_decoder.removeDecodedMessages();
    if( messages != null ) {
      for( int i=0; i < messages.length; i++ ) {
        Message msg = messages[ i ];
        
        if( msg == null ) {
        	System.out.println( "received msg == null [messages.length=" +messages.length+ ", #" +i+ "]: " +connection.getTransport().getDescription() );
        	continue;
        }
        
        ArrayList listeners_ref = listeners;  //copy-on-write
        boolean handled = false;
        
        for( int x=0; x < listeners_ref.size(); x++ ) {
          MessageQueueListener mql = (MessageQueueListener)listeners_ref.get( x );
          handled = handled || mql.messageReceived( msg );
        }
        
        if( !handled ) {
          if( listeners_ref.size() > 0 ) {
            System.out.println( "no registered listeners [out of " +listeners_ref.size()+ "] handled decoded message [" +msg.getDescription()+ "]" );
          }
          
          DirectByteBuffer[] buffs = msg.getData();
          for( int x=0; x < buffs.length; x++ ) {
            buffs[ x ].returnToPool();
          }
        }
      }
    }
    
    int protocol_read = stream_decoder.getProtocolBytesDecoded();
    if( protocol_read > 0 ) {
      ArrayList listeners_ref = listeners;  //copy-on-write
      for( int i=0; i < listeners_ref.size(); i++ ) {
        MessageQueueListener mql = (MessageQueueListener)listeners_ref.get( i );
        mql.protocolBytesReceived( protocol_read );
      }
    }
    
    int data_read = stream_decoder.getDataBytesDecoded();
    if( data_read > 0 ) {
      ArrayList listeners_ref = listeners;  //copy-on-write
      for( int i=0; i < listeners_ref.size(); i++ ) {
        MessageQueueListener mql = (MessageQueueListener)listeners_ref.get( i );
        mql.dataBytesReceived( data_read );
      }
    }
    
    return bytes_read;   
  
public voidregisterQueueListener(MessageQueueListener listener)
Add a listener to be notified of queue events.

param
listener

    try{  listeners_mon.enter();
      //copy-on-write
      ArrayList new_list = new ArrayList( listeners.size() + 1 );
      new_list.addAll( listeners );
      new_list.add( listener );
      listeners = new_list;
    }
    finally{  listeners_mon.exit();  }
  
public voidresumeQueueProcessing()
Manually resume processing (reading) incoming messages. NOTE: Allows us to resume docoding externally, in case it was auto-paused internally.

    stream_decoder.resumeDecoding();
  
public voidsetDecoder(MessageStreamDecoder new_stream_decoder)
Set the message stream decoder that will be used to decode incoming messages.

param
new_stream_decoder to use

    ByteBuffer already_read = stream_decoder.destroy();
    connection.getTransport().setAlreadyRead( already_read );
    stream_decoder = new_stream_decoder;
    stream_decoder.resumeDecoding();