FileDocCategorySizeDatePackage
DeltaManager.javaAPI DocApache Tomcat 6.0.1459360Fri Jul 20 04:20:34 BST 2007org.apache.catalina.ha.session

DeltaManager

public class DeltaManager extends ClusterManagerBase
The DeltaManager manages replicated sessions by only replicating the deltas in data. For applications written to handle this, the DeltaManager is the optimal way of replicating data. This code is almost identical to StandardManager with a difference in how it persists sessions and some modifications to it. IMPLEMENTATION NOTE : Correct behavior of session storing and reloading depends upon external calls to the start() and stop() methods of this class at the correct times.
author
Filip Hanik
author
Craig R. McClanahan
author
Jean-Francois Arcand
author
Peter Rossbach
version
$Revision: 467222 $ $Date: 2006-10-24 05:17:11 +0200 (mar., 24 oct. 2006) $

Fields Summary
public static org.apache.juli.logging.Log
log
protected static org.apache.catalina.util.StringManager
sm
The string manager for this package.
private static final String
info
The descriptive information about this implementation.
private boolean
started
Has this component been started yet?
protected static String
managerName
The descriptive name of this Manager implementation (for logging).
protected String
name
protected boolean
defaultMode
private org.apache.catalina.ha.CatalinaCluster
cluster
private org.apache.catalina.ha.tcp.ReplicationValve
replicationValve
cached replication valve cluster container!
protected org.apache.catalina.util.LifecycleSupport
lifecycle
The lifecycle event support for this component.
private int
maxActiveSessions
The maximum number of active Sessions allowed, or -1 for no limit.
private boolean
expireSessionsOnShutdown
private boolean
notifyListenersOnReplication
private boolean
notifySessionListenersOnReplication
private boolean
stateTransfered
private int
stateTransferTimeout
private boolean
sendAllSessions
private boolean
sendClusterDomainOnly
private int
sendAllSessionsSize
private int
sendAllSessionsWaitTime
wait time between send session block (default 2 sec)
private ArrayList
receivedMessageQueue
private boolean
receiverQueue
private boolean
stateTimestampDrop
private long
stateTransferCreateSendTime
int
rejectedSessions
private long
sessionReplaceCounter
long
processingTime
private long
counterReceive_EVT_GET_ALL_SESSIONS
private long
counterSend_EVT_ALL_SESSION_DATA
private long
counterReceive_EVT_ALL_SESSION_DATA
private long
counterReceive_EVT_SESSION_CREATED
private long
counterReceive_EVT_SESSION_EXPIRED
private long
counterReceive_EVT_SESSION_ACCESSED
private long
counterReceive_EVT_SESSION_DELTA
private long
counterSend_EVT_GET_ALL_SESSIONS
private long
counterSend_EVT_SESSION_CREATED
private long
counterSend_EVT_SESSION_DELTA
private long
counterSend_EVT_SESSION_ACCESSED
private long
counterSend_EVT_SESSION_EXPIRED
private int
counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE
private int
counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE
private int
counterNoStateTransfered
Constructors Summary
public DeltaManager()



    // ------------------------------------------------------------- Constructor
      
        super();
    
Methods Summary
public voidaddLifecycleListener(org.apache.catalina.LifecycleListener listener)
Add a lifecycle event listener to this component.

param
listener The listener to add

        lifecycle.addLifecycleListener(listener);
    
protected booleancheckSenderDomain(SessionMessage msg, org.apache.catalina.tribes.Member sender)
Test that sender and local domain is the same

        boolean sameDomain= true;
        if (!sameDomain && log.isWarnEnabled()) {
                log.warn(sm.getString("deltaManager.receiveMessage.fromWrongDomain",
                         new Object[] {getName(), 
                         msg.getEventTypeString(), 
                         sender,
                         "",
                         "" }));
        }
        return sameDomain ;
    
public org.apache.catalina.ha.ClusterManagercloneFromTemplate()

        DeltaManager result = new DeltaManager();
        result.name = "Clone-from-"+name;
        result.cluster = cluster;
        result.replicationValve = replicationValve;
        result.maxActiveSessions = maxActiveSessions;
        result.expireSessionsOnShutdown = expireSessionsOnShutdown;
        result.notifyListenersOnReplication = notifyListenersOnReplication;
        result.notifySessionListenersOnReplication = notifySessionListenersOnReplication;
        result.stateTransferTimeout = stateTransferTimeout;
        result.sendAllSessions = sendAllSessions;
        result.sendClusterDomainOnly = sendClusterDomainOnly ;
        result.sendAllSessionsSize = sendAllSessionsSize;
        result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ; 
        result.receiverQueue = receiverQueue ;
        result.stateTimestampDrop = stateTimestampDrop ;
        result.stateTransferCreateSendTime = stateTransferCreateSendTime; 
        return result;
    
public org.apache.catalina.SessioncreateEmptySession()
Create DeltaSession

see
org.apache.catalina.Manager#createEmptySession()

        return getNewDeltaSession() ;
    
public org.apache.catalina.SessioncreateSession(java.lang.String sessionId)
Construct and return a new session object, based on the default settings specified by this Manager's properties. The session id will be assigned by this method, and available via the getId() method of the returned session. If a new session cannot be created for any reason, return null.

exception
IllegalStateException if a new session cannot be instantiated for any reason Construct and return a new session object, based on the default settings specified by this Manager's properties. The session id will be assigned by this method, and available via the getId() method of the returned session. If a new session cannot be created for any reason, return null.
exception
IllegalStateException if a new session cannot be instantiated for any reason

        return createSession(sessionId, true);
    
public org.apache.catalina.SessioncreateSession(java.lang.String sessionId, boolean distribute)
create new session with check maxActiveSessions and send session creation to other cluster nodes.

param
distribute
return
The session

        if ((maxActiveSessions >= 0) && (sessions.size() >= maxActiveSessions)) {
            rejectedSessions++;
            throw new IllegalStateException(sm.getString("deltaManager.createSession.ise"));
        }
        DeltaSession session = (DeltaSession) super.createSession(sessionId) ;
        if (distribute) {
            sendCreateSession(session.getId(), session);
        }
        if (log.isDebugEnabled())
            log.debug(sm.getString("deltaManager.createSession.newSession",session.getId(), new Integer(sessions.size())));
        return (session);

    
protected DeltaRequestdeserializeDeltaRequest(DeltaSession session, byte[] data)
Load Deltarequest from external node Load the Class at container classloader

see
DeltaRequest#readExternal(java.io.ObjectInput)
param
session
param
data message data
return
The request
throws
ClassNotFoundException
throws
IOException

        ReplicationStream ois = getReplicationStream(data);
        session.getDeltaRequest().readExternal(ois);
        ois.close();
        return session.getDeltaRequest();
    
protected voiddeserializeSessions(byte[] data)
Load sessions from other cluster node. FIXME replace currently sessions with same id without notifcation. FIXME SSO handling is not really correct with the session replacement!

exception
ClassNotFoundException if a serialized class cannot be found during the reload
exception
IOException if an input/output error occurs


        // Initialize our internal data structures
        //sessions.clear(); //should not do this
        // Open an input stream to the specified pathname, if any
        ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
        ObjectInputStream ois = null;
        // Load the previously unloaded active sessions
        try {
            ois = getReplicationStream(data);
            Integer count = (Integer) ois.readObject();
            int n = count.intValue();
            for (int i = 0; i < n; i++) {
                DeltaSession session = (DeltaSession) createEmptySession();
                session.readObjectData(ois);
                session.setManager(this);
                session.setValid(true);
                session.setPrimarySession(false);
                //in case the nodes in the cluster are out of
                //time synch, this will make sure that we have the
                //correct timestamp, isValid returns true, cause
                // accessCount=1
                session.access();
                //make sure that the session gets ready to expire if
                // needed
                session.setAccessCount(0);
                session.resetDeltaRequest();
                // FIXME How inform other session id cache like SingleSignOn
                // increment sessionCounter to correct stats report
                if (findSession(session.getIdInternal()) == null ) {
                    sessionCounter++;
                } else {
                    sessionReplaceCounter++;
                    // FIXME better is to grap this sessions again !
                    if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.loading.existing.session",session.getIdInternal()));
                }
                add(session);
            }
        } catch (ClassNotFoundException e) {
            log.error(sm.getString("deltaManager.loading.cnfe", e), e);
            throw e;
        } catch (IOException e) {
            log.error(sm.getString("deltaManager.loading.ioe", e), e);
            throw e;
        } finally {
            // Close the input stream
            try {
                if (ois != null) ois.close();
            } catch (IOException f) {
                // ignored
            }
            ois = null;
            if (originalLoader != null) Thread.currentThread().setContextClassLoader(originalLoader);
        }

    
public booleandoDomainReplication()

return
Returns the sendClusterDomainOnly.

        return sendClusterDomainOnly;
    
public voidexpireAllLocalSessions()
Exipre all find sessions.

        long timeNow = System.currentTimeMillis();
        Session sessions[] = findSessions();
        int expireDirect  = 0 ;
        int expireIndirect = 0 ;
        
        if(log.isDebugEnabled()) log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length);
        for (int i = 0; i < sessions.length; i++) {
            if (sessions[i] instanceof DeltaSession) {
                DeltaSession session = (DeltaSession) sessions[i];
                if (session.isPrimarySession()) {
                    if (session.isValid()) {
                        session.expire();
                        expireDirect++;
                    } else {
                        expireIndirect++;
                    }//end if
                }//end if
            }//end if
        }//for
        long timeEnd = System.currentTimeMillis();
        if(log.isDebugEnabled()) log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect);
      
    
public org.apache.catalina.LifecycleListener[]findLifecycleListeners()
Get the lifecycle listeners associated with this lifecycle. If this Lifecycle has no listeners registered, a zero-length array is returned.

        return lifecycle.findLifecycleListeners();
    
protected org.apache.catalina.tribes.MemberfindSessionMasterMember()
Find the master of the session state

return
master member of sessions

        Member mbr = null;
        Member mbrs[] = cluster.getMembers();
        if(mbrs.length != 0 ) mbr = mbrs[0];
        if(mbr == null && log.isWarnEnabled()) log.warn(sm.getString("deltaManager.noMasterMember",getName(), ""));
        if(mbr != null && log.isDebugEnabled()) log.warn(sm.getString("deltaManager.foundMasterMember",getName(), mbr));
        return mbr;
    
public synchronized voidgetAllClusterSessions()
get from first session master the backup from all clustered sessions

see
#findSessionMasterMember()

        if (cluster != null && cluster.getMembers().length > 0) {
            long beforeSendTime = System.currentTimeMillis();
            Member mbr = findSessionMasterMember();
            if(mbr == null) { // No domain member found
                 return;
            }
            SessionMessage msg = new SessionMessageImpl(this.getName(),SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL","GET-ALL-" + getName());
            // set reference time
            stateTransferCreateSendTime = beforeSendTime ;
            // request session state
            counterSend_EVT_GET_ALL_SESSIONS++;
            stateTransfered = false ;
            // FIXME This send call block the deploy thread, when sender waitForAck is enabled
            try {
                synchronized(receivedMessageQueue) {
                     receiverQueue = true ;
                }
                cluster.send(msg, mbr);
                if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState",getName(), mbr));
                // FIXME At sender ack mode this method check only the state transfer and resend is a problem!
                waitForSendAllSessions(beforeSendTime);
            } finally {
                synchronized(receivedMessageQueue) {
                    for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) {
                        SessionMessage smsg = (SessionMessage) iter.next();
                        if (!stateTimestampDrop) {
                            messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
                        } else {
                            if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS && smsg.getTimestamp() >= stateTransferCreateSendTime) {
                                // FIXME handle EVT_GET_ALL_SESSIONS later
                                messageReceived(smsg,smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
                            } else {
                                if (log.isWarnEnabled()) {
                                    log.warn(sm.getString("deltaManager.dropMessage",getName(), smsg.getEventTypeString(),new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp())));
                                }
                            }
                        }
                    }        
                    receivedMessageQueue.clear();
                    receiverQueue = false ;
                }
           }
        } else {
            if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.noMembers", getName()));
        }
    
public org.apache.catalina.ha.CatalinaClustergetCluster()

        return cluster;
    
public intgetCounterNoStateTransfered()

return
Returns the counterNoStateTransfered.

        return counterNoStateTransfered;
    
public longgetCounterReceive_EVT_ALL_SESSION_DATA()

return
Returns the counterReceive_EVT_ALL_SESSION_DATA.

        return counterReceive_EVT_ALL_SESSION_DATA;
    
public intgetCounterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE()

return
Returns the counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE.

        return counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE;
    
public longgetCounterReceive_EVT_GET_ALL_SESSIONS()

return
Returns the counterReceive_EVT_GET_ALL_SESSIONS.

        return counterReceive_EVT_GET_ALL_SESSIONS;
    
public longgetCounterReceive_EVT_SESSION_ACCESSED()

return
Returns the counterReceive_EVT_SESSION_ACCESSED.

        return counterReceive_EVT_SESSION_ACCESSED;
    
public longgetCounterReceive_EVT_SESSION_CREATED()

return
Returns the counterReceive_EVT_SESSION_CREATED.

        return counterReceive_EVT_SESSION_CREATED;
    
public longgetCounterReceive_EVT_SESSION_DELTA()

return
Returns the counterReceive_EVT_SESSION_DELTA.

        return counterReceive_EVT_SESSION_DELTA;
    
public longgetCounterReceive_EVT_SESSION_EXPIRED()

return
Returns the counterReceive_EVT_SESSION_EXPIRED.

        return counterReceive_EVT_SESSION_EXPIRED;
    
public longgetCounterSend_EVT_ALL_SESSION_DATA()

return
Returns the counterSend_EVT_ALL_SESSION_DATA.

        return counterSend_EVT_ALL_SESSION_DATA;
    
public intgetCounterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE()

return
Returns the counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE.

        return counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE;
    
public longgetCounterSend_EVT_GET_ALL_SESSIONS()

return
Returns the counterSend_EVT_GET_ALL_SESSIONS.

        return counterSend_EVT_GET_ALL_SESSIONS;
    
public longgetCounterSend_EVT_SESSION_ACCESSED()

return
Returns the counterSend_EVT_SESSION_ACCESSED.

        return counterSend_EVT_SESSION_ACCESSED;
    
public longgetCounterSend_EVT_SESSION_CREATED()

return
Returns the counterSend_EVT_SESSION_CREATED.

        return counterSend_EVT_SESSION_CREATED;
    
public longgetCounterSend_EVT_SESSION_DELTA()

return
Returns the counterSend_EVT_SESSION_DELTA.

        return counterSend_EVT_SESSION_DELTA;
    
public longgetCounterSend_EVT_SESSION_EXPIRED()

return
Returns the counterSend_EVT_SESSION_EXPIRED.

        return counterSend_EVT_SESSION_EXPIRED;
    
public java.lang.StringgetInfo()
Return descriptive information about this Manager implementation and the corresponding version number, in the format <description>/<version>.

        return info;
    
public java.lang.String[]getInvalidatedSessions()
When the manager expires session not tied to a request. The cluster will periodically ask for a list of sessions that should expire and that should be sent across the wire.

return
The invalidated sessions array

        return new String[0];
    
public intgetMaxActiveSessions()
Return the maximum number of active Sessions allowed, or -1 for no limit.

        return (this.maxActiveSessions);
    
public java.lang.StringgetName()
Return the descriptive short name of this Manager implementation.

        return name;
    
protected DeltaSessiongetNewDeltaSession()
Get new session class to be used in the doLoad() method.

        return new DeltaSession(this);
    
public longgetProcessingTime()

return
Returns the processingTime.

        return processingTime;
    
public intgetReceivedQueueSize()

        return receivedMessageQueue.size() ;
    
public intgetRejectedSessions()
Number of session creations that failed due to maxActiveSessions

return
The count

        return rejectedSessions;
    
public intgetSendAllSessionsSize()

return
Returns the sendAllSessionsSize.

        return sendAllSessionsSize;
    
public intgetSendAllSessionsWaitTime()

return
Returns the sendAllSessionsWaitTime in msec

        return sendAllSessionsWaitTime;
    
public longgetSessionReplaceCounter()

return
Returns the sessionReplaceCounter.

        return sessionReplaceCounter;
    
public intgetStateTransferTimeout()

return
Returns the stateTransferTimeout.

        return stateTransferTimeout;
    
public booleangetStateTransfered()
is session state transfered complete?

        return stateTransfered;
    
protected voidhandleALL_SESSION_DATA(SessionMessage msg, org.apache.catalina.tribes.Member sender)
handle receive sessions from other not ( restart )

param
msg
param
sender
throws
ClassNotFoundException
throws
IOException

        counterReceive_EVT_ALL_SESSION_DATA++;
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin",getName()));
        byte[] data = msg.getSession();
        deserializeSessions(data);
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter",getName()));
        //stateTransferred = true;
    
protected voidhandleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, org.apache.catalina.tribes.Member sender)
handle receive session state is complete transfered

param
msg
param
sender

        counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++ ;
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.transfercomplete",getName(), sender.getHost(), new Integer(sender.getPort())));
        stateTransferCreateSendTime = msg.getTimestamp() ;
        stateTransfered = true ;
    
protected voidhandleGET_ALL_SESSIONS(SessionMessage msg, org.apache.catalina.tribes.Member sender)
handle receive that other node want all sessions ( restart ) a) send all sessions with one message b) send session at blocks After sending send state is complete transfered

param
msg
param
sender
throws
IOException

        counterReceive_EVT_GET_ALL_SESSIONS++;
        //get a list of all the session from this manager
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));
        // Write the number of active sessions, followed by the details
        // get all sessions and serialize without sync
        Session[] currentSessions = findSessions();
        long findSessionTimestamp = System.currentTimeMillis() ;
        if (isSendAllSessions()) {
            sendSessions(sender, currentSessions, findSessionTimestamp);
        } else {
            // send session at blocks
            int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length : getSendAllSessionsSize();
            Session[] sendSessions = new Session[len];
            for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
                len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize();
                System.arraycopy(currentSessions, i, sendSessions, 0, len);
                sendSessions(sender, sendSessions,findSessionTimestamp);
                if (getSendAllSessionsWaitTime() > 0) {
                    try {
                        Thread.sleep(getSendAllSessionsWaitTime());
                    } catch (Exception sleep) {
                    }
                }//end if
            }//for
        }//end if
        
        SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName());
        newmsg.setTimestamp(findSessionTimestamp);
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName()));
        counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
        cluster.send(newmsg, sender);
    
protected voidhandleSESSION_ACCESSED(SessionMessage msg, org.apache.catalina.tribes.Member sender)
handle receive session is access at other node ( primary session is now false)

param
msg
param
sender
throws
IOException

        counterReceive_EVT_SESSION_ACCESSED++;
        DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
        if (session != null) {
            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.accessed",getName(), msg.getSessionID()));
            session.access();
            session.setPrimarySession(false);
            session.endAccess();
        }
    
protected voidhandleSESSION_CREATED(SessionMessage msg, org.apache.catalina.tribes.Member sender)
handle receive new session is created at other node (create backup - primary false)

param
msg
param
sender

        counterReceive_EVT_SESSION_CREATED++;
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.createNewSession",getName(), msg.getSessionID()));
        DeltaSession session = (DeltaSession) createEmptySession();
        session.setManager(this);
        session.setValid(true);
        session.setPrimarySession(false);
        session.setCreationTime(msg.getTimestamp());
        session.access();
        if(notifySessionListenersOnReplication)
            session.setId(msg.getSessionID());
        else
            session.setIdInternal(msg.getSessionID());
        session.resetDeltaRequest();
        session.endAccess();

    
protected voidhandleSESSION_DELTA(SessionMessage msg, org.apache.catalina.tribes.Member sender)
handle receive session delta

param
msg
param
sender
throws
IOException
throws
ClassNotFoundException

        counterReceive_EVT_SESSION_DELTA++;
        byte[] delta = msg.getSession();
        DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
        if (session != null) {
            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.delta",getName(), msg.getSessionID()));
            DeltaRequest dreq = deserializeDeltaRequest(session, delta);
            dreq.execute(session, notifyListenersOnReplication);
            session.setPrimarySession(false);
        }
    
protected voidhandleSESSION_EXPIRED(SessionMessage msg, org.apache.catalina.tribes.Member sender)
handle receive session is expire at other node ( expire session also here)

param
msg
param
sender
throws
IOException

        counterReceive_EVT_SESSION_EXPIRED++;
        DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
        if (session != null) {
            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.expired",getName(), msg.getSessionID()));
            session.expire(notifySessionListenersOnReplication, false);
        }
    
public booleanisDefaultMode()

return
Returns the defaultMode.

        return defaultMode;
    
public booleanisExpireSessionsOnShutdown()

        return expireSessionsOnShutdown;
    
public booleanisNotifyListenersOnReplication()

        return notifyListenersOnReplication;
    
public booleanisNotifySessionListenersOnReplication()

return
Returns the notifySessionListenersOnReplication.

        return notifySessionListenersOnReplication;
    
public booleanisSendAllSessions()

return
Returns the sendAllSessions.

        return sendAllSessions;
    
public booleanisStateTimestampDrop()

return
Returns the stateTimestampDrop.

        return stateTimestampDrop;
    
public voidload()


    
public voidmessageDataReceived(org.apache.catalina.ha.ClusterMessage cmsg)
A message was received from another node, this is the callback method to implement if you are interested in receiving replication messages.

param
cmsg - the message received.

        if (cmsg != null && cmsg instanceof SessionMessage) {
            SessionMessage msg = (SessionMessage) cmsg;
            switch (msg.getEventType()) {
                case SessionMessage.EVT_GET_ALL_SESSIONS:
                case SessionMessage.EVT_SESSION_CREATED: 
                case SessionMessage.EVT_SESSION_EXPIRED: 
                case SessionMessage.EVT_SESSION_ACCESSED:
                case SessionMessage.EVT_SESSION_DELTA: {
                    synchronized(receivedMessageQueue) {
                        if(receiverQueue) {
                            receivedMessageQueue.add(msg);
                            return ;
                        }
                    }
                   break;
                }
                default: {
                    //we didn't queue, do nothing
                    break;
                }
            } //switch
            
            messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null);
        }
    
protected voidmessageReceived(SessionMessage msg, org.apache.catalina.tribes.Member sender)
This method is called by the received thread when a SessionMessage has been received from one of the other nodes in the cluster.

param
msg - the message received
param
sender - the sender of the message, this is used if we receive a EVT_GET_ALL_SESSION message, so that we only reply to the requesting node

        if(doDomainReplication() && !checkSenderDomain(msg,sender)) {
            return;
        }
        ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
        try {
            
            ClassLoader[] loaders = getClassLoaders();
            if ( loaders != null && loaders.length > 0) Thread.currentThread().setContextClassLoader(loaders[0]);
            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.eventType",getName(), msg.getEventTypeString(), sender));
 
            switch (msg.getEventType()) {
                case SessionMessage.EVT_GET_ALL_SESSIONS: {
                    handleGET_ALL_SESSIONS(msg,sender);
                    break;
                }
                case SessionMessage.EVT_ALL_SESSION_DATA: {
                    handleALL_SESSION_DATA(msg,sender);
                    break;
                }
                case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: {
                    handleALL_SESSION_TRANSFERCOMPLETE(msg,sender);
                    break;
                }
                case SessionMessage.EVT_SESSION_CREATED: {
                    handleSESSION_CREATED(msg,sender);
                    break;
                }
                case SessionMessage.EVT_SESSION_EXPIRED: {
                    handleSESSION_EXPIRED(msg,sender);
                    break;
                }
                case SessionMessage.EVT_SESSION_ACCESSED: {
                    handleSESSION_ACCESSED(msg,sender);
                    break;
                }
                case SessionMessage.EVT_SESSION_DELTA: {
                   handleSESSION_DELTA(msg,sender);
                   break;
                }
                default: {
                    //we didn't recognize the message type, do nothing
                    break;
                }
            } //switch
        } catch (Exception x) {
            log.error(sm.getString("deltaManager.receiveMessage.error",getName()), x);
        } finally {
            Thread.currentThread().setContextClassLoader(contextLoader);
        }
    
public voidpropertyChange(java.beans.PropertyChangeEvent event)
Process property change events from our associated Context.

param
event The property change event that has occurred


        // Validate the source of this event
        if (!(event.getSource() instanceof Context))
            return;
        // Process a relevant property change
        if (event.getPropertyName().equals("sessionTimeout")) {
            try {
                setMaxInactiveInterval(((Integer) event.getNewValue()).intValue() * 60);
            } catch (NumberFormatException e) {
                log.error(sm.getString("deltaManager.sessionTimeout", event.getNewValue()));
            }
        }

    
protected voidregisterSessionAtReplicationValve(DeltaSession session)
Register cross context session at replication valve thread local

param
session cross context session

        if(replicationValve == null) {
            if(container instanceof StandardContext && ((StandardContext)container).getCrossContext()) {
                Cluster cluster = getCluster() ;
                if(cluster != null && cluster instanceof CatalinaCluster) {
                    Valve[] valves = ((CatalinaCluster)cluster).getValves();
                    if(valves != null && valves.length > 0) {
                        for(int i=0; replicationValve == null && i < valves.length ; i++ ){
                            if(valves[i] instanceof ReplicationValve) replicationValve = (ReplicationValve)valves[i] ;
                        }//for

                        if(replicationValve == null && log.isDebugEnabled()) {
                            log.debug("no ReplicationValve found for CrossContext Support");
                        }//endif 
                    }//end if
                }//endif
            }//end if
        }//end if
        if(replicationValve != null) {
            replicationValve.registerReplicationSession(session);
        }
    
public voidremoveLifecycleListener(org.apache.catalina.LifecycleListener listener)
Remove a lifecycle event listener from this component.

param
listener The listener to remove

        lifecycle.removeLifecycleListener(listener);
    
public org.apache.catalina.ha.ClusterMessagerequestCompleted(java.lang.String sessionId)
When the request has been completed, the replication valve will notify the manager, and the manager will decide whether any replication is needed or not. If there is a need for replication, the manager will create a session message and that will be replicated. The cluster determines where it gets sent.

param
sessionId - the sessionId that just completed.
return
a SessionMessage to be sent,

        try {
            DeltaSession session = (DeltaSession) findSession(sessionId);
            DeltaRequest deltaRequest = session.getDeltaRequest();
            SessionMessage msg = null;
            boolean isDeltaRequest = false ;
            synchronized(deltaRequest) {
                isDeltaRequest = deltaRequest.getSize() > 0 ;
                if (isDeltaRequest) {    
                    counterSend_EVT_SESSION_DELTA++;
                    byte[] data = serializeDeltaRequest(deltaRequest);
                    msg = new SessionMessageImpl(getName(),
                                                 SessionMessage.EVT_SESSION_DELTA, 
                                                 data, 
                                                 sessionId,
                                                 sessionId + "-" + System.currentTimeMillis());
                    session.resetDeltaRequest();
                }  
            }
            if(!isDeltaRequest) {
                if(!session.isPrimarySession()) {               
                    counterSend_EVT_SESSION_ACCESSED++;
                    msg = new SessionMessageImpl(getName(),
                                                 SessionMessage.EVT_SESSION_ACCESSED, 
                                                 null, 
                                                 sessionId,
                                                 sessionId + "-" + System.currentTimeMillis());
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString("deltaManager.createMessage.accessChangePrimary",getName(), sessionId));
                    }
                }    
            } else { // log only outside synch block!
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("deltaManager.createMessage.delta",getName(), sessionId));
                }
            }
            session.setPrimarySession(true);
            //check to see if we need to send out an access message
            if ((msg == null)) {
                long replDelta = System.currentTimeMillis() - session.getLastTimeReplicated();
                if (replDelta > (getMaxInactiveInterval() * 1000)) {
                    counterSend_EVT_SESSION_ACCESSED++;
                    msg = new SessionMessageImpl(getName(),
                                                 SessionMessage.EVT_SESSION_ACCESSED, 
                                                 null,
                                                 sessionId, 
                                                 sessionId + "-" + System.currentTimeMillis());
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString("deltaManager.createMessage.access", getName(),sessionId));
                    }
                }

            }

            //update last replicated time
            if (msg != null) session.setLastTimeReplicated(System.currentTimeMillis());
            return msg;
        } catch (IOException x) {
            log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest",sessionId), x);
            return null;
        }

    
public synchronized voidresetStatistics()
Reset manager statistics

        processingTime = 0 ;
        expiredSessions = 0 ;
        rejectedSessions = 0 ;
        sessionReplaceCounter = 0 ;
        counterNoStateTransfered = 0 ;
        maxActive = getActiveSessions() ;
        sessionCounter = getActiveSessions() ;
        counterReceive_EVT_ALL_SESSION_DATA = 0;
        counterReceive_EVT_GET_ALL_SESSIONS = 0;
        counterReceive_EVT_SESSION_ACCESSED = 0 ;
        counterReceive_EVT_SESSION_CREATED = 0 ;
        counterReceive_EVT_SESSION_DELTA = 0 ;
        counterReceive_EVT_SESSION_EXPIRED = 0 ;
        counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
        counterSend_EVT_ALL_SESSION_DATA = 0;
        counterSend_EVT_GET_ALL_SESSIONS = 0;
        counterSend_EVT_SESSION_ACCESSED = 0 ;
        counterSend_EVT_SESSION_CREATED = 0 ;
        counterSend_EVT_SESSION_DELTA = 0 ;
        counterSend_EVT_SESSION_EXPIRED = 0 ;
        counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
        
    
protected voidsend(SessionMessage msg)
Send messages to other backup member (domain or all)

param
msg Session message

        if(cluster != null) {
            if(doDomainReplication())
                cluster.sendClusterDomain(msg);
            else
                cluster.send(msg);
        }
    
protected voidsendCreateSession(java.lang.String sessionId, DeltaSession session)
Send create session evt to all backup node

param
sessionId
param
session

        if(cluster.getMembers().length > 0 ) {
            SessionMessage msg = 
                new SessionMessageImpl(getName(),
                                       SessionMessage.EVT_SESSION_CREATED, 
                                       null, 
                                       sessionId,
                                       sessionId + "-" + System.currentTimeMillis());
            if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.sendMessage.newSession",name, sessionId));
            msg.setTimestamp(session.getCreationTime());
            counterSend_EVT_SESSION_CREATED++;
            send(msg);
        }
    
protected voidsendSessions(org.apache.catalina.tribes.Member sender, org.apache.catalina.Session[] currentSessions, long sendTimestamp)
send a block of session to sender

param
sender
param
currentSessions
param
sendTimestamp
throws
IOException

        byte[] data = serializeSessions(currentSessions);
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingAfter",getName()));
        SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_DATA, data,"SESSION-STATE", "SESSION-STATE-" + getName());
        newmsg.setTimestamp(sendTimestamp);
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionData",getName()));
        counterSend_EVT_ALL_SESSION_DATA++;
        cluster.send(newmsg, sender);
    
protected byte[]serializeDeltaRequest(DeltaRequest deltaRequest)
serialize DeltaRequest

see
DeltaRequest#writeExternal(java.io.ObjectOutput)
param
deltaRequest
return
serialized delta request
throws
IOException

        return deltaRequest.serialize();
    
protected byte[]serializeSessions(org.apache.catalina.Session[] currentSessions)
Save any currently active sessions in the appropriate persistence mechanism, if any. If persistence is not supported, this method returns without doing anything.

exception
IOException if an input/output error occurs


        // Open an output stream to the specified pathname, if any
        ByteArrayOutputStream fos = null;
        ObjectOutputStream oos = null;

        try {
            fos = new ByteArrayOutputStream();
            oos = new ObjectOutputStream(new BufferedOutputStream(fos));
            oos.writeObject(new Integer(currentSessions.length));
            for(int i=0 ; i < currentSessions.length;i++) {
                ((DeltaSession)currentSessions[i]).writeObjectData(oos);                
            }
            // Flush and close the output stream
            oos.flush();
        } catch (IOException e) {
            log.error(sm.getString("deltaManager.unloading.ioe", e), e);
            throw e;
        } finally {
            if (oos != null) {
                try {
                    oos.close();
                } catch (IOException f) {
                    ;
                }
                oos = null;
            }
        }
        // send object data as byte[]
        return fos.toByteArray();
    
protected voidsessionExpired(java.lang.String id)
send session expired to other cluster nodes

param
id session id

        counterSend_EVT_SESSION_EXPIRED++ ;
        SessionMessage msg = new SessionMessageImpl(getName(),SessionMessage.EVT_SESSION_EXPIRED, null, id, id+ "-EXPIRED-MSG");
        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.expire",getName(), id));
        send(msg);
    
public voidsetCluster(org.apache.catalina.ha.CatalinaCluster cluster)

        this.cluster = cluster;
    
public voidsetContainer(org.apache.catalina.Container container)
Set the Container with which this Manager has been associated. If it is a Context (the usual case), listen for changes to the session timeout property.

param
container The associated Container

        // De-register from the old Container (if any)
        if ((this.container != null) && (this.container instanceof Context))
            ((Context) this.container).removePropertyChangeListener(this);

        // Default processing provided by our superclass
        super.setContainer(container);

        // Register with the new Container (if any)
        if ((this.container != null) && (this.container instanceof Context)) {
            setMaxInactiveInterval(((Context) this.container).getSessionTimeout() * 60);
            ((Context) this.container).addPropertyChangeListener(this);
        }

    
public voidsetDefaultMode(boolean defaultMode)

param
defaultMode The defaultMode to set.

        this.defaultMode = defaultMode;
    
public voidsetDomainReplication(boolean sendClusterDomainOnly)

param
sendClusterDomainOnly The sendClusterDomainOnly to set.

        this.sendClusterDomainOnly = sendClusterDomainOnly;
    
public voidsetExpireSessionsOnShutdown(boolean expireSessionsOnShutdown)

        this.expireSessionsOnShutdown = expireSessionsOnShutdown;
    
public voidsetMaxActiveSessions(int max)
Set the maximum number of actives Sessions allowed, or -1 for no limit.

param
max The new maximum number of sessions

        int oldMaxActiveSessions = this.maxActiveSessions;
        this.maxActiveSessions = max;
        support.firePropertyChange("maxActiveSessions", new Integer(oldMaxActiveSessions), new Integer(this.maxActiveSessions));
    
public voidsetName(java.lang.String name)

        this.name = name;
    
public voidsetNotifyListenersOnReplication(boolean notifyListenersOnReplication)

        this.notifyListenersOnReplication = notifyListenersOnReplication;
    
public voidsetNotifySessionListenersOnReplication(boolean notifyListenersCreateSessionOnReplication)

param
notifyListenersCreateSessionOnReplication The notifySessionListenersOnReplication to set.

        this.notifySessionListenersOnReplication = notifyListenersCreateSessionOnReplication;
    
public voidsetRejectedSessions(int rejectedSessions)

        this.rejectedSessions = rejectedSessions;
    
public voidsetSendAllSessions(boolean sendAllSessions)

param
sendAllSessions The sendAllSessions to set.

        this.sendAllSessions = sendAllSessions;
    
public voidsetSendAllSessionsSize(int sendAllSessionsSize)

param
sendAllSessionsSize The sendAllSessionsSize to set.

        this.sendAllSessionsSize = sendAllSessionsSize;
    
public voidsetSendAllSessionsWaitTime(int sendAllSessionsWaitTime)

param
sendAllSessionsWaitTime The sendAllSessionsWaitTime to set at msec.

        this.sendAllSessionsWaitTime = sendAllSessionsWaitTime;
    
public voidsetStateTimestampDrop(boolean isTimestampDrop)

param
isTimestampDrop The new flag value

        this.stateTimestampDrop = isTimestampDrop;
    
public voidsetStateTransferTimeout(int timeoutAllSession)

param
timeoutAllSession The timeout

        this.stateTransferTimeout = timeoutAllSession;
    
public voidsetStateTransfered(boolean stateTransfered)
set that state ist complete transfered

param
stateTransfered

        this.stateTransfered = stateTransfered;
    
public voidstart()
Prepare for the beginning of active use of the public methods of this component. This method should be called after configure(), and before any of the public methods of the component are utilized.

exception
LifecycleException if this component detects a fatal error that prevents this component from being used

        if (!initialized) init();

        // Validate and update our current component state
        if (started) {
            return;
        }
        started = true;
        lifecycle.fireLifecycleEvent(START_EVENT, null);

        // Force initialization of the random number generator
        generateSessionId();

        // Load unloaded sessions, if any
        try {
            //the channel is already running
            Cluster cluster = getCluster() ;
            // stop remove cluster binding
            //wow, how many nested levels of if statements can we have ;)
            if(cluster == null) {
                Container context = getContainer() ;
                if(context != null && context instanceof Context) {
                     Container host = context.getParent() ;
                     if(host != null && host instanceof Host) {
                         cluster = host.getCluster();
                         if(cluster != null && cluster instanceof CatalinaCluster) {
                             setCluster((CatalinaCluster) cluster) ;
                         } else {
                             Container engine = host.getParent() ;
                             if(engine != null && engine instanceof Engine) {
                                 cluster = engine.getCluster();
                                 if(cluster != null && cluster instanceof CatalinaCluster) {
                                     setCluster((CatalinaCluster) cluster) ;
                                 }
                             } else {
                                     cluster = null ;
                             }
                         }
                     }
                }
            }
            if (cluster == null) {
                log.error(sm.getString("deltaManager.noCluster", getName()));
                return;
            } else {
                if (log.isInfoEnabled()) {
                    String type = "unknown" ;
                    if( cluster.getContainer() instanceof Host){
                        type = "Host" ;
                    } else if( cluster.getContainer() instanceof Engine){
                        type = "Engine" ;
                    }
                    log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
                }
            }
            if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName()));
            //to survice context reloads, as only a stop/start is called, not
            // createManager
            cluster.registerManager(this);

            getAllClusterSessions();

        } catch (Throwable t) {
            log.error(sm.getString("deltaManager.managerLoad"), t);
        }
    
public voidstop()
Gracefully terminate the active use of the public methods of this component. This method should be the last one called on a given instance of this component.

exception
LifecycleException if this component detects a fatal error that needs to be reported


        if (log.isDebugEnabled())
            log.debug(sm.getString("deltaManager.stopped", getName()));


        // Validate and update our current component state
        if (!started)
            throw new LifecycleException(sm.getString("deltaManager.notStarted"));
        lifecycle.fireLifecycleEvent(STOP_EVENT, null);
        started = false;

        // Expire all active sessions
        if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.expireSessions", getName()));
        Session sessions[] = findSessions();
        for (int i = 0; i < sessions.length; i++) {
            DeltaSession session = (DeltaSession) sessions[i];
            if (!session.isValid())
                continue;
            try {
                session.expire(true, isExpireSessionsOnShutdown());
            } catch (Throwable ignore) {
                ;
            } 
        }

        // Require a new random number generator if we are restarted
        this.random = null;
        getCluster().removeManager(this);
        replicationValve = null;
        if (initialized) {
            destroy();
        }
    
public voidunload()


    
protected voidwaitForSendAllSessions(long beforeSendTime)
Wait that cluster session state is transfer or timeout after 60 Sec With stateTransferTimeout == -1 wait that backup is transfered (forever mode)

        long reqStart = System.currentTimeMillis();
        long reqNow = reqStart ;
        boolean isTimeout = false;
        if(getStateTransferTimeout() > 0) {
            // wait that state is transfered with timeout check
            do {
                try {
                    Thread.sleep(100);
                } catch (Exception sleep) {
                    //
                }
                reqNow = System.currentTimeMillis();
                isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout()));
            } while ((!getStateTransfered()) && (!isTimeout));
        } else {
            if(getStateTransferTimeout() == -1) {
                // wait that state is transfered
                do {
                    try {
                        Thread.sleep(100);
                    } catch (Exception sleep) {
                    }
                } while ((!getStateTransfered()));
                reqNow = System.currentTimeMillis();
            }
        }
        if (isTimeout || (!getStateTransfered())) {
            counterNoStateTransfered++ ;
            log.error(sm.getString("deltaManager.noSessionState",getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime)));
        } else {
            if (log.isInfoEnabled())
                log.info(sm.getString("deltaManager.sessionReceived",getName(), new Date(beforeSendTime), new Long(reqNow - beforeSendTime)));
        }