FileDocCategorySizeDatePackage
SESTSConnectionImpl.javaAPI DocAzureus 3.0.3.418947Thu Aug 10 11:23:00 BST 2006org.gudy.azureus2.pluginsimpl.local.utils.security

SESTSConnectionImpl

public class SESTSConnectionImpl extends Object implements org.gudy.azureus2.plugins.messaging.generic.GenericMessageConnection

Fields Summary
private static final org.gudy.azureus2.core3.logging.LogIDs
LOGID
private static final byte[]
AES_IV1
private static final byte[]
AES_IV2
private final int
AES_KEY_SIZE_BYTES
private static long
last_incoming_sts_create
private static final int
BLOOM_RECREATE
private static final int
BLOOM_INCREASE
private static com.aelitis.azureus.core.util.bloom.BloomFilter
generate_bloom
private static long
generate_bloom_create_time
private com.aelitis.azureus.core.AzureusCore
core
private org.gudy.azureus2.pluginsimpl.local.messaging.GenericMessageConnectionImpl
connection
private org.gudy.azureus2.plugins.utils.security.SEPublicKey
my_public_key
private org.gudy.azureus2.plugins.utils.security.SEPublicKeyLocator
key_locator
private String
reason
private int
block_crypto
private com.aelitis.azureus.core.security.CryptoSTSEngine
sts_engine
private List
listeners
private boolean
sent_keys
private boolean
sent_auth
private org.gudy.azureus2.plugins.utils.PooledByteBuffer
pending_message
private org.gudy.azureus2.core3.util.AESemaphore
crypto_complete
private Cipher
outgoing_cipher
private Cipher
incoming_cipher
private volatile boolean
failed
Constructors Summary
protected SESTSConnectionImpl(com.aelitis.azureus.core.AzureusCore _core, org.gudy.azureus2.pluginsimpl.local.messaging.GenericMessageConnectionImpl _connection, org.gudy.azureus2.plugins.utils.security.SEPublicKey _my_public_key, org.gudy.azureus2.plugins.utils.security.SEPublicKeyLocator _key_locator, String _reason, int _block_crypto)

	
	
	
	
								
			
								
						
									
										 
	
		 
	
		core			= _core;
		connection		= _connection;
		my_public_key	= _my_public_key;
		key_locator		= _key_locator;
		reason			= _reason;
		block_crypto	= _block_crypto;

		if ( connection.isIncoming()){
			
			rateLimit( connection.getEndpoint().getNotionalAddress());
		}
		
		sts_engine	= core.getCryptoManager().getECCHandler().getSTSEngine( reason );
		
		connection.addListener(
			new GenericMessageConnectionListener()
			{
				public void
				connected(
					GenericMessageConnection	connection )
				{
					reportConnected();
				}
				
				public void
				receive(
					GenericMessageConnection	connection,
					PooledByteBuffer			message )
				
					throws MessageException
				{
					SESTSConnectionImpl.this.receive( message );
				}
				
				public void
				failed(
					GenericMessageConnection	connection,
					Throwable 					error )
				
					throws MessageException
				{
					reportFailed( error );
				}
			});
	
Methods Summary
public voidaddListener(org.gudy.azureus2.plugins.messaging.generic.GenericMessageConnectionListener listener)

		listeners.add( listener );
	
public voidclose()

		connection.close();
	
public voidconnect()

		if ( connection.isIncoming()){

			connection.connect();

		}else{
			
			try{
				ByteBuffer	buffer = ByteBuffer.allocate( 32*1024 );
						
				sts_engine.getKeys( buffer );
						
				buffer.flip();
				
				sent_keys = true;
				
				connection.connect( buffer );
				
			}catch( CryptoManagerException	e ){
				
				throw( new MessageException( "Failed to get initial keys", e ));
			}
		}
	
protected voidcryptoComplete()

		crypto_complete.releaseForever();
	
public org.gudy.azureus2.plugins.messaging.generic.GenericMessageEndpointgetEndpoint()

		return( connection.getEndpoint());
	
public intgetMaximumMessageSize()

		int	max = connection.getMaximumMessageSize();
		
		if ( outgoing_cipher != null ){
			
			max -= outgoing_cipher.getBlockSize();
		}
		
		return( max );
	
protected static voidrateLimit(java.net.InetSocketAddress originator)

		synchronized( SESTSConnectionImpl.class ){
							
			int	hit_count = generate_bloom.add( originator.getAddress().getAddress());
			
			long	now = SystemTime.getCurrentTime();

				// allow up to 10% bloom filter utilisation
			
			if ( generate_bloom.getSize() / generate_bloom.getEntryCount() < 10 ){
				
				generate_bloom = BloomFilterFactory.createAddRemove4Bit(generate_bloom.getSize() + BLOOM_INCREASE );
				
				generate_bloom_create_time	= now;
				
	     		Logger.log(	new LogEvent(LOGID, "STS bloom: size increased to " + generate_bloom.getSize()));

			}else if ( now < generate_bloom_create_time || now - generate_bloom_create_time > BLOOM_RECREATE ){
				
				generate_bloom = BloomFilterFactory.createAddRemove4Bit(generate_bloom.getSize());
				
				generate_bloom_create_time	= now;
			}
				
			if ( hit_count >= 15 ){
				
	     		Logger.log(	new LogEvent(LOGID, "STS bloom: too many recent connection attempts from " + originator ));
	     		
	     		Debug.out( "STS: too many recent connection attempts from " + originator );
	     		
				throw( new IOException( "Too many recent connection attempts (sts)"));
			}
			
			long	since_last = now - last_incoming_sts_create;
			
			long	delay = 100 - since_last;
			
				// limit key gen operations to 10 a second
			
			if ( delay > 0 && delay < 100 ){
				
				try{
		    		Logger.log(	new LogEvent(LOGID, "STS: too many recent connection attempts, delaying " + delay ));
		    		 
					Thread.sleep( delay );
					
				}catch( Throwable e ){
				}
			}
			
			last_incoming_sts_create = now;
		}
	
public voidreceive(org.gudy.azureus2.plugins.utils.PooledByteBuffer message)

		try{
			boolean	forward				= false;
			boolean	crypto_completed	= false;
			
			ByteBuffer	out_buffer = null;

			synchronized( this ){
		
				if ( crypto_complete.isReleasedForever()){
					
					forward	= true;
					
				}else{
					
						// basic sts flow:
						//   a -> puba -> b
						//   a <- pubb <- b
						//   a -> auta -> b
						//	 a <- autb <- b
						//   a -> data -> b
					
						// optimised
					
						//  a -> puba 		 -> b
						//  a <- pubb + auta <- b
						//  a -> autb + data -> b
					
						// therefore can be one or two messages in the payload
						// 	  1 crypto
						//    2 crypto (pub + auth)
						//	  crypto + data
					
						// initial a ->puba -> is done on first data send so data is ready for phase 3
					
					ByteBuffer	in_buffer = ByteBuffer.wrap( message.toByteArray());
					
					message.returnToPool();
						
						// piggyback pub key send
					
					if ( !sent_keys ){
						
							// we've received 
							//		a -> puba -> b
							// reply with
							//		a <- puba + auta <- b
						
						out_buffer = ByteBuffer.allocate( 64*1024 );

							// write our keys
						
						sts_engine.getKeys( out_buffer );
					
						sent_keys	= true;
						
							// read their keys
						
						sts_engine.putKeys( in_buffer );
					
							// write our auth
						
						sts_engine.getAuth( out_buffer );
						
						sent_auth 	= true;
						
					}else if ( !sent_auth ){
						
						out_buffer = ByteBuffer.allocate( 64*1024 );

						// we've received 
						//		a <- puba + auta <- b
						// reply with
						//		a -> autb + data -> b

							// read their keys
						
						sts_engine.putKeys( in_buffer );

							// write our auth
						
						sts_engine.getAuth( out_buffer );

						sent_auth = true;
					
							// read their auth
						
						sts_engine.putAuth( in_buffer );

							// check we wanna talk to this person
						
						byte[]	rem_key = sts_engine.getRemotePublicKey();
						
						if ( !key_locator.accept( new SEPublicKeyImpl( my_public_key.getType(), rem_key ))){
							
							throw( new MessageException( "remote public key not accepted" ));
						}
							
						setupBlockCrypto();
						
						if ( pending_message != null ){
							
							byte[]	pending_bytes = pending_message.toByteArray();
							
							int	pending_size = pending_bytes.length;
							
							if ( outgoing_cipher != null ){
								
								pending_size =  (( pending_size + AES_KEY_SIZE_BYTES -1 )/AES_KEY_SIZE_BYTES)*AES_KEY_SIZE_BYTES;
								
								if ( pending_size == 0 ){
									
									pending_size = AES_KEY_SIZE_BYTES;
								}
							}
							
							if ( out_buffer.remaining() >= pending_size ){
								
								if ( outgoing_cipher != null ){
									
									
									out_buffer.put( outgoing_cipher.doFinal( pending_bytes ));
									
								}else{
								
									out_buffer.put( pending_bytes );
								}
								
									// don't deallocate the pending message, the original caller does this
																
								pending_message	= null;
							}
						}
						
						crypto_completed	= true;
						
					}else{
							// we've received
							//		a -> autb + data -> b
						
							// read their auth
						
						sts_engine.putAuth( in_buffer );

							// check we wanna talk to this person
						
						byte[]	rem_key = sts_engine.getRemotePublicKey();
						
						if ( !key_locator.accept( new SEPublicKeyImpl( my_public_key.getType(), rem_key ))){
							
							throw( new MessageException( "remote public key not accepted" ));
						}
						
						setupBlockCrypto();

						crypto_completed	= true;
						
							// pick up any remaining data for delivery
						
						if ( in_buffer.hasRemaining()){
							
							message = new PooledByteBufferImpl( new DirectByteBuffer( in_buffer.slice()));
							
							forward	= true;
						}
					}
				}
			}
				
			if ( out_buffer != null ){
				
				out_buffer.flip();
				
				connection.send( new PooledByteBufferImpl( new DirectByteBuffer( out_buffer )));
			}
			
			if ( crypto_completed ){
				
				cryptoComplete();
			}
			if ( forward ){
				
				receiveContent( message );
			}
		}catch( Throwable e ){
			
			reportFailed( e );
			
			if ( e instanceof MessageException ){
				
				throw((MessageException)e);
				
			}else{
				
				throw( new MessageException( "Receive failed", e ));
			}
		}
	
protected voidreceiveContent(org.gudy.azureus2.plugins.utils.PooledByteBuffer message)

		boolean	buffer_handled = false;
		
		try{
			if ( incoming_cipher != null ){
			
				try{
					byte[]	enc 	= message.toByteArray();
					byte[]	plain 	= incoming_cipher.doFinal( enc );
	
					PooledByteBuffer	temp = new PooledByteBufferImpl( plain );
	
					message.returnToPool();
					
					buffer_handled	= true;
					
					message	= temp;
					
				}catch( Throwable e ){
					
					throw( new MessageException( "Failed to decrypt data", e ));
				}
				
			}else if ( block_crypto != SESecurityManager.BLOCK_ENCRYPTION_NONE ){
				
				throw( new MessageException( "Crypto isn't setup" ));
			}
			
			for (int i=0;i<listeners.size();i++){
				
				PooledByteBuffer	message_to_deliver;
				
				if ( i == 0 ){
					
					message_to_deliver	= message;
					
				}else{
				
						// unlikely we'll ever have > 1 receiver....
					
					message_to_deliver = new PooledByteBufferImpl( message.toByteArray());
				}
				
				try{
					((GenericMessageConnectionListener)listeners.get(i)).receive( this, message_to_deliver );
					
					if ( message_to_deliver == message ){
						
						buffer_handled	= true;
					}
				}catch( Throwable e ){
					
					message_to_deliver.returnToPool();
					
					buffer_handled	= true;
					
					Debug.printStackTrace( e );
				}
			}
		}finally{
			
			if ( !buffer_handled ){
				
				message.returnToPool();
			}
		}
	
public voidremoveListener(org.gudy.azureus2.plugins.messaging.generic.GenericMessageConnectionListener listener)

		listeners.remove( listener );
	
protected voidreportConnected()

			// we've got to take this off the current thread to avoid the connection even causing immediate
			// submission of a message which then block this thread awaiting crypto completion. "this" thread
			// is currently the selector thread which then screws the crypto protocol...
		
		new AEThread( "SESTSConnection:connected", true )
		{
			public void
			runSupport()
			{
				for (int i=0;i<listeners.size();i++){
					
					try{
						((GenericMessageConnectionListener)listeners.get(i)).connected( SESTSConnectionImpl.this );
						
					}catch( Throwable e ){
						
						Debug.printStackTrace( e );
					}
				}
			}
		}.start();
		
	
protected voidreportFailed(java.lang.Throwable error)

		setFailed();
		
		for (int i=0;i<listeners.size();i++){
			
			try{
				((GenericMessageConnectionListener)listeners.get(i)).failed( this, error );
				
			}catch( Throwable e ){
				
				Debug.printStackTrace( e );
			}
		}
	
public voidsend(org.gudy.azureus2.plugins.utils.PooledByteBuffer message)

		if ( failed ){
			
			throw( new MessageException( "Connection failed" ));
		}
		
		try{
			if ( crypto_complete.isReleasedForever()){
				
				sendContent( message );
				
			}else{
				
					// not complete, stash the message so it has a chance of being piggybacked on
					// the crypto protocol exchange
				
				synchronized( this ){
					
					if ( pending_message == null ){
						
						pending_message = message;
					}
				}
			}
			
			crypto_complete.reserve();
						
				// if the pending message couldn't be piggy backed it'll still be allocated
				
			boolean	send_it = false;
				
			synchronized( this ){

				if ( pending_message == message ){
					
					pending_message	= null;
					
					send_it	= true;
				}
			}
			
			if ( send_it ){
				
				sendContent( message );
			}

		}catch( Throwable e ){
			
			setFailed();
			
			if ( e instanceof MessageException ){
				
				throw((MessageException)e);
				
			}else{
				
				throw( new MessageException( "Send failed", e ));
			}
		}
	
protected voidsendContent(org.gudy.azureus2.plugins.utils.PooledByteBuffer message)

		if ( outgoing_cipher != null ){
			
			try{
				byte[]	plain	=  message.toByteArray();
				byte[]	enc		= outgoing_cipher.doFinal( plain );
			
				PooledByteBuffer	temp = new PooledByteBufferImpl( enc );
				
				try{
					connection.send( temp );
					
						// successfull send -> release caller's buffer
					
					message.returnToPool();
					
				}catch( Throwable e ){
					
						// failed semantics are to not release the caller's buffer
					
					temp.returnToPool();
					
					throw( e );
				}
				
			}catch( Throwable e ){
				
				throw( new MessageException( "Failed to encrypt data", e ));
			}
		}else{
				// sanity check - never allow unencrypted outbound if block enc selected
			
			if ( block_crypto != SESecurityManager.BLOCK_ENCRYPTION_NONE ){
				
				connection.close();
				
				throw( new MessageException( "Crypto isn't setup" ));
			}
		
			connection.send( message );
		}
	
protected voidsetFailed()

		failed	= true;
		
		try{
			cryptoComplete();
			
		}catch( Throwable e ){
			
			Debug.printStackTrace( e );
		}
	
protected voidsetupBlockCrypto()

		if ( !failed ){
			
			if ( block_crypto == SESecurityManager.BLOCK_ENCRYPTION_NONE ){
				
				return;
			}
			
			try{
				byte[]	shared_secret = sts_engine.getSharedSecret();
								
			    SecretKeySpec	secret_key_spec1 = new SecretKeySpec(shared_secret, 0, 16, "AES" );
			    SecretKeySpec	secret_key_spec2 = new SecretKeySpec(shared_secret, 8, 16, "AES" );
		        
			    AlgorithmParameterSpec	param_spec1 = 	new IvParameterSpec( AES_IV1);
			    AlgorithmParameterSpec	param_spec2 = 	new IvParameterSpec( AES_IV2);      
			        
			    Cipher cipher1 = Cipher.getInstance( "AES/CBC/PKCS5Padding" );
			    Cipher cipher2 = Cipher.getInstance( "AES/CBC/PKCS5Padding" );
		        
			    if ( connection.isIncoming()){
			    	
			        cipher1.init( Cipher.ENCRYPT_MODE, secret_key_spec1, param_spec1 );
			        cipher2.init( Cipher.DECRYPT_MODE, secret_key_spec2, param_spec2 );
			        
			        incoming_cipher	= cipher2;
			        outgoing_cipher	= cipher1;
			        
			    }else{
			    	
			        cipher1.init( Cipher.DECRYPT_MODE, secret_key_spec1, param_spec1 );
			        cipher2.init( Cipher.ENCRYPT_MODE, secret_key_spec2, param_spec2 );
			        
			        incoming_cipher	= cipher1;
			        outgoing_cipher	= cipher2;
			    }

			}catch( Throwable e ){
				
				throw( new MessageException( "Failed to setup block encryption", e ));
			}
		}