FileDocCategorySizeDatePackage
GenericMessageConnectionIndirect.javaAPI DocAzureus 3.0.3.422003Sat Aug 19 07:13:58 BST 2006org.gudy.azureus2.pluginsimpl.local.messaging

GenericMessageConnectionIndirect

public class GenericMessageConnectionIndirect extends Object implements GenericMessageConnectionAdapter

Fields Summary
private static final org.gudy.azureus2.core3.logging.LogIDs
LOGID
private static final boolean
TRACE
public static final int
MAX_MESSAGE_SIZE
private static final int
MESSAGE_TYPE_CONNECT
private static final int
MESSAGE_TYPE_ERROR
private static final int
MESSAGE_TYPE_DATA
private static final int
MESSAGE_TYPE_DISCONNECT
private static final int
TICK_PERIOD
private static final int
KEEP_ALIVE_CHECK_PERIOD
private static final int
KEEP_ALIVE_MIN
private static final int
STATS_PERIOD
private static final int
KEEP_ALIVE_CHECK_TICKS
private static final int
STATS_TICKS
private static final int
MAX_REMOTE_CONNECTIONS
private static final int
MAX_REMOTE_CONNECTIONS_PER_IP
private static long
connection_id_next
private static Map
local_connections
private static Map
remote_connections
private static org.gudy.azureus2.core3.util.ThreadPool
keep_alive_pool
private MessageManagerImpl
message_manager
private String
msg_id
private String
msg_desc
private org.gudy.azureus2.plugins.messaging.generic.GenericMessageEndpoint
endpoint
private com.aelitis.azureus.core.nat.NATTraverser
nat_traverser
private GenericMessageConnectionImpl
owner
private InetSocketAddress
rendezvous
private InetSocketAddress
target
private long
connection_id
private boolean
incoming
private boolean
closed
private LinkedList
send_queue
private org.gudy.azureus2.core3.util.AESemaphore
send_queue_sem
private volatile long
last_message_sent
private volatile long
last_message_received
private volatile boolean
keep_alive_in_progress
Constructors Summary
protected GenericMessageConnectionIndirect(MessageManagerImpl _message_manager, String _msg_id, String _msg_desc, org.gudy.azureus2.plugins.messaging.generic.GenericMessageEndpoint _endpoint, InetSocketAddress _rendezvous, InetSocketAddress _target)

	
	 
	
					
								
								
				
					
					 
	
			// outgoing
		
		message_manager	= _message_manager;
		msg_id			= _msg_id;
		msg_desc		= _msg_desc;
		endpoint		= _endpoint;
		rendezvous		= _rendezvous;
		target			= _target;
		
		nat_traverser = message_manager.getNATTraverser();
		
		log( "outgoing connection to " + endpoint.getNotionalAddress());
	
protected GenericMessageConnectionIndirect(MessageManagerImpl _message_manager, String _msg_id, String _msg_desc, org.gudy.azureus2.plugins.messaging.generic.GenericMessageEndpoint _endpoint, long _connection_id)

			// incoming
		
		message_manager	= _message_manager;
		msg_id			= _msg_id;
		msg_desc		= _msg_desc;
		endpoint		= _endpoint;
		connection_id	= _connection_id;
		
		incoming		= true;
		
		last_message_received	= SystemTime.getCurrentTime();
		
		if ( TRACE ){
			trace( "inbound connect from " + endpoint.getNotionalAddress());
		}
		
		log( "incoming connection from " + endpoint.getNotionalAddress());
	
Methods Summary
public voidaccepted()

	
public voidclose()

		close( null );
	
protected voidclose(java.lang.Throwable close_cause)

		if ( closed ){
			
			return;
		}
		
		if ( TRACE ){
			if ( close_cause == null ){
				trace( "close[local]" );
			}else{
				trace( "close[" + close_cause.getMessage() + "]" );
			}
		}
		
		log( "connection to " + endpoint.getNotionalAddress() + " closed" + (close_cause==null?"":(" (" + close_cause + ")")));
		
		try{
			closed	= true;
			
			if ( incoming ){
				
				synchronized( remote_connections ){
					
					remote_connections.remove( new Long( connection_id ));
				}
			}else{
				
				
				synchronized( local_connections ){
					
					local_connections.remove( new Long( connection_id ));
				}
				
				Map	message = new HashMap();
				
				message.put( "con_id", new Long( connection_id ));
				message.put( "type", new Long( MESSAGE_TYPE_DISCONNECT ));
				
				try{
					nat_traverser.sendMessage( message_manager, rendezvous, target, message );
					
					last_message_sent = SystemTime.getCurrentTime();

				}catch( Throwable e ){
					
					throw( new MessageException( "Close operation failed", e ));
				}
			}
		}finally{
			
			if ( close_cause != null ){
				
				owner.reportFailed( close_cause );
			}
		}
	
public voidconnect(java.nio.ByteBuffer initial_data, ConnectionListener listener)

		if ( TRACE ){
			trace( "outbound connect to " + endpoint.getNotionalAddress());
		}
		
		try{
			Map	message = new HashMap();
			
			byte[]	initial_data_bytes = new byte[ initial_data.remaining()];
			
			initial_data.get( initial_data_bytes );
			
			List	initial_messages = new ArrayList();
			
			initial_messages.add( initial_data_bytes );
			
			message.put( "type", new Long( MESSAGE_TYPE_CONNECT ));
			message.put( "msg_id", msg_id );
			message.put( "msg_desc", msg_desc );
			message.put( "data", initial_messages );
			
			Map reply = nat_traverser.sendMessage( message_manager, rendezvous, target, message );
	
			last_message_sent = SystemTime.getCurrentTime();

			if ( reply == null || !reply.containsKey( "type") ){
				
				listener.connectFailure( new Throwable( "Indirect connect failed (response=" + reply + ")" ));
				
			}else{
				
				int	reply_type = ((Long)reply.get( "type" )).intValue();
				
				if ( reply_type == MESSAGE_TYPE_ERROR ){
					
					listener.connectFailure( new Throwable( new String((byte[])reply.get( "error" ))));
	
				}else if ( reply_type == MESSAGE_TYPE_DISCONNECT ){
						
					listener.connectFailure( new Throwable( "Disconnected" ));

				}else if ( reply_type == MESSAGE_TYPE_CONNECT ){
										
					connection_id = ((Long)reply.get( "con_id" )).longValue();
					
					synchronized( local_connections ){
						
						local_connections.put( new Long( connection_id ), this );
					}
					
					listener.connectSuccess();

					List	replies = (List)reply.get( "data" );
					
					for (int i=0;i<replies.size();i++){
							
						owner.receive( new GenericMessage(msg_id, msg_desc, new DirectByteBuffer(ByteBuffer.wrap((byte[])replies.get(i))), false ));
					}
					
				}else{
					
					Debug.out( "Unexpected reply type - " + reply_type );
					
					listener.connectFailure( new Throwable( "Unexpected reply type - " + reply_type ));
				}
			}
		}catch( Throwable e ){
			
			listener.connectFailure( e );
		}
	
protected static java.lang.StringgetConnectionStatus(java.util.Map connections)

		Map totals = new HashMap();	
	
		synchronized( connections ){
			
			Iterator	it = connections.values().iterator();
			
			while( it.hasNext()){
				
				GenericMessageConnectionIndirect con = (GenericMessageConnectionIndirect)it.next();
	
				InetAddress	originator = con.getEndpoint().getNotionalAddress().getAddress();
				
				Integer	i = (Integer)totals.get( originator );
				
				if ( i == null ){
					
					i = new Integer(1);
					
				}else{
					
					i = new Integer(i.intValue() + 1 );
				}
				
				totals.put( originator, i );
			}
		}
		
		String	str = "";
		
		Iterator it = totals.entrySet().iterator();
		
		while( it.hasNext()){
			
			Map.Entry entry = (Map.Entry)it.next();
			
			str += (str.length()==0?"":",") + entry.getKey() + ":" + entry.getValue();
		}
		
		return( str );
	
public org.gudy.azureus2.plugins.messaging.generic.GenericMessageEndpointgetEndpoint()

		return( endpoint );
	
public longgetLastMessageReceivedTime()

		long	now = SystemTime.getCurrentTime();
		
		if ( now < last_message_received ){
			
			last_message_received = now;
		}
		
		return( last_message_received );
	
protected static java.lang.StringgetLocalConnectionStatus()

		return( getConnectionStatus( local_connections ));
	
public intgetMaximumMessageSize()

		return( MAX_MESSAGE_SIZE );
	
protected static java.lang.StringgetRemoteConnectionStatus()

		return( getConnectionStatus( remote_connections ));
	
protected booleanisClosed()

		return( closed );
	
protected voidkeepAlive()

		if (TRACE ){
			trace( "keepAlive" );
		}
		
		try{
			
			send( new ArrayList());
			
		}finally{
			
			keep_alive_in_progress	= false;
		}
	
protected static voidlog(java.lang.String str)

		if ( Logger.isEnabled()){

			Logger.log(new LogEvent(LOGID, "GenericMessaging (indirect):" + str ));
		}
	
protected booleanprepareForKeepAlive(boolean force)

		if ( keep_alive_in_progress ){
			
			return( false );
		}
		
		long	now = SystemTime.getCurrentTime();
		
		if ( force || now < last_message_sent || now - last_message_sent > KEEP_ALIVE_MIN ){
			
			keep_alive_in_progress = true;
		
			return( true );
		}
		
		return( false );
	
protected static java.util.Mapreceive(MessageManagerImpl message_manager, java.net.InetSocketAddress originator, java.util.Map message)

	
	
		
			// there are two reasons for timers
			//     1) to check for dead connections (send keepalive/check timeouts)
			//     2) the connection is one-sided so if the responder sends an unsolicited message it 
			//        is queued and only picked up on a periodic ping by the initiator
				
		SimpleTimer.addPeriodicEvent(
			"DDBTorrent:timeout2",
			TICK_PERIOD,
			new TimerEventPerformer()
			{		
				private int	tick_count = 0;
				
				public void
				perform(
					TimerEvent	event )
				{
					tick_count++;
					
					if ( tick_count % STATS_TICKS == 0 ){
					
						int	local_total;
						int remote_total;
						
						if ( Logger.isEnabled()){
	
							synchronized( local_connections ){
	
								local_total = local_connections.size();
							}
								
							synchronized( remote_connections ){
	
								remote_total = remote_connections.size();
							}
							
							if  ( local_total + remote_total > 0 ){
								
								log( "local=" + local_total + " [" + getLocalConnectionStatus() + "], remote=" + remote_total + " [" + getRemoteConnectionStatus() + "]" );
							}
						}
					}

					if ( tick_count % KEEP_ALIVE_CHECK_TICKS == 0 ){
												
						synchronized( local_connections ){
						
							Iterator	it = local_connections.values().iterator();
							
							while( it.hasNext()){
								
								final GenericMessageConnectionIndirect con = (GenericMessageConnectionIndirect)it.next();
								
								if ( con.prepareForKeepAlive( false )){
									
									keep_alive_pool.run(
										new AERunnable()
										{
											public void
											runSupport()
											{
												con.keepAlive();
											}
										});
								}
							}
						}
						
						long	now = SystemTime.getCurrentTime();
						
						synchronized( remote_connections ){
							
							Iterator	it = remote_connections.values().iterator();
							
							while( it.hasNext()){
								
								GenericMessageConnectionIndirect con = (GenericMessageConnectionIndirect)it.next();
						
								long	last_receive = con.getLastMessageReceivedTime();
								
								if ( now - last_receive > KEEP_ALIVE_MIN * 3 ){
									
									try{
										con.close( new Throwable( "Timeout" ));
										
									}catch( Throwable e ){
										
										Debug.printStackTrace(e);
									}
								}
							}
						}
					}
				}
			});
	
		if (TRACE ){
			System.out.println( "receive:" + originator + "/" + message );
		}
			// if this purely a NAT traversal request then bail out 
		
		if ( !message.containsKey( "type" )){
			
			return( null );
		}
		
		int	type = ((Long)message.get("type")).intValue();
		
		if ( type == MESSAGE_TYPE_CONNECT ){
		
			String	msg_id 		= new String((byte[])message.get( "msg_id" ));
			String	msg_desc 	= new String((byte[])message.get( "msg_desc" ));

			GenericMessageEndpointImpl	endpoint = new GenericMessageEndpointImpl( originator );
			
			endpoint.addUDP( originator );
					
			GenericMessageHandler	handler = message_manager.getHandler( msg_id );
			
			if ( handler == null ){
				
				Debug.out( "No message handler registered for '" + msg_id + "'" );
				
				return( null );
			}
			
			try{
				Long	con_id;

				synchronized( remote_connections ){
					
					if ( remote_connections.size() >= MAX_REMOTE_CONNECTIONS ){
						
						Debug.out( "Maximum remote connections exceeded - request from " + originator + " denied [" + getRemoteConnectionStatus() + "]" );
						
						return( null );
					}
					
					int	num_from_this_ip = 0;
									
					Iterator	it = remote_connections.values().iterator();
					
					while( it.hasNext()){
						
						GenericMessageConnectionIndirect con = (GenericMessageConnectionIndirect)it.next();
						
						if ( con.getEndpoint().getNotionalAddress().getAddress().equals( originator.getAddress())){
							
							num_from_this_ip++;
						}
					}
					
					if ( num_from_this_ip >= MAX_REMOTE_CONNECTIONS_PER_IP ){
						
						Debug.out( "Maximum remote connections per-ip exceeded - request from " + originator + " denied [" + getRemoteConnectionStatus() + "]" );
						
						return( null );

					}
					con_id = new Long( connection_id_next++ );
				}
				
				GenericMessageConnectionIndirect indirect_connection = 
					new GenericMessageConnectionIndirect( 
							message_manager, msg_id, msg_desc, endpoint, con_id.longValue());

				GenericMessageConnectionImpl new_connection = new GenericMessageConnectionImpl( message_manager, indirect_connection );

				if ( handler.accept( new_connection )){
					
					new_connection.accepted();
	
					synchronized( remote_connections ){
												
						remote_connections.put( con_id, indirect_connection );
					}
					
					List	replies = indirect_connection.receive((List)message.get( "data" ));
				
					Map	reply = new HashMap();
					
					reply.put( "type", new Long( MESSAGE_TYPE_CONNECT ));
					reply.put( "con_id", con_id );
					reply.put( "data", replies );
					
					return( reply );
					
				}else{
					
					return( null );
				}	
			
			}catch( MessageException e ){
				
				Debug.out( "Error accepting message", e);
				
				return( null );
			}

		}else if ( type == MESSAGE_TYPE_DATA ){
			
			Long	con_id = (Long)message.get( "con_id" );
			
			GenericMessageConnectionIndirect indirect_connection;
			
			synchronized( remote_connections ){
				
				indirect_connection = (GenericMessageConnectionIndirect)remote_connections.get( con_id );
			}
			
			if ( indirect_connection == null ){
				
				return( null );
			}
			
			Map	reply = new HashMap();

			if ( indirect_connection.isClosed()){
				
				reply.put( "type", new Long( MESSAGE_TYPE_DISCONNECT ));

			}else{
				
				List replies = indirect_connection.receive((List)message.get( "data" ));
				
				reply.put( "type", new Long( MESSAGE_TYPE_DATA ));
				reply.put( "data", replies );	
								
				if ( indirect_connection.receiveIncomplete()){
					
					reply.put( "more_data", new Long(1));
				}
			}
			
			return( reply );
			
		}else{
			
				// error or disconnect		
			
			Long	con_id = (Long)message.get( "con_id" );
			
			GenericMessageConnectionIndirect indirect_connection;
			
			synchronized( remote_connections ){
				
				indirect_connection = (GenericMessageConnectionIndirect)remote_connections.get( con_id );
			}
			
			if ( indirect_connection != null ){
				
				try{
					indirect_connection.close( new Throwable( "Remote closed connection" ) );
					
				}catch( Throwable e ){
					
					Debug.printStackTrace(e);
				}
			}
			
			return( null );
		}
	
protected java.util.Listreceive(java.util.List messages)

		if ( TRACE ){	
			trace( "receive: " + messages );
		}
		
		last_message_received	= SystemTime.getCurrentTime();
		
		for (int i=0;i<messages.size();i++){
			
			owner.receive( new GenericMessage(msg_id, msg_desc, new DirectByteBuffer(ByteBuffer.wrap((byte[])messages.get(i))), false ));
		}
		
		List	reply = new ArrayList();
		
			// hang around a bit to see if we can piggyback a reply
		
		if ( send_queue_sem.reserve( 2500 )){
			
				// give a little more time in case async > 1 message is being queued
			
			try{
				Thread.sleep(250);
				
			}catch( Throwable e ){
			}
			
			int	max 	= getMaximumMessageSize();
			int	total 	= 0;
			
			synchronized( send_queue ){
				
				while( send_queue.size() > 0 ){
						
					byte[]	data = (byte[])send_queue.getFirst();
					
					if ( total > 0 && total + data.length > max ){
												
						break;
					}
					
					reply.add( send_queue.removeFirst());
					
					total += data.length;
				}
				
				
				if ( TRACE ){
					trace( "    messages returned = " + reply.size() + " (" + total + "), more=" + (send_queue.size() > 0 ));
				}
			}
			
				// grab sems for any entries other than the initial one grabbed above
			
			for (int i=1;i<reply.size();i++){
				
				send_queue_sem.reserve();
			}
		}
		
		return( reply );
	
protected booleanreceiveIncomplete()

		synchronized( send_queue ){

			return( send_queue.size() > 0 );
		}
	
public voidsend(org.gudy.azureus2.plugins.utils.PooledByteBuffer pbb)

		byte[]	bytes = pbb.toByteArray();
		
		if ( TRACE ){
			trace( "send " +  bytes.length );
		}
		
		if ( incoming ){
			
			synchronized( send_queue ){
				
				if ( send_queue.size() > 64 ){
					
					throw( new MessageException( "Send queue limit exceeded" ));
				}
				
				send_queue.add( bytes );
			}
			
			send_queue_sem.release();
			
		}else{
						
			List	messages = new ArrayList();
			
			messages.add( bytes );
			
			send( messages );
		}
	
protected voidsend(java.util.List messages)

		if ( TRACE ){
			trace( "    send " + messages );
		}
		
		try{
			Map	message = new HashMap();
			
			message.put( "con_id", new Long( connection_id ));
			message.put( "type", new Long( MESSAGE_TYPE_DATA ));
			message.put( "data", messages );
			
			Map reply = nat_traverser.sendMessage( message_manager, rendezvous, target, message );
	
			last_message_sent = SystemTime.getCurrentTime();

			if ( reply == null || !reply.containsKey( "type")){
				
				owner.reportFailed( new Throwable( "Indirect message send failed (response=" + reply + ")" ));
				
			}else{
				
				int	reply_type = ((Long)reply.get( "type" )).intValue();
				
				if ( reply_type == MESSAGE_TYPE_ERROR ){
					
					owner.reportFailed( new Throwable( new String((byte[])reply.get( "error" ))));
	
				}else if ( reply_type == MESSAGE_TYPE_DATA ){
					
					List	replies = (List)reply.get( "data" );
											
					for (int i=0;i<replies.size();i++){
							
						owner.receive( new GenericMessage(msg_id, msg_desc, new DirectByteBuffer(ByteBuffer.wrap((byte[])replies.get(i))), false ));
					}
					
						// if there's more data queued force a keep alive to pick it up but delay 
						// a little to give the rendezvous a breather
					
					if ( reply.get( "more_data" ) != null ){
						
						if ( TRACE ){
							trace( "    received 'more to come'" );
						}
						
						new DelayedEvent(
							"GenMsg:kap",
							500,
							new AERunnable()
							{
								public void
								runSupport()
								{
									if ( prepareForKeepAlive( true )){
										
										keep_alive_pool.run(
												new AERunnable()
												{
													public void
													runSupport()
													{
														GenericMessageConnectionIndirect.this.keepAlive();
													}
												});
									}
								}
							});
					}
				}else if ( reply_type == MESSAGE_TYPE_DISCONNECT ){
					
					owner.reportFailed( new Throwable( "Disconnected" ));
				}
			}
		}catch( Throwable e ){
			
			owner.reportFailed( e );
		}
	
public voidsetOwner(GenericMessageConnectionImpl _owner)

		owner	= _owner;
	
protected voidtrace(java.lang.String str)

		if ( TRACE ){
			System.out.println( "GMCI[" +(incoming?"R":"L") + "/" + connection_id + "] " + str );
		}