Fields Summary |
---|
protected static final org.gudy.azureus2.core3.logging.LogIDs | LOGID |
private static final int | MAX_OUTSTANDING_BT_REQUESTS |
protected static final String | NL |
private static final String | HDR_SERVER |
private static final String | HDR_KEEP_ALIVE_TIMEOUT |
private static final String | HDR_CACHE_CONTROL |
private static final String | DEFAULT_CONTENT_TYPE |
private static int | max_read_block_size |
private static final int | TIMEOUT_CHECK_PERIOD |
private static final int | DEAD_CONNECTION_TIMEOUT_PERIOD |
private static final int | MAX_CON_PER_ENDPOINT |
private static Map | http_connection_map |
private HTTPNetworkManager | manager |
private com.aelitis.azureus.core.networkmanager.NetworkConnection | connection |
private org.gudy.azureus2.core3.peer.impl.PEPeerTransport | peer |
private HTTPMessageDecoder | decoder |
private HTTPMessageEncoder | encoder |
private boolean | sent_handshake |
private byte[] | peer_id |
private boolean | choked |
private List | http_requests |
private List | choked_requests |
private List | outstanding_requests |
private BitSet | piece_map |
private long | last_http_activity_time |
private networkConnectionKey | network_connection_key |
private boolean | closing |
private boolean | destroyed |
private String | last_modified_date |
private String | content_type |
private com.aelitis.azureus.core.util.CopyOnWriteList | request_listeners |
Methods Summary |
---|
protected void | addBTRequest(com.aelitis.azureus.core.peermanager.messaging.bittorrent.BTRequest request, com.aelitis.azureus.core.networkmanager.impl.http.HTTPNetworkConnection$httpRequest http_request)
synchronized( outstanding_requests ){
if ( destroyed ){
throw( new IOException( "HTTP connection destroyed" ));
}
outstanding_requests.add( new pendingRequest( request, http_request ));
if ( choked ){
if ( choked_requests.size() > 1024 ){
Debug.out( "pending request limit exceeded" );
}else{
choked_requests.add( request );
}
}else{
decoder.addMessage( request );
}
}
|
protected void | addRequest(com.aelitis.azureus.core.networkmanager.impl.http.HTTPNetworkConnection$httpRequest request)
last_http_activity_time = SystemTime.getCurrentTime();
PEPeerControl control = getPeerControl();
if ( !sent_handshake ){
sent_handshake = true;
decoder.addMessage( new BTHandshake( control.getHash(), peer_id, false, (byte)1 ));
byte[] bits = new byte[(control.getPieces().length +7) /8];
DirectByteBuffer buffer = new DirectByteBuffer( ByteBuffer.wrap( bits ));
decoder.addMessage( new BTBitfield( buffer, (byte)1 ));
}
synchronized( outstanding_requests ){
http_requests.add( request );
}
submitBTRequests();
|
protected static boolean | checkConnections(java.util.List connections)
SimpleTimer.addPeriodicEvent(
"HTTPNetworkConnection:timer",
TIMEOUT_CHECK_PERIOD,
new TimerEventPerformer()
{
public void
perform(
TimerEvent event )
{
synchronized( http_connection_map ){
boolean check = true;
while( check ){
check = false;
Iterator it = http_connection_map.entrySet().iterator();
while( it.hasNext()){
Map.Entry entry = (Map.Entry)it.next();
networkConnectionKey key = (networkConnectionKey)entry.getKey();
List connections = (List)entry.getValue();
/*
String times = "";
for (int i=0;i<connections.size();i++){
HTTPNetworkConnection connection = (HTTPNetworkConnection)connections.get(i);
times += (i==0?"":",") + connection.getTimeSinceLastActivity();
}
System.out.println( "HTTPNC: " + key.getName() + " -> " + connections.size() + " - " + times );
*/
if ( checkConnections( connections )){
// might have a concurrent mod to the iterator
if ( !http_connection_map.containsKey( key )){
check = true;
break;
}
}
}
}
}
}
});
boolean some_closed = false;
HTTPNetworkConnection oldest = null;
long oldest_time = -1;
Iterator it = connections.iterator();
List timed_out = new ArrayList();
while( it.hasNext()){
HTTPNetworkConnection connection = (HTTPNetworkConnection)it.next();
long time = connection.getTimeSinceLastActivity();
if ( time > DEAD_CONNECTION_TIMEOUT_PERIOD ){
if ( connection.getRequestCount() == 0 ){
timed_out.add( connection );
continue;
}
}
if ( time > oldest_time && !connection.isClosing()){
oldest_time = time;
oldest = connection;
}
}
for (int i=0;i<timed_out.size();i++){
((HTTPNetworkConnection)timed_out.get(i)).close( "Timeout" );
some_closed = true;
}
if ( connections.size() - timed_out.size() > MAX_CON_PER_ENDPOINT ){
oldest.close( "Too many connections from initiator");
some_closed = true;
}
return( some_closed );
|
protected void | close(java.lang.String reason)
closing = true;
peer.getControl().removePeer( peer );
|
protected abstract void | decodeHeader(HTTPMessageDecoder decoder, java.lang.String header)
|
protected void | destroy()
synchronized( http_connection_map ){
List connections = (List)http_connection_map.get( network_connection_key );
if ( connections != null ){
connections.remove( this );
if ( connections.size() == 0 ){
http_connection_map.remove( network_connection_key );
}
}
}
synchronized( outstanding_requests ){
destroyed = true;
for (int i=0;i<outstanding_requests.size();i++){
pendingRequest req = (pendingRequest)outstanding_requests.get(i);
BTPiece piece = req.getBTPiece();
if ( piece != null ){
piece.destroy();
}
}
outstanding_requests.clear();
for (int i=0;i<choked_requests.size();i++){
BTRequest req = ( BTRequest)choked_requests.get(i);
req.destroy();
}
choked_requests.clear();
}
|
protected com.aelitis.azureus.core.networkmanager.RawMessage | encodeBitField()
decoder.addMessage( new BTInterested((byte)1));
return( null );
|
protected com.aelitis.azureus.core.networkmanager.RawMessage | encodeChoke()
synchronized( outstanding_requests ){
choked = true;
}
return( null );
|
protected com.aelitis.azureus.core.networkmanager.RawMessage | encodeHandShake(com.aelitis.azureus.core.peermanager.messaging.Message message)
return( null );
|
protected java.lang.String | encodeHeader(com.aelitis.azureus.core.networkmanager.impl.http.HTTPNetworkConnection$httpRequest request)
String current_date = TimeFormatter.getHTTPDate( SystemTime.getCurrentTime());
StringBuffer res = new StringBuffer(256);
res.append( "HTTP/1.1 " );
res.append( request.isPartialContent()?"206 Partial Content":"200 OK" );
res.append( NL );
res.append( "Content-Type: " );
res.append( content_type );
res.append( NL );
res.append( "Date: " );
res.append( current_date );
res.append( NL );
res.append( "Last-Modified: " );
res.append( last_modified_date );
res.append( NL );
res.append( HDR_CACHE_CONTROL );
// not sure about ETag. I was going to use the torrent hash but I don't understand the link
// between URL, range requests and ETags. Do we need to generate different ETags for each
// webseed piece request URL or can we use the torrent hash and rely on the fact that the
// URL changes? Are range-requests irrelevant as far as ETags go - I'd like to think so...
res.append( HDR_SERVER );
res.append( "Connection: " );
res.append( request.keepAlive()?"Keep-Alive":"Close" );
res.append( NL );
if ( request.keepAlive()){
res.append( HDR_KEEP_ALIVE_TIMEOUT );
}
res.append( "Content-Length: " );
res.append( request.getTotalLength());
res.append( NL );
res.append( NL );
return( res.toString());
|
protected com.aelitis.azureus.core.networkmanager.RawMessage[] | encodePiece(com.aelitis.azureus.core.peermanager.messaging.Message message)
last_http_activity_time = SystemTime.getCurrentTime();
BTPiece piece = (BTPiece)message;
List ready_requests = new ArrayList();
boolean found = false;
synchronized( outstanding_requests ){
if ( destroyed ){
return( new RawMessage[]{ getEmptyRawMessage( message )});
}
for (int i=0;i<outstanding_requests.size();i++){
pendingRequest req = (pendingRequest)outstanding_requests.get(i);
if ( req.getPieceNumber() == piece.getPieceNumber() &&
req.getStart() == piece.getPieceOffset() &&
req.getLength() == piece.getPieceData().remaining( DirectByteBuffer.SS_NET )){
if ( req.getBTPiece() == null ){
req.setBTPiece( piece );
found = true;
if ( i == 0 ){
Iterator it = outstanding_requests.iterator();
while( it.hasNext()){
pendingRequest r = (pendingRequest)it.next();
BTPiece btp = r.getBTPiece();
if ( btp == null ){
break;
}
it.remove();
ready_requests.add( r );
}
}
break;
}
}
}
}
if ( !found ){
Debug.out( "request not matched" );
return( new RawMessage[]{ getEmptyRawMessage( message )});
}
if ( ready_requests.size() == 0 ){
return( new RawMessage[]{ getEmptyRawMessage( message )});
}
try{
submitBTRequests();
}catch( IOException e ){
}
pendingRequest req = (pendingRequest)ready_requests.get(0);
DirectByteBuffer[] buffers;
httpRequest http_request = req.getHTTPRequest();
RawMessage[] raw_messages = new RawMessage[ ready_requests.size()];
for (int i=0;i<raw_messages.length;i++){
buffers = new DirectByteBuffer[ 2 ];
if ( !http_request.hasSentFirstReply()){
http_request.setSentFirstReply();
String header = encodeHeader( http_request );
buffers[0] = new DirectByteBuffer( ByteBuffer.wrap( header.getBytes()));
}else{
// we have to do this as core code assumes buffer entry 0 is protocol
buffers[0] = new DirectByteBuffer( ByteBuffer.allocate(0));
}
req = (pendingRequest)ready_requests.get(i);
BTPiece this_piece = req.getBTPiece();
int piece_number = this_piece.getPieceNumber();
if ( !piece_map.get( piece_number )){
// kinda crappy as it triggers on first block of piece, however better
// than nothing
piece_map.set( piece_number );
decoder.addMessage( new BTHave( piece_number, (byte)1 ));
}
buffers[1] = this_piece.getPieceData();
req.logQueued();
if ( request_listeners != null ){
Iterator it = request_listeners.iterator();
while( it.hasNext()){
((requestListener)it.next()).requestComplete( req );
}
}
raw_messages[i] =
new RawMessageImpl(
this_piece,
buffers,
RawMessage.PRIORITY_HIGH,
true,
new Message[0] );
}
return( raw_messages );
|
protected com.aelitis.azureus.core.networkmanager.RawMessage | encodeUnchoke()
synchronized( outstanding_requests ){
choked = false;
for (int i=0;i<choked_requests.size();i++){
decoder.addMessage((BTRequest)choked_requests.get(i));
}
choked_requests.clear();
}
return( null );
|
protected void | flushRequests(com.aelitis.azureus.core.networkmanager.impl.http.HTTPNetworkConnection$flushListener l)
boolean sync_fire = false;
synchronized( outstanding_requests ){
final int request_count = outstanding_requests.size();
if ( request_count == 0 ){
sync_fire = true;
}else{
if ( request_listeners == null ){
request_listeners = new CopyOnWriteList();
}
request_listeners.add(
new requestListener()
{
int num_to_go = request_count;
public void
requestComplete(
pendingRequest r )
{
num_to_go--;
if ( num_to_go == 0 ){
request_listeners.remove( this );
flushRequestsSupport( l );
}
}
});
}
}
if ( sync_fire ){
flushRequestsSupport( l );
}
|
protected void | flushRequestsSupport(com.aelitis.azureus.core.networkmanager.impl.http.HTTPNetworkConnection$flushListener l)
OutgoingMessageQueue omq = getConnection().getOutgoingMessageQueue();
final Message http_message = new HTTPMessage( new byte[0] );
omq.registerQueueListener(
new OutgoingMessageQueue.MessageQueueListener()
{
public boolean
messageAdded(
Message message )
{
return( true );
}
public void
messageQueued(
Message message )
{
}
public void
messageRemoved(
Message message )
{
}
public void
messageSent(
Message message )
{
if ( message == http_message ){
l.flushed();
}
}
public void
protocolBytesSent(
int byte_count )
{
}
public void
dataBytesSent(
int byte_count )
{
}
});
omq.addMessage( http_message, false );
// if after adding the message there's no bytes on the queue then we need to trigger an
// immediate flushed event as the queue won't get processed (0 bytes on it...)
if ( omq.getTotalSize() == 0 ){
l.flushed();
}
|
protected com.aelitis.azureus.core.networkmanager.NetworkConnection | getConnection()
return( connection );
|
protected com.aelitis.azureus.core.networkmanager.RawMessage | getEmptyRawMessage(com.aelitis.azureus.core.peermanager.messaging.Message message)
return(
new RawMessageImpl(
message,
new DirectByteBuffer[]{ new DirectByteBuffer( ByteBuffer.allocate(0))},
RawMessage.PRIORITY_HIGH,
true,
new Message[0] ));
|
protected HTTPNetworkManager | getManager()
return( manager );
|
protected org.gudy.azureus2.core3.peer.impl.PEPeerTransport | getPeer()
return( peer );
|
protected org.gudy.azureus2.core3.peer.impl.PEPeerControl | getPeerControl()
return( peer.getControl());
|
protected int | getRequestCount()
synchronized( outstanding_requests ){
return( http_requests.size());
}
|
protected long | getTimeSinceLastActivity()
long now = SystemTime.getCurrentTime();
if ( now < last_http_activity_time ){
last_http_activity_time = now;
}
return( now - last_http_activity_time );
|
protected boolean | isClosing()
return( closing );
|
protected boolean | isSeed()
if ( !peer.getControl().isSeeding()){
if (Logger.isEnabled()){
Logger.log(new LogEvent(peer,LOGID, "Download is not seeding" ));
}
sendAndClose( manager.getNotFound());
return( false );
}
return( true );
|
protected void | log(java.lang.String str)
if (Logger.isEnabled()){
Logger.log(new LogEvent( getPeer(),LOGID, str));
}
|
protected void | readWakeup()
connection.getTransport().setReadyForRead();
|
protected void | sendAndClose(java.lang.String data)
final Message http_message = new HTTPMessage( data );
getConnection().getOutgoingMessageQueue().registerQueueListener(
new OutgoingMessageQueue.MessageQueueListener()
{
public boolean
messageAdded(
Message message )
{
return( true );
}
public void
messageQueued(
Message message )
{
}
public void
messageRemoved(
Message message )
{
}
public void
messageSent(
Message message )
{
if ( message == http_message ){
close( "Close after message send complete" );
}
}
public void
protocolBytesSent(
int byte_count )
{
}
public void
dataBytesSent(
int byte_count )
{
}
});
getConnection().getOutgoingMessageQueue().addMessage( http_message, false );
|
protected void | setContentType(java.lang.String ct)
content_type = ct;
|
protected void | submitBTRequests()
PEPeerControl control = getPeerControl();
long piece_size = control.getPieceLength(0);
synchronized( outstanding_requests ){
while( outstanding_requests.size() < MAX_OUTSTANDING_BT_REQUESTS && http_requests.size() > 0 ){
httpRequest http_request = (httpRequest)http_requests.get(0);
long[] offsets = http_request.getOffsets();
long[] lengths = http_request.getLengths();
int index = http_request.getIndex();
long offset = offsets[index];
long length = lengths[index];
int this_piece_number = (int)(offset / piece_size);
int this_piece_size = control.getPieceLength( this_piece_number );
int offset_in_piece = (int)( offset - ( this_piece_number * piece_size ));
int space_this_piece = this_piece_size - offset_in_piece;
int request_size = (int)Math.min( length, space_this_piece );
request_size = Math.min( request_size, max_read_block_size );
addBTRequest(
new BTRequest(
this_piece_number,
offset_in_piece,
request_size,
(byte)1),
http_request );
if ( request_size == length ){
if ( index == offsets.length - 1 ){
http_requests.remove(0);
}else{
http_request.setIndex( index+1 );
}
}else{
offsets[index] += request_size;
lengths[index] -= request_size;
}
}
}
|