Methods Summary |
---|
public void | cancelQueueListener(MessageQueueListener listener)Cancel queue event notification listener.
try{ listeners_mon.enter();
//copy-on-write
ArrayList new_list = new ArrayList( listeners );
new_list.remove( listener );
listeners = new_list;
}
finally{ listeners_mon.exit(); }
|
public void | destroy()Destroy this queue.
stream_decoder.destroy();
|
public MessageStreamDecoder | getDecoder()
return( stream_decoder );
|
public int | getPercentDoneOfCurrentMessage()Get the percentage of the current message that has already been received.
return stream_decoder.getPercentDoneOfCurrentMessage();
|
public void | notifyOfExternallyReceivedMessage(Message message)Notifty the queue (and its listeners) of a message received externally on the queue's behalf.
ArrayList listeners_ref = listeners; //copy-on-write
boolean handled = false;
DirectByteBuffer[] dbbs = message.getData();
int size = 0;
for( int i=0; i < dbbs.length; i++ ) {
size += dbbs[i].remaining( DirectByteBuffer.SS_NET );
}
for( int x=0; x < listeners_ref.size(); x++ ) {
MessageQueueListener mql = (MessageQueueListener)listeners_ref.get( x );
handled = handled || mql.messageReceived( message );
if( message.getType() == Message.TYPE_DATA_PAYLOAD ) {
mql.dataBytesReceived( size );
}
else {
mql.protocolBytesReceived( size );
}
}
if( !handled ) {
if( listeners_ref.size() > 0 ) {
System.out.println( "no registered listeners [out of " +listeners_ref.size()+ "] handled decoded message [" +message.getDescription()+ "]" );
}
DirectByteBuffer[] buffs = message.getData();
for( int x=0; x < buffs.length; x++ ) {
buffs[ x ].returnToPool();
}
}
|
public int | receiveFromTransport(int max_bytes)Receive (read) message(s) data from the underlying transport.
if( max_bytes < 1 ) {
Debug.out( "max_bytes < 1: " +max_bytes );
return 0;
}
if( listeners.isEmpty() ) {
Debug.out( "no queue listeners registered!" );
throw new IOException( "no queue listeners registered!" );
}
int bytes_read;
try{
//perform decode op
bytes_read = stream_decoder.performStreamDecode( connection.getTransport(), max_bytes );
}catch( RuntimeException e ){
Debug.out( "Stream decode for " + connection.getString() + " failed: " + Debug.getNestedExceptionMessageAndStack(e));
throw( e );
}
//check if anything was decoded and notify listeners if so
Message[] messages = stream_decoder.removeDecodedMessages();
if( messages != null ) {
for( int i=0; i < messages.length; i++ ) {
Message msg = messages[ i ];
if( msg == null ) {
System.out.println( "received msg == null [messages.length=" +messages.length+ ", #" +i+ "]: " +connection.getTransport().getDescription() );
continue;
}
ArrayList listeners_ref = listeners; //copy-on-write
boolean handled = false;
for( int x=0; x < listeners_ref.size(); x++ ) {
MessageQueueListener mql = (MessageQueueListener)listeners_ref.get( x );
handled = handled || mql.messageReceived( msg );
}
if( !handled ) {
if( listeners_ref.size() > 0 ) {
System.out.println( "no registered listeners [out of " +listeners_ref.size()+ "] handled decoded message [" +msg.getDescription()+ "]" );
}
DirectByteBuffer[] buffs = msg.getData();
for( int x=0; x < buffs.length; x++ ) {
buffs[ x ].returnToPool();
}
}
}
}
int protocol_read = stream_decoder.getProtocolBytesDecoded();
if( protocol_read > 0 ) {
ArrayList listeners_ref = listeners; //copy-on-write
for( int i=0; i < listeners_ref.size(); i++ ) {
MessageQueueListener mql = (MessageQueueListener)listeners_ref.get( i );
mql.protocolBytesReceived( protocol_read );
}
}
int data_read = stream_decoder.getDataBytesDecoded();
if( data_read > 0 ) {
ArrayList listeners_ref = listeners; //copy-on-write
for( int i=0; i < listeners_ref.size(); i++ ) {
MessageQueueListener mql = (MessageQueueListener)listeners_ref.get( i );
mql.dataBytesReceived( data_read );
}
}
return bytes_read;
|
public void | registerQueueListener(MessageQueueListener listener)Add a listener to be notified of queue events.
try{ listeners_mon.enter();
//copy-on-write
ArrayList new_list = new ArrayList( listeners.size() + 1 );
new_list.addAll( listeners );
new_list.add( listener );
listeners = new_list;
}
finally{ listeners_mon.exit(); }
|
public void | resumeQueueProcessing()Manually resume processing (reading) incoming messages.
NOTE: Allows us to resume docoding externally, in case it was auto-paused internally.
stream_decoder.resumeDecoding();
|
public void | setDecoder(MessageStreamDecoder new_stream_decoder)Set the message stream decoder that will be used to decode incoming messages.
ByteBuffer already_read = stream_decoder.destroy();
connection.getTransport().setAlreadyRead( already_read );
stream_decoder = new_stream_decoder;
stream_decoder.resumeDecoding();
|