Methods Summary |
---|
public void | addMessage(Message message, boolean manual_listener_notify)Add a message to the message queue.
NOTE: Allows for manual listener notification at some later time,
using doListenerNotifications(), instead of notifying immediately
from within this method. This is useful if you want to invoke
listeners outside of some greater synchronised block to avoid
deadlock.
//do message add notifications
boolean allowed = true;
ArrayList list_ref = listeners;
for( int i=0; i < list_ref.size(); i++ ) {
MessageQueueListener listener = (MessageQueueListener)list_ref.get( i );
allowed = allowed && listener.messageAdded( message );
}
if( !allowed ) { //message addition not allowed
//LGLogger.log( "Message [" +message.getDescription()+ "] not allowed for queueing, message addition skipped." );
//message.destroy(); //TODO destroy????
return;
}
RawMessage[] rmesgs = stream_encoder.encodeMessage( message );
if( destroyed ) { //queue is shutdown, drop any added messages
for (int i=0;i<rmesgs.length;i++){
rmesgs[i].destroy();
}
return;
}
for (int i=0;i<rmesgs.length;i++){
RawMessage rmesg = rmesgs[i];
removeMessagesOfType( rmesg.messagesToRemove(), manual_listener_notify );
try{
queue_mon.enter();
int pos = 0;
for( Iterator it = queue.iterator(); it.hasNext(); ) {
RawMessage msg = (RawMessage)it.next();
if( rmesg.getPriority() > msg.getPriority()
&& msg.getRawData()[0].position(DirectByteBuffer.SS_NET) == 0 ) { //but don't insert in front of a half-sent message
break;
}
pos++;
}
if( rmesg.isNoDelay() ) {
urgent_message = rmesg;
}
queue.add( pos, rmesg );
DirectByteBuffer[] payload = rmesg.getRawData();
for( int j=0; j < payload.length; j++ ) {
total_size += payload[j].remaining(DirectByteBuffer.SS_NET);
}
}finally{
queue_mon.exit();
}
if( manual_listener_notify ) { //register listener event for later, manual notification
NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_ADDED );
item.message = rmesg;
try {
delayed_notifications_mon.enter();
delayed_notifications.add( item );
}
finally {
delayed_notifications_mon.exit();
}
}
else { //do listener notification now
ArrayList listeners_ref = listeners;
for( int j=0; j < listeners_ref.size(); j++ ) {
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( j );
listener.messageQueued( rmesg.getBaseMessage() );
}
}
}
|
public void | cancelQueueListener(MessageQueueListener listener)Cancel queue event notification listener.
try{ listeners_mon.enter();
//copy-on-write
ArrayList new_list = new ArrayList( listeners );
new_list.remove( listener );
listeners = new_list;
}
finally{ listeners_mon.exit(); }
|
public int | deliverToTransport(int max_bytes, boolean manual_listener_notify)Deliver (write) message(s) data to the underlying transport.
NOTE: Allows for manual listener notification at some later time,
using doListenerNotifications(), instead of notifying immediately
from within this method. This is useful if you want to invoke
listeners outside of some greater synchronised block to avoid
deadlock.
if( max_bytes < 1 ) {
Debug.out( "max_bytes < 1: " +max_bytes );
return 0;
}
if ( transport == null ){
throw( new IOException( "not ready to deliver data" ));
}
int data_written = 0;
int protocol_written = 0;
ArrayList messages_sent = null;
try{
queue_mon.enter();
if( !queue.isEmpty() ){
int buffer_limit = 256;
ByteBuffer[] raw_buffers = new ByteBuffer[buffer_limit];
int[] orig_positions = new int[buffer_limit];
int buffer_count = 0;
int total_sofar = 0;
outer:
for( Iterator i = queue.iterator(); i.hasNext(); ){
DirectByteBuffer[] payloads = ((RawMessage)i.next()).getRawData();
for( int x=0; x < payloads.length; x++ ){
ByteBuffer buff = payloads[x].getBuffer( DirectByteBuffer.SS_NET );
raw_buffers[buffer_count] = buff;
orig_positions[buffer_count] = buff.position();
total_sofar += buff.remaining();
buffer_count++;
if ( total_sofar >= max_bytes ){
break outer;
}
if ( buffer_count == buffer_limit ) {
int new_buffer_limit = buffer_limit * 2;
ByteBuffer[] new_raw_buffers = new ByteBuffer[new_buffer_limit];
int[] new_orig_positions = new int[new_buffer_limit];
System.arraycopy( raw_buffers, 0, new_raw_buffers, 0, buffer_limit );
System.arraycopy( orig_positions, 0, new_orig_positions, 0, buffer_limit );
raw_buffers = new_raw_buffers;
orig_positions = new_orig_positions;
buffer_limit = new_buffer_limit;
}
}
}
ByteBuffer last_buff = (ByteBuffer)raw_buffers[buffer_count - 1 ];
int orig_last_limit = last_buff.limit();
if ( total_sofar > max_bytes ){
last_buff.limit( orig_last_limit - (total_sofar - max_bytes) );
}
transport.write( raw_buffers, 0, buffer_count );
last_buff.limit( orig_last_limit );
int pos = 0;
boolean stop = false;
while( !queue.isEmpty() && !stop ) {
RawMessage msg = (RawMessage)queue.get( 0 );
DirectByteBuffer[] payloads = msg.getRawData();
for( int x=0; x < payloads.length; x++ ) {
ByteBuffer bb = payloads[x].getBuffer( DirectByteBuffer.SS_NET );
int bytes_written = (bb.limit() - bb.remaining()) - orig_positions[ pos ];
total_size -= bytes_written;
if( x > 0 && msg.getType() == Message.TYPE_DATA_PAYLOAD ) { //assumes the first buffer is message header
data_written += bytes_written;
}
else {
protocol_written += bytes_written;
}
if( bb.hasRemaining() ) { //still data left to send in this message
stop = true; //so don't bother checking later messages for completion
//compute send percentage
int message_size = 0;
int written = 0;
for( int i=0; i < payloads.length; i++ ) {
ByteBuffer buff = payloads[i].getBuffer( DirectByteBuffer.SS_NET );
message_size += buff.limit();
if( i < x ) { //if in front of non-empty buffer
written += buff.limit();
}
else if( i == x ) { //is non-empty buffer
written += buff.position();
}
}
percent_complete = (written * 100) / message_size;
break;
}
else if( x == payloads.length - 1 ) { //last payload buffer of message is empty
if( msg == urgent_message ) urgent_message = null;
queue.remove( 0 );
if( TRACE_HISTORY ) {
prev_sent.addLast( msg );
if( prev_sent.size() > MAX_HISTORY_TRACES ) prev_sent.removeFirst();
}
percent_complete = -1; //reset send percentage
if( manual_listener_notify ) {
NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_SENT );
item.message = msg;
try { delayed_notifications_mon.enter();
delayed_notifications.add( item );
} finally { delayed_notifications_mon.exit(); }
}
else {
if( messages_sent == null ) {
messages_sent = new ArrayList();
}
messages_sent.add( msg );
}
}
pos++;
if( pos >= buffer_count ) {
stop = true;
break;
}
}
}
}
}finally{
queue_mon.exit();
}
// we can have messages that end up getting serialised as 0 bytes (for http
// connections for example) - we still need to notify them of being sent...
if( data_written + protocol_written > 0 || messages_sent != null ) {
if ( trace ){
TimeFormatter.milliTrace( "omq:deliver: " + (data_written + protocol_written) + ", q=" + queue.size() + "/" + total_size );
}
if( manual_listener_notify ) {
if( data_written > 0 ) { //data bytes notify
NotificationItem item = new NotificationItem( NotificationItem.DATA_BYTES_SENT );
item.byte_count = data_written;
try {
delayed_notifications_mon.enter();
delayed_notifications.add( item );
}
finally {
delayed_notifications_mon.exit();
}
}
if( protocol_written > 0 ) { //protocol bytes notify
NotificationItem item = new NotificationItem( NotificationItem.PROTOCOL_BYTES_SENT );
item.byte_count = protocol_written;
try {
delayed_notifications_mon.enter();
delayed_notifications.add( item );
}
finally {
delayed_notifications_mon.exit();
}
}
}
else { //do listener notification now
ArrayList listeners_ref = listeners;
int num_listeners = listeners_ref.size();
for( int i=0; i < num_listeners; i++ ) {
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
if( data_written > 0 ) listener.dataBytesSent( data_written );
if( protocol_written > 0 ) listener.protocolBytesSent( protocol_written );
if ( messages_sent != null ){
for( int x=0; x < messages_sent.size(); x++ ) {
RawMessage msg = (RawMessage)messages_sent.get( x );
listener.messageSent( msg.getBaseMessage() );
if( i == num_listeners - 1 ) { //the last listener notification, so destroy
msg.destroy();
}
}
}
}
}
}else{
if ( trace ){
TimeFormatter.milliTrace( "omq:deliver: 0, q=" + queue.size() + "/" + total_size );
}
}
return data_written + protocol_written;
|
public void | destroy()Destroy this queue; i.e. perform cleanup actions.
destroyed = true;
try{
queue_mon.enter();
while( !queue.isEmpty() ) {
((RawMessage)queue.remove( 0 )).destroy();
}
}finally{
queue_mon.exit();
}
total_size = 0;
|
public void | doListenerNotifications()Manually send any unsent listener notifications.
ArrayList notifications_copy;
try {
delayed_notifications_mon.enter();
if( delayed_notifications.size() == 0 ) return;
notifications_copy = new ArrayList( delayed_notifications );
delayed_notifications.clear();
}
finally {
delayed_notifications_mon.exit();
}
ArrayList listeners_ref = listeners;
for( int j=0; j < notifications_copy.size(); j++ ) { //for each notification
NotificationItem item = (NotificationItem)notifications_copy.get( j );
switch( item.type ) {
case NotificationItem.MESSAGE_ADDED:
for( int i=0; i < listeners_ref.size(); i++ ) { //for each listener
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.messageQueued( item.message.getBaseMessage() );
}
break;
case NotificationItem.MESSAGE_REMOVED:
for( int i=0; i < listeners_ref.size(); i++ ) { //for each listener
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.messageRemoved( item.message.getBaseMessage() );
}
item.message.destroy();
break;
case NotificationItem.MESSAGE_SENT:
for( int i=0; i < listeners_ref.size(); i++ ) { //for each listener
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.messageSent( item.message.getBaseMessage() );
}
item.message.destroy();
break;
case NotificationItem.PROTOCOL_BYTES_SENT:
for( int i=0; i < listeners_ref.size(); i++ ) { //for each listener
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.protocolBytesSent( item.byte_count );
}
break;
case NotificationItem.DATA_BYTES_SENT:
for( int i=0; i < listeners_ref.size(); i++ ) { //for each listener
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.dataBytesSent( item.byte_count );
}
break;
default:
Debug.out( "NotificationItem.type unknown :" + item.type );
}
}
|
public MessageStreamEncoder | getEncoder()
return( stream_encoder );
|
public int | getMssSize()
return( transport==null?NetworkManager.getMinMssSize():transport.getMssSize());
|
public int | getPercentDoneOfCurrentMessage()Get the percentage of the current message that has already been sent out.
return percent_complete;
|
public java.lang.String | getQueueTrace()
StringBuffer trace = new StringBuffer();
trace.append( "**** OUTGOING QUEUE TRACE ****\n" );
try{
queue_mon.enter();
int i=0;
for( Iterator it = prev_sent.iterator(); it.hasNext(); ) {
RawMessage raw = (RawMessage)it.next();
trace.append( "[#h" +i+ "]: ")
.append(raw.getID())
.append(" [")
.append(raw.getDescription())
.append("]")
.append("\n" );
i++;
}
int position = queue.size() - 1;
for( Iterator it = queue.iterator(); it.hasNext(); ) {
RawMessage raw = (RawMessage)it.next();
int pos = raw.getRawData()[0].position(DirectByteBuffer.SS_NET);
int length = raw.getRawData()[0].limit( DirectByteBuffer.SS_NET );
trace.append( "[#")
.append(position)
.append(" ")
.append(pos)
.append(":")
.append(length)
.append("]: ")
.append(raw.getID())
.append(" [")
.append(raw.getDescription())
.append("]")
.append("\n" );
position--;
}
}
finally{
queue_mon.exit();
}
return trace.toString();
|
public int | getTotalSize()Get the total number of bytes ready to be transported. return total_size;
|
public boolean | hasUrgentMessage()Whether or not an urgent message (one that needs an immediate send, i.e. a no-delay message) is queued. return urgent_message == null ? false : true;
|
public boolean | isDestroyed()
return( destroyed );
|
public void | notifyOfExternallySentMessage(Message message)Notifty the queue (and its listeners) of a message sent externally on the queue's behalf.
ArrayList listeners_ref = listeners;
DirectByteBuffer[] buffs = message.getData();
int size = 0;
for( int i=0; i < buffs.length; i++ ) {
size += buffs[i].remaining( DirectByteBuffer.SS_NET );
}
for( int i=0; i < listeners_ref.size(); i++ ) {
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.messageSent( message );
if( message.getType() == Message.TYPE_DATA_PAYLOAD ) {
listener.dataBytesSent( size );
}
else {
listener.protocolBytesSent( size );
}
}
//System.out.println( "notifiedOfExternallySentMessage:: [" +message.getID()+ "] size=" +size );
|
public void | registerQueueListener(MessageQueueListener listener)Add a listener to be notified of queue events.
try{ listeners_mon.enter();
//copy-on-write
ArrayList new_list = new ArrayList( listeners.size() + 1 );
new_list.addAll( listeners );
new_list.add( listener );
listeners = new_list;
}
finally{ listeners_mon.exit(); }
|
public boolean | removeMessage(Message message, boolean manual_listener_notify)Remove a particular message from the queue.
NOTE: Only the original message found in the queue will be destroyed upon removal,
which may not necessarily be the one passed as the method parameter,
as some messages override equals() (i.e. BTRequest messages) instead of using reference
equality, and could be a completely different object, and would need to be destroyed
manually. If the message does not override equals, then any such method will likely
*not* be found and removed, as internal queued object was a new allocation on insertion.
NOTE: Allows for manual listener notification at some later time,
using doListenerNotifications(), instead of notifying immediately
from within this method. This is useful if you want to invoke
listeners outside of some greater synchronised block to avoid
deadlock.
RawMessage msg_removed = null;
try{
queue_mon.enter();
for( Iterator it = queue.iterator(); it.hasNext(); ) {
RawMessage raw = (RawMessage)it.next();
if( message.equals( raw.getBaseMessage() ) ) {
if( raw.getRawData()[0].position(DirectByteBuffer.SS_NET) == 0 ) { //dont remove a half-sent message
if( raw == urgent_message ) urgent_message = null;
DirectByteBuffer[] payload = raw.getRawData();
for( int x=0; x < payload.length; x++ ) {
total_size -= payload[x].remaining(DirectByteBuffer.SS_NET);
}
queue.remove( raw );
msg_removed = raw;
}
break;
}
}
}finally{
queue_mon.exit();
}
if( msg_removed != null ) {
if( manual_listener_notify ) { //delayed manual notification
NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_REMOVED );
item.message = msg_removed;
try {
delayed_notifications_mon.enter();
delayed_notifications.add( item );
}
finally {
delayed_notifications_mon.exit();
}
}
else { //do listener notification now
ArrayList listeners_ref = listeners;
for( int i=0; i < listeners_ref.size(); i++ ) {
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.messageRemoved( msg_removed.getBaseMessage() );
}
msg_removed.destroy();
}
return true;
}
return false;
|
public void | removeMessagesOfType(Message[] message_types, boolean manual_listener_notify)Remove all messages of the given types from the queue.
NOTE: Allows for manual listener notification at some later time,
using doListenerNotifications(), instead of notifying immediately
from within this method. This is useful if you want to invoke
listeners outside of some greater synchronised block to avoid
deadlock.
if( message_types == null ) return;
ArrayList messages_removed = null;
try{
queue_mon.enter();
for( Iterator i = queue.iterator(); i.hasNext(); ) {
RawMessage msg = (RawMessage)i.next();
for( int t=0; t < message_types.length; t++ ) {
boolean same_type = message_types[t].getID().equals( msg.getID() );
if( same_type && msg.getRawData()[0].position(DirectByteBuffer.SS_NET) == 0 ) { //dont remove a half-sent message
if( msg == urgent_message ) urgent_message = null;
DirectByteBuffer[] payload = msg.getRawData();
for( int x=0; x < payload.length; x++ ) {
total_size -= payload[x].remaining(DirectByteBuffer.SS_NET);
}
if( manual_listener_notify ) {
NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_REMOVED );
item.message = msg;
try {
delayed_notifications_mon.enter();
delayed_notifications.add( item );
}
finally {
delayed_notifications_mon.exit();
}
}
else {
if ( messages_removed == null ){
messages_removed = new ArrayList();
}
messages_removed.add( msg );
}
i.remove();
break;
}
}
}
}finally{
queue_mon.exit();
}
if( !manual_listener_notify && messages_removed != null ) {
//do listener notifications now
ArrayList listeners_ref = listeners;
for( int x=0; x < messages_removed.size(); x++ ) {
RawMessage msg = (RawMessage)messages_removed.get( x );
for( int i=0; i < listeners_ref.size(); i++ ) {
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.messageRemoved( msg.getBaseMessage() );
}
msg.destroy();
}
}
|
public void | setEncoder(MessageStreamEncoder stream_encoder)Set the message stream encoder that will be used to encode outgoing messages.
this.stream_encoder = stream_encoder;
|
public void | setTrace(boolean on)
trace = on;
transport.setTrace( on );
|
public void | setTransport(com.aelitis.azureus.core.networkmanager.Transport _transport)
transport = _transport;
|