Methods Summary |
---|
public long | getExpire()
return expire;
|
public boolean | getForwardExpired()
return forwardExpired;
|
protected org.apache.catalina.tribes.group.interceptors.OrderInterceptor$Counter | getInCounter(org.apache.catalina.tribes.Member mbr)
Counter cnt = (Counter)incounter.get(mbr);
if ( cnt == null ) {
cnt = new Counter();
cnt.inc(); //always start at 1 for incoming
incounter.put(mbr,cnt);
}
return cnt;
|
public int | getMaxQueue()
return maxQueue;
|
protected org.apache.catalina.tribes.group.interceptors.OrderInterceptor$Counter | getOutCounter(org.apache.catalina.tribes.Member mbr)
Counter cnt = (Counter)outcounter.get(mbr);
if ( cnt == null ) {
cnt = new Counter();
outcounter.put(mbr,cnt);
}
return cnt;
|
protected int | incCounter(org.apache.catalina.tribes.Member mbr)
Counter cnt = getOutCounter(mbr);
return cnt.inc();
|
public void | memberAdded(org.apache.catalina.tribes.Member member)
//notify upwards
super.memberAdded(member);
|
public void | memberDisappeared(org.apache.catalina.tribes.Member member)
//reset counters - lock free
incounter.remove(member);
outcounter.remove(member);
//clear the remaining queue
processLeftOvers(member,true);
//notify upwards
super.memberDisappeared(member);
|
public void | messageReceived(org.apache.catalina.tribes.ChannelMessage msg)
if ( !okToProcess(msg.getOptions()) ) {
super.messageReceived(msg);
return;
}
int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
msg.getMessage().trim(4);
MessageOrder order = new MessageOrder(msgnr,(ChannelMessage)msg.deepclone());
try {
inLock.writeLock().lock();
if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
}finally {
inLock.writeLock().unlock();
}
|
protected boolean | processIncoming(org.apache.catalina.tribes.group.interceptors.OrderInterceptor$MessageOrder order)
boolean result = false;
Member member = order.getMessage().getAddress();
Counter cnt = getInCounter(member);
MessageOrder tmp = (MessageOrder)incoming.get(member);
if ( tmp != null ) {
order = MessageOrder.add(tmp,order);
}
while ( (order!=null) && (order.getMsgNr() <= cnt.getCounter()) ) {
//we are right on target. process orders
if ( order.getMsgNr() == cnt.getCounter() ) cnt.inc();
else if ( order.getMsgNr() > cnt.getCounter() ) cnt.setCounter(order.getMsgNr());
super.messageReceived(order.getMessage());
order.setMessage(null);
order = order.next;
}
MessageOrder head = order;
MessageOrder prev = null;
tmp = order;
//flag to empty out the queue when it larger than maxQueue
boolean empty = order!=null?order.getCount()>=maxQueue:false;
while ( tmp != null ) {
//process expired messages or empty out the queue
if ( tmp.isExpired(expire) || empty ) {
//reset the head
if ( tmp == head ) head = tmp.next;
cnt.setCounter(tmp.getMsgNr()+1);
if ( getForwardExpired() )
super.messageReceived(tmp.getMessage());
tmp.setMessage(null);
tmp = tmp.next;
if ( prev != null ) prev.next = tmp;
result = true;
} else {
prev = tmp;
tmp = tmp.next;
}
}
if ( head == null ) incoming.remove(member);
else incoming.put(member, head);
return result;
|
protected void | processLeftOvers(org.apache.catalina.tribes.Member member, boolean force)
MessageOrder tmp = (MessageOrder)incoming.get(member);
if ( force ) {
Counter cnt = getInCounter(member);
cnt.setCounter(Integer.MAX_VALUE);
}
if ( tmp!= null ) processIncoming(tmp);
|
public void | sendMessage(org.apache.catalina.tribes.Member[] destination, org.apache.catalina.tribes.ChannelMessage msg, org.apache.catalina.tribes.group.InterceptorPayload payload)
if ( !okToProcess(msg.getOptions()) ) {
super.sendMessage(destination, msg, payload);
return;
}
ChannelException cx = null;
for (int i=0; i<destination.length; i++ ) {
try {
int nr = 0;
try {
outLock.writeLock().lock();
nr = incCounter(destination[i]);
} finally {
outLock.writeLock().unlock();
}
//reduce byte copy
msg.getMessage().append(nr);
try {
getNext().sendMessage(new Member[] {destination[i]}, msg, payload);
} finally {
msg.getMessage().trim(4);
}
}catch ( ChannelException x ) {
if ( cx == null ) cx = x;
cx.addFaultyMember(x.getFaultyMembers());
}
}//for
if ( cx != null ) throw cx;
|
public void | setExpire(long expire)
this.expire = expire;
|
public void | setForwardExpired(boolean forwardExpired)
this.forwardExpired = forwardExpired;
|
public void | setMaxQueue(int maxQueue)
this.maxQueue = maxQueue;
|