SecureMessageServiceClientImplpublic class SecureMessageServiceClientImpl extends Object implements com.aelitis.azureus.core.clientmessageservice.secure.SecureMessageServiceClient
Fields Summary |
---|
public static final int | STATUS_OK | public static final int | STATUS_LOGON_FAIL | public static final int | STATUS_INVALID_SEQUENCE | public static final int | STATUS_FAILED | public static final int | STATUS_ABORT | public static final String | SERVICE_NAME | private static final long | MIN_RETRY_PERIOD | private static final long | MAX_RETRY_PERIOD | private String | host | private int | port | private int | timeout_secs | private RSAPublicKey | public_key | private com.aelitis.azureus.core.clientmessageservice.secure.SecureMessageServiceClientAdapter | adapter | private long | retry_millis | private int | connect_failure_count | private org.gudy.azureus2.core3.util.AEMonitor | message_mon | private org.gudy.azureus2.core3.util.AESemaphore | message_sem | private String | last_failed_user_pw | private long | last_failed_user_pw_time | private List | messages | private List | listeners |
Constructors Summary |
---|
public SecureMessageServiceClientImpl(String _host, int _port, int _timeout_secs, RSAPublicKey _key, com.aelitis.azureus.core.clientmessageservice.secure.SecureMessageServiceClientAdapter _adapter)
host = _host;
port = _port;
timeout_secs = _timeout_secs;
public_key = _key;
adapter = _adapter;
message_mon = new AEMonitor( "SecureService:messages" );
message_sem = new AESemaphore( "SecureService:messages" );
new AEThread( "SecureService::messageSender", true )
{
public void
runSupport()
{
while( true ){
long time = retry_millis;
if ( connect_failure_count > 0 ){
for (int i=0;i<connect_failure_count;i++){
time = time + time;
if ( time > MAX_RETRY_PERIOD ){
time = MAX_RETRY_PERIOD;
break;
}
}
}
message_sem.reserve( time );
try{
sendMessagesSupport();
}catch( Throwable e ){
adapter.log( "Request processing failed", e);
}
}
}
}.start();
|
Methods Summary |
---|
public void | addListener(com.aelitis.azureus.core.clientmessageservice.secure.SecureMessageServiceClientListener l)
listeners.add( l );
| protected void | cancel(com.aelitis.azureus.core.clientmessageservice.secure.SecureMessageServiceClientMessage message)
boolean inform = false;
try{
message_mon.enter();
if ( messages.remove( message )){
inform = true;
}
}finally{
message_mon.exit();
}
if ( inform ){
for (Iterator it=listeners.iterator();it.hasNext();){
try{
((SecureMessageServiceClientListener)it.next()).cancelled( message );
}catch( Throwable e ){
e.printStackTrace();
}
}
}
| public com.aelitis.azureus.core.clientmessageservice.secure.SecureMessageServiceClientMessage[] | getMessages()
try{
message_mon.enter();
return((SecureMessageServiceClientMessage[])messages.toArray( new SecureMessageServiceClientMessage[ messages.size()]));
}finally{
message_mon.exit();
}
| public void | removeListener(com.aelitis.azureus.core.clientmessageservice.secure.SecureMessageServiceClientListener l)
listeners.remove( l );
| public com.aelitis.azureus.core.clientmessageservice.secure.SecureMessageServiceClientMessage | sendMessage(java.util.Map request, java.lang.Object data, java.lang.String description)
try{
message_mon.enter();
SecureMessageServiceClientMessage res = new SecureMessageServiceClientMessageImpl( this, request, data, description );
messages.add( res );
message_sem.release();
return( res );
}finally{
message_mon.exit();
}
| public void | sendMessages()
message_sem.release();
| protected void | sendMessagesSupport()
String user = adapter.getUser();
byte[] password = adapter.getPassword();
String user_password = user + "/" + new String( password );
// user name must be defined, however we allow a blank password
if ( user.length() == 0 ){
adapter.authenticationFailed();
return;
}
// if user-name + password hasn't changed recently and logon failed then
// don't re-attempt
if ( user_password.equals( last_failed_user_pw )){
final long now =SystemTime.getCurrentTime();
if (now >last_failed_user_pw_time &&now -last_failed_user_pw_time <60 *1000){
adapter.authenticationFailed();
return;
}
}
List outstanding_messages;
try{
message_mon.enter();
outstanding_messages = new ArrayList( messages );
}finally{
message_mon.exit();
}
if ( outstanding_messages.size() == 0 ){
return;
}
List complete_messages = new ArrayList();
boolean failed = false;
try{
Iterator it = outstanding_messages.iterator();
while( it.hasNext() && !failed ){
SecureMessageServiceClientMessageImpl message = (SecureMessageServiceClientMessageImpl)it.next();
boolean retry = true;
int retry_count = 0;
while( retry && !failed ){
retry = false;
ClientMessageService message_service = null;
boolean got_reply = false;
try{
Map content = new HashMap();
long sequence = adapter.getMessageSequence();
content.put( "user", user );
content.put( "password", password );
content.put( "seq", new Long( sequence ));
content.put( "request", message.getRequest());
last_failed_user_pw = "";
message_service = SecureMessageServiceClientHelper.getServerService( host, port, timeout_secs, SERVICE_NAME, public_key );
message_service.sendMessage( content );
Map reply = message_service.receiveMessage();
got_reply = true;
long status = ((Long)reply.get( "status" )).longValue();
Long new_retry = (Long)reply.get( "retry" );
if ( new_retry != null ){
retry_millis = new_retry.longValue();
if ( retry_millis < MIN_RETRY_PERIOD ){
retry_millis = MIN_RETRY_PERIOD;
}
adapter.log( "Server requested retry period of " + (retry_millis/1000) + " seconds" );
}else{
retry_millis = MIN_RETRY_PERIOD;
}
if ( status == STATUS_OK ){
message.setReply( (Map)reply.get( "reply" ));
adapter.log( "Request successfully sent: " + message.getRequest() + "->" + message.getReply());
adapter.setMessageSequence( sequence + 1 );
adapter.serverOK();
for (Iterator l_it=listeners.iterator();l_it.hasNext();){
try{
((SecureMessageServiceClientListener)l_it.next()).complete( message );
}catch( Throwable e ){
e.printStackTrace();
}
}
complete_messages.add( message );
}else if ( status == STATUS_LOGON_FAIL ){
last_failed_user_pw = user_password;
last_failed_user_pw_time = SystemTime.getCurrentTime();
adapter.serverOK();
adapter.authenticationFailed();
failed = true;
}else if ( status == STATUS_INVALID_SEQUENCE ){
if ( retry_count == 1 ){
adapter.serverFailed( new Exception( "Sequence resynchronisation failed" ));
failed = true;
}else{
retry_count++;
retry = true;
long expected_sequence = ((Long)reply.get( "seq" )).longValue();
adapter.log( "Sequence resynchronise: local = " + sequence + ", remote = " + expected_sequence );
adapter.setMessageSequence( expected_sequence );
}
}else if ( status == STATUS_FAILED ){
adapter.serverFailed( new Exception( new String( (byte[])reply.get( "error" ))));
failed = true;
}else{// if ( status == STATUS_ABORT ){
// this is when things have gone badly wrong server-side - we just
// dump the message
adapter.serverFailed( new Exception( "Server requested abort" ));
for (Iterator l_it=listeners.iterator();l_it.hasNext();){
try{
((SecureMessageServiceClientListener)l_it.next()).aborted(
message,
new String( (byte[])reply.get( "error" )));
}catch( Throwable e ){
e.printStackTrace();
}
}
complete_messages.add( message );
}
}catch( Throwable e ){
adapter.serverFailed( e );
failed = true;
}finally{
if ( got_reply ){
connect_failure_count = 0;
}else{
connect_failure_count++;
if ( connect_failure_count > 1 ){
try{
adapter.log( "Failed to contact server " + connect_failure_count + " times in a row" );
}catch( Throwable e ){
e.printStackTrace();
}
}
}
if ( message_service != null ){
message_service.close();
}
}
}
}
}catch( Throwable e ){
adapter.serverFailed( e );
}finally{
try{
message_mon.enter();
messages.removeAll( complete_messages );
}finally{
message_mon.exit();
}
}
|
|