FileDocCategorySizeDatePackage
OutgoingMessageQueueImpl.javaAPI DocAzureus 3.0.3.426256Wed Jul 18 15:16:30 BST 2007com.aelitis.azureus.core.networkmanager.impl

OutgoingMessageQueueImpl

public class OutgoingMessageQueueImpl extends Object implements com.aelitis.azureus.core.networkmanager.OutgoingMessageQueue
Priority-based outbound peer message queue.

Fields Summary
private final LinkedList
queue
private final AEMonitor
queue_mon
private final ArrayList
delayed_notifications
private final AEMonitor
delayed_notifications_mon
private volatile ArrayList
listeners
private final AEMonitor
listeners_mon
private int
total_size
private com.aelitis.azureus.core.networkmanager.RawMessage
urgent_message
private boolean
destroyed
private MessageStreamEncoder
stream_encoder
private com.aelitis.azureus.core.networkmanager.Transport
transport
private int
percent_complete
private static final boolean
TRACE_HISTORY
private static final int
MAX_HISTORY_TRACES
private final LinkedList
prev_sent
private boolean
trace
Constructors Summary
public OutgoingMessageQueueImpl(MessageStreamEncoder stream_encoder)
Create a new outgoing message queue.

param
stream_encoder default message encoder

  
                
       
    this.stream_encoder = stream_encoder;
  
Methods Summary
public voidaddMessage(Message message, boolean manual_listener_notify)
Add a message to the message queue. NOTE: Allows for manual listener notification at some later time, using doListenerNotifications(), instead of notifying immediately from within this method. This is useful if you want to invoke listeners outside of some greater synchronised block to avoid deadlock.

param
message message to add
param
manual_listener_notify true for manual notification, false for automatic

    //do message add notifications
    boolean allowed = true;
    ArrayList list_ref = listeners;
    
    for( int i=0; i < list_ref.size(); i++ ) {
      MessageQueueListener listener = (MessageQueueListener)list_ref.get( i );
      allowed = allowed && listener.messageAdded( message );
    }
    
    if( !allowed ) {  //message addition not allowed
      //LGLogger.log( "Message [" +message.getDescription()+ "] not allowed for queueing, message addition skipped." );
      //message.destroy();  //TODO destroy????
      return;
    }
    
    
    RawMessage[] rmesgs = stream_encoder.encodeMessage( message );
    
    if( destroyed ) {  //queue is shutdown, drop any added messages
      for (int i=0;i<rmesgs.length;i++){
    	  rmesgs[i].destroy();
      }
      return;
    }
    
    for (int i=0;i<rmesgs.length;i++){
    	
    	RawMessage rmesg = rmesgs[i];
    	
	    removeMessagesOfType( rmesg.messagesToRemove(), manual_listener_notify );
	    
	    try{
	      queue_mon.enter();
	    
	      int pos = 0;
	      for( Iterator it = queue.iterator(); it.hasNext(); ) {
	        RawMessage msg = (RawMessage)it.next();
	        if( rmesg.getPriority() > msg.getPriority() 
	          && msg.getRawData()[0].position(DirectByteBuffer.SS_NET) == 0 ) {  //but don't insert in front of a half-sent message
	          break;
	        }
	        pos++;
	      }
	      if( rmesg.isNoDelay() ) {
	        urgent_message = rmesg;
	      }
	      queue.add( pos, rmesg );
	      
	      DirectByteBuffer[] payload = rmesg.getRawData();
	      for( int j=0; j < payload.length; j++ ) {
	        total_size += payload[j].remaining(DirectByteBuffer.SS_NET);
	      }
	    }finally{
	      queue_mon.exit();
	    }
	    
	    if( manual_listener_notify ) {  //register listener event for later, manual notification
	      NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_ADDED );
	      item.message = rmesg;
	      try {
	        delayed_notifications_mon.enter();
	        
	        delayed_notifications.add( item );
	      }
	      finally {
	        delayed_notifications_mon.exit();
	      }
	    }
	    else { //do listener notification now
	      ArrayList listeners_ref = listeners;
	    
	      for( int j=0; j < listeners_ref.size(); j++ ) {
	        MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( j );
	        listener.messageQueued( rmesg.getBaseMessage() );
	      }
	    }
    }
  
public voidcancelQueueListener(MessageQueueListener listener)
Cancel queue event notification listener.

param
listener

    try{  listeners_mon.enter();
      //copy-on-write
      ArrayList new_list = new ArrayList( listeners );
      new_list.remove( listener );
      listeners = new_list;
    }
    finally{  listeners_mon.exit();  }
  
public intdeliverToTransport(int max_bytes, boolean manual_listener_notify)
Deliver (write) message(s) data to the underlying transport. NOTE: Allows for manual listener notification at some later time, using doListenerNotifications(), instead of notifying immediately from within this method. This is useful if you want to invoke listeners outside of some greater synchronised block to avoid deadlock.

param
max_bytes maximum number of bytes to deliver
param
manual_listener_notify true for manual notification, false for automatic
return
number of bytes delivered
throws
IOException on delivery error

    
	  if( max_bytes < 1 ) {
		  Debug.out( "max_bytes < 1: " +max_bytes );
		  return 0;
	  }

	  if ( transport == null ){
		  throw( new IOException( "not ready to deliver data" ));
	  }
	  int data_written = 0;
	  int protocol_written = 0;

	  ArrayList messages_sent = null;

	  try{
		  queue_mon.enter();

		  if( !queue.isEmpty() ){
			  
			  int buffer_limit 		= 256;
			  
			  ByteBuffer[] 	raw_buffers 	= new ByteBuffer[buffer_limit];
			  int[]		 	orig_positions	= new int[buffer_limit];
			  int			buffer_count	= 0;
			  
			  int total_sofar = 0;

outer:
			  for( Iterator i = queue.iterator(); i.hasNext(); ){
				  
				  DirectByteBuffer[] payloads = ((RawMessage)i.next()).getRawData();

				  for( int x=0; x < payloads.length; x++ ){
					  
					  ByteBuffer buff = payloads[x].getBuffer( DirectByteBuffer.SS_NET );
					  
					  raw_buffers[buffer_count] = buff;
					  
					  orig_positions[buffer_count] = buff.position();
					  
					  total_sofar += buff.remaining();

					  buffer_count++;
					  
					  if ( total_sofar >= max_bytes ){
						
						  break outer;
					  }
					  
					  if ( buffer_count == buffer_limit ) {
							
						  int	new_buffer_limit	= buffer_limit * 2;
						  
						  ByteBuffer[] 	new_raw_buffers 	= new ByteBuffer[new_buffer_limit];
						  int[]		 	new_orig_positions	= new int[new_buffer_limit];
						  
						  System.arraycopy( raw_buffers, 0, new_raw_buffers, 0, buffer_limit );
						  System.arraycopy( orig_positions, 0, new_orig_positions, 0, buffer_limit );
						  
						  raw_buffers 		= new_raw_buffers;
						  orig_positions	= new_orig_positions;
						  
						  buffer_limit 		= new_buffer_limit;
					  }
				  }
			  }

			  ByteBuffer last_buff = (ByteBuffer)raw_buffers[buffer_count - 1 ];
			  
			  int orig_last_limit = last_buff.limit();
			  
			  if ( total_sofar > max_bytes ){
				  
				  last_buff.limit( orig_last_limit - (total_sofar - max_bytes) );
			  }

			  transport.write( raw_buffers, 0, buffer_count );

			  last_buff.limit( orig_last_limit );

			  int pos = 0;
			  boolean stop = false;

			  while( !queue.isEmpty() && !stop ) {
				  RawMessage msg = (RawMessage)queue.get( 0 );
				  DirectByteBuffer[] payloads = msg.getRawData();

				  for( int x=0; x < payloads.length; x++ ) {
					  ByteBuffer bb = payloads[x].getBuffer( DirectByteBuffer.SS_NET );

					  int bytes_written = (bb.limit() - bb.remaining()) - orig_positions[ pos ];
					  total_size -= bytes_written;

					  if( x > 0 && msg.getType() == Message.TYPE_DATA_PAYLOAD ) {  //assumes the first buffer is message header
						  data_written += bytes_written;
					  }
					  else {
						  protocol_written += bytes_written;
					  }

					  if( bb.hasRemaining() ) {  //still data left to send in this message
						  stop = true;  //so don't bother checking later messages for completion

						  //compute send percentage
						  int message_size = 0;
						  int written = 0;

						  for( int i=0; i < payloads.length; i++ ) {
							  ByteBuffer buff = payloads[i].getBuffer( DirectByteBuffer.SS_NET );

							  message_size += buff.limit();

							  if( i < x ) {  //if in front of non-empty buffer
								  written += buff.limit();
							  }
							  else if( i == x ) {  //is non-empty buffer
								  written += buff.position();
							  }
						  }

						  percent_complete = (written * 100) / message_size;

						  break;
					  }
					  else if( x == payloads.length - 1 ) {  //last payload buffer of message is empty
						  if( msg == urgent_message ) urgent_message = null;

						  queue.remove( 0 );


						  if( TRACE_HISTORY ) {
							  prev_sent.addLast( msg );
							  if( prev_sent.size() > MAX_HISTORY_TRACES )  prev_sent.removeFirst();
						  }


						  percent_complete = -1;  //reset send percentage

						  if( manual_listener_notify ) {
							  NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_SENT );
							  item.message = msg;
							  try {  delayed_notifications_mon.enter();
							  delayed_notifications.add( item );
							  } finally {  delayed_notifications_mon.exit();  }
						  }
						  else {
							  if( messages_sent == null ) {
								  messages_sent = new ArrayList();
							  }
							  messages_sent.add( msg );
						  }
					  }

					  pos++;
					  if( pos >= buffer_count ) {
						  stop = true;
						  break;
					  }
				  }
			  }
		  }
	  }finally{
		  queue_mon.exit();
	  }

	  // we can have messages that end up getting serialised as 0 bytes (for http
	  // connections for example) - we still need to notify them of being sent...

	  if( data_written + protocol_written > 0 || messages_sent != null ) {

		  if ( trace ){
			  TimeFormatter.milliTrace( "omq:deliver: " + (data_written + protocol_written) + ", q=" + queue.size() + "/" + total_size );
		  }

		  if( manual_listener_notify ) {

			  if( data_written > 0 ) {  //data bytes notify
				  NotificationItem item = new NotificationItem( NotificationItem.DATA_BYTES_SENT );
				  item.byte_count = data_written;
				  try {
					  delayed_notifications_mon.enter();

					  delayed_notifications.add( item );
				  }
				  finally {
					  delayed_notifications_mon.exit();
				  }
			  }

			  if( protocol_written > 0 ) {  //protocol bytes notify
				  NotificationItem item = new NotificationItem( NotificationItem.PROTOCOL_BYTES_SENT );
				  item.byte_count = protocol_written;
				  try {
					  delayed_notifications_mon.enter();

					  delayed_notifications.add( item );
				  }
				  finally {
					  delayed_notifications_mon.exit();
				  }
			  }
		  }
		  else {  //do listener notification now
			  ArrayList listeners_ref = listeners;

			  int num_listeners = listeners_ref.size();
			  for( int i=0; i < num_listeners; i++ ) {
				  MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );

				  if( data_written > 0 )  listener.dataBytesSent( data_written );
				  if( protocol_written > 0 )  listener.protocolBytesSent( protocol_written );

				  if ( messages_sent != null ){

					  for( int x=0; x < messages_sent.size(); x++ ) {
						  RawMessage msg = (RawMessage)messages_sent.get( x );

						  listener.messageSent( msg.getBaseMessage() );

						  if( i == num_listeners - 1 ) {  //the last listener notification, so destroy
							  msg.destroy();
						  }
					  }
				  }
			  }
		  }
	  }else{
		  if ( trace ){
			  TimeFormatter.milliTrace( "omq:deliver: 0, q=" + queue.size() + "/" + total_size );
		  }
	  }

	  return data_written + protocol_written;
  
public voiddestroy()
Destroy this queue; i.e. perform cleanup actions.

    destroyed = true;
    try{
      queue_mon.enter();
    
      while( !queue.isEmpty() ) {
      	((RawMessage)queue.remove( 0 )).destroy();
      }
    }finally{
      queue_mon.exit();
    }
    total_size = 0;
  
public voiddoListenerNotifications()
Manually send any unsent listener notifications.

    ArrayList notifications_copy;
    try {
      delayed_notifications_mon.enter();
      
      if( delayed_notifications.size() == 0 )  return;
      notifications_copy = new ArrayList( delayed_notifications );
      delayed_notifications.clear();
    }
    finally {
      delayed_notifications_mon.exit();
    }
    
    ArrayList listeners_ref = listeners;
    
    for( int j=0; j < notifications_copy.size(); j++ ) {  //for each notification
      NotificationItem item = (NotificationItem)notifications_copy.get( j );

      switch( item.type ) {
        case NotificationItem.MESSAGE_ADDED:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.messageQueued( item.message.getBaseMessage() );
          }
          break;
          
        case NotificationItem.MESSAGE_REMOVED:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.messageRemoved( item.message.getBaseMessage() );
          }
          item.message.destroy();
          break;
          
        case NotificationItem.MESSAGE_SENT:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.messageSent( item.message.getBaseMessage() );
          }
          item.message.destroy();
          break;
          
        case NotificationItem.PROTOCOL_BYTES_SENT:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.protocolBytesSent( item.byte_count );
          }
          break;
          
        case NotificationItem.DATA_BYTES_SENT:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.dataBytesSent( item.byte_count );
          }
          break;
          
        default:
          Debug.out( "NotificationItem.type unknown :" + item.type );
      }
    }
  
public MessageStreamEncodergetEncoder()

	  return( stream_encoder );
  
public intgetMssSize()

	  return( transport==null?NetworkManager.getMinMssSize():transport.getMssSize());
  
public intgetPercentDoneOfCurrentMessage()
Get the percentage of the current message that has already been sent out.

return
percentage complete (0-99), or -1 if no message is currently being sent

    return percent_complete;
  
public java.lang.StringgetQueueTrace()

  	StringBuffer trace = new StringBuffer();
  	
  	trace.append( "**** OUTGOING QUEUE TRACE ****\n" );
  	
  	try{
      queue_mon.enter();
      
      
      int i=0;
    	
    	for( Iterator it = prev_sent.iterator(); it.hasNext(); ) {
    		RawMessage raw = (RawMessage)it.next();
        trace.append( "[#h" +i+ "]: ")
             .append(raw.getID())
             .append(" [")
             .append(raw.getDescription())
             .append("]")
             .append("\n" );
        i++;
    	}      
      
      

      int position = queue.size() - 1;

      for( Iterator it = queue.iterator(); it.hasNext(); ) {
        RawMessage raw = (RawMessage)it.next();
        
        int pos = raw.getRawData()[0].position(DirectByteBuffer.SS_NET);
        int length = raw.getRawData()[0].limit( DirectByteBuffer.SS_NET );
        
        trace.append( "[#")
             .append(position)
             .append(" ")
             .append(pos)
             .append(":")
             .append(length)
             .append("]: ")
             .append(raw.getID())
             .append(" [")
             .append(raw.getDescription())
             .append("]")
             .append("\n" );
        
        position--;
      }
    }
  	finally{
      queue_mon.exit();
    }
  	
  	return trace.toString();
  
public intgetTotalSize()
Get the total number of bytes ready to be transported.

return
total bytes remaining

  return total_size;  
public booleanhasUrgentMessage()
Whether or not an urgent message (one that needs an immediate send, i.e. a no-delay message) is queued.

return
true if there's a message tagged for immediate write

  return urgent_message == null ? false : true;  
public booleanisDestroyed()

	  return( destroyed );
  
public voidnotifyOfExternallySentMessage(Message message)
Notifty the queue (and its listeners) of a message sent externally on the queue's behalf.

param
message sent externally

    ArrayList listeners_ref = listeners;

    DirectByteBuffer[] buffs = message.getData();
    int size = 0;
    for( int i=0; i < buffs.length; i++ ) {
      size += buffs[i].remaining( DirectByteBuffer.SS_NET );
    }
    
    for( int i=0; i < listeners_ref.size(); i++ ) {
      MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );

      listener.messageSent( message );
      
      if( message.getType() == Message.TYPE_DATA_PAYLOAD ) {
        listener.dataBytesSent( size );
      }
      else {
        listener.protocolBytesSent( size );
      }
    }
    
    //System.out.println( "notifiedOfExternallySentMessage:: [" +message.getID()+ "] size=" +size );
    
  
public voidregisterQueueListener(MessageQueueListener listener)
Add a listener to be notified of queue events.

param
listener

    try{  listeners_mon.enter();
      //copy-on-write
      ArrayList new_list = new ArrayList( listeners.size() + 1 );
      new_list.addAll( listeners );
      new_list.add( listener );
      listeners = new_list;
    }
    finally{  listeners_mon.exit();  }
  
public booleanremoveMessage(Message message, boolean manual_listener_notify)
Remove a particular message from the queue. NOTE: Only the original message found in the queue will be destroyed upon removal, which may not necessarily be the one passed as the method parameter, as some messages override equals() (i.e. BTRequest messages) instead of using reference equality, and could be a completely different object, and would need to be destroyed manually. If the message does not override equals, then any such method will likely *not* be found and removed, as internal queued object was a new allocation on insertion. NOTE: Allows for manual listener notification at some later time, using doListenerNotifications(), instead of notifying immediately from within this method. This is useful if you want to invoke listeners outside of some greater synchronised block to avoid deadlock.

param
message to remove
param
manual_listener_notify true for manual notification, false for automatic
return
true if the message was removed, false otherwise

    RawMessage msg_removed = null;
    
    try{
      queue_mon.enter();

      for( Iterator it = queue.iterator(); it.hasNext(); ) {
        RawMessage raw = (RawMessage)it.next();
        
        if( message.equals( raw.getBaseMessage() ) ) {
          if( raw.getRawData()[0].position(DirectByteBuffer.SS_NET) == 0 ) {  //dont remove a half-sent message
            if( raw == urgent_message ) urgent_message = null;  
            
            DirectByteBuffer[] payload = raw.getRawData();
            for( int x=0; x < payload.length; x++ ) {
              total_size -= payload[x].remaining(DirectByteBuffer.SS_NET);
            }

            queue.remove( raw );
            msg_removed = raw;
          }
          
          break;
        }
      }
    }finally{
      queue_mon.exit();
    }
    
    
    if( msg_removed != null ) {
      if( manual_listener_notify ) { //delayed manual notification
        NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_REMOVED );
        item.message = msg_removed;
        try {
          delayed_notifications_mon.enter();
          
          delayed_notifications.add( item );
        }
        finally {
          delayed_notifications_mon.exit();
        }
      }
      else {   //do listener notification now
        ArrayList listeners_ref = listeners;
      
        for( int i=0; i < listeners_ref.size(); i++ ) {
          MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
          listener.messageRemoved( msg_removed.getBaseMessage() );
        }
        msg_removed.destroy();
      }
      return true;
    }
    
    return false;
  
public voidremoveMessagesOfType(Message[] message_types, boolean manual_listener_notify)
Remove all messages of the given types from the queue. NOTE: Allows for manual listener notification at some later time, using doListenerNotifications(), instead of notifying immediately from within this method. This is useful if you want to invoke listeners outside of some greater synchronised block to avoid deadlock.

param
message_types type to remove
param
manual_listener_notify true for manual notification, false for automatic

    if( message_types == null ) return;
    
    ArrayList messages_removed = null;
    
    try{
      queue_mon.enter();
    
      for( Iterator i = queue.iterator(); i.hasNext(); ) {
        RawMessage msg = (RawMessage)i.next();
        
        for( int t=0; t < message_types.length; t++ ) {
          boolean same_type = message_types[t].getID().equals( msg.getID() );
          
          if( same_type && msg.getRawData()[0].position(DirectByteBuffer.SS_NET) == 0 ) {   //dont remove a half-sent message
            if( msg == urgent_message ) urgent_message = null;
            
            DirectByteBuffer[] payload = msg.getRawData();
            for( int x=0; x < payload.length; x++ ) {
              total_size -= payload[x].remaining(DirectByteBuffer.SS_NET);
            }
            
            if( manual_listener_notify ) {
              NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_REMOVED );
              item.message = msg;
              try {
                delayed_notifications_mon.enter();
                
                delayed_notifications.add( item );
              }
              finally {
                delayed_notifications_mon.exit();
              }
            }
            else {
              if ( messages_removed == null ){
              	messages_removed = new ArrayList();
              }
              messages_removed.add( msg );
            }
        		i.remove();
            break;
        	}
        }
      }
    }finally{
      queue_mon.exit();
    }

    if( !manual_listener_notify && messages_removed != null ) {
      //do listener notifications now
      ArrayList listeners_ref = listeners;
        
      for( int x=0; x < messages_removed.size(); x++ ) {
        RawMessage msg = (RawMessage)messages_removed.get( x );
        
        for( int i=0; i < listeners_ref.size(); i++ ) {
          MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
          listener.messageRemoved( msg.getBaseMessage() );
        }
        msg.destroy();
      }
    }
  
public voidsetEncoder(MessageStreamEncoder stream_encoder)
Set the message stream encoder that will be used to encode outgoing messages.

param
stream_encoder to use

    this.stream_encoder = stream_encoder;
  
public voidsetTrace(boolean on)

	  trace	= on;
	  
	  transport.setTrace( on );
  
public voidsetTransport(com.aelitis.azureus.core.networkmanager.Transport _transport)

	transport 	= _transport;