FileDocCategorySizeDatePackage
DHTDBMapping.javaAPI DocAzureus 3.0.3.423286Tue May 23 05:34:14 BST 2006com.aelitis.azureus.core.dht.db.impl

DHTDBMapping

public class DHTDBMapping extends Object
author
parg

Fields Summary
private static final boolean
TRACE_ADDS
private DHTDBImpl
db
private org.gudy.azureus2.core3.util.HashWrapper
key
private DHTStorageKey
adapter_key
private Map
direct_originator_map
private Map
indirect_originator_value_map
private int
hits
private int
direct_data_size
private int
indirect_data_size
private int
local_size
private byte
diversification_state
private static final int
IP_COUNT_BLOOM_SIZE_INCREASE_CHUNK
private com.aelitis.azureus.core.util.bloom.BloomFilter
ip_count_bloom_filter
Constructors Summary
protected DHTDBMapping(DHTDBImpl _db, org.gudy.azureus2.core3.util.HashWrapper _key, boolean _local)


	
	
					
					
						 
	
		db			= _db;
		key			= _key;
		
		try{
			if ( db.getAdapter() != null ){
				
				adapter_key = db.getAdapter().keyCreated( key, _local );
				
				if ( adapter_key != null ){
					
					diversification_state	= adapter_key.getDiversificationType();
				}
			}
		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
		}
	
Methods Summary
protected voidadd(DHTDBValueImpl new_value)

		// don't replace a closer cache value with a further away one. in particular
		// we have to avoid the case where the original publisher of a key happens to
		// be close to it and be asked by another node to cache it!

		DHTTransportContact	originator 		= new_value.getOriginator();
		DHTTransportContact	sender 			= new_value.getSender();

		HashWrapper	originator_id = new HashWrapper( originator.getID());
		
		boolean	direct = Arrays.equals( originator.getID(), sender.getID());
		
		if ( direct ){
			
				// direct contact from the originator is straight forward
			
			addDirectValue( originator_id, new_value );
			
				// remove any indirect values we might already have for this
			
			Iterator	it = indirect_originator_value_map.entrySet().iterator();
			
			List	to_remove = new ArrayList();
			
			while( it.hasNext()){
				
				Map.Entry	entry = (Map.Entry)it.next();
				
				HashWrapper		existing_key		= (HashWrapper)entry.getKey();
				
				DHTDBValueImpl	existing_value	= (DHTDBValueImpl)entry.getValue();
	
				if ( Arrays.equals( existing_value.getOriginator().getID(), originator.getID())){
				
					to_remove.add( existing_key );
				}
			}
			
			for (int i=0;i<to_remove.size();i++){
				
				removeIndirectValue((HashWrapper)to_remove.get(i));
			}
		}else{
			
				// not direct. if we have a value already for this originator then
				// we drop the value as the originator originated one takes precedence
			
			if ( direct_originator_map.get( originator_id ) != null ){
				
				return;
			}
						
				// rule (b) - one entry per originator/value pair
				
			HashWrapper	originator_value_id = getOriginatorValueID( new_value );

			DHTDBValueImpl existing_value = (DHTDBValueImpl)indirect_originator_value_map.get( originator_value_id );
			
			if ( existing_value != null ){
				
				addIndirectValue( originator_value_id, new_value );
					
					//System.out.println( "    replacing existing" );
							
			}else{
			
					// only add new values if not diversified
				
				if ( diversification_state == DHT.DT_NONE ){
				
					addIndirectValue( originator_value_id, new_value );
				}
			}	
		}
	
protected voidaddDirectValue(org.gudy.azureus2.core3.util.HashWrapper value_key, DHTDBValueImpl value)

		DHTDBValueImpl	old = (DHTDBValueImpl)direct_originator_map.put( value_key, value );
				
		if ( old != null ){
			
			int	old_version = old.getVersion();
			int new_version = value.getVersion();
			
			if ( old_version != -1 && new_version != -1 && old_version >= new_version ){
				
				if ( old_version == new_version ){
			
					if ( TRACE_ADDS ){
						System.out.println( "addDirect[reset]:" + old.getString() + "/" + value.getString());
					}

					old.reset();	// update store time as this means we don't need to republish
									// as someone else has just done it
				
				}else{
					
						// its important to ignore old versions as a peer's increasing version sequence may
						// have been reset and if this is the case we want the "future" values to timeout
					
					if ( TRACE_ADDS ){
						System.out.println( "addDirect[ignore]:" + old.getString() + "/" + value.getString());
					}
				}
				
					// put the old value back!
				
				direct_originator_map.put( value_key, old );
				
				return;
			}
			
			if ( TRACE_ADDS ){
				System.out.println( "addDirect:" + old.getString() + "/" + value.getString());
			}
			
			direct_data_size -= old.getValue().length;
			
			if ( old.isLocal()){
				
				local_size -= old.getValue().length;
			}
		}else{
			
			if ( TRACE_ADDS ){
				System.out.println( "addDirect:[new]" +  value.getString());
			}
		}
		
		direct_data_size += value.getValue().length;
		
		if ( value.isLocal()){
			
			local_size += value.getValue().length;
		}
		
		if ( old == null ){
			
			informAdded( value );
			
		}else{
			
			informUpdated( old, value );
		}
	
protected voidaddHit()

		hits++;
	
protected voidaddIndirectValue(org.gudy.azureus2.core3.util.HashWrapper value_key, DHTDBValueImpl value)

		DHTDBValueImpl	old = (DHTDBValueImpl)indirect_originator_value_map.put( value_key, value );
		
		if ( old != null ){
			
				// discard updates that are older than current value
			
			int	old_version = old.getVersion();
			int new_version = value.getVersion();
			
			if ( old_version != -1 && new_version != -1 && old_version >= new_version ){
				
				if ( old_version == new_version ){

					if ( TRACE_ADDS ){
						System.out.println( "addIndirect[reset]:" + old.getString() + "/" + value.getString());
					}

					old.reset();	// update store time as this means we don't need to republish
									// as someone else has just done it
				
				}else{
					
					if ( TRACE_ADDS ){
						System.out.println( "addIndirect[ignore]:" + old.getString() + "/" + value.getString());
					}
				}
				
					// put the old value back!
				
				indirect_originator_value_map.put( value_key, old );
				
				return;
			}
			
			// vague backwards compatability - if the creation date of the "new" value is significantly
			// less than the old then we ignore it (given that creation date is adjusted for time-skew you can
			// see the problem with this approach...)
			
			if ( old_version == -1 || new_version == -1 ){
				
				if ( old.getCreationTime() > value.getCreationTime() + 30000 ){
					
					if ( TRACE_ADDS ){
						System.out.println( "backward compat: ignoring store: " + old.getString() + "/" + value.getString());
					}
					
					// put the old value back!
					
					indirect_originator_value_map.put( value_key, old );

					return;
				}
			}
			
			if ( TRACE_ADDS ){
				System.out.println( "addIndirect:" + old.getString() + "/" + value.getString());
			}
		
			indirect_data_size -= old.getValue().length;
			
			if ( old.isLocal()){
				
				local_size -= old.getValue().length;
			}
		}else{	
			if ( TRACE_ADDS ){
				System.out.println( "addIndirect:[new]" +  value.getString());
			}
		}		
		
		indirect_data_size += value.getValue().length;
		
		if ( value.isLocal()){
			
			local_size += value.getValue().length;
		}
		
		if ( old == null ){
			
			informAdded( value );
			
		}else{
			
			informUpdated( old, value );
		}
	
protected voidaddToBloom(DHTDBValueImpl value)

		// we don't check for flooding on indirect stores as this could be used to force a
		// direct store to be bounced (flood a node with indirect stores before the direct
		// store occurs)
	

		DHTTransportContact	originator = value.getOriginator();
		
		int	hit_count = ip_count_bloom_filter.add( originator.getAddress().getAddress().getAddress());
		
		if ( DHTLog.LOCAL_BLOOM_TRACE ){
		
			System.out.println( "direct local add from " + originator.getAddress() + ", hit count = " + hit_count );
		}

			// allow up to 10% bloom filter utilisation
		
		if ( ip_count_bloom_filter.getSize() / ip_count_bloom_filter.getEntryCount() < 10 ){
			
			rebuildIPBloomFilter( true );
		}
		
		if ( hit_count >= 15 ){
		
			db.banContact( originator, "local flood on '" + DHTLog.getFullString( key.getBytes()) + "'" );
		}
	
protected voiddestroy()

		try{
			if ( adapter_key != null ){
				
				Iterator	it = getValues();
				
				while( it.hasNext()){
					
					it.next();
					
					it.remove();
				}
				
				db.getAdapter().keyDeleted( adapter_key );
			}
			
		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
		}
	
protected DHTDBValueImpl[]get(com.aelitis.azureus.core.dht.transport.DHTTransportContact by_who, int max, byte flags)

		if ((flags & DHT.FLAG_STATS) != 0 ){
			
			if ( adapter_key != null ){
												
				try{
					ByteArrayOutputStream	baos = new ByteArrayOutputStream(64);
					
					DataOutputStream	dos = new DataOutputStream( baos );
					
					adapter_key.serialiseStats( dos );
					
					dos.close();
					
					return( 
						new DHTDBValueImpl[]{
							new DHTDBValueImpl(
								SystemTime.getCurrentTime(),
								baos.toByteArray(),
								0,
								db.getLocalContact(),
								db.getLocalContact(),
								true,
								DHT.FLAG_STATS )});
					
				}catch( Throwable e ){
					
					Debug.printStackTrace(e);
				}
			}
				
			return( new DHTDBValueImpl[0] );
		}
		
		List	res 		= new ArrayList();
		
		Set		duplicate_check = new HashSet();
		
		Map[]	maps = new Map[]{ direct_originator_map, indirect_originator_value_map };
		
		for (int i=0;i<maps.length;i++){
			
			List	keys_used 	= new ArrayList();

			Map			map	= maps[i];
			
			Iterator	it = map.entrySet().iterator();
		
			while( it.hasNext() && ( max==0 || res.size()< max )){
			
				Map.Entry	entry = (Map.Entry)it.next();
				
				HashWrapper		entry_key		= (HashWrapper)entry.getKey();
				
				DHTDBValueImpl	entry_value = (DHTDBValueImpl)entry.getValue();
						
				HashWrapper	x = new HashWrapper( entry_value.getValue());
				
				if ( duplicate_check.contains( x )){
					
					continue;
				}
				
				duplicate_check.add( x );
								
					// zero length values imply deleted values so don't return them
				
				if ( entry_value.getValue().length > 0 ){
					
					res.add( entry_value );
				
					keys_used.add( entry_key );
				}
			}
			
				// now update the access order so values get cycled
			
			for (int j=0;j<keys_used.size();j++){
				
				map.get( keys_used.get(j));
			}
		}
		
		informRead( by_who );
		
		DHTDBValueImpl[]	v = new DHTDBValueImpl[res.size()];
		
		res.toArray( v );
		
		return( v );
	
protected DHTDBValueImplget(com.aelitis.azureus.core.dht.transport.DHTTransportContact originator)

			// local get
		
		HashWrapper originator_id = new HashWrapper( originator.getID());
		
		DHTDBValueImpl	res = (DHTDBValueImpl)direct_originator_map.get( originator_id );
		
		return( res );
	
protected intgetDirectSize()

			// our direct count includes local so remove that here
		
		return( direct_data_size - local_size );
	
protected java.util.IteratorgetDirectValues()

		return( new valueIterator( true, false ));
	
protected bytegetDiversificationType()

		return( diversification_state );
	
protected intgetHits()

		return( hits );
	
protected intgetIndirectSize()

		return( indirect_data_size );
	
protected java.util.IteratorgetIndirectValues()

		return( new valueIterator( false, true ));
	
protected org.gudy.azureus2.core3.util.HashWrappergetKey()

		return( key );
	
protected intgetLocalSize()

		return( local_size );
	
private org.gudy.azureus2.core3.util.HashWrappergetOriginatorValueID(DHTDBValueImpl value)

		DHTTransportContact	originator	= value.getOriginator();
		
		byte[]	originator_id	= originator.getID();
		
			// relaxed this due to problems caused by multiple publishes by an originator
			// with the same key but variant values (e.g. seed/peer counts). Seeing as we
			// only accept cache-forwards from contacts that are "close" enough to us to
			// be performing such a forward, the DOS possibilities here are limited (a nasty
			// contact can only trash originator values for things it happens to be close to)
		
		return( new HashWrapper( originator_id ));
		
		/*
		byte[]	value_bytes 	= value.getValue();

		byte[]	x = new byte[originator_id.length + value_bytes.length];
		
		System.arraycopy( originator_id, 0, x, 0, originator_id.length );
		System.arraycopy( value_bytes, 0, x, originator_id.length, value_bytes.length );
		
		HashWrapper	originator_value_id = new HashWrapper( new SHA1Hasher().calculateHash( x ));
		
		return( originator_value_id );
		*/
	
protected intgetValueCount()

		return( direct_originator_map.size() + indirect_originator_value_map.size());
	
protected java.util.IteratorgetValues()

		return( new valueIterator( true, true ));
	
private voidinformAdded(DHTDBValueImpl value)

	
		boolean	direct = 
			(!value.isLocal()) && 		
			Arrays.equals( value.getOriginator().getID(), value.getSender().getID());
			
		if ( direct ){
				
			addToBloom( value );
		}

		try{
			if ( adapter_key != null ){
				
				db.getAdapter().valueAdded( adapter_key, value );
				
				diversification_state	= adapter_key.getDiversificationType();
			}
		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
		}
	
private voidinformDeleted(DHTDBValueImpl value)

		boolean	direct = 
			(!value.isLocal())&&		
			Arrays.equals( value.getOriginator().getID(), value.getSender().getID());
			
		if ( direct ){
				
			removeFromBloom( value );
		}
		
		try{
			if ( adapter_key != null ){
				
				db.getAdapter().valueDeleted( adapter_key, value );
				
				diversification_state	= adapter_key.getDiversificationType();
			}
		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
		}
	
private voidinformRead(com.aelitis.azureus.core.dht.transport.DHTTransportContact contact)

		
		try{
			if ( adapter_key != null && contact != null ){
				
				db.getAdapter().keyRead( adapter_key, contact );
				
				diversification_state	= adapter_key.getDiversificationType();
			}
		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
		}
	
private voidinformUpdated(DHTDBValueImpl old_value, DHTDBValueImpl new_value)

		boolean	old_direct = 
			(!old_value.isLocal()) &&				
			Arrays.equals( old_value.getOriginator().getID(), old_value.getSender().getID());
	
		boolean	new_direct = 
			(!new_value.isLocal()) &&			
			Arrays.equals( new_value.getOriginator().getID(), new_value.getSender().getID());
			
		if ( new_direct && !old_direct ){
			
			addToBloom( new_value );
		}
		
		try{
			if ( adapter_key != null ){
				
				db.getAdapter().valueUpdated( adapter_key, old_value, new_value );
				
				diversification_state	= adapter_key.getDiversificationType();
			}
		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
		}
	
protected voidrebuildIPBloomFilter(boolean increase_size)

		BloomFilter	new_filter;
		
		if ( increase_size ){
			
			new_filter = BloomFilterFactory.createAddRemove4Bit( ip_count_bloom_filter.getSize() + IP_COUNT_BLOOM_SIZE_INCREASE_CHUNK );
			
		}else{
			
			new_filter = BloomFilterFactory.createAddRemove4Bit( ip_count_bloom_filter.getSize());
			
		}
		
		try{
				// only do flood prevention on direct stores as we can't trust the originator
				// details for indirect and this can be used to DOS a direct store later
			
			Iterator	it = getDirectValues();
			
			int	max_hits	= 0;
			
			while( it.hasNext()){
				
				DHTDBValueImpl	val = (DHTDBValueImpl)it.next();
				
				if ( !val.isLocal()){
					
					// logger.log( "    adding " + val.getOriginator().getAddress());
					
					int	hits = new_filter.add( val.getOriginator().getAddress().getAddress().getAddress());
	
					if ( hits > max_hits ){
						
						max_hits	= hits;
					}
				}
			}
			
			if (  DHTLog.LOCAL_BLOOM_TRACE ){

				db.log( "Rebuilt local IP bloom filter, size = " + new_filter.getSize() + ", entries =" + new_filter.getEntryCount()+", max hits = " + max_hits );
			}

		}finally{
			
			ip_count_bloom_filter = new_filter;
		}
	
protected DHTDBValueImplremove(com.aelitis.azureus.core.dht.transport.DHTTransportContact originator)

			// local remove
		
		HashWrapper originator_id = new HashWrapper( originator.getID());
		
		DHTDBValueImpl	res = removeDirectValue( originator_id );
		
		return( res );
	
protected DHTDBValueImplremoveDirectValue(org.gudy.azureus2.core3.util.HashWrapper value_key)

		DHTDBValueImpl	old = (DHTDBValueImpl)direct_originator_map.remove( value_key );
		
		if ( old != null ){
			
			direct_data_size -= old.getValue().length;
			
			if ( old.isLocal()){
				
				local_size -= old.getValue().length;
			}
			
			informDeleted( old );
		}
		
		return( old );
	
protected voidremoveFromBloom(DHTDBValueImpl value)

		DHTTransportContact	originator = value.getOriginator();
		
		int	hit_count = ip_count_bloom_filter.remove( originator.getAddress().getAddress().getAddress());
		
		if (  DHTLog.LOCAL_BLOOM_TRACE ){
		
			System.out.println( "direct local remove from " + originator.getAddress() + ", hit count = " + hit_count );
		}	
	
protected voidremoveIndirectValue(org.gudy.azureus2.core3.util.HashWrapper value_key)

		DHTDBValueImpl	old = (DHTDBValueImpl)indirect_originator_value_map.remove( value_key );
		
		if ( old != null ){
			
			indirect_data_size -= old.getValue().length;
			
			if ( old.isLocal()){
				
				local_size -= old.getValue().length;
			}
			
			informDeleted( old );
		}
	
protected voidupdateLocalContact(com.aelitis.azureus.core.dht.transport.DHTTransportContact contact)

			// pull out all the local values, reset the originator and then
			// re-add them
		
		List	changed = new ArrayList();
		
		Iterator	it = direct_originator_map.values().iterator();
		
		while( it.hasNext()){
		
			DHTDBValueImpl	value = (DHTDBValueImpl)it.next();
			
			if ( value.isLocal()){
			
				value.setOriginatorAndSender( contact );
				
				changed.add( value );
				
				direct_data_size -= value.getValue().length;

				local_size	-= value.getValue().length;
				
				it.remove();
				
				informDeleted( value );
			}
		}
		
		for (int i=0;i<changed.size();i++){
			
			add((DHTDBValueImpl)changed.get(i));
		}