FileDocCategorySizeDatePackage
AEClientService.javaAPI DocAzureus 3.0.3.46690Wed Nov 15 15:17:10 GMT 2006com.aelitis.azureus.core.clientmessageservice.impl

AEClientService

public class AEClientService extends Object implements ClientMessageService

Fields Summary
private final String
address
private final int
port
private final String
msg_type_id
private final int
timeout_secs
private int
max_message_bytes
private ClientConnection
conn
private final AESemaphore
read_block
private final AESemaphore
write_block
private final ArrayList
received_messages
private final NonBlockingReadWriteService
rw_service
private volatile Throwable
error
Constructors Summary
public AEClientService(String server_address, int server_port, String _msg_type_id)

	
  
	         

		this( server_address, server_port, 30, _msg_type_id );
	
public AEClientService(String server_address, int server_port, int timeout, String _msg_type_id)

		this.address = server_address;
		this.port = server_port;
		this.timeout_secs = timeout;
		this.msg_type_id = _msg_type_id;
		
		try {
			AZMessageFactory.registerGenericMapPayloadMessageType( msg_type_id );  //register for incoming type decoding
		}
		catch( MessageException me ) {  /*ignore, since message type probably already registered*/ }
		
		rw_service = new NonBlockingReadWriteService( msg_type_id, timeout, 0, new NonBlockingReadWriteService.ServiceListener() {			
			public void messageReceived( ClientMessage message ) {
				received_messages.add( message.getPayload() );
				read_block.release();
			}
			
			public void connectionError( ClientConnection connection, Throwable msg ) {
				error = msg;
				read_block.releaseForever();
				write_block.releaseForever();
			}
		});
	
Methods Summary
public voidclose()

		if( conn != null ) {
			rw_service.removeClientConnection( conn );
			conn.close( new Exception( "Connection closed" ));
		}
		rw_service.destroy();
	
private voidconnect()

		
	InetSocketAddress	tcp_target = new InetSocketAddress( address, port );
	
	ConnectionEndpoint	ce = new ConnectionEndpoint( tcp_target );
	
	new ProtocolEndpointTCP( ce, tcp_target );
	   
    final AESemaphore connect_block = new AESemaphore( "AEClientService:C" );
    
    ce.connectOutbound( false, false, null, null, new Transport.ConnectListener() {  //NOTE: async operation!
    	public void connectAttemptStarted() {  /*nothing*/ }
      
    	public void connectSuccess(Transport transport, ByteBuffer remaining_initial_data ){
    		conn = new ClientConnection((TCPTransportImpl)transport );
    		if ( max_message_bytes != -1 ){
    			conn.setMaximumMessageSize( max_message_bytes );
    		}
    		connect_block.release();       
    	}
     
    	public void connectFailure( Throwable failure_msg ) {
    		error = failure_msg;
    		connect_block.release();  
    	}
    });
    
    if ( !connect_block.reserve( timeout_secs*1000 )){
        throw new IOException( "connect op failed: timeout" );
    }
    
    //connect op finished   
    
    if( error != null ) {  //connect failure
      close();
      throw new IOException( "connect op failed: " + error.getMessage() == null ? "[]" : error.getMessage() );
    }
        
    rw_service.addClientConnection( conn );  //register for read/write handling
	
public java.util.MapreceiveMessage()

		if( conn == null ) {  //not yet connected
			connect();
		}	
		
		read_block.reserve();  //block until receive completes

		if( !received_messages.isEmpty() ) {  //there were still read messages left from the previous read call
			Map recv_msg = (Map)received_messages.remove( 0 );
			return recv_msg;
		}
		
		//receive op finished	
    
		if (error == null ){
			error = new IOException( "receive op inconsistent" );
		}
		
		close();
		throw new IOException( "receive op failed: " + error.getMessage() == null ? "[]" : error.getMessage() );
	
public voidsendMessage(java.util.Map message)

		if( conn == null ) {  //not yet connected
			connect();
		}
		
		if( error != null ) {
		    close();
		    throw new IOException( "send op failed: " + error.getMessage() == null ? "[]" : error.getMessage() );
		}
		
		ClientMessage client_msg = new ClientMessage( msg_type_id, conn, message, new ClientMessageHandler() {
			public String getMessageTypeID(){  return msg_type_id;  }

			public void processMessage( ClientMessage message ) {
				Debug.out( "ERROR: should never be called" );
			}

			public void sendAttemptCompleted( ClientMessage message ){
				write_block.release();
			}
			public void sendAttemptFailed( ClientMessage message, Throwable cause) {
				error = cause;
				write_block.release();
			}
		});
		
		rw_service.sendMessage( client_msg );  //queue message for sending	

		write_block.reserve();  //block until send completes
		
		//send op finished
    
    if( error != null ) {  //connect failure
      close();
      throw new IOException( "send op failed: " + error.getMessage() == null ? "[]" : error.getMessage() );
    }
	
public voidsetMaximumMessageSize(int max_bytes)

		max_message_bytes	= max_bytes;