/*
* Created on 12-Jan-2005
* Created by Paul Gardner
* Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 46,603.30 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package com.aelitis.azureus.core.dht.control.impl;
import java.io.*;
import java.math.BigInteger;
import java.util.*;
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import org.gudy.azureus2.core3.util.AEMonitor;
import org.gudy.azureus2.core3.util.AESemaphore;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.HashWrapper;
import org.gudy.azureus2.core3.util.ListenerManager;
import org.gudy.azureus2.core3.util.ListenerManagerDispatcher;
import org.gudy.azureus2.core3.util.SHA1Simple;
import org.gudy.azureus2.core3.util.SystemTime;
import org.gudy.azureus2.core3.util.ThreadPool;
import org.gudy.azureus2.core3.util.ThreadPoolTask;
import com.aelitis.azureus.core.dht.DHT;
import com.aelitis.azureus.core.dht.DHTLogger;
import com.aelitis.azureus.core.dht.DHTOperationAdapter;
import com.aelitis.azureus.core.dht.DHTOperationListener;
import com.aelitis.azureus.core.dht.DHTStorageBlock;
import com.aelitis.azureus.core.dht.impl.*;
import com.aelitis.azureus.core.dht.control.*;
import com.aelitis.azureus.core.dht.db.*;
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPosition;
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPositionManager;
import com.aelitis.azureus.core.dht.router.*;
import com.aelitis.azureus.core.dht.transport.*;
import com.aelitis.azureus.core.dht.transport.udp.DHTTransportUDP;
/**
* @author parg
*
*/
public class
DHTControlImpl
implements DHTControl, DHTTransportRequestHandler
{
private static final int EXTERNAL_LOOKUP_CONCURRENCY = 16;
private static final int EXTERNAL_PUT_CONCURRENCY = 8;
private static final int RANDOM_QUERY_PERIOD = 5*60*1000;
private static final int INTEGRATION_TIME_MAX = 15*1000;
private DHTControlAdapter adapter;
private DHTTransport transport;
private DHTTransportContact local_contact;
private DHTRouter router;
private DHTDB database;
private DHTControlStatsImpl stats;
private DHTLogger logger;
private int node_id_byte_count;
private int search_concurrency;
private int lookup_concurrency;
private int cache_at_closest_n;
private int K;
private int B;
private int max_rep_per_node;
private long router_start_time;
private int router_count;
private ThreadPool internal_lookup_pool;
private ThreadPool external_lookup_pool;
private ThreadPool internal_put_pool;
private ThreadPool external_put_pool;
private Map imported_state = new HashMap();
private long last_lookup;
private ListenerManager listeners = ListenerManager.createAsyncManager(
"DHTControl:listenDispatcher",
new ListenerManagerDispatcher()
{
public void
dispatch(
Object _listener,
int type,
Object value )
{
DHTControlListener target = (DHTControlListener)_listener;
target.activityChanged((DHTControlActivity)value, type );
}
});
private List activities = new ArrayList();
private AEMonitor activity_mon = new AEMonitor( "DHTControl:activities" );
protected AEMonitor estimate_mon = new AEMonitor( "DHTControl:estimate" );
private long last_dht_estimate_time;
private long local_dht_estimate;
private long combined_dht_estimate;
private static final int LOCAL_ESTIMATE_HISTORY = 32;
private Map local_estimate_values =
new LinkedHashMap(LOCAL_ESTIMATE_HISTORY,0.75f,true)
{
protected boolean
removeEldestEntry(
Map.Entry eldest)
{
return( size() > LOCAL_ESTIMATE_HISTORY );
}
};
private static final int REMOTE_ESTIMATE_HISTORY = 128;
private List remote_estimate_values = new LinkedList();
protected AEMonitor spoof_mon = new AEMonitor( "DHTControl:spoof" );
private Cipher spoof_cipher;
private SecretKey spoof_key;
public
DHTControlImpl(
DHTControlAdapter _adapter,
DHTTransport _transport,
int _K,
int _B,
int _max_rep_per_node,
int _search_concurrency,
int _lookup_concurrency,
int _original_republish_interval,
int _cache_republish_interval,
int _cache_at_closest_n,
DHTLogger _logger )
{
adapter = _adapter;
transport = _transport;
logger = _logger;
K = _K;
B = _B;
max_rep_per_node = _max_rep_per_node;
search_concurrency = _search_concurrency;
lookup_concurrency = _lookup_concurrency;
cache_at_closest_n = _cache_at_closest_n;
// set this so we don't do initial calculation until reasonably populated
last_dht_estimate_time = SystemTime.getCurrentTime();
database = DHTDBFactory.create(
adapter.getStorageAdapter(),
_original_republish_interval,
_cache_republish_interval,
logger );
internal_lookup_pool = new ThreadPool("DHTControl:internallookups", lookup_concurrency );
internal_put_pool = new ThreadPool("DHTControl:internalputs", lookup_concurrency );
// external pools queue when full ( as opposed to blocking )
external_lookup_pool = new ThreadPool("DHTControl:externallookups", EXTERNAL_LOOKUP_CONCURRENCY, true );
external_put_pool = new ThreadPool("DHTControl:puts", EXTERNAL_PUT_CONCURRENCY, true );
createRouter( transport.getLocalContact());
node_id_byte_count = router.getID().length;
stats = new DHTControlStatsImpl( this );
// don't bother computing anti-spoof stuff if we don't support value storage
if ( transport.supportsStorage()){
try{
spoof_cipher = Cipher.getInstance("DESede/ECB/PKCS5Padding");
KeyGenerator keyGen = KeyGenerator.getInstance("DESede");
spoof_key = keyGen.generateKey();
}catch( Throwable e ){
Debug.printStackTrace( e );
logger.log( e );
}
}
transport.setRequestHandler( this );
transport.addListener(
new DHTTransportListener()
{
public void
localContactChanged(
DHTTransportContact new_local_contact )
{
logger.log( "Transport ID changed, recreating router" );
List old_contacts = router.findBestContacts( 0 );
byte[] old_router_id = router.getID();
createRouter( new_local_contact );
// sort for closeness to new router id
Set sorted_contacts = new sortedTransportContactSet( router.getID(), true ).getSet();
for (int i=0;i<old_contacts.size();i++){
DHTRouterContact contact = (DHTRouterContact)old_contacts.get(i);
if ( !Arrays.equals( old_router_id, contact.getID())){
if ( contact.isAlive()){
DHTTransportContact t_contact = ((DHTControlContactImpl)contact.getAttachment()).getTransportContact();
sorted_contacts.add( t_contact );
}
}
}
// fill up with non-alive ones to lower limit in case this is a start-of-day
// router change and we only have imported contacts in limbo state
for (int i=0;sorted_contacts.size() < 32 && i<old_contacts.size();i++){
DHTRouterContact contact = (DHTRouterContact)old_contacts.get(i);
if ( !Arrays.equals( old_router_id, contact.getID())){
if ( !contact.isAlive()){
DHTTransportContact t_contact = ((DHTControlContactImpl)contact.getAttachment()).getTransportContact();
sorted_contacts.add( t_contact );
}
}
}
Iterator it = sorted_contacts.iterator();
int added = 0;
// don't add them all otherwise we can skew the smallest-subtree. better
// to seed with some close ones and then let the normal seeding process
// populate it correctly
while( it.hasNext() && added < 128 ){
DHTTransportContact contact = (DHTTransportContact)it.next();
router.contactAlive( contact.getID(), new DHTControlContactImpl( contact ));
added++;
}
seed( false );
}
public void
currentAddress(
String address )
{
}
public void
reachabilityChanged(
boolean reacheable )
{
}
});
}
protected void
createRouter(
DHTTransportContact _local_contact)
{
router_start_time = SystemTime.getCurrentTime();
router_count++;
local_contact = _local_contact;
if ( router != null ){
router.destroy();
}
router = DHTRouterFactory.create(
K, B, max_rep_per_node,
local_contact.getID(),
new DHTControlContactImpl( local_contact ),
logger);
router.setAdapter(
new DHTRouterAdapter()
{
public void
requestPing(
DHTRouterContact contact )
{
DHTControlImpl.this.requestPing( contact );
}
public void
requestLookup(
byte[] id,
String description )
{
lookup( internal_lookup_pool, false,
id,
description,
(byte)0,
false,
0,
search_concurrency,
1,
router.getK(), // (parg - removed this) decrease search accuracy for refreshes
new lookupResultHandler(new DHTOperationAdapter())
{
public void
diversify(
DHTTransportContact cause,
byte diversification_type )
{
}
public void
closest(
List res )
{
}
});
}
public void
requestAdd(
DHTRouterContact contact )
{
nodeAddedToRouter( contact );
}
});
database.setControl( this );
}
public long
getRouterUptime()
{
long now = SystemTime.getCurrentTime();
if ( now < router_start_time ){
router_start_time = now;
}
return( now - router_start_time );
}
public int
getRouterCount()
{
return( router_count );
}
public DHTControlStats
getStats()
{
return( stats );
}
public DHTTransport
getTransport()
{
return( transport );
}
public DHTRouter
getRouter()
{
return( router );
}
public DHTDB
getDataBase()
{
return( database );
}
public void
contactImported(
DHTTransportContact contact )
{
router.contactKnown( contact.getID(), new DHTControlContactImpl(contact));
}
public void
contactRemoved(
DHTTransportContact contact )
{
// obviously we don't want to remove ourselves
if ( !router.isID( contact.getID())){
router.contactDead( contact.getID(), true );
}
}
public void
exportState(
DataOutputStream daos,
int max )
throws IOException
{
/*
* We need to be a bit smart about exporting state to deal with the situation where a
* DHT is started (with good import state) and then stopped before the goodness of the
* state can be re-established. So we remember what we imported and take account of this
* on a re-export
*/
// get all the contacts
List contacts = router.findBestContacts( 0 );
// give priority to any that were alive before and are alive now
List to_save = new ArrayList();
List reserves = new ArrayList();
//System.out.println( "Exporting" );
for (int i=0;i<contacts.size();i++){
DHTRouterContact contact = (DHTRouterContact)contacts.get(i);
Object[] imported = (Object[])imported_state.get( new HashWrapper( contact.getID()));
if ( imported != null ){
if ( contact.isAlive()){
// definitely want to keep this one
to_save.add( contact );
}else if ( !contact.isFailing()){
// dunno if its still good or not, however its got to be better than any
// new ones that we didn't import who aren't known to be alive
reserves.add( contact );
}
}
}
//System.out.println( " initial to_save = " + to_save.size() + ", reserves = " + reserves.size());
// now pull out any live ones
for (int i=0;i<contacts.size();i++){
DHTRouterContact contact = (DHTRouterContact)contacts.get(i);
if ( contact.isAlive() && !to_save.contains( contact )){
to_save.add( contact );
}
}
//System.out.println( " after adding live ones = " + to_save.size());
// now add any reserve ones
for (int i=0;i<reserves.size();i++){
DHTRouterContact contact = (DHTRouterContact)reserves.get(i);
if ( !to_save.contains( contact )){
to_save.add( contact );
}
}
//System.out.println( " after adding reserves = " + to_save.size());
// now add in the rest!
for (int i=0;i<contacts.size();i++){
DHTRouterContact contact = (DHTRouterContact)contacts.get(i);
if (!to_save.contains( contact )){
to_save.add( contact );
}
}
// and finally remove the invalid ones
Iterator it = to_save.iterator();
while( it.hasNext()){
DHTRouterContact contact = (DHTRouterContact)it.next();
DHTTransportContact t_contact = ((DHTControlContactImpl)contact.getAttachment()).getTransportContact();
if ( !t_contact.isValid()){
it.remove();
}
}
//System.out.println( " finally = " + to_save.size());
int num_to_write = Math.min( max, to_save.size());
daos.writeInt( num_to_write );
for (int i=0;i<num_to_write;i++){
DHTRouterContact contact = (DHTRouterContact)to_save.get(i);
//System.out.println( "export:" + contact.getString());
daos.writeLong( contact.getTimeAlive());
DHTTransportContact t_contact = ((DHTControlContactImpl)contact.getAttachment()).getTransportContact();
try{
t_contact.exportContact( daos );
}catch( DHTTransportException e ){
// shouldn't fail as for a contact to make it to the router
// it should be valid...
Debug.printStackTrace( e );
throw( new IOException( e.getMessage()));
}
}
daos.flush();
}
public void
importState(
DataInputStream dais )
throws IOException
{
int num = dais.readInt();
for (int i=0;i<num;i++){
try{
long time_alive = dais.readLong();
DHTTransportContact contact = transport.importContact( dais );
imported_state.put( new HashWrapper( contact.getID()), new Object[]{ new Long( time_alive ), contact });
}catch( DHTTransportException e ){
Debug.printStackTrace( e );
}
}
}
public void
seed(
final boolean full_wait )
{
final AESemaphore sem = new AESemaphore( "DHTControl:seed" );
lookup( internal_lookup_pool, false,
router.getID(),
"Seeding DHT",
(byte)0,
false,
0,
search_concurrency*4,
1,
router.getK(),
new lookupResultHandler(new DHTOperationAdapter())
{
public void
diversify(
DHTTransportContact cause,
byte diversification_type )
{
}
public void
closest(
List res )
{
if ( !full_wait ){
sem.release();
}
try{
router.seed();
}finally{
if ( full_wait ){
sem.release();
}
}
}
});
// we always wait at least a minimum amount of time before returning
long start = SystemTime.getCurrentTime();
sem.reserve( INTEGRATION_TIME_MAX );
long now = SystemTime.getCurrentTime();
if ( now < start ){
start = now;
}
long remaining = INTEGRATION_TIME_MAX - ( now - start );
if ( remaining > 500 && !full_wait ){
logger.log( "Initial integration completed, waiting " + remaining + " ms for second phase to start" );
try{
Thread.sleep( remaining );
}catch( Throwable e ){
Debug.out(e);
}
}
}
protected void
poke()
{
long now = SystemTime.getCurrentTime();
if ( now < last_lookup ||
now - last_lookup > RANDOM_QUERY_PERIOD ){
last_lookup = now;
// we don't want this to be blocking as it'll stuff the stats
external_lookup_pool.run(
new task(external_lookup_pool)
{
private byte[] target = {};
public void
runSupport()
{
target = router.refreshRandom();
}
public byte[]
getTarget()
{
return( target );
}
public String
getDescription()
{
return( "Random Query" );
}
});
}
}
public void
put(
byte[] _unencoded_key,
String _description,
byte[] _value,
byte _flags,
DHTOperationListener _listener )
{
// public entry point for explicit publishes
if ( _value.length == 0 ){
// zero length denotes value removal
throw( new RuntimeException( "zero length values not supported"));
}
byte[] encoded_key = encodeKey( _unencoded_key );
DHTLog.log( "put for " + DHTLog.getString( encoded_key ));
DHTDBValue value = database.store( new HashWrapper( encoded_key ), _value, _flags );
put( external_put_pool,
encoded_key,
_description,
value,
0,
true,
new HashSet(),
_listener instanceof DHTOperationListenerDemuxer?
(DHTOperationListenerDemuxer)_listener:
new DHTOperationListenerDemuxer(_listener));
}
public void
putEncodedKey(
byte[] encoded_key,
String description,
DHTTransportValue value,
long timeout,
boolean original_mappings )
{
put( internal_put_pool,
encoded_key,
description,
value,
timeout,
original_mappings,
new HashSet(),
new DHTOperationListenerDemuxer( new DHTOperationAdapter()));
}
protected void
put(
ThreadPool thread_pool,
byte[] initial_encoded_key,
String description,
DHTTransportValue value,
long timeout,
boolean original_mappings,
Set keys_written,
DHTOperationListenerDemuxer listener )
{
put( thread_pool,
initial_encoded_key,
description,
new DHTTransportValue[]{ value },
timeout,
original_mappings,
keys_written,
listener );
}
protected void
put(
final ThreadPool thread_pool,
final byte[] initial_encoded_key,
final String description,
final DHTTransportValue[] values,
final long timeout,
final boolean original_mappings,
final Set keys_written,
final DHTOperationListenerDemuxer listener )
{
// get the initial starting point for the put - may have previously been diversified
byte[][] encoded_keys =
adapter.diversify(
null,
true,
true,
initial_encoded_key,
DHT.DT_NONE,
original_mappings );
// may be > 1 if diversification is replicating (for load balancing)
for (int i=0;i<encoded_keys.length;i++){
final byte[] encoded_key = encoded_keys[i];
HashWrapper hw = new HashWrapper( encoded_key );
if ( keys_written.contains( hw )){
// System.out.println( "put: skipping key as already written" );
continue;
}
keys_written.add( hw );
final String this_description =
Arrays.equals( encoded_key, initial_encoded_key )?
description:
("Diversification of [" + description + "]" );
lookup( thread_pool, false,
encoded_key,
this_description,
(byte)0,
false,
timeout,
search_concurrency,
1,
router.getK(),
new lookupResultHandler(listener)
{
public void
diversify(
DHTTransportContact cause,
byte diversification_type )
{
Debug.out( "Shouldn't get a diversify on a lookup-node" );
}
public void
closest(
List _closest )
{
put( thread_pool,
new byte[][]{ encoded_key },
"Store of [" + this_description + "]",
new DHTTransportValue[][]{ values },
_closest,
timeout,
listener,
true,
keys_written );
}
});
}
}
public void
putDirectEncodedKeys(
byte[][] encoded_keys,
String description,
DHTTransportValue[][] value_sets,
List contacts )
{
// we don't consider diversification for direct puts (these are for republishing
// of cached mappings and we maintain these as normal - its up to the original
// publisher to diversify as required)
put( internal_put_pool,
encoded_keys,
description,
value_sets,
contacts,
0,
new DHTOperationListenerDemuxer( new DHTOperationAdapter()),
false,
new HashSet());
}
protected void
put(
final ThreadPool thread_pool,
byte[][] initial_encoded_keys,
final String description,
final DHTTransportValue[][] initial_value_sets,
final List contacts,
final long timeout,
final DHTOperationListenerDemuxer listener,
final boolean consider_diversification,
final Set keys_written )
{
boolean[] ok = new boolean[initial_encoded_keys.length];
int failed = 0;
for (int i=0;i<initial_encoded_keys.length;i++){
if ( ! (ok[i] = !database.isKeyBlocked( initial_encoded_keys[i]))){
failed++;
}
}
// if all failed then nothing to do
if ( failed == ok.length ){
listener.incrementCompletes();
listener.complete( false );
return;
}
final byte[][] encoded_keys = failed==0?initial_encoded_keys:new byte[ok.length-failed][];
final DHTTransportValue[][] value_sets = failed==0?initial_value_sets:new DHTTransportValue[ok.length-failed][];
if ( failed > 0 ){
int pos = 0;
for (int i=0;i<ok.length;i++){
if ( ok[i] ){
encoded_keys[ pos ] = initial_encoded_keys[i];
value_sets[ pos ] = initial_value_sets[i];
pos++;
}
}
}
// only diversify on one hit as we're storing at closest 'n' so we only need to
// do it once for each key
final boolean[] diversified = new boolean[encoded_keys.length];
for (int i=0;i<contacts.size();i++){
DHTTransportContact contact = (DHTTransportContact)contacts.get(i);
if ( router.isID( contact.getID())){
// don't send to ourselves!
}else{
try{
for (int j=0;j<value_sets.length;j++){
for (int k=0;k<value_sets[j].length;k++){
listener.wrote( contact, value_sets[j][k] );
}
}
// each store is going to report its complete event
listener.incrementCompletes();
contact.sendStore(
new DHTTransportReplyHandlerAdapter()
{
public void
storeReply(
DHTTransportContact _contact,
byte[] _diversifications )
{
try{
DHTLog.log( "Store OK " + DHTLog.getString( _contact ));
router.contactAlive( _contact.getID(), new DHTControlContactImpl(_contact));
// can be null for old protocol versions
if ( consider_diversification && _diversifications != null ){
for (int j=0;j<_diversifications.length;j++){
if ( _diversifications[j] != DHT.DT_NONE && !diversified[j] ){
diversified[j] = true;
byte[][] diversified_keys =
adapter.diversify( _contact, true, false, encoded_keys[j], _diversifications[j], false );
for (int k=0;k<diversified_keys.length;k++){
put( thread_pool,
diversified_keys[k],
"Diversification of [" + description + "]",
value_sets[j],
timeout,
false,
keys_written,
listener );
}
}
}
}
}finally{
listener.complete( false );
}
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
try{
DHTLog.log( "Store failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
router.contactDead( _contact.getID(), false );
}finally{
listener.complete( true );
}
}
public void
keyBlockRequest(
DHTTransportContact contact,
byte[] request,
byte[] key_signature )
{
DHTStorageBlock key_block = database.keyBlockRequest( null, request, key_signature );
if ( key_block != null ){
// remove this key for any subsequent publishes. Quickest hack
// is to change it into a random key value - this will be rejected
// by the recipient as not being close enough anyway
for (int i=0;i<encoded_keys.length;i++){
if ( Arrays.equals( encoded_keys[i], key_block.getKey())){
byte[] dummy = new byte[encoded_keys[i].length];
new Random().nextBytes( dummy );
encoded_keys[i] = dummy;
}
}
}
}
},
encoded_keys,
value_sets );
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
}
}
public DHTTransportValue
getLocalValue(
byte[] unencoded_key )
{
final byte[] encoded_key = encodeKey( unencoded_key );
DHTLog.log( "getLocalValue for " + DHTLog.getString( encoded_key ));
DHTDBValue res = database.get( new HashWrapper( encoded_key ));
if ( res == null ){
return( null );
}
return( res );
}
public void
get(
byte[] unencoded_key,
String description,
byte flags,
int max_values,
long timeout,
boolean exhaustive,
boolean high_priority,
final DHTOperationListener get_listener )
{
final byte[] encoded_key = encodeKey( unencoded_key );
DHTLog.log( "get for " + DHTLog.getString( encoded_key ));
getSupport( encoded_key, description, flags, max_values, timeout, exhaustive, high_priority, new DHTOperationListenerDemuxer( get_listener ));
}
public boolean
isDiversified(
byte[] unencoded_key )
{
final byte[] encoded_key = encodeKey( unencoded_key );
return( adapter.isDiversified( encoded_key ));
}
public boolean
lookup(
byte[] unencoded_key,
long timeout,
final DHTOperationListener lookup_listener )
{
final byte[] encoded_key = encodeKey( unencoded_key );
DHTLog.log( "lookup for " + DHTLog.getString( encoded_key ));
final AESemaphore sem = new AESemaphore( "DHTControl:lookup" );
final boolean[] diversified = { false };
DHTOperationListener delegate =
new DHTOperationListener()
{
public void
searching(
DHTTransportContact contact,
int level,
int active_searches )
{
lookup_listener.searching( contact, level, active_searches );
}
public void
found(
DHTTransportContact contact )
{
}
public void
diversified()
{
lookup_listener.diversified();
}
public void
read(
DHTTransportContact contact,
DHTTransportValue value )
{
}
public void
wrote(
DHTTransportContact contact,
DHTTransportValue value )
{
}
public void
complete(
boolean timeout )
{
lookup_listener.complete( timeout );
sem.release();
}
};
lookup( external_lookup_pool, false,
encoded_key,
"lookup",
(byte)0,
false,
timeout,
search_concurrency,
1,
router.getK(),
new lookupResultHandler( delegate )
{
public void
diversify(
DHTTransportContact cause,
byte diversification_type )
{
diversified();
diversified[0] = true;
}
public void
closest(
List closest )
{
for (int i=0;i<closest.size();i++){
lookup_listener.found((DHTTransportContact)closest.get(i));
}
}
});
sem.reserve();
return( diversified[0] );
}
protected void
getSupport(
final byte[] initial_encoded_key,
final String description,
final byte flags,
final int max_values,
final long timeout,
final boolean exhaustive,
final boolean high_priority,
final DHTOperationListenerDemuxer get_listener )
{
// get the initial starting point for the get - may have previously been diversified
byte[][] encoded_keys = adapter.diversify( null, false, true, initial_encoded_key, DHT.DT_NONE, exhaustive );
for (int i=0;i<encoded_keys.length;i++){
final boolean[] diversified = { false };
final byte[] encoded_key = encoded_keys[i];
boolean div = !Arrays.equals( encoded_key, initial_encoded_key );
if ( div ){
get_listener.diversified();
}
final String this_description =
div?("Diversification of [" + description + "]" ):description;
lookup( external_lookup_pool,
high_priority,
encoded_key,
this_description,
flags,
true,
timeout,
search_concurrency,
max_values,
router.getK(),
new lookupResultHandler( get_listener )
{
private List found_values = new ArrayList();
public void
diversify(
DHTTransportContact cause,
byte diversification_type )
{
diversified();
// we only want to follow one diversification
if ( !diversified[0]){
diversified[0] = true;
int rem = max_values==0?0:( max_values - found_values.size());
if ( max_values == 0 || rem > 0 ){
byte[][] diversified_keys = adapter.diversify( cause, false, false, encoded_key, diversification_type, exhaustive );
// should return a max of 1 (0 if diversification refused)
// however, could change one day to search > 1
for (int j=0;j<diversified_keys.length;j++){
getSupport( diversified_keys[j], "Diversification of [" + this_description + "]", flags, rem, timeout, exhaustive, high_priority, get_listener );
}
}
}
}
public void
read(
DHTTransportContact contact,
DHTTransportValue value )
{
found_values.add( value );
super.read( contact, value );
}
public void
closest(
List closest )
{
/* we don't use teh cache-at-closest kad feature
if ( found_values.size() > 0 ){
DHTTransportValue[] values = new DHTTransportValue[found_values.size()];
found_values.toArray( values );
// cache the values at the 'n' closest seen locations
for (int k=0;k<Math.min(cache_at_closest_n,closest.size());k++){
DHTTransportContact contact = (DHTTransportContact)(DHTTransportContact)closest.get(k);
for (int j=0;j<values.length;j++){
wrote( contact, values[j] );
}
contact.sendStore(
new DHTTransportReplyHandlerAdapter()
{
public void
storeReply(
DHTTransportContact _contact,
byte[] _diversifications )
{
// don't consider diversification for cache stores as we're not that
// bothered
DHTLog.log( "Cache store OK " + DHTLog.getString( _contact ));
router.contactAlive( _contact.getID(), new DHTControlContactImpl(_contact));
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
DHTLog.log( "Cache store failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
router.contactDead( _contact.getID(), false );
}
},
new byte[][]{ encoded_key },
new DHTTransportValue[][]{ values });
}
}
*/
}
});
}
}
public byte[]
remove(
byte[] unencoded_key,
String description,
DHTOperationListener listener )
{
final byte[] encoded_key = encodeKey( unencoded_key );
DHTLog.log( "remove for " + DHTLog.getString( encoded_key ));
DHTDBValue res = database.remove( local_contact, new HashWrapper( encoded_key ));
if ( res == null ){
// not found locally, nothing to do
return( null );
}else{
// we remove a key by pushing it back out again with zero length value
put( external_put_pool,
encoded_key,
description,
res,
0,
true,
new HashSet(),
new DHTOperationListenerDemuxer( listener ));
return( res.getValue());
}
}
/**
* The lookup method returns up to K closest nodes to the target
* @param lookup_id
* @return
*/
protected void
lookup(
ThreadPool thread_pool,
boolean high_priority,
final byte[] lookup_id,
final String description,
final byte flags,
final boolean value_search,
final long timeout,
final int concurrency,
final int max_values,
final int search_accuracy,
final lookupResultHandler handler )
{
thread_pool.run(
new task(thread_pool)
{
public void
runSupport()
{
try{
lookupSupportSync( lookup_id, flags, value_search, timeout, concurrency, max_values, search_accuracy, handler );
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
public byte[]
getTarget()
{
return( lookup_id );
}
public String
getDescription()
{
return( description );
}
}, high_priority );
}
protected void
lookupSupportSync(
final byte[] lookup_id,
byte flags,
boolean value_search,
long timeout,
int concurrency,
int max_values,
final int search_accuracy,
final lookupResultHandler result_handler )
{
boolean timeout_occurred = false;
last_lookup = SystemTime.getCurrentTime();
result_handler.incrementCompletes();
try{
DHTLog.log( "lookup for " + DHTLog.getString( lookup_id ));
if ( value_search ){
if ( database.isKeyBlocked( lookup_id )){
DHTLog.log( "lookup: terminates - key blocked" );
// bail out and pretend everything worked with zero results
return;
}
}
// keep querying successively closer nodes until we have got responses from the K
// closest nodes that we've seen. We might get a bunch of closer nodes that then
// fail to respond, which means we have reconsider further away nodes
// we keep a list of nodes that we have queried to avoid re-querying them
// we keep a list of nodes discovered that we have yet to query
// we have a parallel search limit of A. For each A we effectively loop grabbing
// the currently closest unqueried node, querying it and adding the results to the
// yet-to-query-set (unless already queried)
// we terminate when we have received responses from the K closest nodes we know
// about (excluding failed ones)
// Note that we never widen the root of our search beyond the initial K closest
// that we know about - this could be relaxed
// contacts remaining to query
// closest at front
final Set contacts_to_query = getClosestContactsSet( lookup_id, false );
final AEMonitor contacts_to_query_mon = new AEMonitor( "DHTControl:ctq" );
final Map level_map = new HashMap();
Iterator it = contacts_to_query.iterator();
while( it.hasNext()){
DHTTransportContact contact = (DHTTransportContact)it.next();
result_handler.found( contact );
level_map.put( contact , new Integer(0));
}
// record the set of contacts we've queried to avoid re-queries
final Map contacts_queried = new HashMap();
// record the set of contacts that we've had a reply from
// furthest away at front
final Set ok_contacts = new sortedTransportContactSet( lookup_id, false ).getSet();
// this handles the search concurrency
final AESemaphore search_sem = new AESemaphore( "DHTControl:search", concurrency );
final int[] idle_searches = { 0 };
final int[] active_searches = { 0 };
final int[] values_found = { 0 };
final int[] value_replies = { 0 };
final Set values_found_set = new HashSet();
final boolean[] key_blocked = { false };
long start = SystemTime.getCurrentTime();
while( true ){
if ( timeout > 0 ){
long now = SystemTime.getCurrentTime();
// check for clock being set back
if ( now < start ){
start = now;
}
long remaining = timeout - ( now - start );
if ( remaining <= 0 ){
DHTLog.log( "lookup: terminates - timeout" );
timeout_occurred = true;
break;
}
// get permission to kick off another search
if ( !search_sem.reserve( remaining )){
DHTLog.log( "lookup: terminates - timeout" );
timeout_occurred = true;
break;
}
}else{
search_sem.reserve();
}
try{
contacts_to_query_mon.enter();
if ( values_found[0] >= max_values ||
value_replies[0]>= 2 ){ // all hits should have the same values anyway...
break;
}
// if we've received a key block then easiest way to terminate the query is to
// dump any outstanding targets
if ( key_blocked[0] ){
contacts_to_query.clear();
}
// if nothing pending then we need to wait for the results of a previous
// search to arrive. Of course, if there are no searches active then
// we've run out of things to do
if ( contacts_to_query.size() == 0 ){
if ( active_searches[0] == 0 ){
DHTLog.log( "lookup: terminates - no contacts left to query" );
break;
}
idle_searches[0]++;
continue;
}
// select the next contact to search
DHTTransportContact closest = (DHTTransportContact)contacts_to_query.iterator().next();
// if the next closest is further away than the furthest successful hit so
// far and we have K hits, we're done
if ( ok_contacts.size() == search_accuracy ){
DHTTransportContact furthest_ok = (DHTTransportContact)ok_contacts.iterator().next();
int distance = computeAndCompareDistances( furthest_ok.getID(), closest.getID(), lookup_id );
if ( distance <= 0 ){
DHTLog.log( "lookup: terminates - we've searched the closest " + search_accuracy + " contacts" );
break;
}
}
// we optimise the first few entries based on their Vivaldi distance. Only a few
// however as we don't want to start too far away from the target.
if ( contacts_queried.size() < concurrency ){
DHTNetworkPosition[] loc_nps = local_contact.getNetworkPositions();
DHTTransportContact vp_closest = null;
Iterator vp_it = contacts_to_query.iterator();
int vp_count_limit = (concurrency*2) - contacts_queried.size();
int vp_count = 0;
float best_dist = Float.MAX_VALUE;
while( vp_it.hasNext() && vp_count < vp_count_limit ){
vp_count++;
DHTTransportContact entry = (DHTTransportContact)vp_it.next();
DHTNetworkPosition[] rem_nps = entry.getNetworkPositions();
float dist = DHTNetworkPositionManager.estimateRTT( loc_nps, rem_nps );
if ( (!Float.isNaN(dist)) && dist < best_dist ){
best_dist = dist;
vp_closest = entry;
// System.out.println( start + ": lookup for " + DHTLog.getString2( lookup_id ) + ": vp override (dist = " + dist + ")");
}
if ( vp_closest != null ){
// override ID closest with VP closes
closest = vp_closest;
}
}
}
contacts_to_query.remove( closest );
contacts_queried.put( new HashWrapper( closest.getID()), closest );
// never search ourselves!
if ( router.isID( closest.getID())){
search_sem.release();
continue;
}
final int search_level = ((Integer)level_map.get(closest)).intValue();
active_searches[0]++;
result_handler.searching( closest, search_level, active_searches[0] );
DHTTransportReplyHandlerAdapter handler =
new DHTTransportReplyHandlerAdapter()
{
private boolean value_reply_received = false;
public void
findNodeReply(
DHTTransportContact target_contact,
DHTTransportContact[] reply_contacts )
{
try{
DHTLog.log( "findNodeReply: " + DHTLog.getString( reply_contacts ));
router.contactAlive( target_contact.getID(), new DHTControlContactImpl(target_contact));
for (int i=0;i<reply_contacts.length;i++){
DHTTransportContact contact = reply_contacts[i];
// ignore responses that are ourselves
if ( compareDistances( router.getID(), contact.getID()) == 0 ){
continue;
}
// dunno if its alive or not, however record its existance
router.contactKnown( contact.getID(), new DHTControlContactImpl(contact));
}
try{
contacts_to_query_mon.enter();
ok_contacts.add( target_contact );
if ( ok_contacts.size() > search_accuracy ){
// delete the furthest away
Iterator ok_it = ok_contacts.iterator();
ok_it.next();
ok_it.remove();
}
for (int i=0;i<reply_contacts.length;i++){
DHTTransportContact contact = reply_contacts[i];
// ignore responses that are ourselves
if ( compareDistances( router.getID(), contact.getID()) == 0 ){
continue;
}
if ( contacts_queried.get( new HashWrapper( contact.getID())) == null &&
(!contacts_to_query.contains( contact ))){
DHTLog.log( " new contact for query: " + DHTLog.getString( contact ));
contacts_to_query.add( contact );
result_handler.found( contact );
level_map.put( contact, new Integer( search_level+1));
if ( idle_searches[0] > 0 ){
idle_searches[0]--;
search_sem.release();
}
}else{
// DHTLog.log( " already queried: " + DHTLog.getString( contact ));
}
}
}finally{
contacts_to_query_mon.exit();
}
}finally{
try{
contacts_to_query_mon.enter();
active_searches[0]--;
}finally{
contacts_to_query_mon.exit();
}
search_sem.release();
}
}
public void
findValueReply(
DHTTransportContact contact,
DHTTransportValue[] values,
byte diversification_type,
boolean more_to_come )
{
DHTLog.log( "findValueReply: " + DHTLog.getString( values ) + ",mtc=" + more_to_come + ", dt=" + diversification_type );
try{
if ( !key_blocked[0] ){
if ( diversification_type != DHT.DT_NONE ){
// diversification instruction
result_handler.diversify( contact, diversification_type );
}
}
value_reply_received = true;
router.contactAlive( contact.getID(), new DHTControlContactImpl(contact));
int new_values = 0;
if ( !key_blocked[0] ){
for (int i=0;i<values.length;i++){
DHTTransportValue value = values[i];
DHTTransportContact originator = value.getOriginator();
// can't just use originator id as this value can be DOSed (see DB code)
byte[] originator_id = originator.getID();
byte[] value_bytes = value.getValue();
byte[] value_id = new byte[originator_id.length + value_bytes.length];
System.arraycopy( originator_id, 0, value_id, 0, originator_id.length );
System.arraycopy( value_bytes, 0, value_id, originator_id.length, value_bytes.length );
HashWrapper x = new HashWrapper( value_id );
if ( !values_found_set.contains( x )){
new_values++;
values_found_set.add( x );
result_handler.read( contact, values[i] );
}
}
}
try{
contacts_to_query_mon.enter();
if ( !more_to_come ){
value_replies[0]++;
}
values_found[0] += new_values;
}finally{
contacts_to_query_mon.exit();
}
}finally{
if ( !more_to_come ){
try{
contacts_to_query_mon.enter();
active_searches[0]--;
}finally{
contacts_to_query_mon.exit();
}
search_sem.release();
}
}
}
public void
findValueReply(
DHTTransportContact contact,
DHTTransportContact[] contacts )
{
findNodeReply( contact, contacts );
}
public void
failed(
DHTTransportContact target_contact,
Throwable error )
{
try{
// if at least one reply has been received then we
// don't treat subsequent failure as indication of
// a contact failure (just packet loss)
if ( !value_reply_received ){
DHTLog.log( "findNode/findValue " + DHTLog.getString( target_contact ) + " -> failed: " + error.getMessage());
router.contactDead( target_contact.getID(), false );
}
}finally{
try{
contacts_to_query_mon.enter();
active_searches[0]--;
}finally{
contacts_to_query_mon.exit();
}
search_sem.release();
}
}
public void
keyBlockRequest(
DHTTransportContact contact,
byte[] request,
byte[] key_signature )
{
// we don't want to kill the contact due to this so indicate that
// it is ok by setting the flag
if ( database.keyBlockRequest( null, request, key_signature ) != null ){
key_blocked[0] = true;
}
}
};
router.recordLookup( lookup_id );
if ( value_search ){
int rem = max_values - values_found[0];
if ( rem <= 0 ){
Debug.out( "eh?" );
rem = 1;
}
closest.sendFindValue( handler, lookup_id, rem, flags );
}else{
closest.sendFindNode( handler, lookup_id );
}
}finally{
contacts_to_query_mon.exit();
}
}
// maybe unterminated searches still going on so protect ourselves
// against concurrent modification of result set
List closest_res = null;
try{
contacts_to_query_mon.enter();
if ( DHTLog.isOn()){
DHTLog.log( "lookup complete for " + DHTLog.getString( lookup_id ));
DHTLog.log( " queried = " + DHTLog.getString( contacts_queried ));
DHTLog.log( " to query = " + DHTLog.getString( contacts_to_query ));
DHTLog.log( " ok = " + DHTLog.getString( ok_contacts ));
}
closest_res = new ArrayList( ok_contacts );
// we need to reverse the list as currently closest is at
// the end
Collections.reverse( closest_res );
if ( timeout <= 0 && !value_search ){
// we can use the results of this to estimate the DHT size
estimateDHTSize( lookup_id, contacts_queried, search_accuracy );
}
}finally{
contacts_to_query_mon.exit();
}
result_handler.closest( closest_res );
}finally{
result_handler.complete( timeout_occurred );
}
}
// Request methods
public void
pingRequest(
DHTTransportContact originating_contact )
{
DHTLog.log( "pingRequest from " + DHTLog.getString( originating_contact.getID()));
router.contactAlive( originating_contact.getID(), new DHTControlContactImpl(originating_contact));
}
public void
keyBlockRequest(
DHTTransportContact originating_contact,
byte[] request,
byte[] sig )
{
DHTLog.log( "keyBlockRequest from " + DHTLog.getString( originating_contact.getID()));
router.contactAlive( originating_contact.getID(), new DHTControlContactImpl(originating_contact));
database.keyBlockRequest( originating_contact, request, sig );
}
public DHTTransportStoreReply
storeRequest(
DHTTransportContact originating_contact,
byte[][] keys,
DHTTransportValue[][] value_sets )
{
router.contactAlive( originating_contact.getID(), new DHTControlContactImpl(originating_contact));
DHTLog.log( "storeRequest from " + DHTLog.getString( originating_contact )+ ", keys = " + keys.length );
byte[] diverse_res = new byte[ keys.length];
Arrays.fill( diverse_res, DHT.DT_NONE );
if ( keys.length != value_sets.length ){
Debug.out( "DHTControl:storeRequest - invalid request received from " + originating_contact.getString() + ", keys and values length mismatch");
return( new DHTTransportStoreReplyImpl( diverse_res ));
}
// System.out.println( "storeRequest: received " + originating_contact.getRandomID() + " from " + originating_contact.getAddress());
DHTStorageBlock blocked_details = null;
for (int i=0;i<keys.length;i++){
HashWrapper key = new HashWrapper( keys[i] );
DHTTransportValue[] values = value_sets[i];
DHTLog.log( " key=" + DHTLog.getString(key) + ", value=" + DHTLog.getString(values));
diverse_res[i] = database.store( originating_contact, key, values );
if ( blocked_details == null ){
blocked_details = database.getKeyBlockDetails( keys[i] );
}
}
// fortunately we can get away with this as diversifications are only taken note of by initial, single value stores
// and not by the multi-value cache forwards...
if ( blocked_details == null ){
return( new DHTTransportStoreReplyImpl( diverse_res ));
}else{
return( new DHTTransportStoreReplyImpl( blocked_details.getRequest(), blocked_details.getCertificate()));
}
}
public DHTTransportContact[]
findNodeRequest(
DHTTransportContact originating_contact,
byte[] id )
{
DHTLog.log( "findNodeRequest from " + DHTLog.getString( originating_contact.getID()));
router.contactAlive( originating_contact.getID(), new DHTControlContactImpl(originating_contact));
List l;
if ( id.length == router.getID().length ){
l = getClosestKContactsList( id, false );
}else{
// this helps both protect against idiot queries and also saved bytes when we use findNode
// to just get a random ID prior to cache-forwards
l = new ArrayList();
}
final DHTTransportContact[] res = new DHTTransportContact[l.size()];
l.toArray( res );
int rand = generateSpoofID( originating_contact );
originating_contact.setRandomID( rand );
return( res );
}
public DHTTransportFindValueReply
findValueRequest(
DHTTransportContact originating_contact,
byte[] key,
int max_values,
byte flags )
{
DHTLog.log( "findValueRequest from " + DHTLog.getString( originating_contact.getID()));
DHTDBLookupResult result = database.get( originating_contact, new HashWrapper( key ), max_values, flags, true );
if ( result != null ){
router.contactAlive( originating_contact.getID(), new DHTControlContactImpl(originating_contact));
DHTStorageBlock block_details = database.getKeyBlockDetails( key );
if ( block_details == null ){
return( new DHTTransportFindValueReplyImpl( result.getDiversificationType(), result.getValues()));
}else{
return( new DHTTransportFindValueReplyImpl( block_details.getRequest(), block_details.getCertificate()));
}
}else{
return( new DHTTransportFindValueReplyImpl( findNodeRequest( originating_contact, key )));
}
}
public DHTTransportFullStats
statsRequest(
DHTTransportContact contact )
{
return( stats );
}
protected void
requestPing(
DHTRouterContact contact )
{
((DHTControlContactImpl)contact.getAttachment()).getTransportContact().sendPing(
new DHTTransportReplyHandlerAdapter()
{
public void
pingReply(
DHTTransportContact _contact )
{
DHTLog.log( "ping OK " + DHTLog.getString( _contact ));
router.contactAlive( _contact.getID(), new DHTControlContactImpl(_contact));
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
DHTLog.log( "ping " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
router.contactDead( _contact.getID(), false );
}
});
}
protected void
nodeAddedToRouter(
DHTRouterContact new_contact )
{
// ignore ourselves
if ( router.isID( new_contact.getID())){
return;
}
// when a new node is added we must check to see if we need to transfer
// any of our values to it.
Map keys_to_store = new HashMap();
DHTStorageBlock[] direct_key_blocks = database.getDirectKeyBlocks();
if ( database.isEmpty() && direct_key_blocks.length == 0 ){
// nothing to do, ping it if it isn't known to be alive
if ( !new_contact.hasBeenAlive()){
requestPing( new_contact );
}
return;
}
// see if we're one of the K closest to the new node
List closest_contacts = getClosestKContactsList( new_contact.getID(), false );
boolean close = false;
for (int i=0;i<closest_contacts.size();i++){
if ( router.isID(((DHTTransportContact)closest_contacts.get(i)).getID())){
close = true;
break;
}
}
if ( !close ){
if ( !new_contact.hasBeenAlive()){
requestPing( new_contact );
}
return;
}
// ok, we're close enough to worry about transferring values
Iterator it = database.getKeys();
while( it.hasNext()){
HashWrapper key = (HashWrapper)it.next();
byte[] encoded_key = key.getHash();
if ( database.isKeyBlocked( encoded_key )){
continue;
}
DHTDBLookupResult result = database.get( null, key, 0, (byte)0, false );
if ( result == null ){
// deleted in the meantime
continue;
}
// even if a result has been diversified we continue to maintain the base value set
// until the original publisher picks up the diversification (next publish period) and
// publishes to the correct place
DHTDBValue[] values = result.getValues();
List values_to_store = new ArrayList();
for (int i=0;i<values.length;i++){
DHTDBValue value = values[i];
// we don't consider any cached further away than the initial location, for transfer
// however, we *do* include ones we originate as, if we're the closest, we have to
// take responsibility for xfer (as others won't)
List sorted_contacts = getClosestKContactsList( encoded_key, false );
// if we're closest to the key, or the new node is closest and
// we're second closest, then we take responsibility for storing
// the value
boolean store_it = false;
if ( sorted_contacts.size() > 0 ){
DHTTransportContact first = (DHTTransportContact)sorted_contacts.get(0);
if ( router.isID( first.getID())){
store_it = true;
}else if ( Arrays.equals( first.getID(), new_contact.getID()) && sorted_contacts.size() > 1 ){
store_it = router.isID(((DHTTransportContact)sorted_contacts.get(1)).getID());
}
}
if ( store_it ){
values_to_store.add( value );
}
}
if ( values_to_store.size() > 0 ){
keys_to_store.put( key, values_to_store );
}
}
final DHTTransportContact t_contact = ((DHTControlContactImpl)new_contact.getAttachment()).getTransportContact();
final boolean[] anti_spoof_done = { false };
if ( keys_to_store.size() > 0 ){
it = keys_to_store.entrySet().iterator();
final byte[][] keys = new byte[keys_to_store.size()][];
final DHTTransportValue[][] value_sets = new DHTTransportValue[keys.length][];
int index = 0;
while( it.hasNext()){
Map.Entry entry = (Map.Entry)it.next();
HashWrapper key = (HashWrapper)entry.getKey();
List values = (List)entry.getValue();
keys[index] = key.getHash();
value_sets[index] = new DHTTransportValue[values.size()];
for (int i=0;i<values.size();i++){
value_sets[index][i] = ((DHTDBValue)values.get(i)).getValueForRelay( local_contact );
}
index++;
}
// move to anti-spoof for cache forwards. we gotta do a findNode to update the
// contact's latest random id
t_contact.sendFindNode(
new DHTTransportReplyHandlerAdapter()
{
public void
findNodeReply(
DHTTransportContact contact,
DHTTransportContact[] contacts )
{
// System.out.println( "nodeAdded: pre-store findNode OK" );
anti_spoof_done[0] = true;
t_contact.sendStore(
new DHTTransportReplyHandlerAdapter()
{
public void
storeReply(
DHTTransportContact _contact,
byte[] _diversifications )
{
// System.out.println( "nodeAdded: store OK" );
// don't consider diversifications for node additions as they're not interested
// in getting values from us, they need to get them from nodes 'near' to the
// diversification targets or the originator
DHTLog.log( "add store ok" );
router.contactAlive( _contact.getID(), new DHTControlContactImpl(_contact));
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
// System.out.println( "nodeAdded: store Failed" );
DHTLog.log( "add store failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
router.contactDead( _contact.getID(), false);
}
public void
keyBlockRequest(
DHTTransportContact contact,
byte[] request,
byte[] signature )
{
database.keyBlockRequest( null, request, signature );
}
},
keys,
value_sets );
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
// System.out.println( "nodeAdded: pre-store findNode Failed" );
DHTLog.log( "pre-store findNode failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
router.contactDead( _contact.getID(), false);
}
},
t_contact.getProtocolVersion() >= DHTTransportUDP.PROTOCOL_VERSION_ANTI_SPOOF2?new byte[0]:new byte[20] );
}else{
if ( !new_contact.hasBeenAlive()){
requestPing( new_contact );
}
}
// finally transfer any key-blocks
if ( t_contact.getProtocolVersion() >= DHTTransportUDP.PROTOCOL_VERSION_BLOCK_KEYS ){
for (int i=0;i<direct_key_blocks.length;i++){
final DHTStorageBlock key_block = direct_key_blocks[i];
List contacts = getClosestKContactsList( key_block.getKey(), false );
boolean forward_it = false;
// ensure that the key is close enough to us
for (int j=0;j<contacts.size();j++){
final DHTTransportContact contact = (DHTTransportContact)contacts.get(j);
if ( router.isID( contact.getID())){
forward_it = true;
break;
}
}
if ( !forward_it || key_block.hasBeenSentTo( t_contact )){
continue;
}
final Runnable task =
new Runnable()
{
public void
run()
{
t_contact.sendKeyBlock(
new DHTTransportReplyHandlerAdapter()
{
public void
keyBlockReply(
DHTTransportContact _contact )
{
DHTLog.log( "key block forward ok " + DHTLog.getString( _contact ));
key_block.sentTo( _contact );
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
DHTLog.log( "key block forward failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
}
},
key_block.getRequest(),
key_block.getCertificate());
}
};
if ( anti_spoof_done[0] ){
task.run();
}else{
t_contact.sendFindNode(
new DHTTransportReplyHandlerAdapter()
{
public void
findNodeReply(
DHTTransportContact contact,
DHTTransportContact[] contacts )
{
task.run();
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
// System.out.println( "nodeAdded: pre-store findNode Failed" );
DHTLog.log( "pre-kb findNode failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
router.contactDead( _contact.getID(), false);
}
},
t_contact.getProtocolVersion() >= DHTTransportUDP.PROTOCOL_VERSION_ANTI_SPOOF2?new byte[0]:new byte[20] );
}
}
}
}
protected Set
getClosestContactsSet(
byte[] id,
boolean live_only )
{
List l = router.findClosestContacts( id, live_only );
Set sorted_set = new sortedTransportContactSet( id, true ).getSet();
for (int i=0;i<l.size();i++){
sorted_set.add(((DHTControlContactImpl)((DHTRouterContact)l.get(i)).getAttachment()).getTransportContact());
}
return( sorted_set );
}
public List
getClosestKContactsList(
byte[] id,
boolean live_only )
{
Set sorted_set = getClosestContactsSet( id, live_only );
List res = new ArrayList(K);
Iterator it = sorted_set.iterator();
while( it.hasNext() && res.size() < K ){
res.add( it.next());
}
return( res );
}
protected byte[]
encodeKey(
byte[] key )
{
byte[] temp = new SHA1Simple().calculateHash( key );
byte[] result = new byte[node_id_byte_count];
System.arraycopy( temp, 0, result, 0, node_id_byte_count );
return( result );
}
public int
computeAndCompareDistances(
byte[] t1,
byte[] t2,
byte[] pivot )
{
return( computeAndCompareDistances2( t1, t2, pivot ));
}
protected static int
computeAndCompareDistances2(
byte[] t1,
byte[] t2,
byte[] pivot )
{
for (int i=0;i<t1.length;i++){
byte d1 = (byte)( t1[i] ^ pivot[i] );
byte d2 = (byte)( t2[i] ^ pivot[i] );
int diff = (d1&0xff) - (d2&0xff);
if ( diff != 0 ){
return( diff );
}
}
return( 0 );
}
public byte[]
computeDistance(
byte[] n1,
byte[] n2 )
{
return( computeDistance2( n1, n2 ));
}
protected static byte[]
computeDistance2(
byte[] n1,
byte[] n2 )
{
byte[] res = new byte[n1.length];
for (int i=0;i<res.length;i++){
res[i] = (byte)( n1[i] ^ n2[i] );
}
return( res );
}
/**
* -ve -> n1 < n2
* @param n1
* @param n2
* @return
*/
public int
compareDistances(
byte[] n1,
byte[] n2 )
{
return( compareDistances2( n1,n2 ));
}
protected static int
compareDistances2(
byte[] n1,
byte[] n2 )
{
for (int i=0;i<n1.length;i++){
int diff = (n1[i]&0xff) - (n2[i]&0xff);
if ( diff != 0 ){
return( diff );
}
}
return( 0 );
}
public void
addListener(
DHTControlListener l )
{
try{
activity_mon.enter();
listeners.addListener( l );
for (int i=0;i<activities.size();i++){
listeners.dispatch( DHTControlListener.CT_ADDED, activities.get(i));
}
}finally{
activity_mon.exit();
}
}
public void
removeListener(
DHTControlListener l )
{
listeners.removeListener( l );
}
public DHTControlActivity[]
getActivities()
{
List res;
try{
activity_mon.enter();
res = new ArrayList( activities );
}finally{
activity_mon.exit();
}
DHTControlActivity[] x = new DHTControlActivity[res.size()];
res.toArray( x );
return( x );
}
public void
setTransportEstimatedDHTSize(
int size )
{
if ( size > 0 ){
try{
estimate_mon.enter();
remote_estimate_values.add( new Integer( size ));
if ( remote_estimate_values.size() > REMOTE_ESTIMATE_HISTORY ){
remote_estimate_values.remove(0);
}
}finally{
estimate_mon.exit();
}
}
// System.out.println( "estimated dht size: " + size );
}
public int
getTransportEstimatedDHTSize()
{
return((int)local_dht_estimate );
}
public int
getEstimatedDHTSize()
{
// public method, trigger actual computation periodically
long now = SystemTime.getCurrentTime();
long diff = now - last_dht_estimate_time;
if ( diff < 0 || diff > 60*1000 ){
estimateDHTSize( router.getID(), null, router.getK());
}
return((int)combined_dht_estimate );
}
protected void
estimateDHTSize(
byte[] id,
Map contacts,
int contacts_to_use )
{
// if called with contacts then this is in internal estimation based on lookup values
long now = SystemTime.getCurrentTime();
long diff = now - last_dht_estimate_time;
// 5 second limiter here
if ( diff < 0 || diff > 5*1000 ){
try{
estimate_mon.enter();
last_dht_estimate_time = now;
List l;
if ( contacts == null ){
l = getClosestKContactsList( id, false );
}else{
Set sorted_set = new sortedTransportContactSet( id, true ).getSet();
sorted_set.addAll( contacts.values());
l = new ArrayList( sorted_set );
if ( l.size() > 0 ){
// algorithm works relative to a starting point in the ID space so we grab
// the first here rather than using the initial lookup target
id = ((DHTTransportContact)l.get(0)).getID();
}
/*
String str = "";
for (int i=0;i<l.size();i++){
str += (i==0?"":",") + DHTLog.getString2( ((DHTTransportContact)l.get(i)).getID());
}
System.out.println( "trace: " + str );
*/
}
// can't estimate with less than 2
if ( l.size() > 2 ){
/*
<Gudy> if you call N0 yourself, N1 the nearest peer, N2 the 2nd nearest peer ... Np the pth nearest peer that you know (for example, N1 .. N20)
<Gudy> and if you call D1 the Kad distance between you and N1, D2 between you and N2 ...
<Gudy> then you have to compute :
<Gudy> Dc = sum(i * Di) / sum( i * i)
<Gudy> and then :
<Gudy> NbPeers = 2^160 / Dc
*/
BigInteger sum1 = new BigInteger("0");
BigInteger sum2 = new BigInteger("0");
// first entry should be us
for (int i=1;i<Math.min( l.size(), contacts_to_use );i++){
DHTTransportContact node = (DHTTransportContact)l.get(i);
byte[] dist = computeDistance( id, node.getID());
BigInteger b_dist = IDToBigInteger( dist );
BigInteger b_i = new BigInteger(""+i);
sum1 = sum1.add( b_i.multiply(b_dist));
sum2 = sum2.add( b_i.multiply( b_i ));
}
byte[] max = new byte[id.length+1];
max[0] = 0x01;
long this_estimate;
if ( sum1.compareTo( new BigInteger("0")) == 0 ){
this_estimate = 0;
}else{
this_estimate = IDToBigInteger(max).multiply( sum2 ).divide( sum1 ).longValue();
}
// there's always us!!!!
if ( this_estimate < 1 ){
this_estimate = 1;
}
local_estimate_values.put( new HashWrapper( id ), new Long( this_estimate ));
long new_estimate = 0;
Iterator it = local_estimate_values.values().iterator();
String sizes = "";
while( it.hasNext()){
long estimate = ((Long)it.next()).longValue();
sizes += (sizes.length()==0?"":",") + estimate;
new_estimate += estimate;
}
local_dht_estimate = new_estimate/local_estimate_values.size();
// System.out.println( "getEstimatedDHTSize: " + sizes + "->" + dht_estimate + " (id=" + DHTLog.getString2(id) + ",cont=" + (contacts==null?"null":(""+contacts.size())) + ",use=" + contacts_to_use );
}
List rems = new ArrayList(new TreeSet( remote_estimate_values ));
// ignore largest and smallest few values
long rem_average = local_dht_estimate;
int rem_vals = 1;
for (int i=3;i<rems.size()-3;i++){
rem_average += ((Integer)rems.get(i)).intValue();
rem_vals++;
}
combined_dht_estimate = rem_average / rem_vals;
// System.out.println( "estimateDHTSize: loc =" + local_dht_estimate + ", comb = " + combined_dht_estimate + " [" + remote_estimate_values.size() + "]");
}finally{
estimate_mon.exit();
}
}
}
protected BigInteger
IDToBigInteger(
byte[] data )
{
String str_key = "";
for (int i=0;i<data.length;i++){
String hex = Integer.toHexString( data[i]&0xff );
while( hex.length() < 2 ){
hex = "0" + hex;
}
str_key += hex;
}
BigInteger res = new BigInteger( str_key, 16 );
return( res );
}
protected int
generateSpoofID(
DHTTransportContact contact )
{
if ( spoof_cipher == null ){
return( 0 );
}
try{
spoof_mon.enter();
spoof_cipher.init(Cipher.ENCRYPT_MODE, spoof_key );
byte[] address = contact.getAddress().getAddress().getAddress();
byte[] data_out = spoof_cipher.doFinal( address );
int res = (data_out[0]<<24)&0xff000000 |
(data_out[1] << 16)&0x00ff0000 |
(data_out[2] << 8)&0x0000ff00 |
data_out[3]&0x000000ff;
// System.out.println( "anti-spoof: generating " + res + " for " + contact.getAddress());
return( res );
}catch( Throwable e ){
logger.log(e);
}finally{
spoof_mon.exit();
}
return( 0 );
}
public boolean
verifyContact(
DHTTransportContact c,
boolean direct )
{
boolean ok = c.getRandomID() == generateSpoofID( c );
if ( DHTLog.CONTACT_VERIFY_TRACE ){
System.out.println( " net " + transport.getNetwork() +"," + (direct?"direct":"indirect") + " verify for " + c.getName() + " -> " + ok + ", version = " + c.getProtocolVersion());
}
return( ok );
}
public List
getContacts()
{
List contacts = router.getAllContacts();
List res = new ArrayList( contacts.size());
for (int i=0;i<contacts.size();i++){
DHTRouterContact rc = (DHTRouterContact)contacts.get(i);
res.add( rc.getAttachment());
}
return( res );
}
public void
print()
{
DHTNetworkPosition[] nps = transport.getLocalContact().getNetworkPositions();
String np_str = "";
for (int j=0;j<nps.length;j++){
np_str += (j==0?"":",") + nps[j];
}
logger.log( "DHT Details: external IP = " + transport.getLocalContact().getAddress() +
", network = " + transport.getNetwork() +
", protocol = V" + transport.getProtocolVersion() +
", nps = " + np_str );
router.print();
database.print();
/*
List c = getContacts();
for (int i=0;i<c.size();i++){
DHTControlContact cc = (DHTControlContact)c.get(i);
System.out.println( " " + cc.getTransportContact().getVivaldiPosition());
}
*/
}
public List
sortContactsByDistance(
List contacts )
{
Set sorted_contacts = new sortedTransportContactSet( router.getID(), true ).getSet();
sorted_contacts.addAll( contacts );
return( new ArrayList( sorted_contacts ));
}
protected static class
sortedTransportContactSet
{
private TreeSet tree_set;
private byte[] pivot;
private boolean ascending;
protected
sortedTransportContactSet(
byte[] _pivot,
boolean _ascending )
{
pivot = _pivot;
ascending = _ascending;
tree_set = new TreeSet(
new Comparator()
{
public int
compare(
Object o1,
Object o2 )
{
// this comparator ensures that the closest to the key
// is first in the iterator traversal
DHTTransportContact t1 = (DHTTransportContact)o1;
DHTTransportContact t2 = (DHTTransportContact)o2;
int distance = computeAndCompareDistances2( t1.getID(), t2.getID(), pivot );
if ( ascending ){
return( distance );
}else{
return( -distance );
}
}
});
}
public Set
getSet()
{
return( tree_set );
}
}
protected static class
DHTOperationListenerDemuxer
implements DHTOperationListener
{
private AEMonitor this_mon = new AEMonitor( "DHTOperationListenerDemuxer" );
private DHTOperationListener delegate;
private boolean complete_fired;
private boolean complete_included_ok;
private int complete_count = 0;
protected
DHTOperationListenerDemuxer(
DHTOperationListener _delegate )
{
delegate = _delegate;
if ( delegate == null ){
Debug.out( "invalid: null delegate" );
}
}
public void
incrementCompletes()
{
try{
this_mon.enter();
complete_count++;
}finally{
this_mon.exit();
}
}
public void
searching(
DHTTransportContact contact,
int level,
int active_searches )
{
delegate.searching( contact, level, active_searches );
}
public void
diversified()
{
delegate.diversified();
}
public void
found(
DHTTransportContact contact )
{
delegate.found( contact );
}
public void
read(
DHTTransportContact contact,
DHTTransportValue value )
{
delegate.read( contact, value );
}
public void
wrote(
DHTTransportContact contact,
DHTTransportValue value )
{
delegate.wrote( contact, value );
}
public void
complete(
boolean timeout )
{
boolean fire = false;
try{
this_mon.enter();
if ( !timeout ){
complete_included_ok = true;
}
complete_count--;
if (complete_count <= 0 && !complete_fired ){
complete_fired = true;
fire = true;
}
}finally{
this_mon.exit();
}
if ( fire ){
delegate.complete( !complete_included_ok );
}
}
}
abstract static class
lookupResultHandler
extends DHTOperationListenerDemuxer
{
protected
lookupResultHandler(
DHTOperationListener delegate )
{
super( delegate );
}
public abstract void
closest(
List res );
public abstract void
diversify(
DHTTransportContact cause,
byte diversification_type );
}
protected static class
DHTTransportFindValueReplyImpl
implements DHTTransportFindValueReply
{
private byte dt = DHT.DT_NONE;
private DHTTransportValue[] values;
private DHTTransportContact[] contacts;
private byte[] blocked_key;
private byte[] blocked_sig;
protected
DHTTransportFindValueReplyImpl(
byte _dt,
DHTTransportValue[] _values )
{
dt = _dt;
values = _values;
}
protected
DHTTransportFindValueReplyImpl(
DHTTransportContact[] _contacts )
{
contacts = _contacts;
}
protected
DHTTransportFindValueReplyImpl(
byte[] _blocked_key,
byte[] _blocked_sig )
{
blocked_key = _blocked_key;
blocked_sig = _blocked_sig;
}
public byte
getDiversificationType()
{
return( dt );
}
public boolean
hit()
{
return( values != null );
}
public boolean
blocked()
{
return( blocked_key != null );
}
public DHTTransportValue[]
getValues()
{
return( values );
}
public DHTTransportContact[]
getContacts()
{
return( contacts );
}
public byte[]
getBlockedKey()
{
return( blocked_key );
}
public byte[]
getBlockedSignature()
{
return( blocked_sig );
}
}
protected static class
DHTTransportStoreReplyImpl
implements DHTTransportStoreReply
{
private byte[] divs;
private byte[] block_request;
private byte[] block_sig;
protected
DHTTransportStoreReplyImpl(
byte[] _divs )
{
divs = _divs;
}
protected
DHTTransportStoreReplyImpl(
byte[] _bk,
byte[] _bs )
{
block_request = _bk;
block_sig = _bs;
}
public byte[]
getDiversificationTypes()
{
return( divs );
}
public boolean
blocked()
{
return( block_request != null );
}
public byte[]
getBlockRequest()
{
return( block_request );
}
public byte[]
getBlockSignature()
{
return( block_sig );
}
}
protected abstract class
task
extends ThreadPoolTask
{
private controlActivity activity;
protected
task(
ThreadPool thread_pool )
{
activity = new controlActivity( thread_pool, this );
try{
activity_mon.enter();
activities.add( activity );
listeners.dispatch( DHTControlListener.CT_ADDED, activity );
// System.out.println( "activity added:" + activities.size());
}finally{
activity_mon.exit();
}
}
public void
taskStarted()
{
listeners.dispatch( DHTControlListener.CT_CHANGED, activity );
//System.out.println( "activity changed:" + activities.size());
}
public void
taskCompleted()
{
try{
activity_mon.enter();
activities.remove( activity );
listeners.dispatch( DHTControlListener.CT_REMOVED, activity );
// System.out.println( "activity removed:" + activities.size());
}finally{
activity_mon.exit();
}
}
public void
interruptTask()
{
}
public abstract byte[]
getTarget();
public abstract String
getDescription();
}
protected class
controlActivity
implements DHTControlActivity
{
protected ThreadPool tp;
protected task task;
protected int type;
protected
controlActivity(
ThreadPool _tp,
task _task )
{
tp = _tp;
task = _task;
if ( _tp == internal_lookup_pool ){
type = DHTControlActivity.AT_INTERNAL_GET;
}else if ( _tp == external_lookup_pool ){
type = DHTControlActivity.AT_EXTERNAL_GET;
}else if ( _tp == internal_put_pool ){
type = DHTControlActivity.AT_INTERNAL_PUT;
}else{
type = DHTControlActivity.AT_EXTERNAL_PUT;
}
}
public byte[]
getTarget()
{
return( task.getTarget());
}
public String
getDescription()
{
return( task.getDescription());
}
public int
getType()
{
return( type );
}
public boolean
isQueued()
{
return( tp.isQueued( task ));
}
public String
getString()
{
return( type + ":" + DHTLog.getString( getTarget()) + "/" + getDescription() + ", q = " + isQueued());
}
}
}
|