Methods Summary |
---|
public void | checkMembers(boolean checkAll)
try {
if (membership == null) setupMembership();
synchronized (membership) {
if ( !checkAll ) performBasicCheck();
else performForcedCheck();
}
}catch ( Exception x ) {
log.warn("Unable to perform heartbeat on the TcpFailureDetector.",x);
} finally {
}
|
public org.apache.catalina.tribes.Member | getLocalMember(boolean incAlive)
return super.getLocalMember(incAlive);
|
public org.apache.catalina.tribes.Member | getMember(org.apache.catalina.tribes.Member mbr)
if ( membership == null ) setupMembership();
return membership.getMember(mbr);
|
public org.apache.catalina.tribes.Member[] | getMembers()
if ( membership == null ) setupMembership();
return membership.getMembers();
|
public boolean | hasMembers()
if ( membership == null ) setupMembership();
return membership.hasMembers();
|
public void | heartbeat()
super.heartbeat();
checkMembers(false);
|
public void | memberAdded(org.apache.catalina.tribes.Member member)
if ( membership == null ) setupMembership();
boolean notify = false;
synchronized (membership) {
if (removeSuspects.containsKey(member)) {
//previously marked suspect, system below picked up the member again
removeSuspects.remove(member);
} else if (membership.getMember( (MemberImpl) member) == null){
//if we add it here, then add it upwards too
//check to see if it is alive
if (memberAlive(member)) {
membership.memberAlive( (MemberImpl) member);
notify = true;
} else {
addSuspects.put(member, new Long(System.currentTimeMillis()));
}
}
}
if ( notify ) super.memberAdded(member);
|
protected boolean | memberAlive(org.apache.catalina.tribes.Member mbr)
return memberAlive(mbr,TCP_FAIL_DETECT,performSendTest,performReadTest,readTestTimeout,connectTimeout,getOptionFlag());
|
protected static boolean | memberAlive(org.apache.catalina.tribes.Member mbr, byte[] msgData, boolean sendTest, boolean readTest, long readTimeout, long conTimeout, int optionFlag)
//could be a shutdown notification
if ( Arrays.equals(mbr.getCommand(),Member.SHUTDOWN_PAYLOAD) ) return false;
Socket socket = new Socket();
try {
InetAddress ia = InetAddress.getByAddress(mbr.getHost());
InetSocketAddress addr = new InetSocketAddress(ia, mbr.getPort());
socket.setSoTimeout((int)readTimeout);
socket.connect(addr, (int) conTimeout);
if ( sendTest ) {
ChannelData data = new ChannelData(true);
data.setAddress(mbr);
data.setMessage(new XByteBuffer(msgData,false));
data.setTimestamp(System.currentTimeMillis());
int options = optionFlag | Channel.SEND_OPTIONS_BYTE_MESSAGE;
if ( readTest ) options = (options | Channel.SEND_OPTIONS_USE_ACK);
else options = (options & (~Channel.SEND_OPTIONS_USE_ACK));
data.setOptions(options);
byte[] message = XByteBuffer.createDataPackage(data);
socket.getOutputStream().write(message);
if ( readTest ) {
int length = socket.getInputStream().read(message);
return length > 0;
}
}//end if
return true;
} catch ( SocketTimeoutException sx) {
//do nothing, we couldn't connect
} catch ( ConnectException cx) {
//do nothing, we couldn't connect
}catch (Exception x ) {
log.error("Unable to perform failure detection check, assuming member down.",x);
} finally {
try {socket.close(); } catch ( Exception ignore ){}
}
return false;
|
public void | memberDisappeared(org.apache.catalina.tribes.Member member)
if ( membership == null ) setupMembership();
boolean notify = false;
boolean shutdown = Arrays.equals(member.getCommand(),Member.SHUTDOWN_PAYLOAD);
if ( !shutdown )
if(log.isInfoEnabled())
log.info("Received memberDisappeared["+member+"] message. Will verify.");
synchronized (membership) {
//check to see if the member really is gone
//if the payload is not a shutdown message
if (shutdown || !memberAlive(member)) {
//not correct, we need to maintain the map
membership.removeMember( (MemberImpl) member);
removeSuspects.remove(member);
notify = true;
} else {
//add the member as suspect
removeSuspects.put(member, new Long(System.currentTimeMillis()));
}
}
if ( notify ) {
if(log.isInfoEnabled())
log.info("Verification complete. Member disappeared["+member+"]");
super.memberDisappeared(member);
} else {
if(log.isInfoEnabled())
log.info("Verification complete. Member still alive["+member+"]");
}
|
public void | messageReceived(org.apache.catalina.tribes.ChannelMessage msg)
//catch incoming
boolean process = true;
if ( okToProcess(msg.getOptions()) ) {
//check to see if it is a testMessage, if so, process = false
process = ( (msg.getMessage().getLength() != TCP_FAIL_DETECT.length) ||
(!Arrays.equals(TCP_FAIL_DETECT,msg.getMessage().getBytes()) ) );
}//end if
//ignore the message, it doesnt have the flag set
if ( process ) super.messageReceived(msg);
else if ( log.isDebugEnabled() ) log.debug("Received a failure detector packet:"+msg);
|
protected void | performBasicCheck()
//update all alive times
Member[] members = super.getMembers();
for (int i = 0; members != null && i < members.length; i++) {
if (membership.memberAlive( (MemberImpl) members[i])) {
//we don't have this one in our membership, check to see if he/she is alive
if (memberAlive(members[i])) {
log.warn("Member added, even though we werent notified:" + members[i]);
super.memberAdded(members[i]);
} else {
membership.removeMember( (MemberImpl) members[i]);
} //end if
} //end if
} //for
//check suspect members if they are still alive,
//if not, simply issue the memberDisappeared message
MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
for (int i = 0; i < keys.length; i++) {
MemberImpl m = (MemberImpl) keys[i];
if (membership.getMember(m) != null && (!memberAlive(m))) {
membership.removeMember(m);
super.memberDisappeared(m);
removeSuspects.remove(m);
if(log.isInfoEnabled())
log.info("Suspect member, confirmed dead.["+m+"]");
} //end if
}
//check add suspects members if they are alive now,
//if they are, simply issue the memberAdded message
keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
for (int i = 0; i < keys.length; i++) {
MemberImpl m = (MemberImpl) keys[i];
if ( membership.getMember(m) == null && (memberAlive(m))) {
membership.memberAlive(m);
super.memberAdded(m);
addSuspects.remove(m);
if(log.isInfoEnabled())
log.info("Suspect member, confirmed alive.["+m+"]");
} //end if
}
|
protected void | performForcedCheck()
//update all alive times
Member[] members = super.getMembers();
for (int i = 0; members != null && i < members.length; i++) {
if (memberAlive(members[i])) {
if (membership.memberAlive((MemberImpl)members[i])) super.memberAdded(members[i]);
addSuspects.remove(members[i]);
} else {
if (membership.getMember(members[i])!=null) {
membership.removeMember((MemberImpl)members[i]);
removeSuspects.remove(members[i]);
super.memberDisappeared((MemberImpl)members[i]);
}
} //end if
} //for
|
public void | sendMessage(org.apache.catalina.tribes.Member[] destination, org.apache.catalina.tribes.ChannelMessage msg, org.apache.catalina.tribes.group.InterceptorPayload payload)
try {
super.sendMessage(destination, msg, payload);
}catch ( ChannelException cx ) {
FaultyMember[] mbrs = cx.getFaultyMembers();
for ( int i=0; i<mbrs.length; i++ ) {
if ( mbrs[i].getCause()!=null &&
(!(mbrs[i].getCause() instanceof RemoteProcessException)) ) {//RemoteProcessException's are ok
this.memberDisappeared(mbrs[i].getMember());
}//end if
}//for
throw cx;
}
|
protected synchronized void | setupMembership()
if ( membership == null ) {
membership = new Membership((MemberImpl)super.getLocalMember(true));
}
|