McastServiceImplpublic class McastServiceImpl extends Object A membership implementation using simple multicast.
This is the representation of a multicast membership service.
This class is responsible for maintaining a list of active cluster nodes in the cluster.
If a node fails to send out a heartbeat, the node will be dismissed.
This is the low level implementation that handles the multicasting sockets.
Need to fix this, could use java.nio and only need one thread to send and receive, or
just use a timeout on the receive |
Fields Summary |
---|
private static org.apache.juli.logging.Log | log | protected static int | MAX_PACKET_SIZE | protected boolean | doRunSenderInternal flag used for the listen thread that listens to the multicasting socket. | protected boolean | doRunReceiver | protected int | startLevel | protected MulticastSocket | socketSocket that we intend to listen to | protected MemberImpl | memberThe local member that we intend to broad cast over and over again | protected InetAddress | addressThe multicast address | protected int | portThe multicast port | protected long | timeToExpirationThe time it takes for a member to expire. | protected long | sendFrequencyHow often to we send out a broadcast saying we are alive, must be smaller than timeToExpiration | protected DatagramPacket | sendPacketReuse the sendPacket, no need to create a new one everytime | protected DatagramPacket | receivePacketReuse the receivePacket, no need to create a new one everytime | protected Membership | membershipThe membership, used so that we calculate memberships when they arrive or don't arrive | protected org.apache.catalina.tribes.MembershipListener | serviceThe actual listener, for callback when shits goes down | protected ReceiverThread | receiverThread to listen for pings | protected SenderThread | senderThread to send pings | protected long | serviceStartTimeWhen was the service started | protected int | mcastTTLTime to live for the multicast packets that are being sent out | protected int | mcastSoTimeoutRead timeout on the mcast socket | protected InetAddress | mcastBindAddressbind address | protected Object | expiredMutex |
Constructors Summary |
---|
public McastServiceImpl(MemberImpl member, long sendFrequency, long expireTime, int port, InetAddress bind, InetAddress mcastAddress, int ttl, int soTimeout, org.apache.catalina.tribes.MembershipListener service)Create a new mcast service impl
this.member = member;
this.address = mcastAddress;
this.port = port;
this.mcastSoTimeout = soTimeout;
this.mcastTTL = ttl;
this.mcastBindAddress = bind;
this.timeToExpiration = expireTime;
this.service = service;
this.sendFrequency = sendFrequency;
setupSocket();
sendPacket = new DatagramPacket(new byte[MAX_PACKET_SIZE],MAX_PACKET_SIZE);
sendPacket.setAddress(address);
sendPacket.setPort(port);
receivePacket = new DatagramPacket(new byte[MAX_PACKET_SIZE],MAX_PACKET_SIZE);
receivePacket.setAddress(address);
receivePacket.setPort(port);
membership = new Membership(member);
|
Methods Summary |
---|
protected void | checkExpired()
synchronized (expiredMutex) {
MemberImpl[] expired = membership.expire(timeToExpiration);
for (int i = 0; i < expired.length; i++) {
final MemberImpl member = expired[i];
if (log.isDebugEnabled())
log.debug("Mcast exipre member " + expired[i]);
try {
Thread t = new Thread() {
public void run() {
service.memberDisappeared(member);
}
};
t.start();
} catch (Exception x) {
log.error("Unable to process member disappeared message.", x);
}
}
}
| public long | getServiceStartTime()
return this.serviceStartTime;
| public void | receive()Receive a datagram packet, locking wait
try {
socket.receive(receivePacket);
if(receivePacket.getLength() > MAX_PACKET_SIZE) {
log.error("Multicast packet received was too long, dropping package:"+receivePacket.getLength());
} else {
byte[] data = new byte[receivePacket.getLength()];
System.arraycopy(receivePacket.getData(), receivePacket.getOffset(), data, 0, data.length);
final MemberImpl m = MemberImpl.getMember(data);
if (log.isTraceEnabled()) log.trace("Mcast receive ping from member " + m);
Thread t = null;
if (Arrays.equals(m.getCommand(), Member.SHUTDOWN_PAYLOAD)) {
if (log.isDebugEnabled()) log.debug("Member has shutdown:" + m);
membership.removeMember(m);
t = new Thread() {
public void run() {
service.memberDisappeared(m);
}
};
} else if (membership.memberAlive(m)) {
if (log.isDebugEnabled()) log.debug("Mcast add member " + m);
t = new Thread() {
public void run() {
service.memberAdded(m);
}
};
} //end if
if ( t != null ) t.start();
}
} catch (SocketTimeoutException x ) {
//do nothing, this is normal, we don't want to block forever
//since the receive thread is the same thread
//that does membership expiration
}
checkExpired();
| public void | send(boolean checkexpired)Send a ping
//ignore if we haven't started the sender
//if ( (startLevel&Channel.MBR_TX_SEQ) != Channel.MBR_TX_SEQ ) return;
member.inc();
if(log.isTraceEnabled())
log.trace("Mcast send ping from member " + member);
byte[] data = member.getData();
DatagramPacket p = new DatagramPacket(data,data.length);
p.setAddress(address);
p.setPort(port);
socket.send(p);
if ( checkexpired ) checkExpired();
| protected void | setupSocket()
if (mcastBindAddress != null) socket = new MulticastSocket(new InetSocketAddress(mcastBindAddress, port));
else socket = new MulticastSocket(port);
socket.setLoopbackMode(false); //hint that we don't need loop back messages
if (mcastBindAddress != null) {
if(log.isInfoEnabled())
log.info("Setting multihome multicast interface to:" +mcastBindAddress);
socket.setInterface(mcastBindAddress);
} //end if
//force a so timeout so that we don't block forever
if ( mcastSoTimeout <= 0 ) mcastSoTimeout = (int)sendFrequency;
if(log.isInfoEnabled())
log.info("Setting cluster mcast soTimeout to "+mcastSoTimeout);
socket.setSoTimeout(mcastSoTimeout);
if ( mcastTTL >= 0 ) {
if(log.isInfoEnabled())
log.info("Setting cluster mcast TTL to " + mcastTTL);
socket.setTimeToLive(mcastTTL);
}
| public synchronized void | start(int level)Start the service
boolean valid = false;
if ( (level & Channel.MBR_RX_SEQ)==Channel.MBR_RX_SEQ ) {
if ( receiver != null ) throw new IllegalStateException("McastService.receive already running.");
if ( sender == null ) socket.joinGroup(address);
doRunReceiver = true;
receiver = new ReceiverThread();
receiver.setDaemon(true);
receiver.start();
valid = true;
}
if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) {
if ( sender != null ) throw new IllegalStateException("McastService.send already running.");
if ( receiver == null ) socket.joinGroup(address);
//make sure at least one packet gets out there
send(false);
doRunSender = true;
serviceStartTime = System.currentTimeMillis();
sender = new SenderThread(sendFrequency);
sender.setDaemon(true);
sender.start();
//we have started the receiver, but not yet waited for membership to establish
valid = true;
}
if (!valid) {
throw new IllegalArgumentException("Invalid start level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ");
}
//pause, once or twice
waitForMembers(level);
startLevel = (startLevel | level);
| public synchronized boolean | stop(int level)Stops the service
boolean valid = false;
if ( (level & Channel.MBR_RX_SEQ)==Channel.MBR_RX_SEQ ) {
valid = true;
doRunReceiver = false;
if ( receiver !=null ) receiver.interrupt();
receiver = null;
}
if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) {
valid = true;
doRunSender = false;
if ( sender != null )sender.interrupt();
sender = null;
}
if (!valid) {
throw new IllegalArgumentException("Invalid stop level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ");
}
startLevel = (startLevel & (~level));
//we're shutting down, send a shutdown message and close the socket
if ( startLevel == 0 ) {
//send a stop message
member.setCommand(Member.SHUTDOWN_PAYLOAD);
member.getData(true, true);
send(false);
//leave mcast group
try {socket.leaveGroup(address);}catch ( Exception ignore){}
serviceStartTime = Long.MAX_VALUE;
}
return (startLevel == 0);
| private void | waitForMembers(int level)
long memberwait = sendFrequency*2;
if(log.isInfoEnabled())
log.info("Sleeping for "+memberwait+" milliseconds to establish cluster membership, start level:"+level);
try {Thread.sleep(memberwait);}catch (InterruptedException ignore){}
if(log.isInfoEnabled())
log.info("Done sleeping, membership established, start level:"+level);
|
|