NonBlockingReadWriteServicepublic class NonBlockingReadWriteService extends Object
Fields Summary |
---|
private final com.aelitis.azureus.core.networkmanager.VirtualChannelSelector | read_selector | private final com.aelitis.azureus.core.networkmanager.VirtualChannelSelector | write_selector | private final ArrayList | connections | private final AEMonitor | connections_mon | private final ServiceListener | listener | private final String | service_name | private volatile boolean | destroyed | private long | last_timeout_check_time | private static final int | TIMEOUT_CHECK_INTERVAL_MS | private final int | activity_timeout_period_ms | private final int | close_delay_period_ms |
Constructors Summary |
---|
public NonBlockingReadWriteService(String _service_name, int timeout, ServiceListener _listener)
this( _service_name, timeout, 0, _listener );
| public NonBlockingReadWriteService(String _service_name, int timeout, int close_delay, ServiceListener _listener)
this.service_name = _service_name;
this.listener = _listener;
read_selector = new VirtualChannelSelector( service_name, VirtualChannelSelector.OP_READ, false );
write_selector = new VirtualChannelSelector( service_name, VirtualChannelSelector.OP_WRITE, true );
if( timeout < TIMEOUT_CHECK_INTERVAL_MS /1000 ) timeout = TIMEOUT_CHECK_INTERVAL_MS /1000;
this.activity_timeout_period_ms = timeout *1000;
close_delay_period_ms = close_delay * 1000;
AEThread select_thread = new AEThread( "[" +service_name+ "] Service Select" ) {
public void runSupport() {
while( true ) {
boolean stop_after_select = destroyed;
if ( stop_after_select ){
read_selector.destroy();
write_selector.destroy();
}
try{
read_selector.select( 50 );
write_selector.select( 50 );
}
catch( Throwable t ) {
Debug.out( "[" +service_name+ "] SelectorLoop() EXCEPTION: ", t );
}
if (stop_after_select){
break;
}
doConnectionTimeoutChecks();
// check this at the end so we have one last run through the selectors to do cancels
// before exiting
}
}
};
select_thread.setDaemon( true );
select_thread.start();
|
Methods Summary |
---|
public void | addClientConnection(ClientConnection connection)
//add to active list
try { connections_mon.enter();
if ( destroyed ){
Debug.out( "connection added after destroy" );
}
connections.add( connection );
}finally {
connections_mon.exit();
}
registerForSelection( connection );
| public void | destroy()
try {
connections_mon.enter();
connections.clear();
destroyed = true;
}finally{
connections_mon.exit();
}
| private void | doConnectionTimeoutChecks()
//check timeouts
long time = System.currentTimeMillis();
if( time < last_timeout_check_time || time - last_timeout_check_time > TIMEOUT_CHECK_INTERVAL_MS ) {
ArrayList timed_out = new ArrayList();
try { connections_mon.enter();
long current_time = System.currentTimeMillis();
for( int i=0; i < connections.size(); i++ ) {
ClientConnection vconn = (ClientConnection)connections.get( i );
if( current_time < vconn.getLastActivityTime() ) { //time went backwards!
vconn.resetLastActivityTime();
}
else{
if( current_time - vconn.getLastActivityTime() > activity_timeout_period_ms ||
( close_delay_period_ms > 0 &&
current_time - vconn.getLastActivityTime() > close_delay_period_ms )){
timed_out.add( vconn ); //do actual removal outside the check loop
}
}
}
}
finally { connections_mon.exit(); }
for( int i=0; i < timed_out.size(); i++ ) {
ClientConnection vconn = (ClientConnection)timed_out.get( i );
// don't change the exception text - it is used elsewhere
listener.connectionError( vconn, new Exception( "Timeout" ));
}
last_timeout_check_time = System.currentTimeMillis();
}
| private void | registerForSelection(ClientConnection client)
//READS
VirtualChannelSelector.VirtualSelectorListener read_listener = new VirtualChannelSelector.VirtualSelectorListener() {
//SUCCESS
public boolean selectSuccess( VirtualChannelSelector selector, SocketChannel sc, Object attachment ) {
try{
Message[] messages = client.readMessages();
if( messages != null ) {
for( int i=0; i < messages.length; i++ ) {
AZGenericMapPayload msg = (AZGenericMapPayload)messages[i];
ClientMessage client_msg = new ClientMessage( msg.getID(), client, msg.getMapPayload(), null ); //note no handler. we let the listener attach it
listener.messageReceived( client_msg );
}
}
return( client.getLastReadMadeProgress());
}
catch( Throwable t ) {
if ( !client.isClosePending()){
System.out.println( "[" +new Date()+ "] Connection read error [" +sc.socket().getInetAddress()+ "] [" +client.getDebugString()+ "]: " +t.getMessage() );
}
listener.connectionError( client, t );
return( false );
}
}
//FAILURE
public void selectFailure( VirtualChannelSelector selector, SocketChannel sc, Object attachment, Throwable msg ) {
if ( !destroyed ){
msg.printStackTrace();
}
listener.connectionError( client, msg );
}
};
//WRITES
final VirtualChannelSelector.VirtualSelectorListener write_listener = new VirtualChannelSelector.VirtualSelectorListener() {
public boolean selectSuccess( VirtualChannelSelector selector, SocketChannel sc, Object attachment ) {
try{
boolean more_writes_needed = client.writeMessages();
if( more_writes_needed ) {
write_selector.resumeSelects( client.getSocketChannel() ); //we need to resume since write selects are auto-paused after select op
}
return( client.getLastWriteMadeProgress());
}
catch( Throwable t ) {
System.out.println( "[" +new Date()+ "] Connection write error [" +sc.socket().getInetAddress()+ "] [" +client.getDebugString()+ "]: " +t.getMessage() );
listener.connectionError( client, t );
return( false );
}
}
public void selectFailure( VirtualChannelSelector selector, SocketChannel sc, Object attachment, Throwable msg ) {
if ( !destroyed ){
msg.printStackTrace();
}
listener.connectionError( client, msg );
}
};
write_selector.register( client.getSocketChannel(), write_listener, null ); //start writing back to the connection
write_selector.pauseSelects( client.getSocketChannel() ); //wait until we've got something to send before selecting
read_selector.register( client.getSocketChannel(), read_listener, null ); //start reading from the connection
| public void | removeClientConnection(ClientConnection connection)
read_selector.cancel( connection.getSocketChannel() );
write_selector.cancel( connection.getSocketChannel() );
//remove from active list
try { connections_mon.enter();
connections.remove( connection );
}
finally { connections_mon.exit(); }
| public void | sendMessage(ClientMessage message)
ClientConnection vconn = message.getClient();
boolean still_connected;
try { connections_mon.enter();
still_connected = connections.contains( vconn );
}
finally { connections_mon.exit(); }
if( !still_connected ) {
System.out.println( "[" +new Date()+ "] Connection message send error [connection no longer connected]: " +vconn.getDebugString()+ "]" );
message.reportFailed( new Exception("No longer connected" ));
//listener.connectionError( vconn ); //no need to call this, as there is no connection to remove
return;
}
Message reply = new AZGenericMapPayload( message.getMessageID(), message.getPayload(), (byte)1 );
vconn.sendMessage( message, reply );
write_selector.resumeSelects( vconn.getSocketChannel() ); //start write selecting now that there's something to send
|
|