FileDocCategorySizeDatePackage
UDPConnectionManager.javaAPI DocAzureus 3.0.3.420625Tue Apr 10 13:30:42 BST 2007com.aelitis.azureus.core.networkmanager.impl.udp

UDPConnectionManager

public class UDPConnectionManager extends Object implements NetworkGlueListener

Fields Summary
private static final org.gudy.azureus2.core3.logging.LogIDs
LOGID
private static final boolean
LOOPBACK
private static final boolean
FORCE_LOG
private static boolean
LOG
private static int
max_outbound_connections
public static final int
TIMER_TICK_MILLIS
public static final int
THREAD_LINGER_ON_IDLE_PERIOD
public static final int
DEAD_KEY_RETENTION_PERIOD
public static final int
STATS_TIME
public static final int
STATS_TICKS
private final Map
connection_sets
private final Map
recently_dead_keys
private int
next_connection_id
private com.aelitis.azureus.core.networkmanager.impl.IncomingConnectionManager
incoming_manager
private NetworkGlue
network_glue
private UDPSelector
selector
private ProtocolTimer
protocol_timer
private long
idle_start
private static final int
BLOOM_RECREATE
private static final int
BLOOM_INCREASE
private com.aelitis.azureus.core.util.bloom.BloomFilter
incoming_bloom
private long
incoming_bloom_create_time
private long
last_incoming
private int
rate_limit_discard_packets
private int
rate_limit_discard_bytes
private int
setup_discard_packets
private int
setup_discard_bytes
private int
outbound_connection_count
private boolean
max_conn_exceeded_logged
Constructors Summary
protected UDPConnectionManager()

	
	
	
			
		if ( LOOPBACK ){
			
			network_glue = new NetworkGlueLoopBack( this );
			
		}else{
			
			network_glue = new NetworkGlueUDP( this );
		}
	
Methods Summary
protected voidaccept(int local_port, java.net.InetSocketAddress remote_address, UDPConnection connection)

		final UDPTransportHelper	helper = new UDPTransportHelper( this, remote_address, connection );

		try{
			connection.setTransport( helper );
			
			TransportCryptoManager.getSingleton().manageCrypto( 
				helper, 
				null, 
				true, 
				null,
				new TransportCryptoManager.HandshakeListener() 
				{
					public void 
					handshakeSuccess( 
						ProtocolDecoder	decoder,
						ByteBuffer		remaining_initial_data ) 
					{
						TransportHelperFilter	filter = decoder.getFilter();
						
						ConnectionEndpoint	co_ep = new ConnectionEndpoint( remote_address);
	
						ProtocolEndpointUDP	pe_udp = new ProtocolEndpointUDP( co_ep, remote_address );
	
						UDPTransport transport = new UDPTransport( pe_udp, filter );
								
						helper.setTransport( transport );
						
						incoming_manager.addConnection( local_port, filter, transport );
	        		}
	
					public void 
					handshakeFailure( 
	            		Throwable failure_msg ) 
					{
						if (Logger.isEnabled()){
							Logger.log(new LogEvent(LOGID, "incoming crypto handshake failure: " + Debug.getNestedExceptionMessage( failure_msg )));
						}
	 
						connection.close( "handshake failure: " + Debug.getNestedExceptionMessage(failure_msg));
					}
	            
					public void
					gotSecret(
						byte[]				session_secret )
					{
						helper.getConnection().setSecret( session_secret );
					}
					
					public int
					getMaximumPlainHeaderLength()
					{
						return( incoming_manager.getMaxMinMatchBufferSize());
					}
	    		
					public int
					matchPlainHeader(
							ByteBuffer			buffer )
					{
						Object[]	match_data = incoming_manager.checkForMatch( helper, local_port, buffer, true );
	    			
						if ( match_data == null ){
	    				
							return( TransportCryptoManager.HandshakeListener.MATCH_NONE );
	    				
						}else{
							
								// no fallback for UDP
	    				
							return( TransportCryptoManager.HandshakeListener.MATCH_CRYPTO_NO_AUTO_FALLBACK );
	    				}
	    			}
	        	});
			
		}catch( Throwable e ){
			
			Debug.printStackTrace( e );
			
			helper.close( Debug.getNestedExceptionMessage(e));
		}
	
protected synchronized intallocationConnectionID()

		int	id = next_connection_id++;
		
		if ( id < 0 ){
			
			id					= 0;
			next_connection_id	= 1;
		}
		
		return( id );
	
protected UDPSelectorcheckThreadCreation()

			// called while holding the "connections" monitor
		
		if ( selector == null ){
			
			if (Logger.isEnabled()){
				Logger.log(new LogEvent(LOGID, "UDPConnectionManager: activating" ));
			}
			
			selector = new UDPSelector(this );
			
			protocol_timer = new ProtocolTimer();
		}
		
		return( selector );
	
protected voidcheckThreadDeath(boolean connections_running)

			// called while holding the "connections" monitor

		if ( connections_running ){
			
			idle_start = 0;
			
		}else{
			
			long	now = SystemTime.getCurrentTime();
			
			if ( idle_start == 0 ){
				
				idle_start = now;
				
			}else if ( idle_start > now ){
				
				idle_start = now;
				
			}else if ( now - idle_start > THREAD_LINGER_ON_IDLE_PERIOD ){
				
				if (Logger.isEnabled()){
					Logger.log(new LogEvent(LOGID, "UDPConnectionManager: deactivating" ));
				}
				
				selector.destroy();
				
				selector = null;
				
				protocol_timer.destroy();
				
				protocol_timer = null;
			}
		}
	
public voidconnectOutbound(UDPTransport udp_transport, java.net.InetSocketAddress address, byte[][] shared_secrets, java.nio.ByteBuffer initial_data, com.aelitis.azureus.core.networkmanager.Transport.ConnectListener listener)

		
		UDPTransportHelper	helper = null;

		try{
			listener.connectAttemptStarted();
			
			helper = new UDPTransportHelper( this, address, udp_transport );
	 		
			final UDPTransportHelper f_helper = helper;
			
			synchronized( this ){
				
				outbound_connection_count++;
				
				if ( outbound_connection_count >= max_outbound_connections ){
			    	
					if ( !max_conn_exceeded_logged ){
			    		
						max_conn_exceeded_logged = true;
			    	
			    		Debug.out( "UDPConnectionManager: max outbound connection limit reached (" + max_outbound_connections + ")" );
					}
				}
			}
			
			try{
		    	TransportCryptoManager.getSingleton().manageCrypto( 
	    			helper, 
	    			shared_secrets, 
	    			false, 
	    			initial_data,
	    			new TransportCryptoManager.HandshakeListener() 
	    			{
	    				public void 
	    				handshakeSuccess( 
	    					ProtocolDecoder	decoder,
	    					ByteBuffer		remaining_initial_data )
	    				{
	    					synchronized( this ){
	    						
	    						if ( outbound_connection_count > 0 ){

	    							outbound_connection_count--;
	    						}
	    					}
	    					
	    					TransportHelperFilter	filter = decoder.getFilter();
	    					
	    					try{
	    						udp_transport.setFilter( filter );
		    					
		    					if ( udp_transport.isClosed()){
		    						
		    						udp_transport.close( "Already closed" );
		    						
		    						listener.connectFailure( new Exception( "Connection already closed" ));
		    						
		    					}else{
		    						
			    		   			if ( Logger.isEnabled()){
			    		    		
			    		   				Logger.log(new LogEvent(LOGID, "Outgoing UDP stream to " + address + " established, type = " + filter.getName()));
			    		    		}
			    		   			
			    		   			udp_transport.connectedOutbound();
			    		   			
			    		   			listener.connectSuccess( udp_transport, remaining_initial_data );
		    					}
	    					}catch( Throwable e ){
	    						
	    						Debug.printStackTrace(e);
	    						
	    						udp_transport.close( Debug.getNestedExceptionMessageAndStack(e));
	    						
	    						listener.connectFailure( e );
	    					}
	    				}
	
	    				public void 
	    				handshakeFailure( 
	    					Throwable failure_msg )
	    				{
	    					synchronized( this ){
	    						
	    						if ( outbound_connection_count > 0 ){

	    							outbound_connection_count--;
	    						}
	    					}
	    					
	    					f_helper.close( Debug.getNestedExceptionMessageAndStack(failure_msg));
	    					
	    					listener.connectFailure( failure_msg );
	    				}
	    				
	    				public void
	    				gotSecret(
							byte[]				session_secret )
	    				{
	    					f_helper.getConnection().setSecret( session_secret );
	    				}
	    				
	    				public int 
	    				getMaximumPlainHeaderLength()
	    				{
	    		   			throw( new RuntimeException());	// this is outgoing
	    				}
	    				
	    				public int 
	    				matchPlainHeader( 
	    					ByteBuffer buffer )
	    				{
	    					throw( new RuntimeException());	// this is outgoing
	    				}
	    			});
		    	
			}catch( Throwable e ){
				
				synchronized( this ){
					
					if ( outbound_connection_count > 0 ){
						
						outbound_connection_count--;
					}
				}
				
				throw( e );
			}
	    	
		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
			
			if ( helper != null ){
			
				helper.close( Debug.getNestedExceptionMessage( e ));
			}
				
			listener.connectFailure( e );
		}
	
public voidfailed(UDPConnectionSet set)

		synchronized( connection_sets ){

			String	key = set.getKey();
					
			if ( connection_sets.remove( key ) != null ){
					
				set.removed();
				
				recently_dead_keys.put( key, new Long( SystemTime.getCurrentTime()));
				
				if (Logger.isEnabled()){
					
					Logger.log(new LogEvent(LOGID, "Connection set " + key + " failed"));
				}
			}                      
		}                    
	
public intgetMaxOutboundPermitted()

		return( Math.max( max_outbound_connections - outbound_connection_count, 0 ));
	
protected voidlogStats()

		if (Logger.isEnabled()){
			
			long[]	nw_stats = network_glue.getStats();
		
			String	str = "UDPConnection stats: sent=" + nw_stats[0] + "/" + nw_stats[1] + ",received=" + nw_stats[2] + "/" + nw_stats[3];
			
			str += ", setup discards=" + setup_discard_packets + "/" + setup_discard_bytes;
			str += ", rate discards=" + rate_limit_discard_packets + "/" + rate_limit_discard_bytes;
			
			Logger.log(new LogEvent(LOGID, str ));
		}
	
protected voidpoll()

		synchronized( connection_sets ){

			Iterator	it = connection_sets.values().iterator();
			
			while( it.hasNext()){
				
				((UDPConnectionSet)it.next()).poll();
			}
		}
	
protected synchronized booleanrateLimitIncoming(java.net.InetSocketAddress s_address)

		byte[]	address = s_address.getAddress().getAddress();
		
		int	hit_count = incoming_bloom.add( address );
		
		long	now = SystemTime.getCurrentTime();

			// allow up to 10% bloom filter utilisation
		
		if ( incoming_bloom.getSize() / incoming_bloom.getEntryCount() < 10 ){
			
			incoming_bloom = BloomFilterFactory.createAddRemove4Bit(incoming_bloom.getSize() + BLOOM_INCREASE );
			
			incoming_bloom_create_time	= now;
			
     		Logger.log(	new LogEvent(LOGID, "UDP connnection bloom: size increased to " + incoming_bloom.getSize()));

		}else if ( now < incoming_bloom_create_time || now - incoming_bloom_create_time > BLOOM_RECREATE ){
			
			incoming_bloom = BloomFilterFactory.createAddRemove4Bit(incoming_bloom.getSize());
			
			incoming_bloom_create_time	= now;
		}
			
		if ( hit_count >= 15 ){
			
     		Logger.log(	new LogEvent(LOGID, "UDP incoming: too many recent connection attempts from " + s_address ));
     		
			return( false );
		}
		
		long	since_last = now - last_incoming;
		
		long	delay = 100 - since_last;
		
			// limit to 10 a second
		
		if ( delay > 0 && delay < 100 ){
			
			try{
				Thread.sleep( delay );
				
			}catch( Throwable e ){
			}
		}
		
		last_incoming = now;	
		
		return( true );
	
public voidreceive(int local_port, java.net.InetSocketAddress remote_address, byte[] data, int data_length)

		String	key = local_port + ":" + remote_address.getAddress().getHostAddress() + ":" + remote_address.getPort();
		
		UDPConnectionSet	connection_set;
		
		synchronized( connection_sets ){
			
			UDPSelector	current_selector	= checkThreadCreation();
			
			connection_set = (UDPConnectionSet)connection_sets.get( key );
			
			if ( connection_set == null ){
				
				timeoutDeadKeys();
				
					// check that this at least looks like an initial crypto packet
				
				if (	data_length >= UDPNetworkManager.MIN_INCOMING_INITIAL_PACKET_SIZE &&
						data_length <= UDPNetworkManager.MAX_INCOMING_INITIAL_PACKET_SIZE ){
				
					if ( !rateLimitIncoming( remote_address )){
						
						rate_limit_discard_packets++;
						rate_limit_discard_bytes	+= data_length;
						
						return;
					}
					
					connection_set = new UDPConnectionSet( this, key, current_selector, local_port, remote_address );
						
					if (Logger.isEnabled()){
						
						Logger.log(new LogEvent(LOGID, "Created new set - " + connection_set.getName() + ", incoming"));
					}
					
					connection_sets.put( key, connection_set );
					
				}else{
					
					if ( recently_dead_keys.get( key ) == null ){
	
						// we can get quite a lot of these if things get out of sync
						
						// Debug.out( "Incoming UDP packet mismatch for connection establishment: " + key );
					}
					
					setup_discard_packets++;
					setup_discard_bytes	+= data_length;

					return;
				}
			}
		}
		
		try{
			//System.out.println( "recv:" + ByteFormatter.encodeString( data, 0, data_length>64?64:data_length ) + (data_length>64?"...":""));
			
			connection_set.receive( data, data_length );
			
		}catch( IOException e ){
			
			connection_set.failed( e );
			
		}catch( Throwable e ){
			
			Debug.printStackTrace( e );
			
			connection_set.failed( e );
		}
	
protected UDPConnectionregisterOutgoing(UDPTransportHelper helper)

		int	local_port = UDPNetworkManager.getSingleton().getUDPListeningPortNumber();
		
		InetSocketAddress	address = helper.getAddress();
		
		String	key = local_port + ":" + address.getAddress().getHostAddress() + ":" + address.getPort();
		
		synchronized( connection_sets ){
			
			UDPSelector	current_selector	= checkThreadCreation();
			
			UDPConnectionSet	connection_set = (UDPConnectionSet)connection_sets.get( key );
			
			if ( connection_set == null ){
				
				timeoutDeadKeys();
				
				connection_set = new UDPConnectionSet( this, key, current_selector, local_port, address );
				
				if (Logger.isEnabled()){
					
					Logger.log(new LogEvent(LOGID, "Created new set - " + connection_set.getName() + ", outgoing"));
				}
				
				connection_sets.put( key, connection_set );
			}
			
			UDPConnection	connection = new UDPConnection( connection_set, allocationConnectionID(), helper );
			
			connection_set.add( connection );
			
			return( connection  );
		}
	
public voidremove(UDPConnectionSet set, UDPConnection connection)

		synchronized( connection_sets ){

			if ( set.remove( connection )){

				String	key = set.getKey();

				if ( set.hasFailed()){
					
					if ( connection_sets.remove( key ) != null ){
						
						set.removed();
						
						recently_dead_keys.put( key, new Long( SystemTime.getCurrentTime()));
						
						if (Logger.isEnabled()){
							
							Logger.log(new LogEvent(LOGID, "Connection set " + key + " failed"));
						}
					}
				}
			}	                          
		}                    
	
public intsend(int local_port, java.net.InetSocketAddress remote_address, byte[] data)

		return( network_glue.send( local_port, remote_address, data ));
	
protected voidtimeoutDeadKeys()

		Iterator	it = recently_dead_keys.values().iterator();
		
		long	now = SystemTime.getCurrentTime();
		
		while( it.hasNext()){
			
			long	dead_time = ((Long)it.next()).longValue();
		
			if ( dead_time > now || now - dead_time > DEAD_KEY_RETENTION_PERIOD ){
								
				it.remove();
			}
		}
	
protected booleantrace()

		return( LOG );
	
protected voidtrace(java.lang.String str)

		if ( LOG ){
			
			if ( FORCE_LOG ){
				
				System.out.println( str );
			}
			
			if (Logger.isEnabled()){
				
				Logger.log(new LogEvent(LOGID, str ));
			}
		}