DHTDBMapping.javaAPI DocAzureus May 23 05:34:14 BST 2006com.aelitis.azureus.core.dht.db.impl


public class DHTDBMapping extends Object

Fields Summary
private static final boolean
private DHTDBImpl
private org.gudy.azureus2.core3.util.HashWrapper
private DHTStorageKey
private Map
private Map
private int
private int
private int
private int
private byte
private static final int
private com.aelitis.azureus.core.util.bloom.BloomFilter
Constructors Summary
protected DHTDBMapping(DHTDBImpl _db, org.gudy.azureus2.core3.util.HashWrapper _key, boolean _local)

		db			= _db;
		key			= _key;
			if ( db.getAdapter() != null ){
				adapter_key = db.getAdapter().keyCreated( key, _local );
				if ( adapter_key != null ){
					diversification_state	= adapter_key.getDiversificationType();
		}catch( Throwable e ){
Methods Summary
protected voidadd(DHTDBValueImpl new_value)

		// don't replace a closer cache value with a further away one. in particular
		// we have to avoid the case where the original publisher of a key happens to
		// be close to it and be asked by another node to cache it!

		DHTTransportContact	originator 		= new_value.getOriginator();
		DHTTransportContact	sender 			= new_value.getSender();

		HashWrapper	originator_id = new HashWrapper( originator.getID());
		boolean	direct = Arrays.equals( originator.getID(), sender.getID());
		if ( direct ){
				// direct contact from the originator is straight forward
			addDirectValue( originator_id, new_value );
				// remove any indirect values we might already have for this
			Iterator	it = indirect_originator_value_map.entrySet().iterator();
			List	to_remove = new ArrayList();
			while( it.hasNext()){
				Map.Entry	entry = (Map.Entry);
				HashWrapper		existing_key		= (HashWrapper)entry.getKey();
				DHTDBValueImpl	existing_value	= (DHTDBValueImpl)entry.getValue();
				if ( Arrays.equals( existing_value.getOriginator().getID(), originator.getID())){
					to_remove.add( existing_key );
			for (int i=0;i<to_remove.size();i++){
				// not direct. if we have a value already for this originator then
				// we drop the value as the originator originated one takes precedence
			if ( direct_originator_map.get( originator_id ) != null ){
				// rule (b) - one entry per originator/value pair
			HashWrapper	originator_value_id = getOriginatorValueID( new_value );

			DHTDBValueImpl existing_value = (DHTDBValueImpl)indirect_originator_value_map.get( originator_value_id );
			if ( existing_value != null ){
				addIndirectValue( originator_value_id, new_value );
					//System.out.println( "    replacing existing" );
					// only add new values if not diversified
				if ( diversification_state == DHT.DT_NONE ){
					addIndirectValue( originator_value_id, new_value );
protected voidaddDirectValue(org.gudy.azureus2.core3.util.HashWrapper value_key, DHTDBValueImpl value)

		DHTDBValueImpl	old = (DHTDBValueImpl)direct_originator_map.put( value_key, value );
		if ( old != null ){
			int	old_version = old.getVersion();
			int new_version = value.getVersion();
			if ( old_version != -1 && new_version != -1 && old_version >= new_version ){
				if ( old_version == new_version ){
					if ( TRACE_ADDS ){
						System.out.println( "addDirect[reset]:" + old.getString() + "/" + value.getString());

					old.reset();	// update store time as this means we don't need to republish
									// as someone else has just done it
						// its important to ignore old versions as a peer's increasing version sequence may
						// have been reset and if this is the case we want the "future" values to timeout
					if ( TRACE_ADDS ){
						System.out.println( "addDirect[ignore]:" + old.getString() + "/" + value.getString());
					// put the old value back!
				direct_originator_map.put( value_key, old );
			if ( TRACE_ADDS ){
				System.out.println( "addDirect:" + old.getString() + "/" + value.getString());
			direct_data_size -= old.getValue().length;
			if ( old.isLocal()){
				local_size -= old.getValue().length;
			if ( TRACE_ADDS ){
				System.out.println( "addDirect:[new]" +  value.getString());
		direct_data_size += value.getValue().length;
		if ( value.isLocal()){
			local_size += value.getValue().length;
		if ( old == null ){
			informAdded( value );
			informUpdated( old, value );
protected voidaddHit()

protected voidaddIndirectValue(org.gudy.azureus2.core3.util.HashWrapper value_key, DHTDBValueImpl value)

		DHTDBValueImpl	old = (DHTDBValueImpl)indirect_originator_value_map.put( value_key, value );
		if ( old != null ){
				// discard updates that are older than current value
			int	old_version = old.getVersion();
			int new_version = value.getVersion();
			if ( old_version != -1 && new_version != -1 && old_version >= new_version ){
				if ( old_version == new_version ){

					if ( TRACE_ADDS ){
						System.out.println( "addIndirect[reset]:" + old.getString() + "/" + value.getString());

					old.reset();	// update store time as this means we don't need to republish
									// as someone else has just done it
					if ( TRACE_ADDS ){
						System.out.println( "addIndirect[ignore]:" + old.getString() + "/" + value.getString());
					// put the old value back!
				indirect_originator_value_map.put( value_key, old );
			// vague backwards compatability - if the creation date of the "new" value is significantly
			// less than the old then we ignore it (given that creation date is adjusted for time-skew you can
			// see the problem with this approach...)
			if ( old_version == -1 || new_version == -1 ){
				if ( old.getCreationTime() > value.getCreationTime() + 30000 ){
					if ( TRACE_ADDS ){
						System.out.println( "backward compat: ignoring store: " + old.getString() + "/" + value.getString());
					// put the old value back!
					indirect_originator_value_map.put( value_key, old );

			if ( TRACE_ADDS ){
				System.out.println( "addIndirect:" + old.getString() + "/" + value.getString());
			indirect_data_size -= old.getValue().length;
			if ( old.isLocal()){
				local_size -= old.getValue().length;
			if ( TRACE_ADDS ){
				System.out.println( "addIndirect:[new]" +  value.getString());
		indirect_data_size += value.getValue().length;
		if ( value.isLocal()){
			local_size += value.getValue().length;
		if ( old == null ){
			informAdded( value );
			informUpdated( old, value );
protected voidaddToBloom(DHTDBValueImpl value)

		// we don't check for flooding on indirect stores as this could be used to force a
		// direct store to be bounced (flood a node with indirect stores before the direct
		// store occurs)

		DHTTransportContact	originator = value.getOriginator();
		int	hit_count = ip_count_bloom_filter.add( originator.getAddress().getAddress().getAddress());
			System.out.println( "direct local add from " + originator.getAddress() + ", hit count = " + hit_count );

			// allow up to 10% bloom filter utilisation
		if ( ip_count_bloom_filter.getSize() / ip_count_bloom_filter.getEntryCount() < 10 ){
			rebuildIPBloomFilter( true );
		if ( hit_count >= 15 ){
			db.banContact( originator, "local flood on '" + DHTLog.getFullString( key.getBytes()) + "'" );
protected voiddestroy()

			if ( adapter_key != null ){
				Iterator	it = getValues();
				while( it.hasNext()){
				db.getAdapter().keyDeleted( adapter_key );
		}catch( Throwable e ){
protected DHTDBValueImpl[]get(com.aelitis.azureus.core.dht.transport.DHTTransportContact by_who, int max, byte flags)

		if ((flags & DHT.FLAG_STATS) != 0 ){
			if ( adapter_key != null ){
					ByteArrayOutputStream	baos = new ByteArrayOutputStream(64);
					DataOutputStream	dos = new DataOutputStream( baos );
					adapter_key.serialiseStats( dos );
						new DHTDBValueImpl[]{
							new DHTDBValueImpl(
								DHT.FLAG_STATS )});
				}catch( Throwable e ){
			return( new DHTDBValueImpl[0] );
		List	res 		= new ArrayList();
		Set		duplicate_check = new HashSet();
		Map[]	maps = new Map[]{ direct_originator_map, indirect_originator_value_map };
		for (int i=0;i<maps.length;i++){
			List	keys_used 	= new ArrayList();

			Map			map	= maps[i];
			Iterator	it = map.entrySet().iterator();
			while( it.hasNext() && ( max==0 || res.size()< max )){
				Map.Entry	entry = (Map.Entry);
				HashWrapper		entry_key		= (HashWrapper)entry.getKey();
				DHTDBValueImpl	entry_value = (DHTDBValueImpl)entry.getValue();
				HashWrapper	x = new HashWrapper( entry_value.getValue());
				if ( duplicate_check.contains( x )){
				duplicate_check.add( x );
					// zero length values imply deleted values so don't return them
				if ( entry_value.getValue().length > 0 ){
					res.add( entry_value );
					keys_used.add( entry_key );
				// now update the access order so values get cycled
			for (int j=0;j<keys_used.size();j++){
				map.get( keys_used.get(j));
		informRead( by_who );
		DHTDBValueImpl[]	v = new DHTDBValueImpl[res.size()];
		res.toArray( v );
		return( v );
protected DHTDBValueImplget(com.aelitis.azureus.core.dht.transport.DHTTransportContact originator)

			// local get
		HashWrapper originator_id = new HashWrapper( originator.getID());
		DHTDBValueImpl	res = (DHTDBValueImpl)direct_originator_map.get( originator_id );
		return( res );
protected intgetDirectSize()

			// our direct count includes local so remove that here
		return( direct_data_size - local_size );
protected java.util.IteratorgetDirectValues()

		return( new valueIterator( true, false ));
protected bytegetDiversificationType()

		return( diversification_state );
protected intgetHits()

		return( hits );
protected intgetIndirectSize()

		return( indirect_data_size );
protected java.util.IteratorgetIndirectValues()

		return( new valueIterator( false, true ));
protected org.gudy.azureus2.core3.util.HashWrappergetKey()

		return( key );
protected intgetLocalSize()

		return( local_size );
private org.gudy.azureus2.core3.util.HashWrappergetOriginatorValueID(DHTDBValueImpl value)

		DHTTransportContact	originator	= value.getOriginator();
		byte[]	originator_id	= originator.getID();
			// relaxed this due to problems caused by multiple publishes by an originator
			// with the same key but variant values (e.g. seed/peer counts). Seeing as we
			// only accept cache-forwards from contacts that are "close" enough to us to
			// be performing such a forward, the DOS possibilities here are limited (a nasty
			// contact can only trash originator values for things it happens to be close to)
		return( new HashWrapper( originator_id ));
		byte[]	value_bytes 	= value.getValue();

		byte[]	x = new byte[originator_id.length + value_bytes.length];
		System.arraycopy( originator_id, 0, x, 0, originator_id.length );
		System.arraycopy( value_bytes, 0, x, originator_id.length, value_bytes.length );
		HashWrapper	originator_value_id = new HashWrapper( new SHA1Hasher().calculateHash( x ));
		return( originator_value_id );
protected intgetValueCount()

		return( direct_originator_map.size() + indirect_originator_value_map.size());
protected java.util.IteratorgetValues()

		return( new valueIterator( true, true ));
private voidinformAdded(DHTDBValueImpl value)

		boolean	direct = 
			(!value.isLocal()) && 		
			Arrays.equals( value.getOriginator().getID(), value.getSender().getID());
		if ( direct ){
			addToBloom( value );

			if ( adapter_key != null ){
				db.getAdapter().valueAdded( adapter_key, value );
				diversification_state	= adapter_key.getDiversificationType();
		}catch( Throwable e ){
private voidinformDeleted(DHTDBValueImpl value)

		boolean	direct = 
			Arrays.equals( value.getOriginator().getID(), value.getSender().getID());
		if ( direct ){
			removeFromBloom( value );
			if ( adapter_key != null ){
				db.getAdapter().valueDeleted( adapter_key, value );
				diversification_state	= adapter_key.getDiversificationType();
		}catch( Throwable e ){
private voidinformRead(com.aelitis.azureus.core.dht.transport.DHTTransportContact contact)

			if ( adapter_key != null && contact != null ){
				db.getAdapter().keyRead( adapter_key, contact );
				diversification_state	= adapter_key.getDiversificationType();
		}catch( Throwable e ){
private voidinformUpdated(DHTDBValueImpl old_value, DHTDBValueImpl new_value)

		boolean	old_direct = 
			(!old_value.isLocal()) &&				
			Arrays.equals( old_value.getOriginator().getID(), old_value.getSender().getID());
		boolean	new_direct = 
			(!new_value.isLocal()) &&			
			Arrays.equals( new_value.getOriginator().getID(), new_value.getSender().getID());
		if ( new_direct && !old_direct ){
			addToBloom( new_value );
			if ( adapter_key != null ){
				db.getAdapter().valueUpdated( adapter_key, old_value, new_value );
				diversification_state	= adapter_key.getDiversificationType();
		}catch( Throwable e ){
protected voidrebuildIPBloomFilter(boolean increase_size)

		BloomFilter	new_filter;
		if ( increase_size ){
			new_filter = BloomFilterFactory.createAddRemove4Bit( ip_count_bloom_filter.getSize() + IP_COUNT_BLOOM_SIZE_INCREASE_CHUNK );
			new_filter = BloomFilterFactory.createAddRemove4Bit( ip_count_bloom_filter.getSize());
				// only do flood prevention on direct stores as we can't trust the originator
				// details for indirect and this can be used to DOS a direct store later
			Iterator	it = getDirectValues();
			int	max_hits	= 0;
			while( it.hasNext()){
				DHTDBValueImpl	val = (DHTDBValueImpl);
				if ( !val.isLocal()){
					// logger.log( "    adding " + val.getOriginator().getAddress());
					int	hits = new_filter.add( val.getOriginator().getAddress().getAddress().getAddress());
					if ( hits > max_hits ){
						max_hits	= hits;

				db.log( "Rebuilt local IP bloom filter, size = " + new_filter.getSize() + ", entries =" + new_filter.getEntryCount()+", max hits = " + max_hits );

			ip_count_bloom_filter = new_filter;
protected DHTDBValueImplremove(com.aelitis.azureus.core.dht.transport.DHTTransportContact originator)

			// local remove
		HashWrapper originator_id = new HashWrapper( originator.getID());
		DHTDBValueImpl	res = removeDirectValue( originator_id );
		return( res );
protected DHTDBValueImplremoveDirectValue(org.gudy.azureus2.core3.util.HashWrapper value_key)

		DHTDBValueImpl	old = (DHTDBValueImpl)direct_originator_map.remove( value_key );
		if ( old != null ){
			direct_data_size -= old.getValue().length;
			if ( old.isLocal()){
				local_size -= old.getValue().length;
			informDeleted( old );
		return( old );
protected voidremoveFromBloom(DHTDBValueImpl value)

		DHTTransportContact	originator = value.getOriginator();
		int	hit_count = ip_count_bloom_filter.remove( originator.getAddress().getAddress().getAddress());
			System.out.println( "direct local remove from " + originator.getAddress() + ", hit count = " + hit_count );
protected voidremoveIndirectValue(org.gudy.azureus2.core3.util.HashWrapper value_key)

		DHTDBValueImpl	old = (DHTDBValueImpl)indirect_originator_value_map.remove( value_key );
		if ( old != null ){
			indirect_data_size -= old.getValue().length;
			if ( old.isLocal()){
				local_size -= old.getValue().length;
			informDeleted( old );
protected voidupdateLocalContact(com.aelitis.azureus.core.dht.transport.DHTTransportContact contact)

			// pull out all the local values, reset the originator and then
			// re-add them
		List	changed = new ArrayList();
		Iterator	it = direct_originator_map.values().iterator();
		while( it.hasNext()){
			DHTDBValueImpl	value = (DHTDBValueImpl);
			if ( value.isLocal()){
				value.setOriginatorAndSender( contact );
				changed.add( value );
				direct_data_size -= value.getValue().length;

				local_size	-= value.getValue().length;
				informDeleted( value );
		for (int i=0;i<changed.size();i++){