RpcChannelpublic class RpcChannel extends Object implements org.apache.catalina.tribes.ChannelListenerA channel to handle RPC messaging |
Fields Summary |
---|
protected static org.apache.juli.logging.Log | log | public static final int | FIRST_REPLY | public static final int | MAJORITY_REPLY | public static final int | ALL_REPLY | public static final int | NO_REPLY | private org.apache.catalina.tribes.Channel | channel | private RpcCallback | callback | private byte[] | rpcId | private HashMap | responseMap |
Constructors Summary |
---|
public RpcChannel(byte[] rpcId, org.apache.catalina.tribes.Channel channel, RpcCallback callback)Create an RPC channel. You can have several RPC channels attached to a group
all separated out by the uniqueness
this.channel = channel;
this.callback = callback;
this.rpcId = rpcId;
channel.addChannelListener(this);
|
Methods Summary |
---|
public boolean | accept(java.io.Serializable msg, org.apache.catalina.tribes.Member sender)
if ( msg instanceof RpcMessage ) {
RpcMessage rmsg = (RpcMessage)msg;
return Arrays.equals(rmsg.rpcId,rpcId);
}else return false;
| protected static java.lang.String | bToS(byte[] data)
StringBuffer buf = new StringBuffer(4*16);
buf.append("{");
for (int i=0; data!=null && i<data.length; i++ ) buf.append(String.valueOf(data[i])).append(" ");
buf.append("}");
return buf.toString();
| public void | breakdown()
channel.removeChannelListener(this);
| public void | finalize()
breakdown();
| public RpcCallback | getCallback()
return callback;
| public org.apache.catalina.tribes.Channel | getChannel()
return channel;
| public byte[] | getRpcId()
return rpcId;
| public void | messageReceived(java.io.Serializable msg, org.apache.catalina.tribes.Member sender)
RpcMessage rmsg = (RpcMessage)msg;
RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
if ( rmsg.reply ) {
RpcCollector collector = (RpcCollector)responseMap.get(key);
if (collector == null) {
callback.leftOver(rmsg.message, sender);
} else {
synchronized (collector) {
//make sure it hasn't been removed
if ( responseMap.containsKey(key) ) {
if ( (rmsg instanceof RpcMessage.NoRpcChannelReply) )
collector.destcnt--;
else
collector.addResponse(rmsg.message, sender);
if (collector.isComplete()) collector.notifyAll();
} else {
if (! (rmsg instanceof RpcMessage.NoRpcChannelReply) )
callback.leftOver(rmsg.message, sender);
}
}//synchronized
}//end if
} else{
Serializable reply = callback.replyRequest(rmsg.message,sender);
rmsg.reply = true;
rmsg.message = reply;
try {
channel.send(new Member[] {sender}, rmsg,0);
}catch ( Exception x ) {
log.error("Unable to send back reply in RpcChannel.",x);
}
}//end if
| public Response[] | send(org.apache.catalina.tribes.Member[] destination, java.io.Serializable message, int rpcOptions, int channelOptions, long timeout)Send a message and wait for the response.
if ( destination==null || destination.length == 0 ) return new Response[0];
//avoid dead lock
channelOptions = channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false));
RpcCollector collector = new RpcCollector(key,rpcOptions,destination.length,timeout);
try {
synchronized (collector) {
if ( rpcOptions != NO_REPLY ) responseMap.put(key, collector);
RpcMessage rmsg = new RpcMessage(rpcId, key.id, message);
channel.send(destination, rmsg, channelOptions);
if ( rpcOptions != NO_REPLY ) collector.wait(timeout);
}
} catch ( InterruptedException ix ) {
Thread.currentThread().interrupted();
//throw new ChannelException(ix);
}finally {
responseMap.remove(key);
}
return collector.getResponses();
| public void | setCallback(RpcCallback callback)
this.callback = callback;
| public void | setChannel(org.apache.catalina.tribes.Channel channel)
this.channel = channel;
| public void | setRpcId(byte[] rpcId)
this.rpcId = rpcId;
|
|