TCPConnectionManagerpublic class TCPConnectionManager extends Object Manages new connection establishment and ended connection termination. |
Fields Summary |
---|
private static final LogIDs | LOGID | private static int | MIN_SIMULTANIOUS_CONNECT_ATTEMPTS | public static int | MAX_SIMULTANIOUS_CONNECT_ATTEMPTS | private static int | max_outbound_connections | private static final int | CONNECT_ATTEMPT_TIMEOUT | private static final int | CONNECT_ATTEMPT_STALL_TIME | private static final boolean | SHOW_CONNECT_STATS | private final com.aelitis.azureus.core.networkmanager.VirtualChannelSelector | connect_selector | private final LinkedList | new_requests | private final ArrayList | canceled_requests | private final AEMonitor | new_canceled_mon | private final HashMap | pending_attempts | private final LinkedList | pending_closes | private final Map | delayed_closes | private final AEMonitor | pending_closes_mon | private final Random | random | private boolean | max_conn_exceeded_logged |
Constructors Summary |
---|
public TCPConnectionManager()
Set types = new HashSet();
types.add( AzureusCoreStats.ST_NET_TCP_OUT_CONNECT_QUEUE_LENGTH );
types.add( AzureusCoreStats.ST_NET_TCP_OUT_CANCEL_QUEUE_LENGTH );
types.add( AzureusCoreStats.ST_NET_TCP_OUT_CLOSE_QUEUE_LENGTH );
types.add( AzureusCoreStats.ST_NET_TCP_OUT_PENDING_QUEUE_LENGTH );
AzureusCoreStats.registerProvider(
types,
new AzureusCoreStatsProvider()
{
public void
updateStats(
Set types,
Map values )
{
if ( types.contains( AzureusCoreStats.ST_NET_TCP_OUT_CONNECT_QUEUE_LENGTH )){
values.put( AzureusCoreStats.ST_NET_TCP_OUT_CONNECT_QUEUE_LENGTH, new Long( new_requests.size()));
}
if ( types.contains( AzureusCoreStats.ST_NET_TCP_OUT_CANCEL_QUEUE_LENGTH )){
values.put( AzureusCoreStats.ST_NET_TCP_OUT_CANCEL_QUEUE_LENGTH, new Long( canceled_requests.size()));
}
if ( types.contains( AzureusCoreStats.ST_NET_TCP_OUT_CLOSE_QUEUE_LENGTH )){
values.put( AzureusCoreStats.ST_NET_TCP_OUT_CLOSE_QUEUE_LENGTH, new Long( pending_closes.size()));
}
if ( types.contains( AzureusCoreStats.ST_NET_TCP_OUT_PENDING_QUEUE_LENGTH )){
values.put( AzureusCoreStats.ST_NET_TCP_OUT_PENDING_QUEUE_LENGTH, new Long( pending_attempts.size()));
}
}
});
AEThread loop = new AEThread( "ConnectDisconnectManager" ) {
public void runSupport() {
mainLoop();
}
};
loop.setDaemon( true );
loop.start();
|
Methods Summary |
---|
private void | addNewOutboundRequests()
while( pending_attempts.size() < MIN_SIMULTANIOUS_CONNECT_ATTEMPTS ) {
ConnectionRequest cr = null;
try{
new_canceled_mon.enter();
if( new_requests.isEmpty() ) break;
cr = (ConnectionRequest)new_requests.removeFirst();
}
finally{
new_canceled_mon.exit();
}
if( cr != null ) {
addNewRequest( cr );
}
}
| private void | addNewRequest(com.aelitis.azureus.core.networkmanager.impl.tcp.TCPConnectionManager$ConnectionRequest request)
request.listener.connectAttemptStarted();
try {
request.channel = SocketChannel.open();
try { //advanced socket options
int rcv_size = COConfigurationManager.getIntParameter( "network.tcp.socket.SO_RCVBUF" );
if( rcv_size > 0 ) {
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID, "Setting socket receive buffer size"
+ " for outgoing connection [" + request.address + "] to: "
+ rcv_size));
request.channel.socket().setReceiveBufferSize( rcv_size );
}
int snd_size = COConfigurationManager.getIntParameter( "network.tcp.socket.SO_SNDBUF" );
if( snd_size > 0 ) {
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID, "Setting socket send buffer size "
+ "for outgoing connection [" + request.address + "] to: "
+ snd_size));
request.channel.socket().setSendBufferSize( snd_size );
}
String ip_tos = COConfigurationManager.getStringParameter( "network.tcp.socket.IPTOS" );
if( ip_tos.length() > 0 ) {
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID, "Setting socket TOS field "
+ "for outgoing connection [" + request.address + "] to: "
+ ip_tos));
request.channel.socket().setTrafficClass( Integer.decode( ip_tos ).intValue() );
}
int local_bind_port = COConfigurationManager.getIntParameter( "network.bind.local.port" );
if( local_bind_port > 0 ) {
request.channel.socket().setReuseAddress( true );
}
InetAddress bindIP = NetworkAdmin.getSingleton().getDefaultBindAddress();
if ( bindIP != null ) {
if (Logger.isEnabled()) Logger.log(new LogEvent(LOGID, "Binding outgoing connection [" + request.address + "] to local IP address: " + bindIP));
request.channel.socket().bind( new InetSocketAddress( bindIP, local_bind_port ) );
}
else if( local_bind_port > 0 ) {
if (Logger.isEnabled()) Logger.log(new LogEvent(LOGID, "Binding outgoing connection [" + request.address + "] to local port #: " +local_bind_port));
request.channel.socket().bind( new InetSocketAddress( local_bind_port ) );
}
}
catch( Throwable t ) {
String msg = "Error while processing advanced socket options.";
Debug.out( msg, t );
Logger.log(new LogAlert(LogAlert.UNREPEATABLE, msg, t));
//dont pass the exception outwards, so we will continue processing connection without advanced options set
}
request.channel.configureBlocking( false );
request.connect_start_time = SystemTime.getCurrentTime();
if( request.channel.connect( request.address ) ) { //already connected
finishConnect( request );
}
else { //not yet connected, so register for connect selection
pending_attempts.put( request, null );
connect_selector.register( request.channel, new VirtualChannelSelector.VirtualSelectorListener() {
public boolean selectSuccess( VirtualChannelSelector selector, SocketChannel sc, Object attachment ) {
pending_attempts.remove( request );
finishConnect( request );
return true;
}
public void selectFailure( VirtualChannelSelector selector, SocketChannel sc,Object attachment, Throwable msg ) {
pending_attempts.remove( request );
closeConnection( request.channel );
request.listener.connectFailure( msg );
}
}, null );
}
}
catch( Throwable t ) {
String full = request.address.toString();
String hostname = request.address.getHostName();
int port = request.address.getPort();
boolean unresolved = request.address.isUnresolved();
InetAddress inet_address = request.address.getAddress();
String full_sub = inet_address==null?request.address.toString():inet_address.toString();
String host_address = inet_address==null?request.address.toString():inet_address.getHostAddress();
String msg = "ConnectDisconnectManager::address exception: full="+full+ ", hostname="+hostname+ ", port="+port+ ", unresolved="+unresolved+ ", full_sub="+full_sub+ ", host_address="+host_address;
if( request.channel != null ) {
String channel = request.channel.toString();
String socket = request.channel.socket().toString();
String local_address = request.channel.socket().getLocalAddress().toString();
int local_port = request.channel.socket().getLocalPort();
SocketAddress ra = request.channel.socket().getRemoteSocketAddress();
String remote_address;
if( ra != null ) remote_address = ra.toString();
else remote_address = "<null>";
int remote_port = request.channel.socket().getPort();
msg += "\n channel="+channel+ ", socket="+socket+ ", local_address="+local_address+ ", local_port="+local_port+ ", remote_address="+remote_address+ ", remote_port="+remote_port;
}
else {
msg += "\n channel=<null>";
}
if ( t instanceof UnresolvedAddressException ){
Debug.outNoStack( msg );
}else{
Debug.out( msg, t );
}
if( request.channel != null ) {
closeConnection( request.channel );
}
request.listener.connectFailure( t );
}
| public void | cancelRequest(com.aelitis.azureus.core.networkmanager.impl.tcp.TCPConnectionManager$ConnectListener listener_key)Cancel a pending new connection request.
try{
new_canceled_mon.enter();
//check if we can cancel it right away
for( Iterator i = new_requests.iterator(); i.hasNext(); ) {
ConnectionRequest request = (ConnectionRequest)i.next();
if( request.listener == listener_key ) {
i.remove();
return;
}
}
canceled_requests.add( listener_key ); //else add for later removal during select
}
finally{
new_canceled_mon.exit();
}
| public void | closeConnection(java.nio.channels.SocketChannel channel)Close the given connection.
closeConnection( channel, 0 );
| public void | closeConnection(java.nio.channels.SocketChannel channel, int delay)
try{
pending_closes_mon.enter();
if ( delay == 0 ){
if ( !delayed_closes.containsKey( channel )){
if ( !pending_closes.contains( channel )){
pending_closes.addLast( channel );
}
}
}else{
delayed_closes.put( channel, new Long( SystemTime.getCurrentTime() + delay ));
}
}finally{
pending_closes_mon.exit();
}
| private void | doClosings()
try{
pending_closes_mon.enter();
long now = SystemTime.getCurrentTime();
if ( delayed_closes.size() > 0 ){
Iterator it = delayed_closes.entrySet().iterator();
while( it.hasNext()){
Map.Entry entry = (Map.Entry)it.next();
long wait = ((Long)entry.getValue()).longValue() - now;
if ( wait < 0 || wait > 60*1000 ){
pending_closes.addLast( entry.getKey());
it.remove();
}
}
}
while( !pending_closes.isEmpty() ) {
SocketChannel channel = (SocketChannel)pending_closes.removeFirst();
if( channel != null ) {
connect_selector.cancel( channel );
try{
channel.close();
}
catch( Throwable t ) {
/*Debug.printStackTrace(t);*/
}
}
}
}finally{
pending_closes_mon.exit();
}
| private void | finishConnect(com.aelitis.azureus.core.networkmanager.impl.tcp.TCPConnectionManager$ConnectionRequest request)
try {
if( request.channel.finishConnect() ) {
if( SHOW_CONNECT_STATS ) {
long queue_wait_time = request.connect_start_time - request.request_start_time;
long connect_time = SystemTime.getCurrentTime() - request.connect_start_time;
int num_queued = new_requests.size();
int num_connecting = pending_attempts.size();
System.out.println("S: queue_wait_time="+queue_wait_time+
", connect_time="+connect_time+
", num_queued="+num_queued+
", num_connecting="+num_connecting);
}
//ensure the request hasn't been canceled during the select op
boolean canceled = false;
try{ new_canceled_mon.enter();
canceled = canceled_requests.contains( request.listener );
}
finally{ new_canceled_mon.exit(); }
if( canceled ) {
closeConnection( request.channel );
}
else {
connect_selector.cancel( request.channel );
request.listener.connectSuccess( request.channel );
}
}
else { //should never happen
Debug.out( "finishConnect() failed" );
request.listener.connectFailure( new Throwable( "finishConnect() failed" ) );
closeConnection( request.channel );
}
}
catch( Throwable t ) {
if( SHOW_CONNECT_STATS ) {
long queue_wait_time = request.connect_start_time - request.request_start_time;
long connect_time = SystemTime.getCurrentTime() - request.connect_start_time;
int num_queued = new_requests.size();
int num_connecting = pending_attempts.size();
System.out.println("F: queue_wait_time="+queue_wait_time+
", connect_time="+connect_time+
", num_queued="+num_queued+
", num_connecting="+num_connecting);
}
request.listener.connectFailure( t );
closeConnection( request.channel );
}
| public int | getMaxOutboundPermitted()
return( Math.max( max_outbound_connections - new_requests.size(), 0 ));
| private void | mainLoop()
while( true ) {
addNewOutboundRequests();
runSelect();
doClosings();
}
| public void | requestNewConnection(java.net.InetSocketAddress address, com.aelitis.azureus.core.networkmanager.impl.tcp.TCPConnectionManager$ConnectListener listener, long connect_timeout)
ConnectionRequest cr = new ConnectionRequest( address, listener, connect_timeout );
try{
new_canceled_mon.enter();
//insert at a random position because new connections are usually added in 50-peer
//chunks, i.e. from a tracker announce reply, and we want to evenly distribute the
//connect attempts if there are multiple torrents running
int insert_pos = random.nextInt( new_requests.size() + 1 );
new_requests.add( insert_pos, cr );
if ( new_requests.size() >= max_outbound_connections ){
if ( !max_conn_exceeded_logged ){
max_conn_exceeded_logged = true;
Debug.out( "TCPConnectionManager: max outbound connection limit reached (" + max_outbound_connections + ")" );
}
}
}finally{
new_canceled_mon.exit();
}
| public void | requestNewConnection(java.net.InetSocketAddress address, com.aelitis.azureus.core.networkmanager.impl.tcp.TCPConnectionManager$ConnectListener listener)Request that a new connection be made out to the given address.
requestNewConnection( address, listener, CONNECT_ATTEMPT_TIMEOUT );
| private void | runSelect()
//do cancellations
try{
new_canceled_mon.enter();
for (Iterator can_it =canceled_requests.iterator(); can_it.hasNext();) {
ConnectListener key =(ConnectListener) can_it.next();
ConnectionRequest to_remove =null;
for (Iterator pen_it =pending_attempts.keySet().iterator(); pen_it.hasNext();) {
ConnectionRequest request =(ConnectionRequest) pen_it.next();
if (request.listener ==key) {
connect_selector.cancel(request.channel);
closeConnection(request.channel);
to_remove =request;
break;
}
}
if( to_remove != null ) {
pending_attempts.remove( to_remove );
}
}
canceled_requests.clear();
}
finally{
new_canceled_mon.exit();
}
//run select
try{
connect_selector.select(100);
}
catch( Throwable t ) {
Debug.out("connnectSelectLoop() EXCEPTION: ", t);
}
//do connect attempt timeout checks
int num_stalled_requests =0;
final long now =SystemTime.getCurrentTime();
for (Iterator i =pending_attempts.keySet().iterator(); i.hasNext();) {
final ConnectionRequest request =(ConnectionRequest) i.next();
final long waiting_time =now -request.connect_start_time;
if( waiting_time > request.connect_timeout ) {
i.remove();
SocketChannel channel = request.channel;
connect_selector.cancel( channel );
closeConnection( channel );
InetSocketAddress sock_address = request.address;
InetAddress a = sock_address.getAddress();
String target;
if ( a != null ){
target = a.getHostAddress() + ":" + sock_address.getPort();
}else{
target = sock_address.toString();
}
request.listener.connectFailure( new SocketTimeoutException( "Connection attempt to " + target + " aborted: timed out after " + request.connect_timeout/1000+ "sec" ) );
}
else if( waiting_time >= CONNECT_ATTEMPT_STALL_TIME ) {
num_stalled_requests++;
}
else if( waiting_time < 0 ) { //time went backwards
request.connect_start_time =now;
}
}
//check if our connect queue is stalled, and expand if so
if (num_stalled_requests ==pending_attempts.size() &&pending_attempts.size() <MAX_SIMULTANIOUS_CONNECT_ATTEMPTS) {
ConnectionRequest cr =null;
try{
new_canceled_mon.enter();
if( !new_requests.isEmpty() ) {
cr = (ConnectionRequest)new_requests.removeFirst();
}
}
finally{
new_canceled_mon.exit();
}
if( cr != null ) {
addNewRequest( cr );
}
}
|
|