FileDocCategorySizeDatePackage
OutgoingMessageQueueImpl.javaAPI DocAzureus 3.0.3.426256Wed Jul 18 15:16:30 BST 2007com.aelitis.azureus.core.networkmanager.impl

OutgoingMessageQueueImpl.java

/*
 * Created on May 8, 2004
 * Created by Alon Rohter
 * Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 * 
 * AELITIS, SAS au capital de 46,603.30 euros
 * 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
 *
 */


package com.aelitis.azureus.core.networkmanager.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;

import org.gudy.azureus2.core3.util.*;

import com.aelitis.azureus.core.networkmanager.NetworkManager;
import com.aelitis.azureus.core.networkmanager.OutgoingMessageQueue;
import com.aelitis.azureus.core.networkmanager.RawMessage;
import com.aelitis.azureus.core.networkmanager.Transport;
import com.aelitis.azureus.core.peermanager.messaging.*;



/**
 * Priority-based outbound peer message queue.
 */
public class 
OutgoingMessageQueueImpl
	implements OutgoingMessageQueue
{
  private final LinkedList 		queue		= new LinkedList();
  private final AEMonitor	queue_mon	= new AEMonitor( "OutgoingMessageQueue:queue" );

  private final ArrayList delayed_notifications = new ArrayList();
  private final AEMonitor delayed_notifications_mon = new AEMonitor( "OutgoingMessageQueue:DN" );

  private volatile ArrayList listeners 		= new ArrayList();  //copied-on-write
  private final AEMonitor listeners_mon		= new AEMonitor( "OutgoingMessageQueue:L");
  
  private int total_size = 0;
  private RawMessage urgent_message = null;
  private boolean destroyed = false;
  
  private MessageStreamEncoder stream_encoder;
  private Transport transport;
  
  private int percent_complete = -1;
    
  private static final boolean TRACE_HISTORY = true;  //TODO
  private static final int MAX_HISTORY_TRACES = 30;
  private final LinkedList prev_sent = new LinkedList();
  
  private boolean	trace;
  
  /**
   * Create a new outgoing message queue.
   * @param stream_encoder default message encoder
   */
  public OutgoingMessageQueueImpl( MessageStreamEncoder stream_encoder ) {
    this.stream_encoder = stream_encoder;
  }
  
  public void
  setTransport(
	 Transport		_transport )
  {
	transport 	= _transport;  
  }
  
  public int
  getMssSize()
  {
	  return( transport==null?NetworkManager.getMinMssSize():transport.getMssSize());
  }
  
  /**
   * Set the message stream encoder that will be used to encode outgoing messages.
   * @param stream_encoder to use
   */
  public void setEncoder( MessageStreamEncoder stream_encoder ) {
    this.stream_encoder = stream_encoder;
  }
  
  public MessageStreamEncoder
  getEncoder()
  {
	  return( stream_encoder );
  }
  
  /**
   * Get the percentage of the current message that has already been sent out.
   * @return percentage complete (0-99), or -1 if no message is currently being sent
   */
  public int getPercentDoneOfCurrentMessage() {
    return percent_complete;
  }
  

  /**
   * Destroy this queue; i.e. perform cleanup actions.
   */
  public void destroy() {
    destroyed = true;
    try{
      queue_mon.enter();
    
      while( !queue.isEmpty() ) {
      	((RawMessage)queue.remove( 0 )).destroy();
      }
    }finally{
      queue_mon.exit();
    }
    total_size = 0;
  }
  
  
  /**
   * Get the total number of bytes ready to be transported.
   * @return total bytes remaining
   */
  public int getTotalSize() {  return total_size;  }
  
  
  /**
   * Whether or not an urgent message (one that needs an immediate send, i.e. a no-delay message) is queued.
   * @return true if there's a message tagged for immediate write
   */
  public boolean hasUrgentMessage() {  return urgent_message == null ? false : true;  }
  
  
  /**
   * Add a message to the message queue.
   * NOTE: Allows for manual listener notification at some later time,
   * using doListenerNotifications(), instead of notifying immediately
   * from within this method.  This is useful if you want to invoke
   * listeners outside of some greater synchronised block to avoid
   * deadlock.
   * @param message message to add
   * @param manual_listener_notify true for manual notification, false for automatic
   */
  public void addMessage( Message message, boolean manual_listener_notify ) {
    //do message add notifications
    boolean allowed = true;
    ArrayList list_ref = listeners;
    
    for( int i=0; i < list_ref.size(); i++ ) {
      MessageQueueListener listener = (MessageQueueListener)list_ref.get( i );
      allowed = allowed && listener.messageAdded( message );
    }
    
    if( !allowed ) {  //message addition not allowed
      //LGLogger.log( "Message [" +message.getDescription()+ "] not allowed for queueing, message addition skipped." );
      //message.destroy();  //TODO destroy????
      return;
    }
    
    
    RawMessage[] rmesgs = stream_encoder.encodeMessage( message );
    
    if( destroyed ) {  //queue is shutdown, drop any added messages
      for (int i=0;i<rmesgs.length;i++){
    	  rmesgs[i].destroy();
      }
      return;
    }
    
    for (int i=0;i<rmesgs.length;i++){
    	
    	RawMessage rmesg = rmesgs[i];
    	
	    removeMessagesOfType( rmesg.messagesToRemove(), manual_listener_notify );
	    
	    try{
	      queue_mon.enter();
	    
	      int pos = 0;
	      for( Iterator it = queue.iterator(); it.hasNext(); ) {
	        RawMessage msg = (RawMessage)it.next();
	        if( rmesg.getPriority() > msg.getPriority() 
	          && msg.getRawData()[0].position(DirectByteBuffer.SS_NET) == 0 ) {  //but don't insert in front of a half-sent message
	          break;
	        }
	        pos++;
	      }
	      if( rmesg.isNoDelay() ) {
	        urgent_message = rmesg;
	      }
	      queue.add( pos, rmesg );
	      
	      DirectByteBuffer[] payload = rmesg.getRawData();
	      for( int j=0; j < payload.length; j++ ) {
	        total_size += payload[j].remaining(DirectByteBuffer.SS_NET);
	      }
	    }finally{
	      queue_mon.exit();
	    }
	    
	    if( manual_listener_notify ) {  //register listener event for later, manual notification
	      NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_ADDED );
	      item.message = rmesg;
	      try {
	        delayed_notifications_mon.enter();
	        
	        delayed_notifications.add( item );
	      }
	      finally {
	        delayed_notifications_mon.exit();
	      }
	    }
	    else { //do listener notification now
	      ArrayList listeners_ref = listeners;
	    
	      for( int j=0; j < listeners_ref.size(); j++ ) {
	        MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( j );
	        listener.messageQueued( rmesg.getBaseMessage() );
	      }
	    }
    }
  }
  

  
  /**
   * Remove all messages of the given types from the queue.
   * NOTE: Allows for manual listener notification at some later time,
   * using doListenerNotifications(), instead of notifying immediately
   * from within this method.  This is useful if you want to invoke
   * listeners outside of some greater synchronised block to avoid
   * deadlock.
   * @param message_types type to remove
   * @param manual_listener_notify true for manual notification, false for automatic
   */
  public void removeMessagesOfType( Message[] message_types, boolean manual_listener_notify ) {
    if( message_types == null ) return;
    
    ArrayList messages_removed = null;
    
    try{
      queue_mon.enter();
    
      for( Iterator i = queue.iterator(); i.hasNext(); ) {
        RawMessage msg = (RawMessage)i.next();
        
        for( int t=0; t < message_types.length; t++ ) {
          boolean same_type = message_types[t].getID().equals( msg.getID() );
          
          if( same_type && msg.getRawData()[0].position(DirectByteBuffer.SS_NET) == 0 ) {   //dont remove a half-sent message
            if( msg == urgent_message ) urgent_message = null;
            
            DirectByteBuffer[] payload = msg.getRawData();
            for( int x=0; x < payload.length; x++ ) {
              total_size -= payload[x].remaining(DirectByteBuffer.SS_NET);
            }
            
            if( manual_listener_notify ) {
              NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_REMOVED );
              item.message = msg;
              try {
                delayed_notifications_mon.enter();
                
                delayed_notifications.add( item );
              }
              finally {
                delayed_notifications_mon.exit();
              }
            }
            else {
              if ( messages_removed == null ){
              	messages_removed = new ArrayList();
              }
              messages_removed.add( msg );
            }
        		i.remove();
            break;
        	}
        }
      }
    }finally{
      queue_mon.exit();
    }

    if( !manual_listener_notify && messages_removed != null ) {
      //do listener notifications now
      ArrayList listeners_ref = listeners;
        
      for( int x=0; x < messages_removed.size(); x++ ) {
        RawMessage msg = (RawMessage)messages_removed.get( x );
        
        for( int i=0; i < listeners_ref.size(); i++ ) {
          MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
          listener.messageRemoved( msg.getBaseMessage() );
        }
        msg.destroy();
      }
    }
  }
  
  
  /**
   * Remove a particular message from the queue.
   * NOTE: Only the original message found in the queue will be destroyed upon removal,
   * which may not necessarily be the one passed as the method parameter,
   * as some messages override equals() (i.e. BTRequest messages) instead of using reference
   * equality, and could be a completely different object, and would need to be destroyed
   * manually.  If the message does not override equals, then any such method will likely
   * *not* be found and removed, as internal queued object was a new allocation on insertion.
   * NOTE: Allows for manual listener notification at some later time,
   * using doListenerNotifications(), instead of notifying immediately
   * from within this method.  This is useful if you want to invoke
   * listeners outside of some greater synchronised block to avoid
   * deadlock.
   * @param message to remove
   * @param manual_listener_notify true for manual notification, false for automatic
   * @return true if the message was removed, false otherwise
   */
  public boolean removeMessage( Message message, boolean manual_listener_notify ) {
    RawMessage msg_removed = null;
    
    try{
      queue_mon.enter();

      for( Iterator it = queue.iterator(); it.hasNext(); ) {
        RawMessage raw = (RawMessage)it.next();
        
        if( message.equals( raw.getBaseMessage() ) ) {
          if( raw.getRawData()[0].position(DirectByteBuffer.SS_NET) == 0 ) {  //dont remove a half-sent message
            if( raw == urgent_message ) urgent_message = null;  
            
            DirectByteBuffer[] payload = raw.getRawData();
            for( int x=0; x < payload.length; x++ ) {
              total_size -= payload[x].remaining(DirectByteBuffer.SS_NET);
            }

            queue.remove( raw );
            msg_removed = raw;
          }
          
          break;
        }
      }
    }finally{
      queue_mon.exit();
    }
    
    
    if( msg_removed != null ) {
      if( manual_listener_notify ) { //delayed manual notification
        NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_REMOVED );
        item.message = msg_removed;
        try {
          delayed_notifications_mon.enter();
          
          delayed_notifications.add( item );
        }
        finally {
          delayed_notifications_mon.exit();
        }
      }
      else {   //do listener notification now
        ArrayList listeners_ref = listeners;
      
        for( int i=0; i < listeners_ref.size(); i++ ) {
          MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
          listener.messageRemoved( msg_removed.getBaseMessage() );
        }
        msg_removed.destroy();
      }
      return true;
    }
    
    return false;
  }
  
  
  /**
   * Deliver (write) message(s) data to the underlying transport.
   * 
   * NOTE: Allows for manual listener notification at some later time,
   * using doListenerNotifications(), instead of notifying immediately
   * from within this method.  This is useful if you want to invoke
   * listeners outside of some greater synchronised block to avoid
   * deadlock.
   * @param max_bytes maximum number of bytes to deliver
   * @param manual_listener_notify true for manual notification, false for automatic
   * @return number of bytes delivered
   * @throws IOException on delivery error
   */
  
   public int deliverToTransport( int max_bytes, boolean manual_listener_notify ) throws IOException {    
	  if( max_bytes < 1 ) {
		  Debug.out( "max_bytes < 1: " +max_bytes );
		  return 0;
	  }

	  if ( transport == null ){
		  throw( new IOException( "not ready to deliver data" ));
	  }
	  int data_written = 0;
	  int protocol_written = 0;

	  ArrayList messages_sent = null;

	  try{
		  queue_mon.enter();

		  if( !queue.isEmpty() ){
			  
			  int buffer_limit 		= 256;
			  
			  ByteBuffer[] 	raw_buffers 	= new ByteBuffer[buffer_limit];
			  int[]		 	orig_positions	= new int[buffer_limit];
			  int			buffer_count	= 0;
			  
			  int total_sofar = 0;

outer:
			  for( Iterator i = queue.iterator(); i.hasNext(); ){
				  
				  DirectByteBuffer[] payloads = ((RawMessage)i.next()).getRawData();

				  for( int x=0; x < payloads.length; x++ ){
					  
					  ByteBuffer buff = payloads[x].getBuffer( DirectByteBuffer.SS_NET );
					  
					  raw_buffers[buffer_count] = buff;
					  
					  orig_positions[buffer_count] = buff.position();
					  
					  total_sofar += buff.remaining();

					  buffer_count++;
					  
					  if ( total_sofar >= max_bytes ){
						
						  break outer;
					  }
					  
					  if ( buffer_count == buffer_limit ) {
							
						  int	new_buffer_limit	= buffer_limit * 2;
						  
						  ByteBuffer[] 	new_raw_buffers 	= new ByteBuffer[new_buffer_limit];
						  int[]		 	new_orig_positions	= new int[new_buffer_limit];
						  
						  System.arraycopy( raw_buffers, 0, new_raw_buffers, 0, buffer_limit );
						  System.arraycopy( orig_positions, 0, new_orig_positions, 0, buffer_limit );
						  
						  raw_buffers 		= new_raw_buffers;
						  orig_positions	= new_orig_positions;
						  
						  buffer_limit 		= new_buffer_limit;
					  }
				  }
			  }

			  ByteBuffer last_buff = (ByteBuffer)raw_buffers[buffer_count - 1 ];
			  
			  int orig_last_limit = last_buff.limit();
			  
			  if ( total_sofar > max_bytes ){
				  
				  last_buff.limit( orig_last_limit - (total_sofar - max_bytes) );
			  }

			  transport.write( raw_buffers, 0, buffer_count );

			  last_buff.limit( orig_last_limit );

			  int pos = 0;
			  boolean stop = false;

			  while( !queue.isEmpty() && !stop ) {
				  RawMessage msg = (RawMessage)queue.get( 0 );
				  DirectByteBuffer[] payloads = msg.getRawData();

				  for( int x=0; x < payloads.length; x++ ) {
					  ByteBuffer bb = payloads[x].getBuffer( DirectByteBuffer.SS_NET );

					  int bytes_written = (bb.limit() - bb.remaining()) - orig_positions[ pos ];
					  total_size -= bytes_written;

					  if( x > 0 && msg.getType() == Message.TYPE_DATA_PAYLOAD ) {  //assumes the first buffer is message header
						  data_written += bytes_written;
					  }
					  else {
						  protocol_written += bytes_written;
					  }

					  if( bb.hasRemaining() ) {  //still data left to send in this message
						  stop = true;  //so don't bother checking later messages for completion

						  //compute send percentage
						  int message_size = 0;
						  int written = 0;

						  for( int i=0; i < payloads.length; i++ ) {
							  ByteBuffer buff = payloads[i].getBuffer( DirectByteBuffer.SS_NET );

							  message_size += buff.limit();

							  if( i < x ) {  //if in front of non-empty buffer
								  written += buff.limit();
							  }
							  else if( i == x ) {  //is non-empty buffer
								  written += buff.position();
							  }
						  }

						  percent_complete = (written * 100) / message_size;

						  break;
					  }
					  else if( x == payloads.length - 1 ) {  //last payload buffer of message is empty
						  if( msg == urgent_message ) urgent_message = null;

						  queue.remove( 0 );


						  if( TRACE_HISTORY ) {
							  prev_sent.addLast( msg );
							  if( prev_sent.size() > MAX_HISTORY_TRACES )  prev_sent.removeFirst();
						  }


						  percent_complete = -1;  //reset send percentage

						  if( manual_listener_notify ) {
							  NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_SENT );
							  item.message = msg;
							  try {  delayed_notifications_mon.enter();
							  delayed_notifications.add( item );
							  } finally {  delayed_notifications_mon.exit();  }
						  }
						  else {
							  if( messages_sent == null ) {
								  messages_sent = new ArrayList();
							  }
							  messages_sent.add( msg );
						  }
					  }

					  pos++;
					  if( pos >= buffer_count ) {
						  stop = true;
						  break;
					  }
				  }
			  }
		  }
	  }finally{
		  queue_mon.exit();
	  }

	  // we can have messages that end up getting serialised as 0 bytes (for http
	  // connections for example) - we still need to notify them of being sent...

	  if( data_written + protocol_written > 0 || messages_sent != null ) {

		  if ( trace ){
			  TimeFormatter.milliTrace( "omq:deliver: " + (data_written + protocol_written) + ", q=" + queue.size() + "/" + total_size );
		  }

		  if( manual_listener_notify ) {

			  if( data_written > 0 ) {  //data bytes notify
				  NotificationItem item = new NotificationItem( NotificationItem.DATA_BYTES_SENT );
				  item.byte_count = data_written;
				  try {
					  delayed_notifications_mon.enter();

					  delayed_notifications.add( item );
				  }
				  finally {
					  delayed_notifications_mon.exit();
				  }
			  }

			  if( protocol_written > 0 ) {  //protocol bytes notify
				  NotificationItem item = new NotificationItem( NotificationItem.PROTOCOL_BYTES_SENT );
				  item.byte_count = protocol_written;
				  try {
					  delayed_notifications_mon.enter();

					  delayed_notifications.add( item );
				  }
				  finally {
					  delayed_notifications_mon.exit();
				  }
			  }
		  }
		  else {  //do listener notification now
			  ArrayList listeners_ref = listeners;

			  int num_listeners = listeners_ref.size();
			  for( int i=0; i < num_listeners; i++ ) {
				  MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );

				  if( data_written > 0 )  listener.dataBytesSent( data_written );
				  if( protocol_written > 0 )  listener.protocolBytesSent( protocol_written );

				  if ( messages_sent != null ){

					  for( int x=0; x < messages_sent.size(); x++ ) {
						  RawMessage msg = (RawMessage)messages_sent.get( x );

						  listener.messageSent( msg.getBaseMessage() );

						  if( i == num_listeners - 1 ) {  //the last listener notification, so destroy
							  msg.destroy();
						  }
					  }
				  }
			  }
		  }
	  }else{
		  if ( trace ){
			  TimeFormatter.milliTrace( "omq:deliver: 0, q=" + queue.size() + "/" + total_size );
		  }
	  }

	  return data_written + protocol_written;
  }
  
  public boolean
  isDestroyed()
  {
	  return( destroyed );
  }
  
  /**
   * Manually send any unsent listener notifications.
   */
  public void doListenerNotifications() {
    ArrayList notifications_copy;
    try {
      delayed_notifications_mon.enter();
      
      if( delayed_notifications.size() == 0 )  return;
      notifications_copy = new ArrayList( delayed_notifications );
      delayed_notifications.clear();
    }
    finally {
      delayed_notifications_mon.exit();
    }
    
    ArrayList listeners_ref = listeners;
    
    for( int j=0; j < notifications_copy.size(); j++ ) {  //for each notification
      NotificationItem item = (NotificationItem)notifications_copy.get( j );

      switch( item.type ) {
        case NotificationItem.MESSAGE_ADDED:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.messageQueued( item.message.getBaseMessage() );
          }
          break;
          
        case NotificationItem.MESSAGE_REMOVED:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.messageRemoved( item.message.getBaseMessage() );
          }
          item.message.destroy();
          break;
          
        case NotificationItem.MESSAGE_SENT:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.messageSent( item.message.getBaseMessage() );
          }
          item.message.destroy();
          break;
          
        case NotificationItem.PROTOCOL_BYTES_SENT:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.protocolBytesSent( item.byte_count );
          }
          break;
          
        case NotificationItem.DATA_BYTES_SENT:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.dataBytesSent( item.byte_count );
          }
          break;
          
        default:
          Debug.out( "NotificationItem.type unknown :" + item.type );
      }
    }
  }
  
  
  public void
  setTrace(
		boolean	on )
  {
	  trace	= on;
	  
	  transport.setTrace( on );
  }
  
  public String getQueueTrace() {
  	StringBuffer trace = new StringBuffer();
  	
  	trace.append( "**** OUTGOING QUEUE TRACE ****\n" );
  	
  	try{
      queue_mon.enter();
      
      
      int i=0;
    	
    	for( Iterator it = prev_sent.iterator(); it.hasNext(); ) {
    		RawMessage raw = (RawMessage)it.next();
        trace.append( "[#h" +i+ "]: ")
             .append(raw.getID())
             .append(" [")
             .append(raw.getDescription())
             .append("]")
             .append("\n" );
        i++;
    	}      
      
      

      int position = queue.size() - 1;

      for( Iterator it = queue.iterator(); it.hasNext(); ) {
        RawMessage raw = (RawMessage)it.next();
        
        int pos = raw.getRawData()[0].position(DirectByteBuffer.SS_NET);
        int length = raw.getRawData()[0].limit( DirectByteBuffer.SS_NET );
        
        trace.append( "[#")
             .append(position)
             .append(" ")
             .append(pos)
             .append(":")
             .append(length)
             .append("]: ")
             .append(raw.getID())
             .append(" [")
             .append(raw.getDescription())
             .append("]")
             .append("\n" );
        
        position--;
      }
    }
  	finally{
      queue_mon.exit();
    }
  	
  	return trace.toString();
  }
  
  
  /**
   * Add a listener to be notified of queue events.
   * @param listener
   */
  public void registerQueueListener( MessageQueueListener listener ) {
    try{  listeners_mon.enter();
      //copy-on-write
      ArrayList new_list = new ArrayList( listeners.size() + 1 );
      new_list.addAll( listeners );
      new_list.add( listener );
      listeners = new_list;
    }
    finally{  listeners_mon.exit();  }
  }
  
  
  /**
   * Cancel queue event notification listener.
   * @param listener
   */
  public void cancelQueueListener( MessageQueueListener listener ) {
    try{  listeners_mon.enter();
      //copy-on-write
      ArrayList new_list = new ArrayList( listeners );
      new_list.remove( listener );
      listeners = new_list;
    }
    finally{  listeners_mon.exit();  }
  }

  
  
  /**
   * Notifty the queue (and its listeners) of a message sent externally on the queue's behalf.
   * @param message sent externally
   */
  public void notifyOfExternallySentMessage( Message message ) {
    ArrayList listeners_ref = listeners;

    DirectByteBuffer[] buffs = message.getData();
    int size = 0;
    for( int i=0; i < buffs.length; i++ ) {
      size += buffs[i].remaining( DirectByteBuffer.SS_NET );
    }
    
    for( int i=0; i < listeners_ref.size(); i++ ) {
      MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );

      listener.messageSent( message );
      
      if( message.getType() == Message.TYPE_DATA_PAYLOAD ) {
        listener.dataBytesSent( size );
      }
      else {
        listener.protocolBytesSent( size );
      }
    }
    
    //System.out.println( "notifiedOfExternallySentMessage:: [" +message.getID()+ "] size=" +size );
    
  }
  
  
  
  
  
  private static class NotificationItem {
    private static final int MESSAGE_ADDED        = 0;
    private static final int MESSAGE_REMOVED      = 1;
    private static final int MESSAGE_SENT         = 2;
    private static final int DATA_BYTES_SENT      = 3;
    private static final int PROTOCOL_BYTES_SENT  = 4;
    private final int type;
    private RawMessage message;
    private int byte_count = 0;
    private NotificationItem( int notification_type ) {
      type = notification_type;
    }
  }
  
}