FileDocCategorySizeDatePackage
TCPConnectionManager.javaAPI DocAzureus 3.0.3.422429Wed May 09 14:45:12 BST 2007com.aelitis.azureus.core.networkmanager.impl.tcp

TCPConnectionManager

public class TCPConnectionManager extends Object
Manages new connection establishment and ended connection termination.

Fields Summary
private static final LogIDs
LOGID
private static int
MIN_SIMULTANIOUS_CONNECT_ATTEMPTS
public static int
MAX_SIMULTANIOUS_CONNECT_ATTEMPTS
private static int
max_outbound_connections
private static final int
CONNECT_ATTEMPT_TIMEOUT
private static final int
CONNECT_ATTEMPT_STALL_TIME
private static final boolean
SHOW_CONNECT_STATS
private final com.aelitis.azureus.core.networkmanager.VirtualChannelSelector
connect_selector
private final LinkedList
new_requests
private final ArrayList
canceled_requests
private final AEMonitor
new_canceled_mon
private final HashMap
pending_attempts
private final LinkedList
pending_closes
private final Map
delayed_closes
private final AEMonitor
pending_closes_mon
private final Random
random
private boolean
max_conn_exceeded_logged
Constructors Summary
public TCPConnectionManager()

  
  
   
   
  
	  Set	types = new HashSet();
	  
	  types.add( AzureusCoreStats.ST_NET_TCP_OUT_CONNECT_QUEUE_LENGTH );
	  types.add( AzureusCoreStats.ST_NET_TCP_OUT_CANCEL_QUEUE_LENGTH );
	  types.add( AzureusCoreStats.ST_NET_TCP_OUT_CLOSE_QUEUE_LENGTH );
	  types.add( AzureusCoreStats.ST_NET_TCP_OUT_PENDING_QUEUE_LENGTH );
		
	  AzureusCoreStats.registerProvider( 
			  types,
			  new AzureusCoreStatsProvider()
			  {
					public void
					updateStats(
						Set		types,
						Map		values )
					{
						if ( types.contains( AzureusCoreStats.ST_NET_TCP_OUT_CONNECT_QUEUE_LENGTH )){
							
							values.put( AzureusCoreStats.ST_NET_TCP_OUT_CONNECT_QUEUE_LENGTH, new Long( new_requests.size()));
						}	
						
						if ( types.contains( AzureusCoreStats.ST_NET_TCP_OUT_CANCEL_QUEUE_LENGTH )){
							
							values.put( AzureusCoreStats.ST_NET_TCP_OUT_CANCEL_QUEUE_LENGTH, new Long( canceled_requests.size()));
						}					

						if ( types.contains( AzureusCoreStats.ST_NET_TCP_OUT_CLOSE_QUEUE_LENGTH )){
							
							values.put( AzureusCoreStats.ST_NET_TCP_OUT_CLOSE_QUEUE_LENGTH, new Long( pending_closes.size()));
						}					

						if ( types.contains( AzureusCoreStats.ST_NET_TCP_OUT_PENDING_QUEUE_LENGTH )){
							
							values.put( AzureusCoreStats.ST_NET_TCP_OUT_PENDING_QUEUE_LENGTH, new Long( pending_attempts.size()));
						}					

					}
			  });
	  
    AEThread loop = new AEThread( "ConnectDisconnectManager" ) {
      public void runSupport() {
        mainLoop();
      }
    };
    loop.setDaemon( true );
    loop.start();    
  
Methods Summary
private voidaddNewOutboundRequests()

    
    while( pending_attempts.size() < MIN_SIMULTANIOUS_CONNECT_ATTEMPTS ) {
      ConnectionRequest cr = null;
      
      try{
        new_canceled_mon.enter();
      
        if( new_requests.isEmpty() )  break;
        cr = (ConnectionRequest)new_requests.removeFirst();
      }
      finally{
        new_canceled_mon.exit();
      }
      
      if( cr != null ) {
        addNewRequest( cr ); 
      }
    }
  
private voidaddNewRequest(com.aelitis.azureus.core.networkmanager.impl.tcp.TCPConnectionManager$ConnectionRequest request)

    request.listener.connectAttemptStarted();
    
    try {
      request.channel = SocketChannel.open();
      
      try {  //advanced socket options
        int rcv_size = COConfigurationManager.getIntParameter( "network.tcp.socket.SO_RCVBUF" );
        if( rcv_size > 0 ) {
          if (Logger.isEnabled())
						Logger.log(new LogEvent(LOGID, "Setting socket receive buffer size"
								+ " for outgoing connection [" + request.address + "] to: "
								+ rcv_size));
          request.channel.socket().setReceiveBufferSize( rcv_size );
        }
      
        int snd_size = COConfigurationManager.getIntParameter( "network.tcp.socket.SO_SNDBUF" );
        if( snd_size > 0 ) {
        	if (Logger.isEnabled())
        		Logger.log(new LogEvent(LOGID, "Setting socket send buffer size "
        				+ "for outgoing connection [" + request.address + "] to: "
        				+ snd_size));
          request.channel.socket().setSendBufferSize( snd_size );
        }

        String ip_tos = COConfigurationManager.getStringParameter( "network.tcp.socket.IPTOS" );
        if( ip_tos.length() > 0 ) {
        	if (Logger.isEnabled())
        		Logger.log(new LogEvent(LOGID, "Setting socket TOS field "
        				+ "for outgoing connection [" + request.address + "] to: "
        				+ ip_tos));
          request.channel.socket().setTrafficClass( Integer.decode( ip_tos ).intValue() );
        }

      
        int local_bind_port = COConfigurationManager.getIntParameter( "network.bind.local.port" );
        
        if( local_bind_port > 0 ) {
        	request.channel.socket().setReuseAddress( true );
        }
        
        InetAddress bindIP = NetworkAdmin.getSingleton().getDefaultBindAddress();
        if ( bindIP != null ) {
        	if (Logger.isEnabled()) 	Logger.log(new LogEvent(LOGID, "Binding outgoing connection [" + request.address + "] to local IP address: " + bindIP));
          request.channel.socket().bind( new InetSocketAddress( bindIP, local_bind_port ) );
        }
        else if( local_bind_port > 0 ) {       
        	if (Logger.isEnabled()) Logger.log(new LogEvent(LOGID, "Binding outgoing connection [" + request.address + "] to local port #: " +local_bind_port));
        	request.channel.socket().bind( new InetSocketAddress( local_bind_port ) );     
        }

      }
      catch( Throwable t ) {
        String msg = "Error while processing advanced socket options.";
        Debug.out( msg, t );
        Logger.log(new LogAlert(LogAlert.UNREPEATABLE, msg, t));
        //dont pass the exception outwards, so we will continue processing connection without advanced options set
      }
      
      request.channel.configureBlocking( false );
      request.connect_start_time = SystemTime.getCurrentTime();
      
      if( request.channel.connect( request.address ) ) {  //already connected
        finishConnect( request );
      }
      else {  //not yet connected, so register for connect selection
        pending_attempts.put( request, null );
        
        connect_selector.register( request.channel, new VirtualChannelSelector.VirtualSelectorListener() {
          public boolean selectSuccess( VirtualChannelSelector selector, SocketChannel sc, Object attachment ) {         
            pending_attempts.remove( request );
            finishConnect( request );
            return true;
          }
          
          public void selectFailure( VirtualChannelSelector selector, SocketChannel sc,Object attachment, Throwable msg ) {
            pending_attempts.remove( request );
            
            closeConnection( request.channel );
           
            request.listener.connectFailure( msg );
          }
        }, null );
      }
    }
    catch( Throwable t ) {
      
      String full = request.address.toString();
      String hostname = request.address.getHostName();
      int port = request.address.getPort();
      boolean unresolved = request.address.isUnresolved();
      InetAddress	inet_address = request.address.getAddress();
      String full_sub = inet_address==null?request.address.toString():inet_address.toString();
      String host_address = inet_address==null?request.address.toString():inet_address.getHostAddress();
      
      String msg = "ConnectDisconnectManager::address exception: full="+full+ ", hostname="+hostname+ ", port="+port+ ", unresolved="+unresolved+ ", full_sub="+full_sub+ ", host_address="+host_address;
      if( request.channel != null ) {
        String channel = request.channel.toString();
        String socket = request.channel.socket().toString();
        String local_address = request.channel.socket().getLocalAddress().toString();
        int local_port = request.channel.socket().getLocalPort();
           SocketAddress ra = request.channel.socket().getRemoteSocketAddress();
        String remote_address;
           if( ra != null )  remote_address = ra.toString();
           else remote_address = "<null>";
        int remote_port = request.channel.socket().getPort();

        msg += "\n channel="+channel+ ", socket="+socket+ ", local_address="+local_address+ ", local_port="+local_port+ ", remote_address="+remote_address+ ", remote_port="+remote_port;
      }
      else {
        msg += "\n channel=<null>";
      }
      
      if ( t instanceof UnresolvedAddressException ){
    	  Debug.outNoStack( msg );
      }else{
    	  Debug.out( msg, t );
      }
      
      
      if( request.channel != null ) {
    	  closeConnection( request.channel );
      }
      request.listener.connectFailure( t );
    }
  
public voidcancelRequest(com.aelitis.azureus.core.networkmanager.impl.tcp.TCPConnectionManager$ConnectListener listener_key)
Cancel a pending new connection request.

param
listener_key used in the initial connect request

    try{
      new_canceled_mon.enter();
    
      //check if we can cancel it right away
      for( Iterator i = new_requests.iterator(); i.hasNext(); ) {
        ConnectionRequest request = (ConnectionRequest)i.next();
        if( request.listener == listener_key ) {
          i.remove();
          return;
        }
      }
      
      canceled_requests.add( listener_key ); //else add for later removal during select
    }
    finally{
      new_canceled_mon.exit();
    }
  
public voidcloseConnection(java.nio.channels.SocketChannel channel)
Close the given connection.

param
channel to close

	  closeConnection( channel, 0 );
  
public voidcloseConnection(java.nio.channels.SocketChannel channel, int delay)

    try{
    	pending_closes_mon.enter();
    
    	if ( delay == 0 ){
    		
    		if ( !delayed_closes.containsKey( channel )){
    		
	    		if ( !pending_closes.contains( channel )){
	    			
	    			pending_closes.addLast( channel );
	    		}
    		}
    	}else{
    		
    		delayed_closes.put( channel, new Long( SystemTime.getCurrentTime() + delay ));
    	}
    }finally{
    	
    	pending_closes_mon.exit();
    }
  
private voiddoClosings()

    try{
    	pending_closes_mon.enter();
    
    	long	now = SystemTime.getCurrentTime();
    	
    	if ( delayed_closes.size() > 0 ){
    		   		
    		Iterator	it = delayed_closes.entrySet().iterator();
    		
    		while( it.hasNext()){
    			
    			Map.Entry	entry = (Map.Entry)it.next();
    			
    			long	wait = ((Long)entry.getValue()).longValue() - now;
    			
    			if ( wait < 0 || wait > 60*1000 ){
    				
    				pending_closes.addLast( entry.getKey());
    				
    				it.remove();    				
    			}
    		}
    	}
    	
    	while( !pending_closes.isEmpty() ) {
    		
    		SocketChannel channel = (SocketChannel)pending_closes.removeFirst();
    		if( channel != null ) {
        	
    			connect_selector.cancel( channel );
        	
    			try{ 
    				channel.close();
    			}
    			catch( Throwable t ) {
    				/*Debug.printStackTrace(t);*/
    			}
    		}
    	}
    }finally{
    	
    	pending_closes_mon.exit();
    }
  
private voidfinishConnect(com.aelitis.azureus.core.networkmanager.impl.tcp.TCPConnectionManager$ConnectionRequest request)

    try {
      if( request.channel.finishConnect() ) {
            
        if( SHOW_CONNECT_STATS ) {
          long queue_wait_time = request.connect_start_time - request.request_start_time;
          long connect_time = SystemTime.getCurrentTime() - request.connect_start_time;
          int num_queued = new_requests.size();
          int num_connecting = pending_attempts.size();
          System.out.println("S: queue_wait_time="+queue_wait_time+
                              ", connect_time="+connect_time+
                              ", num_queued="+num_queued+
                              ", num_connecting="+num_connecting);
        }
        
        //ensure the request hasn't been canceled during the select op
        boolean canceled = false;
        try{  new_canceled_mon.enter();
          canceled = canceled_requests.contains( request.listener );
        }
        finally{ new_canceled_mon.exit(); }
        
        if( canceled ) {
        	closeConnection( request.channel );
        }
        else {
        	connect_selector.cancel( request.channel );
          request.listener.connectSuccess( request.channel );
        }
      }
      else { //should never happen
        Debug.out( "finishConnect() failed" );
        request.listener.connectFailure( new Throwable( "finishConnect() failed" ) );
        
        closeConnection( request.channel );
      }
    }
    catch( Throwable t ) {
          
      if( SHOW_CONNECT_STATS ) {
        long queue_wait_time = request.connect_start_time - request.request_start_time;
        long connect_time = SystemTime.getCurrentTime() - request.connect_start_time;
        int num_queued = new_requests.size();
        int num_connecting = pending_attempts.size();
        System.out.println("F: queue_wait_time="+queue_wait_time+
                            ", connect_time="+connect_time+
                            ", num_queued="+num_queued+
                            ", num_connecting="+num_connecting);
      }
          
      request.listener.connectFailure( t );
      
      closeConnection( request.channel );
    }
  
public intgetMaxOutboundPermitted()

		return( Math.max( max_outbound_connections - new_requests.size(), 0 ));
	
private voidmainLoop()

      
    while( true ) {
      addNewOutboundRequests();
      runSelect();
      doClosings();
    }
  
public voidrequestNewConnection(java.net.InetSocketAddress address, com.aelitis.azureus.core.networkmanager.impl.tcp.TCPConnectionManager$ConnectListener listener, long connect_timeout)

    
	   ConnectionRequest cr = new ConnectionRequest( address, listener, connect_timeout );
	    try{
	      new_canceled_mon.enter();
	    
	      //insert at a random position because new connections are usually added in 50-peer
	      //chunks, i.e. from a tracker announce reply, and we want to evenly distribute the
	      //connect attempts if there are multiple torrents running
	      int insert_pos = random.nextInt( new_requests.size() + 1 );
	      new_requests.add( insert_pos, cr );
	      
	      if ( new_requests.size() >= max_outbound_connections ){
			
	    	if ( !max_conn_exceeded_logged ){
	    		
	    		max_conn_exceeded_logged = true;
	    	
	    		Debug.out( "TCPConnectionManager: max outbound connection limit reached (" + max_outbound_connections + ")" );
	    	}
	      }
	    }finally{
	    	
	      new_canceled_mon.exit();
	    }
  
public voidrequestNewConnection(java.net.InetSocketAddress address, com.aelitis.azureus.core.networkmanager.impl.tcp.TCPConnectionManager$ConnectListener listener)
Request that a new connection be made out to the given address.

param
address remote ip+port to connect to
param
listener to receive notification of connect attempt success/failure

   
	  requestNewConnection( address, listener, CONNECT_ATTEMPT_TIMEOUT );
  
private voidrunSelect()

    //do cancellations
    try{
      new_canceled_mon.enter();

      for (Iterator can_it =canceled_requests.iterator(); can_it.hasNext();) {
        ConnectListener key =(ConnectListener) can_it.next();

        ConnectionRequest to_remove =null;

        for (Iterator pen_it =pending_attempts.keySet().iterator(); pen_it.hasNext();) {
          ConnectionRequest request =(ConnectionRequest) pen_it.next();
          if (request.listener ==key) {
            connect_selector.cancel(request.channel);

            closeConnection(request.channel);

            to_remove =request;
            break;
          }
        }

        if( to_remove != null ) {
          pending_attempts.remove( to_remove );
        }
      }

      canceled_requests.clear();
    }
    finally{
      new_canceled_mon.exit();
    }

    //run select
    try{
      connect_selector.select(100);
    }
    catch( Throwable t ) {
      Debug.out("connnectSelectLoop() EXCEPTION: ", t);
    }

    //do connect attempt timeout checks
    int num_stalled_requests =0;
    final long now =SystemTime.getCurrentTime();
    for (Iterator i =pending_attempts.keySet().iterator(); i.hasNext();) {
      final ConnectionRequest request =(ConnectionRequest) i.next();
      final long waiting_time =now -request.connect_start_time;
      if( waiting_time > request.connect_timeout ) {
        i.remove();

        SocketChannel channel = request.channel;
        
        connect_selector.cancel( channel );

        closeConnection( channel );
              
        InetSocketAddress	sock_address = request.address;
        
       	InetAddress a = sock_address.getAddress();
        	
       	String	target;
       	
       	if ( a != null ){
        		
        	target = a.getHostAddress() + ":" + sock_address.getPort();
        		
        }else{
        		
        	target = sock_address.toString();
        }
               
        request.listener.connectFailure( new SocketTimeoutException( "Connection attempt to " + target + " aborted: timed out after " + request.connect_timeout/1000+ "sec" ) );
      }
      else if( waiting_time >= CONNECT_ATTEMPT_STALL_TIME ) {
        num_stalled_requests++;
      }
      else if( waiting_time < 0 ) {  //time went backwards
        request.connect_start_time =now;
      }
    }

    //check if our connect queue is stalled, and expand if so
    if (num_stalled_requests ==pending_attempts.size() &&pending_attempts.size() <MAX_SIMULTANIOUS_CONNECT_ATTEMPTS) {
      ConnectionRequest cr =null;

      try{
        new_canceled_mon.enter();

        if( !new_requests.isEmpty() ) {
          cr = (ConnectionRequest)new_requests.removeFirst();
        }
      }
      finally{
        new_canceled_mon.exit();
      }

      if( cr != null ) {
        addNewRequest( cr );
      }
    }