UDPConnectionManagerpublic class UDPConnectionManager extends Object implements NetworkGlueListener
Fields Summary |
---|
private static final org.gudy.azureus2.core3.logging.LogIDs | LOGID | private static final boolean | LOOPBACK | private static final boolean | FORCE_LOG | private static boolean | LOG | private static int | max_outbound_connections | public static final int | TIMER_TICK_MILLIS | public static final int | THREAD_LINGER_ON_IDLE_PERIOD | public static final int | DEAD_KEY_RETENTION_PERIOD | public static final int | STATS_TIME | public static final int | STATS_TICKS | private final Map | connection_sets | private final Map | recently_dead_keys | private int | next_connection_id | private com.aelitis.azureus.core.networkmanager.impl.IncomingConnectionManager | incoming_manager | private NetworkGlue | network_glue | private UDPSelector | selector | private ProtocolTimer | protocol_timer | private long | idle_start | private static final int | BLOOM_RECREATE | private static final int | BLOOM_INCREASE | private com.aelitis.azureus.core.util.bloom.BloomFilter | incoming_bloom | private long | incoming_bloom_create_time | private long | last_incoming | private int | rate_limit_discard_packets | private int | rate_limit_discard_bytes | private int | setup_discard_packets | private int | setup_discard_bytes | private int | outbound_connection_count | private boolean | max_conn_exceeded_logged |
Constructors Summary |
---|
protected UDPConnectionManager()
if ( LOOPBACK ){
network_glue = new NetworkGlueLoopBack( this );
}else{
network_glue = new NetworkGlueUDP( this );
}
|
Methods Summary |
---|
protected void | accept(int local_port, java.net.InetSocketAddress remote_address, UDPConnection connection)
final UDPTransportHelper helper = new UDPTransportHelper( this, remote_address, connection );
try{
connection.setTransport( helper );
TransportCryptoManager.getSingleton().manageCrypto(
helper,
null,
true,
null,
new TransportCryptoManager.HandshakeListener()
{
public void
handshakeSuccess(
ProtocolDecoder decoder,
ByteBuffer remaining_initial_data )
{
TransportHelperFilter filter = decoder.getFilter();
ConnectionEndpoint co_ep = new ConnectionEndpoint( remote_address);
ProtocolEndpointUDP pe_udp = new ProtocolEndpointUDP( co_ep, remote_address );
UDPTransport transport = new UDPTransport( pe_udp, filter );
helper.setTransport( transport );
incoming_manager.addConnection( local_port, filter, transport );
}
public void
handshakeFailure(
Throwable failure_msg )
{
if (Logger.isEnabled()){
Logger.log(new LogEvent(LOGID, "incoming crypto handshake failure: " + Debug.getNestedExceptionMessage( failure_msg )));
}
connection.close( "handshake failure: " + Debug.getNestedExceptionMessage(failure_msg));
}
public void
gotSecret(
byte[] session_secret )
{
helper.getConnection().setSecret( session_secret );
}
public int
getMaximumPlainHeaderLength()
{
return( incoming_manager.getMaxMinMatchBufferSize());
}
public int
matchPlainHeader(
ByteBuffer buffer )
{
Object[] match_data = incoming_manager.checkForMatch( helper, local_port, buffer, true );
if ( match_data == null ){
return( TransportCryptoManager.HandshakeListener.MATCH_NONE );
}else{
// no fallback for UDP
return( TransportCryptoManager.HandshakeListener.MATCH_CRYPTO_NO_AUTO_FALLBACK );
}
}
});
}catch( Throwable e ){
Debug.printStackTrace( e );
helper.close( Debug.getNestedExceptionMessage(e));
}
| protected synchronized int | allocationConnectionID()
int id = next_connection_id++;
if ( id < 0 ){
id = 0;
next_connection_id = 1;
}
return( id );
| protected UDPSelector | checkThreadCreation()
// called while holding the "connections" monitor
if ( selector == null ){
if (Logger.isEnabled()){
Logger.log(new LogEvent(LOGID, "UDPConnectionManager: activating" ));
}
selector = new UDPSelector(this );
protocol_timer = new ProtocolTimer();
}
return( selector );
| protected void | checkThreadDeath(boolean connections_running)
// called while holding the "connections" monitor
if ( connections_running ){
idle_start = 0;
}else{
long now = SystemTime.getCurrentTime();
if ( idle_start == 0 ){
idle_start = now;
}else if ( idle_start > now ){
idle_start = now;
}else if ( now - idle_start > THREAD_LINGER_ON_IDLE_PERIOD ){
if (Logger.isEnabled()){
Logger.log(new LogEvent(LOGID, "UDPConnectionManager: deactivating" ));
}
selector.destroy();
selector = null;
protocol_timer.destroy();
protocol_timer = null;
}
}
| public void | connectOutbound(UDPTransport udp_transport, java.net.InetSocketAddress address, byte[][] shared_secrets, java.nio.ByteBuffer initial_data, com.aelitis.azureus.core.networkmanager.Transport.ConnectListener listener)
UDPTransportHelper helper = null;
try{
listener.connectAttemptStarted();
helper = new UDPTransportHelper( this, address, udp_transport );
final UDPTransportHelper f_helper = helper;
synchronized( this ){
outbound_connection_count++;
if ( outbound_connection_count >= max_outbound_connections ){
if ( !max_conn_exceeded_logged ){
max_conn_exceeded_logged = true;
Debug.out( "UDPConnectionManager: max outbound connection limit reached (" + max_outbound_connections + ")" );
}
}
}
try{
TransportCryptoManager.getSingleton().manageCrypto(
helper,
shared_secrets,
false,
initial_data,
new TransportCryptoManager.HandshakeListener()
{
public void
handshakeSuccess(
ProtocolDecoder decoder,
ByteBuffer remaining_initial_data )
{
synchronized( this ){
if ( outbound_connection_count > 0 ){
outbound_connection_count--;
}
}
TransportHelperFilter filter = decoder.getFilter();
try{
udp_transport.setFilter( filter );
if ( udp_transport.isClosed()){
udp_transport.close( "Already closed" );
listener.connectFailure( new Exception( "Connection already closed" ));
}else{
if ( Logger.isEnabled()){
Logger.log(new LogEvent(LOGID, "Outgoing UDP stream to " + address + " established, type = " + filter.getName()));
}
udp_transport.connectedOutbound();
listener.connectSuccess( udp_transport, remaining_initial_data );
}
}catch( Throwable e ){
Debug.printStackTrace(e);
udp_transport.close( Debug.getNestedExceptionMessageAndStack(e));
listener.connectFailure( e );
}
}
public void
handshakeFailure(
Throwable failure_msg )
{
synchronized( this ){
if ( outbound_connection_count > 0 ){
outbound_connection_count--;
}
}
f_helper.close( Debug.getNestedExceptionMessageAndStack(failure_msg));
listener.connectFailure( failure_msg );
}
public void
gotSecret(
byte[] session_secret )
{
f_helper.getConnection().setSecret( session_secret );
}
public int
getMaximumPlainHeaderLength()
{
throw( new RuntimeException()); // this is outgoing
}
public int
matchPlainHeader(
ByteBuffer buffer )
{
throw( new RuntimeException()); // this is outgoing
}
});
}catch( Throwable e ){
synchronized( this ){
if ( outbound_connection_count > 0 ){
outbound_connection_count--;
}
}
throw( e );
}
}catch( Throwable e ){
Debug.printStackTrace(e);
if ( helper != null ){
helper.close( Debug.getNestedExceptionMessage( e ));
}
listener.connectFailure( e );
}
| public void | failed(UDPConnectionSet set)
synchronized( connection_sets ){
String key = set.getKey();
if ( connection_sets.remove( key ) != null ){
set.removed();
recently_dead_keys.put( key, new Long( SystemTime.getCurrentTime()));
if (Logger.isEnabled()){
Logger.log(new LogEvent(LOGID, "Connection set " + key + " failed"));
}
}
}
| public int | getMaxOutboundPermitted()
return( Math.max( max_outbound_connections - outbound_connection_count, 0 ));
| protected void | logStats()
if (Logger.isEnabled()){
long[] nw_stats = network_glue.getStats();
String str = "UDPConnection stats: sent=" + nw_stats[0] + "/" + nw_stats[1] + ",received=" + nw_stats[2] + "/" + nw_stats[3];
str += ", setup discards=" + setup_discard_packets + "/" + setup_discard_bytes;
str += ", rate discards=" + rate_limit_discard_packets + "/" + rate_limit_discard_bytes;
Logger.log(new LogEvent(LOGID, str ));
}
| protected void | poll()
synchronized( connection_sets ){
Iterator it = connection_sets.values().iterator();
while( it.hasNext()){
((UDPConnectionSet)it.next()).poll();
}
}
| protected synchronized boolean | rateLimitIncoming(java.net.InetSocketAddress s_address)
byte[] address = s_address.getAddress().getAddress();
int hit_count = incoming_bloom.add( address );
long now = SystemTime.getCurrentTime();
// allow up to 10% bloom filter utilisation
if ( incoming_bloom.getSize() / incoming_bloom.getEntryCount() < 10 ){
incoming_bloom = BloomFilterFactory.createAddRemove4Bit(incoming_bloom.getSize() + BLOOM_INCREASE );
incoming_bloom_create_time = now;
Logger.log( new LogEvent(LOGID, "UDP connnection bloom: size increased to " + incoming_bloom.getSize()));
}else if ( now < incoming_bloom_create_time || now - incoming_bloom_create_time > BLOOM_RECREATE ){
incoming_bloom = BloomFilterFactory.createAddRemove4Bit(incoming_bloom.getSize());
incoming_bloom_create_time = now;
}
if ( hit_count >= 15 ){
Logger.log( new LogEvent(LOGID, "UDP incoming: too many recent connection attempts from " + s_address ));
return( false );
}
long since_last = now - last_incoming;
long delay = 100 - since_last;
// limit to 10 a second
if ( delay > 0 && delay < 100 ){
try{
Thread.sleep( delay );
}catch( Throwable e ){
}
}
last_incoming = now;
return( true );
| public void | receive(int local_port, java.net.InetSocketAddress remote_address, byte[] data, int data_length)
String key = local_port + ":" + remote_address.getAddress().getHostAddress() + ":" + remote_address.getPort();
UDPConnectionSet connection_set;
synchronized( connection_sets ){
UDPSelector current_selector = checkThreadCreation();
connection_set = (UDPConnectionSet)connection_sets.get( key );
if ( connection_set == null ){
timeoutDeadKeys();
// check that this at least looks like an initial crypto packet
if ( data_length >= UDPNetworkManager.MIN_INCOMING_INITIAL_PACKET_SIZE &&
data_length <= UDPNetworkManager.MAX_INCOMING_INITIAL_PACKET_SIZE ){
if ( !rateLimitIncoming( remote_address )){
rate_limit_discard_packets++;
rate_limit_discard_bytes += data_length;
return;
}
connection_set = new UDPConnectionSet( this, key, current_selector, local_port, remote_address );
if (Logger.isEnabled()){
Logger.log(new LogEvent(LOGID, "Created new set - " + connection_set.getName() + ", incoming"));
}
connection_sets.put( key, connection_set );
}else{
if ( recently_dead_keys.get( key ) == null ){
// we can get quite a lot of these if things get out of sync
// Debug.out( "Incoming UDP packet mismatch for connection establishment: " + key );
}
setup_discard_packets++;
setup_discard_bytes += data_length;
return;
}
}
}
try{
//System.out.println( "recv:" + ByteFormatter.encodeString( data, 0, data_length>64?64:data_length ) + (data_length>64?"...":""));
connection_set.receive( data, data_length );
}catch( IOException e ){
connection_set.failed( e );
}catch( Throwable e ){
Debug.printStackTrace( e );
connection_set.failed( e );
}
| protected UDPConnection | registerOutgoing(UDPTransportHelper helper)
int local_port = UDPNetworkManager.getSingleton().getUDPListeningPortNumber();
InetSocketAddress address = helper.getAddress();
String key = local_port + ":" + address.getAddress().getHostAddress() + ":" + address.getPort();
synchronized( connection_sets ){
UDPSelector current_selector = checkThreadCreation();
UDPConnectionSet connection_set = (UDPConnectionSet)connection_sets.get( key );
if ( connection_set == null ){
timeoutDeadKeys();
connection_set = new UDPConnectionSet( this, key, current_selector, local_port, address );
if (Logger.isEnabled()){
Logger.log(new LogEvent(LOGID, "Created new set - " + connection_set.getName() + ", outgoing"));
}
connection_sets.put( key, connection_set );
}
UDPConnection connection = new UDPConnection( connection_set, allocationConnectionID(), helper );
connection_set.add( connection );
return( connection );
}
| public void | remove(UDPConnectionSet set, UDPConnection connection)
synchronized( connection_sets ){
if ( set.remove( connection )){
String key = set.getKey();
if ( set.hasFailed()){
if ( connection_sets.remove( key ) != null ){
set.removed();
recently_dead_keys.put( key, new Long( SystemTime.getCurrentTime()));
if (Logger.isEnabled()){
Logger.log(new LogEvent(LOGID, "Connection set " + key + " failed"));
}
}
}
}
}
| public int | send(int local_port, java.net.InetSocketAddress remote_address, byte[] data)
return( network_glue.send( local_port, remote_address, data ));
| protected void | timeoutDeadKeys()
Iterator it = recently_dead_keys.values().iterator();
long now = SystemTime.getCurrentTime();
while( it.hasNext()){
long dead_time = ((Long)it.next()).longValue();
if ( dead_time > now || now - dead_time > DEAD_KEY_RETENTION_PERIOD ){
it.remove();
}
}
| protected boolean | trace()
return( LOG );
| protected void | trace(java.lang.String str)
if ( LOG ){
if ( FORCE_LOG ){
System.out.println( str );
}
if (Logger.isEnabled()){
Logger.log(new LogEvent(LOGID, str ));
}
}
|
|