/*
* Created on 22 Jun 2006
* Created by Paul Gardner
* Copyright (C) 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 46,603.30 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package com.aelitis.azureus.core.networkmanager.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import org.gudy.azureus2.core3.logging.*;
import org.gudy.azureus2.core3.util.*;
import com.aelitis.azureus.core.networkmanager.NetworkManager;
import com.aelitis.azureus.core.networkmanager.Transport;
public class
IncomingConnectionManager
{
private static final LogIDs LOGID = LogIDs.NWMAN;
private static IncomingConnectionManager singleton = new IncomingConnectionManager();
public static IncomingConnectionManager
getSingleton()
{
return( singleton );
}
private volatile Map match_buffers_cow = new HashMap(); // copy-on-write
private final AEMonitor match_buffers_mon = new AEMonitor( "IncomingConnectionManager:match" );
private int max_match_buffer_size = 0;
private int max_min_match_buffer_size = 0;
private final ArrayList connections = new ArrayList();
private final AEMonitor connections_mon = new AEMonitor( "IncomingConnectionManager:conns" );
protected
IncomingConnectionManager()
{
SimpleTimer.addPeriodicEvent(
"IncomingConnectionManager:timeouts",
5000,
new TimerEventPerformer() {
public void perform( TimerEvent ev ) {
doTimeoutChecks();
}
}
);
}
public boolean
isEmpty()
{
return( match_buffers_cow.isEmpty());
}
// returns MatchListener,RoutingData if matched
public Object[]
checkForMatch(
TransportHelper transport,
int incoming_port,
ByteBuffer to_check,
boolean min_match )
{
//remember original values for later restore
int orig_position = to_check.position();
int orig_limit = to_check.limit();
//rewind
to_check.position( 0 );
MatchListener listener = null;
Object routing_data = null;
for( Iterator i = match_buffers_cow.entrySet().iterator(); i.hasNext(); ) {
Map.Entry entry = (Map.Entry)i.next();
NetworkManager.ByteMatcher bm = (NetworkManager.ByteMatcher)entry.getKey();
MatchListener this_listener = (MatchListener)entry.getValue();
int specific_port = bm.getSpecificPort();
if ( specific_port != -1 && specific_port != incoming_port ){
continue;
}
if ( min_match ){
if( orig_position < bm.minSize() ) { //not enough bytes yet to compare
continue;
}
routing_data = bm.minMatches( transport, to_check, incoming_port );
if ( routing_data != null ){
listener = this_listener;
break;
}
}else{
if( orig_position < bm.matchThisSizeOrBigger() ) { //not enough bytes yet to compare
continue;
}
routing_data = bm.matches( transport, to_check, incoming_port );
if ( routing_data != null ){
listener = this_listener;
break;
}
}
}
//restore original values in case the checks changed them
to_check.position( orig_position );
to_check.limit( orig_limit );
if ( listener == null ){
return( null );
}
return( new Object[]{ listener, routing_data });
}
/**
* Register the given byte sequence matcher to handle matching against new incoming connection
* initial data; i.e. the first bytes read from a connection must match in order for the given
* listener to be invoked.
* @param matcher byte filter sequence
* @param listener to call upon match
*/
public void
registerMatchBytes(
NetworkManager.ByteMatcher matcher,
MatchListener listener )
{
try { match_buffers_mon.enter();
if( matcher.maxSize() > max_match_buffer_size ) {
max_match_buffer_size = matcher.maxSize();
}
if ( matcher.minSize() > max_min_match_buffer_size ){
max_min_match_buffer_size = matcher.minSize();
}
Map new_match_buffers = new HashMap( match_buffers_cow );
new_match_buffers.put( matcher, listener );
match_buffers_cow = new_match_buffers;
addSharedSecrets( matcher.getSharedSecrets());
}finally {
match_buffers_mon.exit();
}
}
/**
* Remove the given byte sequence match from the registration list.
* @param to_remove byte sequence originally used to register
*/
public void
deregisterMatchBytes(
NetworkManager.ByteMatcher to_remove )
{
try { match_buffers_mon.enter();
Map new_match_buffers = new HashMap( match_buffers_cow );
new_match_buffers.remove( to_remove );
if( to_remove.maxSize() == max_match_buffer_size ) { //recalc longest buffer if necessary
max_match_buffer_size = 0;
for( Iterator i = new_match_buffers.keySet().iterator(); i.hasNext(); ) {
NetworkManager.ByteMatcher bm = (NetworkManager.ByteMatcher)i.next();
if( bm.maxSize() > max_match_buffer_size ) {
max_match_buffer_size = bm.maxSize();
}
}
}
match_buffers_cow = new_match_buffers;
removeSharedSecrets( to_remove.getSharedSecrets());
} finally { match_buffers_mon.exit(); }
}
public void
addSharedSecrets(
byte[][] secrets )
{
if ( secrets != null ){
ProtocolDecoder.addSecrets( secrets );
}
}
public void
removeSharedSecrets(
byte[][] secrets )
{
if ( secrets != null ){
ProtocolDecoder.removeSecrets( secrets );
}
}
public int
getMaxMatchBufferSize()
{
return( max_match_buffer_size );
}
public int
getMaxMinMatchBufferSize()
{
return( max_min_match_buffer_size );
}
public void
addConnection(
int local_port,
TransportHelperFilter filter,
Transport new_transport )
{
TransportHelper transport_helper = filter.getHelper();
if ( isEmpty()) { //no match registrations, just close
if ( Logger.isEnabled()){
Logger.log(new LogEvent(LOGID, "Incoming connection from [" + transport_helper.getAddress() +
"] dropped because zero routing handlers registered"));
}
transport_helper.close( "No routing handler" );
return;
}
// note that the filter may have some data internally queued in it after the crypto handshake decode
// (in particular the BT header). However, there should be some data right behind it that will trigger
// a read-select below, thus giving prompt access to the queued data
final IncomingConnection ic = new IncomingConnection( filter, getMaxMatchBufferSize());
TransportHelper.selectListener sel_listener = new SelectorListener( local_port, new_transport );
try{
connections_mon.enter();
connections.add( ic );
transport_helper.registerForReadSelects( sel_listener, ic );
}finally{
connections_mon.exit();
}
// might be stuff queued up in the filter - force one process cycle (NAT check in particular )
sel_listener.selectSuccess( transport_helper, ic );
}
protected void
removeConnection(
IncomingConnection connection,
boolean close_as_well )
{
try{
connections_mon.enter();
connection.filter.getHelper().cancelReadSelects();
connections.remove( connection ); //remove from connection list
}finally{
connections_mon.exit();
}
if( close_as_well ) {
connection.filter.getHelper().close( "Tidy close" );
}
}
protected void
doTimeoutChecks()
{
try{ connections_mon.enter();
ArrayList to_close = null;
long now = SystemTime.getCurrentTime();
for( int i=0; i < connections.size(); i++ ){
IncomingConnection ic = (IncomingConnection)connections.get( i );
TransportHelper transport_helper = ic.filter.getHelper();
if( ic.last_read_time > 0 ) { //at least one read op has occured
if( now < ic.last_read_time ) { //time went backwards!
ic.last_read_time = now;
}
else if( now - ic.last_read_time > transport_helper.getReadTimeout()) {
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID, "Incoming connection ["
+ transport_helper.getAddress()
+ "] forcibly timed out due to socket read inactivity ["
+ ic.buffer.position() + " bytes read: "
+ new String(ic.buffer.array()) + "]"));
if( to_close == null ) to_close = new ArrayList();
to_close.add( ic );
}
}
else { //no bytes have been read yet
if( now < ic.initial_connect_time ) { //time went backwards!
ic.initial_connect_time = now;
}
else if( now - ic.initial_connect_time > transport_helper.getConnectTimeout()) {
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID, "Incoming connection ["
+ transport_helper.getAddress() + "] forcibly timed out after "
+ "60sec due to socket inactivity"));
if( to_close == null ) to_close = new ArrayList();
to_close.add( ic );
}
}
}
if( to_close != null ) {
for( int i=0; i < to_close.size(); i++ ) {
IncomingConnection ic = (IncomingConnection)to_close.get( i );
removeConnection( ic, true );
}
}
} finally { connections_mon.exit(); }
}
protected static class
IncomingConnection
{
protected final TransportHelperFilter filter;
protected final ByteBuffer buffer;
protected long initial_connect_time;
protected long last_read_time = -1;
protected
IncomingConnection(
TransportHelperFilter filter, int buff_size )
{
this.filter = filter;
this.buffer = ByteBuffer.allocate( buff_size );
this.initial_connect_time = SystemTime.getCurrentTime();
}
}
protected class
SelectorListener
implements TransportHelper.selectListener
{
private int local_port;
private Transport transport;
protected
SelectorListener(
int _local_port,
Transport _transport )
{
local_port = _local_port;
transport = _transport;
}
public boolean
selectSuccess(
TransportHelper transport_helper, Object attachment )
{
IncomingConnection ic = (IncomingConnection)attachment;
try {
long bytes_read = ic.filter.read( new ByteBuffer[]{ ic.buffer }, 0, 1 );
if( bytes_read < 0 ) {
throw new IOException( "end of stream on socket read" );
}
if( bytes_read == 0 ) {
return false;
}
ic.last_read_time = SystemTime.getCurrentTime();
Object[] match_data = checkForMatch( transport_helper, local_port, ic.buffer, false );
if( match_data == null ) { //no match found
if( ic.buffer.position() >= getMaxMatchBufferSize()) { //we've already read in enough bytes to have compared against all potential match buffers
ic.buffer.flip();
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID,
LogEvent.LT_WARNING,
"Incoming stream from [" + transport_helper.getAddress()
+ "] does not match "
+ "any known byte pattern: "
+ ByteFormatter.nicePrint(ic.buffer.array(), 128)));
removeConnection( ic, true );
}
}
else { //match found!
ic.buffer.flip();
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID,
"Incoming stream from [" + transport_helper.getAddress()
+ "] recognized as "
+ "known byte pattern: "
+ ByteFormatter.nicePrint(ic.buffer.array(), 64)));
removeConnection( ic, false );
transport.setAlreadyRead( ic.buffer );
transport.connectedInbound();
IncomingConnectionManager.MatchListener listener = (IncomingConnectionManager.MatchListener)match_data[0];
listener.connectionMatched( transport, match_data[1] );
}
return( true );
}
catch( Throwable t ) {
try {
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID,
LogEvent.LT_ERROR,
"Incoming connection [" + transport_helper.getAddress()
+ "] socket read exception: "
+ t.getMessage()));
}
catch( Throwable x ) {
Debug.out( "Caught exception on incoming exception log:" );
x.printStackTrace();
System.out.println( "CAUSED BY:" );
t.printStackTrace();
}
removeConnection( ic, true );
return( false );
}
}
//FAILURE
public void
selectFailure(
TransportHelper transport_helper,
Object attachment,
Throwable msg )
{
IncomingConnection ic = (IncomingConnection)attachment;
if (Logger.isEnabled()){
Logger.log(new LogEvent(LOGID, LogEvent.LT_ERROR,
"Incoming connection [" + transport_helper.getAddress()
+ "] socket select op failure: "
+ msg.getMessage()));
}
removeConnection( ic, true );
}
}
/**
* Listener for byte matches.
*/
public interface MatchListener {
/**
* Currently if message crypto is on and default fallback for incoming not
* enabled then we would bounce incoming messages from non-crypto transports
* For example, NAT check
* This method allows auto-fallback for such transports
* @return
*/
public boolean
autoCryptoFallback();
/**
* The given socket has been accepted as matching the byte filter.
* @param channel matching accepted connection
* @param read_so_far bytes already read
*/
public void
connectionMatched(
Transport transport,
Object routing_data );
}
}
|