AEClientServicepublic class AEClientService extends Object implements ClientMessageService
Fields Summary |
---|
private final String | address | private final int | port | private final String | msg_type_id | private final int | timeout_secs | private int | max_message_bytes | private ClientConnection | conn | private final AESemaphore | read_block | private final AESemaphore | write_block | private final ArrayList | received_messages | private final NonBlockingReadWriteService | rw_service | private volatile Throwable | error |
Constructors Summary |
---|
public AEClientService(String server_address, int server_port, String _msg_type_id)
this( server_address, server_port, 30, _msg_type_id );
| public AEClientService(String server_address, int server_port, int timeout, String _msg_type_id)
this.address = server_address;
this.port = server_port;
this.timeout_secs = timeout;
this.msg_type_id = _msg_type_id;
try {
AZMessageFactory.registerGenericMapPayloadMessageType( msg_type_id ); //register for incoming type decoding
}
catch( MessageException me ) { /*ignore, since message type probably already registered*/ }
rw_service = new NonBlockingReadWriteService( msg_type_id, timeout, 0, new NonBlockingReadWriteService.ServiceListener() {
public void messageReceived( ClientMessage message ) {
received_messages.add( message.getPayload() );
read_block.release();
}
public void connectionError( ClientConnection connection, Throwable msg ) {
error = msg;
read_block.releaseForever();
write_block.releaseForever();
}
});
|
Methods Summary |
---|
public void | close()
if( conn != null ) {
rw_service.removeClientConnection( conn );
conn.close( new Exception( "Connection closed" ));
}
rw_service.destroy();
| private void | connect()
InetSocketAddress tcp_target = new InetSocketAddress( address, port );
ConnectionEndpoint ce = new ConnectionEndpoint( tcp_target );
new ProtocolEndpointTCP( ce, tcp_target );
final AESemaphore connect_block = new AESemaphore( "AEClientService:C" );
ce.connectOutbound( false, false, null, null, new Transport.ConnectListener() { //NOTE: async operation!
public void connectAttemptStarted() { /*nothing*/ }
public void connectSuccess(Transport transport, ByteBuffer remaining_initial_data ){
conn = new ClientConnection((TCPTransportImpl)transport );
if ( max_message_bytes != -1 ){
conn.setMaximumMessageSize( max_message_bytes );
}
connect_block.release();
}
public void connectFailure( Throwable failure_msg ) {
error = failure_msg;
connect_block.release();
}
});
if ( !connect_block.reserve( timeout_secs*1000 )){
throw new IOException( "connect op failed: timeout" );
}
//connect op finished
if( error != null ) { //connect failure
close();
throw new IOException( "connect op failed: " + error.getMessage() == null ? "[]" : error.getMessage() );
}
rw_service.addClientConnection( conn ); //register for read/write handling
| public java.util.Map | receiveMessage()
if( conn == null ) { //not yet connected
connect();
}
read_block.reserve(); //block until receive completes
if( !received_messages.isEmpty() ) { //there were still read messages left from the previous read call
Map recv_msg = (Map)received_messages.remove( 0 );
return recv_msg;
}
//receive op finished
if (error == null ){
error = new IOException( "receive op inconsistent" );
}
close();
throw new IOException( "receive op failed: " + error.getMessage() == null ? "[]" : error.getMessage() );
| public void | sendMessage(java.util.Map message)
if( conn == null ) { //not yet connected
connect();
}
if( error != null ) {
close();
throw new IOException( "send op failed: " + error.getMessage() == null ? "[]" : error.getMessage() );
}
ClientMessage client_msg = new ClientMessage( msg_type_id, conn, message, new ClientMessageHandler() {
public String getMessageTypeID(){ return msg_type_id; }
public void processMessage( ClientMessage message ) {
Debug.out( "ERROR: should never be called" );
}
public void sendAttemptCompleted( ClientMessage message ){
write_block.release();
}
public void sendAttemptFailed( ClientMessage message, Throwable cause) {
error = cause;
write_block.release();
}
});
rw_service.sendMessage( client_msg ); //queue message for sending
write_block.reserve(); //block until send completes
//send op finished
if( error != null ) { //connect failure
close();
throw new IOException( "send op failed: " + error.getMessage() == null ? "[]" : error.getMessage() );
}
| public void | setMaximumMessageSize(int max_bytes)
max_message_bytes = max_bytes;
|
|