FileDocCategorySizeDatePackage
NonBlockingReadWriteService.javaAPI DocAzureus 3.0.3.410112Sun Mar 04 21:08:18 GMT 2007com.aelitis.azureus.core.clientmessageservice.impl

NonBlockingReadWriteService

public class NonBlockingReadWriteService extends Object

Fields Summary
private final com.aelitis.azureus.core.networkmanager.VirtualChannelSelector
read_selector
private final com.aelitis.azureus.core.networkmanager.VirtualChannelSelector
write_selector
private final ArrayList
connections
private final AEMonitor
connections_mon
private final ServiceListener
listener
private final String
service_name
private volatile boolean
destroyed
private long
last_timeout_check_time
private static final int
TIMEOUT_CHECK_INTERVAL_MS
private final int
activity_timeout_period_ms
private final int
close_delay_period_ms
Constructors Summary
public NonBlockingReadWriteService(String _service_name, int timeout, ServiceListener _listener)

  
  
	         
		this( _service_name, timeout, 0, _listener );
	
public NonBlockingReadWriteService(String _service_name, int timeout, int close_delay, ServiceListener _listener)

		this.service_name = _service_name;
		this.listener = _listener;

		read_selector = new VirtualChannelSelector( service_name, VirtualChannelSelector.OP_READ, false );
		write_selector = new VirtualChannelSelector( service_name, VirtualChannelSelector.OP_WRITE, true );

		if( timeout < TIMEOUT_CHECK_INTERVAL_MS /1000 )  timeout = TIMEOUT_CHECK_INTERVAL_MS /1000;
		this.activity_timeout_period_ms = timeout *1000;
		close_delay_period_ms			= close_delay * 1000;
		
    AEThread select_thread = new AEThread( "[" +service_name+ "] Service Select" ) {
      public void runSupport() {
        while( true ) {
        	
          boolean	stop_after_select = destroyed;
          
  	      if ( stop_after_select ){
  	    	  read_selector.destroy();
  	    	  write_selector.destroy();
  	      }
  	      
          try{
            read_selector.select( 50 );
            write_selector.select( 50 );
          }
          catch( Throwable t ) {
            Debug.out( "[" +service_name+ "] SelectorLoop() EXCEPTION: ", t );
          }
          
          if (stop_after_select){
        	  break;
          }
          
          doConnectionTimeoutChecks();
          
          	// check this at the end so we have one last run through  the selectors to do cancels
          	// before exiting
        }
      }
    };
    select_thread.setDaemon( true );
    select_thread.start();
	
Methods Summary
public voidaddClientConnection(ClientConnection connection)

		//add to active list
		
    try {  connections_mon.enter();
    
    	if ( destroyed ){
    		
    		Debug.out( "connection added after destroy" );
    	}
    	
    	connections.add( connection );
    }finally {  
    	connections_mon.exit(); 
    }
    
    registerForSelection( connection );
	
public voiddestroy()

	    try {  
	    	connections_mon.enter();
		    	    
	    	connections.clear();
	    	
	    	destroyed	= true;
	    	
	    }finally{
	    	connections_mon.exit();
	    }
	
private voiddoConnectionTimeoutChecks()

    //check timeouts
    long time = System.currentTimeMillis();
    if( time < last_timeout_check_time || time - last_timeout_check_time > TIMEOUT_CHECK_INTERVAL_MS ) {
      ArrayList timed_out = new ArrayList();
      
      try {  connections_mon.enter();
        long current_time = System.currentTimeMillis();
    
        for( int i=0; i < connections.size(); i++ ) {
          ClientConnection vconn = (ClientConnection)connections.get( i );
        
          if( current_time < vconn.getLastActivityTime() ) {  //time went backwards!
            vconn.resetLastActivityTime();
          }
          else{
        	  if( current_time - vconn.getLastActivityTime() > activity_timeout_period_ms ||
        			 ( close_delay_period_ms > 0 && 
        			   current_time - vconn.getLastActivityTime() > close_delay_period_ms )){
        		  
        		  timed_out.add( vconn );   //do actual removal outside the check loop
        	  }
          }
        }
      }
      finally {  connections_mon.exit();  }
      
      for( int i=0; i < timed_out.size(); i++ ) {  
        ClientConnection vconn = (ClientConnection)timed_out.get( i );
        // don't change the exception text - it is used elsewhere
        listener.connectionError( vconn, new Exception( "Timeout" ));
      }
      
      last_timeout_check_time = System.currentTimeMillis();
    }
  
private voidregisterForSelection(ClientConnection client)

		
		//READS
		VirtualChannelSelector.VirtualSelectorListener read_listener = new VirtualChannelSelector.VirtualSelectorListener() {
			//SUCCESS
      public boolean selectSuccess( VirtualChannelSelector selector, SocketChannel sc, Object attachment ) {     	
      	try{
      		Message[] messages = client.readMessages();
      	
      		if( messages != null ) {    		
      			for( int i=0; i < messages.length; i++ ) {
      				AZGenericMapPayload msg = (AZGenericMapPayload)messages[i];
      				ClientMessage client_msg = new ClientMessage( msg.getID(), client, msg.getMapPayload(), null );  //note no handler. we let the listener attach it		
      				listener.messageReceived( client_msg );	
      			}
      		}	
      		
      		return( client.getLastReadMadeProgress());
      	}
      	catch( Throwable t ) {
      		if ( !client.isClosePending()){
      			
      			System.out.println( "[" +new Date()+ "] Connection read error [" +sc.socket().getInetAddress()+ "] [" +client.getDebugString()+ "]: " +t.getMessage() );
      		}
      		
      		listener.connectionError( client, t );
      		return( false );
      	}
      }

      //FAILURE
      public void selectFailure( VirtualChannelSelector selector, SocketChannel sc, Object attachment, Throwable msg ) {
    	  if ( !destroyed ){
    		  msg.printStackTrace();
    	  }
        listener.connectionError( client, msg );
      }
    };
    
    
    //WRITES
    final VirtualChannelSelector.VirtualSelectorListener write_listener = new VirtualChannelSelector.VirtualSelectorListener() {
      public boolean selectSuccess( VirtualChannelSelector selector, SocketChannel sc, Object attachment ) {   	
      	try{
      		boolean more_writes_needed = client.writeMessages();
      	
      		if( more_writes_needed ) {
      			write_selector.resumeSelects( client.getSocketChannel() );  //we need to resume since write selects are auto-paused after select op
      		}
      		
      		return( client.getLastWriteMadeProgress());
      	}
      	catch( Throwable t ) {
          System.out.println( "[" +new Date()+ "] Connection write error [" +sc.socket().getInetAddress()+ "] [" +client.getDebugString()+ "]: " +t.getMessage() );
          listener.connectionError( client, t );
          return( false );
      	}
      }

      public void selectFailure( VirtualChannelSelector selector, SocketChannel sc, Object attachment, Throwable msg ) {
        if ( !destroyed ){
        	msg.printStackTrace();
        }
        listener.connectionError( client, msg );
      }
    };

    write_selector.register( client.getSocketChannel(), write_listener, null );  //start writing back to the connection
    write_selector.pauseSelects( client.getSocketChannel() );   //wait until we've got something to send before selecting
    
    read_selector.register( client.getSocketChannel(), read_listener, null );  //start reading from the connection
	
public voidremoveClientConnection(ClientConnection connection)

    read_selector.cancel( connection.getSocketChannel() );
    write_selector.cancel( connection.getSocketChannel() );
    
    //remove from active list
    try {  connections_mon.enter();
      connections.remove( connection );
    }
    finally {  connections_mon.exit();  }
	
public voidsendMessage(ClientMessage message)

		ClientConnection vconn = message.getClient();
		
		boolean still_connected;
		
		try {  connections_mon.enter();
			still_connected = connections.contains( vconn );
		}
		finally {  connections_mon.exit();  }
		
		if( !still_connected ) {
			System.out.println( "[" +new Date()+ "] Connection message send error [connection no longer connected]: " +vconn.getDebugString()+ "]" );
			message.reportFailed( new Exception("No longer connected" ));
			//listener.connectionError( vconn ); //no need to call this, as there is no connection to remove
      return;
		}
		
		Message reply = new AZGenericMapPayload( message.getMessageID(), message.getPayload(), (byte)1 );

		vconn.sendMessage( message, reply );
		
		write_selector.resumeSelects( vconn.getSocketChannel() );  //start write selecting now that there's something to send