FileDocCategorySizeDatePackage
HTTPNetworkConnection.javaAPI DocAzureus 3.0.3.426626Tue Aug 14 23:26:42 BST 2007com.aelitis.azureus.core.networkmanager.impl.http

HTTPNetworkConnection

public abstract class HTTPNetworkConnection extends Object

Fields Summary
protected static final org.gudy.azureus2.core3.logging.LogIDs
LOGID
private static final int
MAX_OUTSTANDING_BT_REQUESTS
protected static final String
NL
private static final String
HDR_SERVER
private static final String
HDR_KEEP_ALIVE_TIMEOUT
private static final String
HDR_CACHE_CONTROL
private static final String
DEFAULT_CONTENT_TYPE
private static int
max_read_block_size
private static final int
TIMEOUT_CHECK_PERIOD
private static final int
DEAD_CONNECTION_TIMEOUT_PERIOD
private static final int
MAX_CON_PER_ENDPOINT
private static Map
http_connection_map
private HTTPNetworkManager
manager
private com.aelitis.azureus.core.networkmanager.NetworkConnection
connection
private org.gudy.azureus2.core3.peer.impl.PEPeerTransport
peer
private HTTPMessageDecoder
decoder
private HTTPMessageEncoder
encoder
private boolean
sent_handshake
private byte[]
peer_id
private boolean
choked
private List
http_requests
private List
choked_requests
private List
outstanding_requests
private BitSet
piece_map
private long
last_http_activity_time
private networkConnectionKey
network_connection_key
private boolean
closing
private boolean
destroyed
private String
last_modified_date
private String
content_type
private com.aelitis.azureus.core.util.CopyOnWriteList
request_listeners
Constructors Summary
protected HTTPNetworkConnection(HTTPNetworkManager _manager, com.aelitis.azureus.core.networkmanager.NetworkConnection _connection, org.gudy.azureus2.core3.peer.impl.PEPeerTransport _peer)

	
	
	
				
				
					 
	
		manager			= _manager;
		connection		= _connection;
		peer			= _peer;
		
		DiskManager dm = peer.getManager().getDiskManager();
		
		long	last_modified = 0;
		
		try{
			last_modified = dm.getFiles()[0].getFile(true).lastModified();
						
		}catch( Throwable e ){
		}
		
		last_modified_date = TimeFormatter.getHTTPDate( last_modified );
		
		network_connection_key = new networkConnectionKey();
			
		last_http_activity_time	= SystemTime.getCurrentTime();
		
		decoder	= (HTTPMessageDecoder)connection.getIncomingMessageQueue().getDecoder();
		encoder = (HTTPMessageEncoder)connection.getOutgoingMessageQueue().getEncoder();

		synchronized( http_connection_map ){
						
			List	connections = (List)http_connection_map.get( network_connection_key );
			
			if ( connections == null ){
				
				connections = new ArrayList();
				
				http_connection_map.put( network_connection_key, connections );
			}
			
			connections.add( this );
			
			if ( connections.size() > MAX_CON_PER_ENDPOINT ){
				
				checkConnections( connections );
			}
		}
		
			// note that the decoder can synchronously call-back if is preloaded with a header
			// here...
		
		encoder.setConnection( this );
		decoder.setConnection( this );
	
Methods Summary
protected voidaddBTRequest(com.aelitis.azureus.core.peermanager.messaging.bittorrent.BTRequest request, com.aelitis.azureus.core.networkmanager.impl.http.HTTPNetworkConnection$httpRequest http_request)

		synchronized( outstanding_requests ){
				
			if ( destroyed ){
				
				throw( new IOException( "HTTP connection destroyed" ));
			}
			
			outstanding_requests.add( new pendingRequest( request, http_request ));
			
			if ( choked ){
					
				if ( choked_requests.size() > 1024 ){
					
					Debug.out( "pending request limit exceeded" );
					
				}else{
				
					choked_requests.add( request );
				}
			}else{
				
				decoder.addMessage( request );
			}
		}
	
protected voidaddRequest(com.aelitis.azureus.core.networkmanager.impl.http.HTTPNetworkConnection$httpRequest request)

		last_http_activity_time	= SystemTime.getCurrentTime();
		
		PEPeerControl	control = getPeerControl();
		
		if ( !sent_handshake ){
			
			sent_handshake	= true;
			
			decoder.addMessage( new BTHandshake( control.getHash(), peer_id, false, (byte)1 ));
			
			byte[]	bits = new byte[(control.getPieces().length +7) /8];
			
			DirectByteBuffer buffer = new DirectByteBuffer( ByteBuffer.wrap( bits ));
			
			decoder.addMessage( new BTBitfield( buffer, (byte)1 ));
		}
		
		synchronized( outstanding_requests ){

			http_requests.add( request );
		}
		
		submitBTRequests();
	
protected static booleancheckConnections(java.util.List connections)

	
	
		SimpleTimer.addPeriodicEvent(
			"HTTPNetworkConnection:timer",
			TIMEOUT_CHECK_PERIOD,
			new TimerEventPerformer()
			{
				public void 
				perform(
					TimerEvent event ) 
				{
					synchronized( http_connection_map ){
						
						boolean	 check = true;
						
						while( check ){
							
							check = false;
	
							Iterator	it = http_connection_map.entrySet().iterator();
							
							while( it.hasNext()){
								
								Map.Entry	entry = (Map.Entry)it.next();
								
								networkConnectionKey	key = (networkConnectionKey)entry.getKey();
								
								List	connections = (List)entry.getValue();
								
								/*
								String	times = "";
								
								for (int i=0;i<connections.size();i++){
									
									HTTPNetworkConnection	connection = (HTTPNetworkConnection)connections.get(i);
								
									times += (i==0?"":",") + connection.getTimeSinceLastActivity();
								}
								
								System.out.println( "HTTPNC: " + key.getName() + " -> " + connections.size() + " - " + times );
								*/
								
								if ( checkConnections( connections )){
									
										// might have a concurrent mod to the iterator
									
									if ( !http_connection_map.containsKey( key )){
										
										check	= true;
										
										break;
									}
								}
							}
						}
					}
				}
			});
	
		boolean	some_closed = false;
				
		HTTPNetworkConnection	oldest 			= null;
		long					oldest_time		= -1;
		
		Iterator	it = connections.iterator();
			
		List	timed_out = new ArrayList();
		
		while( it.hasNext()){
			
			HTTPNetworkConnection	connection = (HTTPNetworkConnection)it.next();
		
			long	time = connection.getTimeSinceLastActivity();
		
			if ( time > DEAD_CONNECTION_TIMEOUT_PERIOD ){
				
				if ( connection.getRequestCount() == 0 ){
																
					timed_out.add( connection );
					
					continue;
				}
			}
						
			if ( time > oldest_time && !connection.isClosing()){
					
				oldest_time		= time;
					
				oldest	= connection;
			}
		}
		
		for (int i=0;i<timed_out.size();i++){
			
			((HTTPNetworkConnection)timed_out.get(i)).close( "Timeout" );
			
			some_closed	= true;
		}
		
		if ( connections.size() - timed_out.size() > MAX_CON_PER_ENDPOINT ){
			
			oldest.close( "Too many connections from initiator");
				
			some_closed	= true;
		}
		
		return( some_closed );
	
protected voidclose(java.lang.String reason)

		closing	= true;
		
		peer.getControl().removePeer( peer );
	
protected abstract voiddecodeHeader(HTTPMessageDecoder decoder, java.lang.String header)

protected voiddestroy()

		synchronized( http_connection_map ){

			List	connections = (List)http_connection_map.get( network_connection_key );
			
			if ( connections != null ){
				
				connections.remove( this );
				
				if ( connections.size() == 0 ){
					
					http_connection_map.remove( network_connection_key );
				}
			}
		}
		
		synchronized( outstanding_requests ){

			destroyed	= true;
			
			for (int i=0;i<outstanding_requests.size();i++){
				
				pendingRequest	req = (pendingRequest)outstanding_requests.get(i);
				
				BTPiece	piece = req.getBTPiece();
				
				if ( piece != null ){
					
					piece.destroy();
				}
			}
			
			outstanding_requests.clear();
			
			for (int i=0;i<choked_requests.size();i++){
				
				BTRequest	req = (	BTRequest)choked_requests.get(i);
								
				req.destroy();
			}
			
			choked_requests.clear();
		}
	
protected com.aelitis.azureus.core.networkmanager.RawMessageencodeBitField()

		decoder.addMessage( new BTInterested((byte)1));
		
		return( null );
	
protected com.aelitis.azureus.core.networkmanager.RawMessageencodeChoke()

		synchronized( outstanding_requests ){
			
			choked	= true;
		}
		
		return( null );
	
protected com.aelitis.azureus.core.networkmanager.RawMessageencodeHandShake(com.aelitis.azureus.core.peermanager.messaging.Message message)

		return( null );
	
protected java.lang.StringencodeHeader(com.aelitis.azureus.core.networkmanager.impl.http.HTTPNetworkConnection$httpRequest request)

		String	current_date = TimeFormatter.getHTTPDate( SystemTime.getCurrentTime());
		
		StringBuffer	res = new StringBuffer(256);
		
		res.append( "HTTP/1.1 " );
		res.append( request.isPartialContent()?"206 Partial Content":"200 OK" );
			res.append( NL );

		res.append( "Content-Type: " );
		res.append( content_type );
	 		res.append( NL );		
	 		
		res.append( "Date: " );
		res.append( current_date );
		 	res.append( NL );
		 	
		res.append( "Last-Modified: " );
		res.append( last_modified_date );
			res.append( NL );

		res.append( HDR_CACHE_CONTROL );
		
			// not sure about ETag. I was going to use the torrent hash but I don't understand the link
			// between URL, range requests and ETags. Do we need to generate different ETags for each
			// webseed piece request URL or can we use the torrent hash and rely on the fact that the
			// URL changes? Are range-requests irrelevant as far as ETags go - I'd like to think so...
		
		res.append( HDR_SERVER );
		
		res.append( "Connection: " );
		res.append( request.keepAlive()?"Keep-Alive":"Close" );
			res.append( NL );
		
		if ( request.keepAlive()){
				
			res.append( HDR_KEEP_ALIVE_TIMEOUT );
		}
		
		res.append( "Content-Length: " );
		res.append( request.getTotalLength());
		res.append( NL );
		
		res.append( NL );
						
		return( res.toString());
	
protected com.aelitis.azureus.core.networkmanager.RawMessage[]encodePiece(com.aelitis.azureus.core.peermanager.messaging.Message message)

		last_http_activity_time	= SystemTime.getCurrentTime();
		
		BTPiece	piece = (BTPiece)message;
		
		List	ready_requests = new ArrayList();
		
		boolean	found = false;
		
		synchronized( outstanding_requests ){

			if ( destroyed ){
				
				return( new RawMessage[]{ getEmptyRawMessage( message )});
			}
		
			for (int i=0;i<outstanding_requests.size();i++){
				
				pendingRequest	req = (pendingRequest)outstanding_requests.get(i);
				
				if ( 	req.getPieceNumber() == piece.getPieceNumber() &&
						req.getStart() 	== piece.getPieceOffset() &&
						req.getLength() == piece.getPieceData().remaining( DirectByteBuffer.SS_NET )){
		
					if ( req.getBTPiece() == null ){
					
						req.setBTPiece( piece );
						
						found	= true;
						
						if ( i == 0 ){
							
							Iterator	it = outstanding_requests.iterator();
							
							while( it.hasNext()){
								
								pendingRequest r = (pendingRequest)it.next();
								
								BTPiece	btp = r.getBTPiece();
								
								if ( btp == null ){
									
									break;
								}
								
								it.remove();
								
								ready_requests.add( r );
							}
						}
					
						break;
					}
				}
			}
		}
		
		if ( !found ){
			
			Debug.out( "request not matched" );
			
			return( new RawMessage[]{ getEmptyRawMessage( message )});
		}
		
		if ( ready_requests.size() == 0 ){
			
			return( new RawMessage[]{ getEmptyRawMessage( message )});
		}
		
		try{
			submitBTRequests();
			
		}catch( IOException e ){
			
		}
		
		pendingRequest req	= (pendingRequest)ready_requests.get(0);
		
		DirectByteBuffer[]	buffers;
		
		httpRequest	http_request = req.getHTTPRequest();
		
		RawMessage[]	raw_messages = new RawMessage[ ready_requests.size()];
		
		for (int i=0;i<raw_messages.length;i++){
			
			buffers = new DirectByteBuffer[ 2 ];

			if ( !http_request.hasSentFirstReply()){
		
				http_request.setSentFirstReply();
						
				String	header = encodeHeader( http_request );
			
				buffers[0] = new DirectByteBuffer( ByteBuffer.wrap( header.getBytes()));
			
			}else{
			
					// we have to do this as core code assumes buffer entry 0 is protocol
			
				buffers[0] = new DirectByteBuffer( ByteBuffer.allocate(0));
			}
					
			req	= (pendingRequest)ready_requests.get(i);

			BTPiece	this_piece = req.getBTPiece();
			
			int	piece_number = this_piece.getPieceNumber();
			
			if ( !piece_map.get( piece_number )){
				
					// kinda crappy as it triggers on first block of piece, however better
					// than nothing
				
				piece_map.set( piece_number );
				
				decoder.addMessage( new BTHave( piece_number, (byte)1 ));
			}
			
			buffers[1] = this_piece.getPieceData();
			
			req.logQueued();
			
			if ( request_listeners != null ){
				
				Iterator it = request_listeners.iterator();
			
				while( it.hasNext()){
					
					((requestListener)it.next()).requestComplete( req );
				}
			}
			
			raw_messages[i] = 
				new RawMessageImpl( 
						this_piece, 
						buffers,
						RawMessage.PRIORITY_HIGH, 
						true, 
						new Message[0] );
		}
		
		return( raw_messages );
	
protected com.aelitis.azureus.core.networkmanager.RawMessageencodeUnchoke()

		
		synchronized( outstanding_requests ){
			
			choked	= false;
			
			for (int i=0;i<choked_requests.size();i++){
								
				decoder.addMessage((BTRequest)choked_requests.get(i));
			}
			
			choked_requests.clear();
		}
		
		return( null );
	
protected voidflushRequests(com.aelitis.azureus.core.networkmanager.impl.http.HTTPNetworkConnection$flushListener l)

		boolean	sync_fire = false;
		
		synchronized( outstanding_requests ){

			final int request_count = outstanding_requests.size();
						
			if ( request_count == 0 ){
				
				sync_fire = true;
				
			}else{
				
				if ( request_listeners == null ){
					
					request_listeners = new CopyOnWriteList();
				}
				
				request_listeners.add(
					new requestListener()
					{
						int	num_to_go = request_count;
						
						public void
						requestComplete(
							pendingRequest r )
						{
							num_to_go--;
							
							if ( num_to_go == 0 ){
								
								request_listeners.remove( this );
								
								flushRequestsSupport( l );
							}
						}
					});
			}
		}
		
		if ( sync_fire ){
			
			flushRequestsSupport( l );
		}
	
protected voidflushRequestsSupport(com.aelitis.azureus.core.networkmanager.impl.http.HTTPNetworkConnection$flushListener l)

		OutgoingMessageQueue omq = getConnection().getOutgoingMessageQueue();
		
		final Message	http_message = new HTTPMessage( new byte[0] );
		
		omq.registerQueueListener(
			new OutgoingMessageQueue.MessageQueueListener()
			{
				public boolean 
				messageAdded( 
					Message message )
				{	
					return( true );
				}
				   
				public void 
				messageQueued( 
					Message message )
				{			
				}
				    
				public void 
				messageRemoved( 
					Message message )
				{
				}
				    
				public void 
				messageSent( 
					Message message )
				{
					if ( message == http_message ){
						
						l.flushed();
					}
				}
				    
			    public void 
			    protocolBytesSent( 
			    	int byte_count )
			    {	
			    }
				 
			    public void 
			    dataBytesSent( 
			    	int byte_count )
			    {	
			    }
			});
		
		omq.addMessage( http_message, false );
		
			// if after adding the message there's no bytes on the queue then we need to trigger an
			// immediate flushed event as the queue won't get processed (0 bytes on it...)
		
		if ( omq.getTotalSize() == 0 ){
			
			l.flushed();
		}
	
protected com.aelitis.azureus.core.networkmanager.NetworkConnectiongetConnection()

		return( connection );
	
protected com.aelitis.azureus.core.networkmanager.RawMessagegetEmptyRawMessage(com.aelitis.azureus.core.peermanager.messaging.Message message)

		
		return( 
			new RawMessageImpl( 
					message, 
					new DirectByteBuffer[]{ new DirectByteBuffer( ByteBuffer.allocate(0))},
					RawMessage.PRIORITY_HIGH, 
					true, 
					new Message[0] ));
	
protected HTTPNetworkManagergetManager()

		return( manager );
	
protected org.gudy.azureus2.core3.peer.impl.PEPeerTransportgetPeer()

		return( peer );
	
protected org.gudy.azureus2.core3.peer.impl.PEPeerControlgetPeerControl()

		return( peer.getControl());
	
protected intgetRequestCount()

		synchronized( outstanding_requests ){

			return( http_requests.size());
		}
	
protected longgetTimeSinceLastActivity()

		long	now = SystemTime.getCurrentTime();
		
		if ( now < last_http_activity_time ){
			
			last_http_activity_time = now;
		}
		
		return( now - last_http_activity_time );
	
protected booleanisClosing()

		return( closing );
	
protected booleanisSeed()

		if ( !peer.getControl().isSeeding()){
			
			if (Logger.isEnabled()){
				Logger.log(new LogEvent(peer,LOGID, "Download is not seeding" ));
			}   	
			
			sendAndClose( manager.getNotFound());
			
			return( false );
		}
		
		return( true );
	
protected voidlog(java.lang.String str)

		if (Logger.isEnabled()){
			Logger.log(new LogEvent( getPeer(),LOGID, str));
		}   
	
protected voidreadWakeup()

		connection.getTransport().setReadyForRead();
	
protected voidsendAndClose(java.lang.String data)

		final Message	http_message = new HTTPMessage( data );
		
		getConnection().getOutgoingMessageQueue().registerQueueListener(
			new OutgoingMessageQueue.MessageQueueListener()
			{
				public boolean 
				messageAdded( 
					Message message )
				{	
					return( true );
				}
				   
				public void 
				messageQueued( 
					Message message )
				{			
				}
				    
				public void 
				messageRemoved( 
					Message message )
				{
				}
				    
				public void 
				messageSent( 
					Message message )
				{
					if ( message == http_message ){
						
						close( "Close after message send complete" );
					}
				}
				    
			    public void 
			    protocolBytesSent( 
			    	int byte_count )
			    {	
			    }
				 
			    public void 
			    dataBytesSent( 
			    	int byte_count )
			    {	
			    }
			});
		
		getConnection().getOutgoingMessageQueue().addMessage( http_message, false );
	
protected voidsetContentType(java.lang.String ct)

		content_type	= ct;
	
protected voidsubmitBTRequests()

		PEPeerControl	control = getPeerControl();

		long	piece_size = control.getPieceLength(0);
	
		synchronized( outstanding_requests ){

			while( outstanding_requests.size() < MAX_OUTSTANDING_BT_REQUESTS && http_requests.size() > 0 ){
				
				httpRequest	http_request = (httpRequest)http_requests.get(0);
				
				long[]	offsets	= http_request.getOffsets();
				long[]	lengths	= http_request.getLengths();
				
				int	index	= http_request.getIndex();
				
				long	offset 	= offsets[index];
				long	length	= lengths[index];
				
				int		this_piece_number 	= (int)(offset / piece_size);
				int		this_piece_size		= control.getPieceLength( this_piece_number );
				
				int		offset_in_piece 	= (int)( offset - ( this_piece_number * piece_size ));
				
				int		space_this_piece 	= this_piece_size - offset_in_piece;
				
				int		request_size = (int)Math.min( length, space_this_piece );
				
				request_size = Math.min( request_size, max_read_block_size );
				
				addBTRequest( 
					new BTRequest( 
							this_piece_number, 
							offset_in_piece, 
							request_size,
							(byte)1),
					http_request );
					
				if ( request_size == length ){
					
					if ( index == offsets.length - 1 ){
						
						http_requests.remove(0);
						
					}else{
						
						http_request.setIndex( index+1 );
					}
				}else{
					offsets[index] += request_size;
					lengths[index] -= request_size;
				}
			}
		}