DDBaseImplpublic class DDBaseImpl extends Object implements DistributedDatabase
Fields Summary |
---|
private static DDBaseImpl | singleton | protected static org.gudy.azureus2.core3.util.AEMonitor | class_mon | protected static Map | transfer_map | private static DDBaseTTTorrent | torrent_transfer | private com.aelitis.azureus.core.AzureusCore | azureus_core | private com.aelitis.azureus.plugins.dht.DHTPlugin | dht_use_accessor |
Constructors Summary |
---|
protected DDBaseImpl(com.aelitis.azureus.core.AzureusCore _azureus_core)
azureus_core = _azureus_core;
torrent_transfer = new DDBaseTTTorrent( _azureus_core, this );
grabDHT();
|
Methods Summary |
---|
public void | addTransferHandler(DistributedDatabaseTransferType type, DistributedDatabaseTransferHandler handler)
throwIfNotAvailable();
final HashWrapper type_key = DDBaseHelpers.getKey( type.getClass());
if ( transfer_map.get( type_key ) != null ){
throw( new DistributedDatabaseException( "Handler for class '" + type.getClass().getName() + "' already defined" ));
}
transfer_map.put( type_key, handler );
final String handler_name = type==torrent_transfer?"Torrent Transfer":"Plugin Defined";
getDHT().registerHandler(
type_key.getHash(),
new DHTPluginTransferHandler()
{
public String
getName()
{
return( handler_name );
}
public byte[]
handleRead(
DHTPluginContact originator,
byte[] xfer_key )
{
try{
DDBaseValueImpl res = (DDBaseValueImpl)
handler.read(
new DDBaseContactImpl( DDBaseImpl.this, originator ),
type,
new DDBaseKeyImpl( xfer_key ));
if ( res == null ){
return( null );
}
return( res.getBytes());
}catch( Throwable e ){
Debug.printStackTrace(e);
return( null );
}
}
public void
handleWrite(
DHTPluginContact originator,
byte[] xfer_key,
byte[] value )
{
try{
DDBaseContactImpl contact = new DDBaseContactImpl( DDBaseImpl.this, originator );
handler.write(
contact,
type,
new DDBaseKeyImpl( xfer_key ),
new DDBaseValueImpl( contact, value, SystemTime.getCurrentTime(), -1));
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
});
| public DistributedDatabaseKey | createKey(java.lang.Object key, java.lang.String description)
throwIfNotAvailable();
return( new DDBaseKeyImpl( key, description ));
| public DistributedDatabaseKey | createKey(java.lang.Object key)
throwIfNotAvailable();
return( new DDBaseKeyImpl( key ));
| public DistributedDatabaseValue | createValue(java.lang.Object value)
throwIfNotAvailable();
return( new DDBaseValueImpl( new DDBaseContactImpl( this, getDHT().getLocalAddress()), value, SystemTime.getCurrentTime(), -1));
| public void | delete(DistributedDatabaseListener listener, DistributedDatabaseKey key)
throwIfNotAvailable();
getDHT().remove( ((DDBaseKeyImpl)key).getBytes(),
key.getDescription(),
new listenerMapper( listener, DistributedDatabaseEvent.ET_VALUE_DELETED, key, 0, false, false ));
| protected com.aelitis.azureus.plugins.dht.DHTPlugin | getDHT()
throwIfNotAvailable();
return( grabDHT());
| public static DistributedDatabase | getSingleton(com.aelitis.azureus.core.AzureusCore azureus_core)
try{
class_mon.enter();
if ( singleton == null ){
singleton = new DDBaseImpl( azureus_core );
}
}finally{
class_mon.exit();
}
return( singleton );
| public DistributedDatabaseTransferType | getStandardTransferType(int standard_type)
if ( standard_type == DistributedDatabaseTransferType.ST_TORRENT ){
return( torrent_transfer );
}
throw( new DistributedDatabaseException( "unknown type" ));
| protected com.aelitis.azureus.plugins.dht.DHTPlugin | grabDHT()
if ( dht_use_accessor != null ){
return( dht_use_accessor );
}
try{
class_mon.enter();
if( dht_use_accessor == null ){
PluginInterface dht_pi =
azureus_core.getPluginManager().getPluginInterfaceByClass(
DHTPlugin.class );
if ( dht_pi != null ){
dht_use_accessor = (DHTPlugin)dht_pi.getPlugin();
if ( dht_use_accessor.isEnabled()){
try{
addTransferHandler( torrent_transfer, torrent_transfer );
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
}
}
}finally{
class_mon.exit();
}
return( dht_use_accessor );
| public DistributedDatabaseContact | importContact(java.net.InetSocketAddress address)
throwIfNotAvailable();
DHTPluginContact contact = getDHT().importContact( address );
if ( contact == null ){
throw( new DistributedDatabaseException( "import of '" + address + "' failed" ));
}
return( new DDBaseContactImpl( this, contact));
| public boolean | isAvailable()
DHTPlugin dht = grabDHT();
if ( dht == null ){
return( false );
}
return( dht.isEnabled());
| public boolean | isExtendedUseAllowed()
DHTPlugin dht = grabDHT();
if ( dht == null ){
return( false );
}
return( dht.isExtendedUseAllowed());
| protected void | log(java.lang.String str)
DHTPlugin dht = grabDHT();
if ( dht != null ){
dht.log( str );
}
| public void | read(DistributedDatabaseListener listener, DistributedDatabaseKey key, long timeout)
read( listener, key, timeout, OP_NONE );
| public void | read(DistributedDatabaseListener listener, DistributedDatabaseKey key, long timeout, int options)
throwIfNotAvailable();
boolean exhaustive = (options&OP_EXHAUSTIVE_READ)!=0;
boolean high_priority = (options&OP_PRIORITY_HIGH)!=0;
// TODO: max values?
getDHT().get(
((DDBaseKeyImpl)key).getBytes(),
key.getDescription(),
(byte)0,
256,
timeout,
exhaustive,
high_priority,
new listenerMapper( listener, DistributedDatabaseEvent.ET_VALUE_READ, key, timeout, exhaustive, high_priority ));
| protected DistributedDatabaseValue | read(DDBaseContactImpl contact, DistributedDatabaseProgressListener listener, DistributedDatabaseTransferType type, DistributedDatabaseKey key, long timeout)
if ( type == torrent_transfer ){
return( torrent_transfer.read( contact, listener, type, key, timeout ));
}else{
byte[] data = getDHT().read(
new DHTPluginProgressListener()
{
public void
reportSize(
long size )
{
listener.reportSize( size );
}
public void
reportActivity(
String str )
{
listener.reportActivity( str );
}
public void
reportCompleteness(
int percent )
{
listener.reportCompleteness( percent );
}
},
contact.getContact(),
DDBaseHelpers.getKey(type.getClass()).getHash(),
((DDBaseKeyImpl)key).getBytes(),
timeout );
if ( data == null ){
return( null );
}
return( new DDBaseValueImpl( contact, data, SystemTime.getCurrentTime(), -1));
}
| public void | readKeyStats(DistributedDatabaseListener listener, DistributedDatabaseKey key, long timeout)
throwIfNotAvailable();
getDHT().get(
((DDBaseKeyImpl)key).getBytes(),
key.getDescription(),
DHTPlugin.FLAG_STATS,
256,
timeout,
false,
false,
new listenerMapper( listener, DistributedDatabaseEvent.ET_KEY_STATS_READ, key, timeout, false, false ));
| protected void | throwIfNotAvailable()
if ( !isAvailable()){
throw( new DistributedDatabaseException( "DHT not available" ));
}
| public void | write(DistributedDatabaseListener listener, DistributedDatabaseKey key, DistributedDatabaseValue value)
write( listener, key, new DistributedDatabaseValue[]{ value } );
| public void | write(DistributedDatabaseListener listener, DistributedDatabaseKey key, DistributedDatabaseValue[] values)
throwIfNotAvailable();
for (int i=0;i<values.length;i++){
if (((DDBaseValueImpl)values[i]).getBytes().length > DDBaseValueImpl.MAX_VALUE_SIZE ){
throw( new DistributedDatabaseException("Value size limited to " + DDBaseValueImpl.MAX_VALUE_SIZE + " bytes" ));
}
}
if ( values.length == 0 ){
delete( listener, key );
}else if ( values.length == 1 ){
getDHT().put(
((DDBaseKeyImpl)key).getBytes(),
key.getDescription(),
((DDBaseValueImpl)values[0]).getBytes(),
DHTPlugin.FLAG_SINGLE_VALUE,
new listenerMapper( listener, DistributedDatabaseEvent.ET_VALUE_WRITTEN, key, 0, false, false ));
}else{
// TODO: optimise re-publishing to avoid republishing everything each time
/*
DHTPluginValue old_value = dht.getLocalValue( ((DDBaseKeyImpl)key).getBytes());
List old_values = new ArrayList();
if ( old_value != null ){
if (( old_value.getFlags() & DHTPlugin.FLAG_MULTI_VALUE ) == 0 ){
old_values.add( old_value.getValue());
}else{
byte[] encoded = old_value.getValue();
}
}
*/
byte[] current_key = ((DDBaseKeyImpl)key).getBytes();
// format is: <continuation> <len><len><data>
byte[] payload = new byte[DHTPlugin.MAX_VALUE_SIZE];
int payload_length = 1;
int pos = 0;
while( pos < values.length ){
DDBaseValueImpl value = (DDBaseValueImpl)values[pos];
byte[] bytes = value.getBytes();
int len = bytes.length;
if ( payload_length + len < payload.length - 2 ){
payload[payload_length++] = (byte)(( len & 0x0000ff00 ) >> 8);
payload[payload_length++] = (byte) ( len & 0x000000ff );
System.arraycopy( bytes, 0, payload, payload_length, len );
payload_length += len;
pos++;
}else{
payload[0] = 1;
final byte[] copy = new byte[payload_length];
System.arraycopy( payload, 0, copy, 0, copy.length );
final byte[] f_current_key = current_key;
getDHT().put(
f_current_key,
key.getDescription(),
copy,
DHTPlugin.FLAG_MULTI_VALUE,
new listenerMapper( listener, DistributedDatabaseEvent.ET_VALUE_WRITTEN, key, 0, false, false ));
payload_length = 1;
current_key = new SHA1Simple().calculateHash( current_key );
}
}
if ( payload_length > 1 ){
payload[0] = 0;
final byte[] copy = new byte[payload_length];
System.arraycopy( payload, 0, copy, 0, copy.length );
final byte[] f_current_key = current_key;
getDHT().put(
f_current_key,
key.getDescription(),
copy,
DHTPlugin.FLAG_MULTI_VALUE,
new listenerMapper( listener, DistributedDatabaseEvent.ET_VALUE_WRITTEN, key, 0, false, false ));
}
}
|
|