FileDocCategorySizeDatePackage
AEProxyImpl.javaAPI DocAzureus 3.0.3.410397Fri Nov 17 13:27:12 GMT 2006com.aelitis.azureus.core.proxy.impl

AEProxyImpl

public class AEProxyImpl extends Object implements AEProxy, VirtualChannelSelector.VirtualSelectorListener
author
parg

Fields Summary
private static final LogIDs
LOGID
private static final int
DEBUG_PERIOD
private long
last_debug
private int
port
private long
connect_timeout
private long
read_timeout
private AEProxyHandler
proxy_handler
private com.aelitis.azureus.core.networkmanager.VirtualChannelSelector
read_selector
private com.aelitis.azureus.core.networkmanager.VirtualChannelSelector
connect_selector
private com.aelitis.azureus.core.networkmanager.VirtualChannelSelector
write_selector
private List
processors
private final HashMap
write_select_regs
private boolean
allow_external_access
private org.gudy.azureus2.core3.util.AEMonitor
this_mon
Constructors Summary
public AEProxyImpl(int _port, long _connect_timeout, long _read_timeout, AEProxyHandler _proxy_handler)

	
	 
	
						
					
					
			 	
		 
	
		port					= _port;
		connect_timeout			= _connect_timeout;
		read_timeout			= _read_timeout;
		proxy_handler			= _proxy_handler;
		
		String	name = "Proxy:" + port;
		
		read_selector	 = new VirtualChannelSelector( name, VirtualChannelSelector.OP_READ, false );
		connect_selector = new VirtualChannelSelector( name, VirtualChannelSelector.OP_CONNECT, true );
		write_selector	 = new VirtualChannelSelector( name, VirtualChannelSelector.OP_WRITE, true );
		
		try{
			
			final ServerSocketChannel	ssc = ServerSocketChannel.open();
			
			ServerSocket ss	= ssc.socket();
			
			ss.setReuseAddress(true);

			ss.bind(  new InetSocketAddress( InetAddress.getByName("127.0.0.1"), port), 128 );
			
			if ( port == 0 ){
				
				port	= ss.getLocalPort();
			}
				
			Thread connect_thread = 
				new AEThread("AEProxy:connect.loop")
				{
					public void
					runSupport()
					{
						selectLoop( connect_selector );
					}
				};
	
			connect_thread.setDaemon( true );
	
			connect_thread.start();
	
			Thread read_thread = 
				new AEThread("AEProxy:read.loop")
				{
					public void
					runSupport()
					{
						selectLoop( read_selector );
					}
				};
	
			read_thread.setDaemon( true );
	
			read_thread.start();
			
			Thread write_thread = 
				new AEThread("AEProxy:write.loop")
				{
					public void
					runSupport()
					{
						selectLoop( write_selector );
					}
				};
	
			write_thread.setDaemon( true );
	
			write_thread.start();
			
			Thread accept_thread = 
					new AEThread("AEProxy:accept.loop")
					{
						public void
						runSupport()
						{
							acceptLoop( ssc );
						}
					};
		
			accept_thread.setDaemon( true );
		
			accept_thread.start();									
		
			if (Logger.isEnabled())
				Logger.log(new LogEvent(LOGID, "AEProxy: listener established on port "
						+ port)); 
			
		}catch( Throwable e){

			Logger.logTextResource(new LogAlert(LogAlert.UNREPEATABLE,
					LogAlert.AT_ERROR, "Tracker.alert.listenfail"), new String[] { ""
					+ port });
	
			if (Logger.isEnabled())
				Logger.log(new LogEvent(LOGID, "AEProxy: listener failed on port "
						+ port, e)); 
						
			throw( new AEProxyException( "AEProxy: accept fails: " + e.toString()));
		}			
	
Methods Summary
protected voidacceptLoop(java.nio.channels.ServerSocketChannel ssc)

		
		long	successfull_accepts = 0;
		long	failed_accepts		= 0;

		while(true){
			
			try{				
				SocketChannel socket_channel = ssc.accept();
						
				successfull_accepts++;
				
				if ( !( allow_external_access || socket_channel.socket().getInetAddress().isLoopbackAddress())){
					
					if (Logger.isEnabled())
						Logger.log(new LogEvent(LOGID, LogEvent.LT_WARNING,
								"AEProxy: incoming connection from '"
										+ socket_channel.socket().getInetAddress()
										+ "' - closed as not local"));
				
					socket_channel.close();
					
				}else{
						
					socket_channel.configureBlocking(false);
	
					AEProxyConnectionImpl processor = new AEProxyConnectionImpl(this, socket_channel, proxy_handler);
					
					if ( !processor.isClosed()){
						
						try{
							this_mon.enter();
						
							processors.add( processor );
			
							if (Logger.isEnabled())
								Logger.log(new LogEvent(LOGID, "AEProxy: active processors = "
										+ processors.size()));
							
						}finally{
							
							this_mon.exit();
						}
						
						read_selector.register( socket_channel, this, processor );
					}
				}
			}catch( Throwable e ){
				
				failed_accepts++;

				if (Logger.isEnabled())
					Logger.log(new LogEvent(LOGID, "AEProxy: listener failed on port "
							+ port, e)); 
			
				if ( failed_accepts > 100 && successfull_accepts == 0 ){

						// looks like its not going to work...
						// some kind of socket problem
					Logger.logTextResource(new LogAlert(LogAlert.UNREPEATABLE,
							LogAlert.AT_ERROR, "Network.alert.acceptfail"), new String[] {
							"" + port, "TCP" });
			
					break;
				}			
			}
		}
	
protected voidcancelConnectSelect(java.nio.channels.SocketChannel sc)

		connect_selector.cancel( sc );
	
protected voidcancelReadSelect(java.nio.channels.SocketChannel sc)

		read_selector.cancel( sc );
	
protected voidcancelWriteSelect(java.nio.channels.SocketChannel sc)

    write_select_regs.remove( sc );
		write_selector.cancel( sc );
	
protected voidcheckTimeouts()

		long	now = SystemTime.getCurrentTime();
		
		if ( now - last_debug > DEBUG_PERIOD ){
			
			last_debug	= now;
			
			try{
				this_mon.enter();
				
				Iterator	it = processors.iterator();
				
				while( it.hasNext()){
					
					AEProxyConnectionImpl	processor = (AEProxyConnectionImpl)it.next();
					
					if (Logger.isEnabled())
						Logger.log(new LogEvent(LOGID, "AEProxy: active processor: "
								+ processor.getStateString()));
				}
			}finally{
				
				this_mon.exit();
			}
		}
		
		if ( connect_timeout <= 0 && read_timeout <= 0 ){
			
			return;
		}
		
		List	closes = new ArrayList();
		
		try{
			this_mon.enter();
			
			Iterator	it = processors.iterator();
			
			while( it.hasNext()){
				
				AEProxyConnectionImpl	processor = (AEProxyConnectionImpl)it.next();
				
				long diff = now - processor.getTimeStamp();
				
				if ( 	connect_timeout > 0 &&
						diff >= connect_timeout && 
						!processor.isConnected()){
					
					closes.add( processor );
				
				}else if (	read_timeout > 0 &&
							diff >= read_timeout &&
							processor.isConnected()){
					
					closes.add( processor );
				}
			}
		}finally{
			
			this_mon.exit();
		}
		
		for (int i=0;i<closes.size();i++){
			
			((AEProxyConnectionImpl)closes.get(i)).failed( new Throwable( "timeout" ));
		}
	
protected voidclose(AEProxyConnectionImpl processor)

		try{
			this_mon.enter();
			
			processors.remove( processor );
			
		}finally{
		
			this_mon.exit();
		}
	
public intgetPort()

		return( port );
	
protected voidrequestConnectSelect(AEProxyConnectionImpl processor, java.nio.channels.SocketChannel sc)

		connect_selector.register( sc, this, processor );
	
protected voidrequestReadSelect(AEProxyConnectionImpl processor, java.nio.channels.SocketChannel sc)

		read_selector.register( sc, this, processor );
	
protected voidrequestWriteSelect(AEProxyConnectionImpl processor, java.nio.channels.SocketChannel sc)

    
    if( write_select_regs.containsKey( sc ) ) {  //already been registered, just resume
      write_selector.resumeSelects( sc );
    }
    else {  //not yet registered
      write_select_regs.put( sc, null );
      write_selector.register( sc, this, processor );
    }
	
public voidselectFailure(com.aelitis.azureus.core.networkmanager.VirtualChannelSelector selector, java.nio.channels.SocketChannel sc, java.lang.Object attachment, java.lang.Throwable msg)

    	AEProxyConnectionImpl	processor = (AEProxyConnectionImpl)attachment;
    	
    	processor.failed( msg );
    
protected voidselectLoop(com.aelitis.azureus.core.networkmanager.VirtualChannelSelector selector)

		long	last_time	= 0;
		
		while( true ){
			
			try{
				selector.select( 100 );
				
					// only use one selector to trigger the timeouts!
				
				if ( selector == read_selector ){
					
					long	now = SystemTime.getCurrentTime();
					
					if ( now < last_time ){
						
						last_time	= now;
						
					}else if ( now - last_time >= 5000 ){
						
						last_time	= now;
						
						checkTimeouts();
					}
				}
			}catch( Throwable e ){
				
				Debug.printStackTrace(e);
			}
		}
	
public booleanselectSuccess(com.aelitis.azureus.core.networkmanager.VirtualChannelSelector selector, java.nio.channels.SocketChannel sc, java.lang.Object attachment)

    	AEProxyConnectionImpl	processor = (AEProxyConnectionImpl)attachment;
    	   	
    	if ( selector == read_selector ){
    		
    		return( processor.read(sc));
    		
    	}else if ( selector == write_selector ){
    		
    		return( processor.write(sc));
    		
    	}else{
    		
    		return( processor.connect(sc));
    	}
    
public voidsetAllowExternalConnections(boolean permit)

		allow_external_access = permit;