FileDocCategorySizeDatePackage
SimpleTcpReplicationManager.javaAPI DocApache Tomcat 6.0.1427735Fri Jul 20 04:20:32 BST 2007org.apache.catalina.ha.session

SimpleTcpReplicationManager

public class SimpleTcpReplicationManager extends org.apache.catalina.session.StandardManager implements org.apache.catalina.ha.ClusterManager
Title: Tomcat Session Replication for Tomcat 4.0
Description: A very simple straight forward implementation of session replication of servers in a cluster.
This session replication is implemented "live". By live I mean, when a session attribute is added into a session on Node A a message is broadcasted to other messages and setAttribute is called on the replicated sessions.
A full description of this implementation can be found under Filip's Tomcat Page
Copyright: See apache license Company: www.filip.net
author
Filip Hanik
author
Bela Ban (modifications for synchronous replication)
version
1.0 for TC 4.0 Description: The InMemoryReplicationManager is a session manager that replicated session information in memory.

The InMemoryReplicationManager extends the StandardManager hence it allows for us to inherit all the basic session management features like expiration, session listeners etc

To communicate with other nodes in the cluster, the InMemoryReplicationManager sends out 7 different type of multicast messages all defined in the SessionMessage class.
When a session is replicated (not an attribute added/removed) the session is serialized into a byte array using the StandardSession.readObjectData, StandardSession.writeObjectData methods.

Fields Summary
public static org.apache.juli.logging.Log
log
protected String
mChannelConfig
protected String
mGroupName
protected boolean
mChannelStarted
protected boolean
mPrintToScreen
protected boolean
defaultMode
protected boolean
mManagerRunning
protected boolean
synchronousReplication
Use synchronous rather than asynchronous replication. Every session modification (creation, change, removal etc) will be sent to all members. The call will then wait for max milliseconds, or forever (if timeout is 0) for all responses.
protected boolean
mExpireSessionsOnShutdown
Set to true if we don't want the sessions to expire on shutdown
protected boolean
useDirtyFlag
protected String
name
protected boolean
distributable
protected org.apache.catalina.ha.CatalinaCluster
cluster
protected HashMap
invalidatedSessions
protected boolean
stateTransferred
Flag to keep track if the state has been transferred or not Assumes false.
private boolean
notifyListenersOnReplication
private boolean
sendClusterDomainOnly
Constructors Summary
public SimpleTcpReplicationManager()
Constructor, just calls super()


             
     
    
        super();
    
Methods Summary
public org.apache.catalina.ha.ClusterManagercloneFromTemplate()

        throw new UnsupportedOperationException();
    
protected org.apache.catalina.SessioncreateSession(java.lang.String sessionId, boolean notify, boolean setId)
Creates a HTTP session. Most of the code in here is copied from the StandardManager. This is not pretty, yeah I know, but it was necessary since the StandardManager had hard coded the session instantiation to the a StandardSession, when we actually want to instantiate a ReplicatedSession
If the call comes from the Tomcat servlet engine, a SessionMessage goes out to the other nodes in the cluster that this session has been created.

param
notify - if set to true the other nodes in the cluster will be notified. This flag is needed so that we can create a session before we deserialize a replicated one
see
ReplicatedSession


        //inherited from the basic manager
        if ((getMaxActiveSessions() >= 0) &&
           (sessions.size() >= getMaxActiveSessions()))
            throw new IllegalStateException(sm.getString("standardManager.createSession.ise"));


        Session session = new ReplicatedSession(this);

        // Initialize the properties of the new session and return it
        session.setNew(true);
        session.setValid(true);
        session.setCreationTime(System.currentTimeMillis());
        session.setMaxInactiveInterval(this.maxInactiveInterval);
        if(sessionId == null)
            sessionId = generateSessionId();
        if ( setId ) session.setId(sessionId);
        if ( notify && (cluster!=null) ) {
            ((ReplicatedSession)session).setIsDirty(true);
        }
        return (session);
    
public org.apache.catalina.SessioncreateSession(java.lang.String sessionId)
Construct and return a new session object, based on the default settings specified by this Manager's properties. The session id will be assigned by this method, and available via the getId() method of the returned session. If a new session cannot be created for any reason, return null.

exception
IllegalStateException if a new session cannot be instantiated for any reason

        //create a session and notify the other nodes in the cluster
        Session session =  createSession(sessionId,getDistributable(),true);
        add(session);
        return session;
    
public booleandoDomainReplication()

        return sendClusterDomainOnly;
    
public org.apache.catalina.ha.CatalinaClustergetCluster()

        return cluster;
    
public booleangetDistributable()

        return distributable;
    
public booleangetExpireSessionsOnShutdown()

        return mExpireSessionsOnShutdown;
    
public java.lang.String[]getInvalidatedSessions()

        synchronized ( invalidatedSessions ) {
            String[] result = new String[invalidatedSessions.size()];
            invalidatedSessions.values().toArray(result);
            return result;
        }

    
public java.lang.StringgetName()

        return this.name;
    
public org.apache.catalina.tribes.io.ReplicationStreamgetReplicationStream(byte[] data)
Open Stream and use correct ClassLoader (Container) Switch ThreadClassLoader

param
data
return
The object input stream
throws
IOException

        return getReplicationStream(data,0,data.length);
    
public org.apache.catalina.tribes.io.ReplicationStreamgetReplicationStream(byte[] data, int offset, int length)

        ByteArrayInputStream fis =null;
        ReplicationStream ois = null;
        Loader loader = null;
        ClassLoader classLoader = null;
        //fix to be able to run the DeltaManager
        //stand alone without a container.
        //use the Threads context class loader
        if (container != null)
            loader = container.getLoader();
        if (loader != null)
            classLoader = loader.getClassLoader();
        else
            classLoader = Thread.currentThread().getContextClassLoader();
        //end fix
        fis = new ByteArrayInputStream(data, offset, length);
        if ( classLoader == Thread.currentThread().getContextClassLoader() ) {
            ois = new ReplicationStream(fis, new ClassLoader[] {classLoader});
        } else {
            ois = new ReplicationStream(fis, new ClassLoader[] {classLoader,Thread.currentThread().getContextClassLoader()});
        }
        return ois;
    
public booleanisDefaultMode()

return
Returns the defaultMode.

        return defaultMode;
    
public booleanisManagerRunning()

        return mManagerRunning;
    
public booleanisNotifyListenersOnReplication()

        return notifyListenersOnReplication;
    
public booleanisStateTransferred()

        return stateTransferred;
    
public voidmessageDataReceived(org.apache.catalina.ha.ClusterMessage cmsg)

        try {
            if ( cmsg instanceof SessionMessage ) {
                SessionMessage msg = (SessionMessage)cmsg;
                messageReceived(msg,
                                msg.getAddress() != null ? (Member) msg.getAddress() : null);
            }
        } catch(Throwable ex){
            log.error("InMemoryReplicationManager.messageDataReceived()", ex);
        }//catch
    
protected voidmessageReceived(SessionMessage msg, org.apache.catalina.tribes.Member sender)
This method is called by the received thread when a SessionMessage has been received from one of the other nodes in the cluster.

param
msg - the message received
param
sender - the sender of the message, this is used if we receive a EVT_GET_ALL_SESSION message, so that we only reply to the requesting node

        try  {
            if(log.isInfoEnabled()) {
                log.debug("Received SessionMessage of type="+msg.getEventTypeString());
                log.debug("Received SessionMessage sender="+sender);
            }
            switch ( msg.getEventType() ) {
                case SessionMessage.EVT_GET_ALL_SESSIONS: {
                    //get a list of all the session from this manager
                    Object[] sessions = findSessions();
                    java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream();
                    java.io.ObjectOutputStream oout = new java.io.ObjectOutputStream(bout);
                    oout.writeInt(sessions.length);
                    for (int i=0; i<sessions.length; i++){
                        ReplicatedSession ses = (ReplicatedSession)sessions[i];
                        oout.writeUTF(ses.getIdInternal());
                        byte[] data = writeSession(ses);
                        oout.writeObject(data);
                    }//for
                    //don't send a message if we don't have to
                    oout.flush();
                    oout.close();
                    byte[] data = bout.toByteArray();
                    SessionMessage newmsg = new SessionMessageImpl(name,
                        SessionMessage.EVT_ALL_SESSION_DATA,
                        data, "SESSION-STATE","SESSION-STATE-"+getName());
                    cluster.send(newmsg, sender);
                    break;
                }
                case SessionMessage.EVT_ALL_SESSION_DATA: {
                    java.io.ByteArrayInputStream bin =
                        new java.io.ByteArrayInputStream(msg.getSession());
                    java.io.ObjectInputStream oin = new java.io.ObjectInputStream(bin);
                    int size = oin.readInt();
                    for ( int i=0; i<size; i++) {
                        String id = oin.readUTF();
                        byte[] data = (byte[])oin.readObject();
                        Session session = readSession(data,id);
                    }//for
                    stateTransferred=true;
                    break;
                }
                case SessionMessage.EVT_SESSION_CREATED: {
                    Session session = this.readSession(msg.getSession(),msg.getSessionID());
                    if ( log.isDebugEnabled() ) {
                        log.debug("Received replicated session=" + session +
                            " isValid=" + session.isValid());
                    }
                    break;
                }
                case SessionMessage.EVT_SESSION_EXPIRED: {
                    Session session = findSession(msg.getSessionID());
                    if ( session != null ) {
                        session.expire();
                        this.remove(session);
                    }//end if
                    break;
                }
                case SessionMessage.EVT_SESSION_ACCESSED :{
                    Session session = findSession(msg.getSessionID());
                    if ( session != null ) {
                        session.access();
                        session.endAccess();
                    }
                    break;
                }
                default:  {
                    //we didn't recognize the message type, do nothing
                    break;
                }
            }//switch
        }
        catch ( Exception x )
        {
            log.error("Unable to receive message through TCP channel",x);
        }
    
protected org.apache.catalina.SessionreadSession(byte[] data, java.lang.String sessionId)
Reinstantiates a serialized session from the data passed in. This will first call createSession() so that we get a fresh instance with all the managers set and all the transient fields validated. Then it calls Session.readObjectData(byte[]) to deserialize the object

param
data - a byte array containing session data
return
a valid Session object, null if an error occurs

        try
        {
            ReplicationStream session_in = getReplicationStream(data);

            Session session = sessionId!=null?this.findSession(sessionId):null;
            boolean isNew = (session==null);
            //clear the old values from the existing session
            if ( session!=null ) {
                ReplicatedSession rs = (ReplicatedSession)session;
                rs.expire(false);  //cleans up the previous values, since we are not doing removes
                session = null;
            }//end if

            if (session==null) {
                session = createSession(null,false, false);
                sessions.remove(session.getIdInternal());
            }
            
            
            boolean hasPrincipal = session_in.readBoolean();
            SerializablePrincipal p = null;
            if ( hasPrincipal )
                p = (SerializablePrincipal)session_in.readObject();
            ((ReplicatedSession)session).readObjectData(session_in);
            if ( hasPrincipal )
                session.setPrincipal(p.getPrincipal(getContainer().getRealm()));
            ((ReplicatedSession)session).setId(sessionId,isNew);
            ReplicatedSession rsession = (ReplicatedSession)session; 
            rsession.setAccessCount(1);
            session.setManager(this);
            session.setValid(true);
            rsession.setLastAccessedTime(System.currentTimeMillis());
            rsession.setThisAccessedTime(System.currentTimeMillis());
            ((ReplicatedSession)session).setAccessCount(0);
            session.setNew(false);
            if(log.isTraceEnabled())
                 log.trace("Session loaded id="+sessionId +
                               " actualId="+session.getId()+ 
                               " exists="+this.sessions.containsKey(sessionId)+
                               " valid="+rsession.isValid());
            return session;

        }
        catch ( Exception x )
        {
            log.error("Failed to deserialize the session!",x);
        }
        return null;
    
public org.apache.catalina.ha.ClusterMessagerequestCompleted(java.lang.String sessionId)

        if (  !getDistributable() ) {
            log.warn("Received requestCompleted message, although this context["+
                     getName()+"] is not distributable. Ignoring message");
            return null;
        }
        try
        {
            if ( invalidatedSessions.get(sessionId) != null ) {
                synchronized ( invalidatedSessions ) {
                    invalidatedSessions.remove(sessionId);
                    SessionMessage msg = new SessionMessageImpl(name,
                    SessionMessage.EVT_SESSION_EXPIRED,
                    null,
                    sessionId,
                    sessionId);
                return msg;
                }
            } else {
                ReplicatedSession session = (ReplicatedSession) findSession(
                    sessionId);
                if (session != null) {
                    //return immediately if the session is not dirty
                    if (useDirtyFlag && (!session.isDirty())) {
                        //but before we return doing nothing,
                        //see if we should send
                        //an updated last access message so that
                        //sessions across cluster dont expire
                        long interval = session.getMaxInactiveInterval();
                        long lastaccdist = System.currentTimeMillis() -
                            session.getLastAccessWasDistributed();
                        if ( ((interval*1000) / lastaccdist)< 3 ) {
                            SessionMessage accmsg = new SessionMessageImpl(name,
                                SessionMessage.EVT_SESSION_ACCESSED,
                                null,
                                sessionId,
                                sessionId);
                            session.setLastAccessWasDistributed(System.currentTimeMillis());
                            return accmsg;
                        }
                        return null;
                    }

                    session.setIsDirty(false);
                    if (log.isDebugEnabled()) {
                        try {
                            log.debug("Sending session to cluster=" + session);
                        }
                        catch (Exception ignore) {}
                    }
                    SessionMessage msg = new SessionMessageImpl(name,
                        SessionMessage.EVT_SESSION_CREATED,
                        writeSession(session),
                        session.getIdInternal(),
                        session.getIdInternal());
                    return msg;
                } //end if
            }//end if
        }
        catch (Exception x )
        {
            log.error("Unable to replicate session",x);
        }
        return null;
    
public voidsessionInvalidated(java.lang.String sessionId)

        synchronized ( invalidatedSessions ) {
            invalidatedSessions.put(sessionId, sessionId);
        }
    
public voidsetCluster(org.apache.catalina.ha.CatalinaCluster cluster)

        if(log.isDebugEnabled())
            log.debug("Cluster associated with SimpleTcpReplicationManager");
        this.cluster = cluster;
    
public voidsetDefaultMode(boolean defaultMode)

param
defaultMode The defaultMode to set.

        this.defaultMode = defaultMode;
    
public voidsetDistributable(boolean dist)

        this.distributable = dist;
    
public voidsetDomainReplication(boolean sendClusterDomainOnly)

param
sendClusterDomainOnly The sendClusterDomainOnly to set.

        this.sendClusterDomainOnly = sendClusterDomainOnly;
    
public voidsetExpireSessionsOnShutdown(boolean expireSessionsOnShutdown)

        mExpireSessionsOnShutdown = expireSessionsOnShutdown;
    
public voidsetName(java.lang.String name)

        this.name = name;
    
public voidsetNotifyListenersOnReplication(boolean notifyListenersOnReplication)

        this.notifyListenersOnReplication = notifyListenersOnReplication;
    
public voidsetPrintToScreen(boolean printtoscreen)

        if(log.isDebugEnabled())
            log.debug("Setting screen debug to:"+printtoscreen);
        mPrintToScreen = printtoscreen;
    
public voidsetSynchronousReplication(boolean flag)

        synchronousReplication=flag;
    
public voidsetUseDirtyFlag(boolean usedirtyflag)

        this.useDirtyFlag = usedirtyflag;
    
public voidstart()
Prepare for the beginning of active use of the public methods of this component. This method should be called after configure(), and before any of the public methods of the component are utilized.
Starts the cluster communication channel, this will connect with the other nodes in the cluster, and request the current session state to be transferred to this node.

exception
IllegalStateException if this component has already been started
exception
LifecycleException if this component detects a fatal error that prevents this component from being used

        mManagerRunning = true;
        super.start();
        try {
            //the channel is already running
            if ( mChannelStarted ) return;
            if(log.isInfoEnabled())
                log.info("Starting clustering manager...:"+getName());
            if ( cluster == null ) {
                log.error("Starting... no cluster associated with this context:"+getName());
                return;
            }
            cluster.registerManager(this);

            if (cluster.getMembers().length > 0) {
                Member mbr = cluster.getMembers()[0];
                SessionMessage msg =
                    new SessionMessageImpl(this.getName(),
                                       SessionMessage.EVT_GET_ALL_SESSIONS,
                                       null,
                                       "GET-ALL",
                                       "GET-ALL-"+this.getName());
                cluster.send(msg, mbr);
                if(log.isWarnEnabled())
                     log.warn("Manager["+getName()+"], requesting session state from "+mbr+
                         ". This operation will timeout if no session state has been received within "+
                         "60 seconds");
                long reqStart = System.currentTimeMillis();
                long reqNow = 0;
                boolean isTimeout=false;
                do {
                    try {
                        Thread.sleep(100);
                    }catch ( Exception sleep) {}
                    reqNow = System.currentTimeMillis();
                    isTimeout=((reqNow-reqStart)>(1000*60));
                } while ( (!isStateTransferred()) && (!isTimeout));
                if ( isTimeout || (!isStateTransferred()) ) {
                    log.error("Manager["+getName()+"], No session state received, timing out.");
                }else {
                    if(log.isInfoEnabled())
                        log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms.");
                }
            } else {
                if(log.isInfoEnabled())
                    log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group.");
            }//end if
            mChannelStarted = true;
        }  catch ( Exception x ) {
            log.error("Unable to start SimpleTcpReplicationManager",x);
        }
    
public voidstop()
Gracefully terminate the active use of the public methods of this component. This method should be the last one called on a given instance of this component.
This will disconnect the cluster communication channel and stop the listener thread.

exception
IllegalStateException if this component has not been started
exception
LifecycleException if this component detects a fatal error that needs to be reported

        mManagerRunning = false;
        mChannelStarted = false;
        super.stop();
        try
        {
            this.sessions.clear();
            cluster.removeManager(this);
        }
        catch ( Exception x )
        {
            log.error("Unable to stop SimpleTcpReplicationManager",x);
        }
    
public voidunload()
Override persistence since they don't go hand in hand with replication for now.

        if ( !getDistributable() ) {
            super.unload();
        }
    
protected byte[]writeSession(org.apache.catalina.Session session)
Serialize a session into a byte array
This method simple calls the writeObjectData method on the session and returns the byte data from that call

param
session - the session to be serialized
return
a byte array containing the session data, null if the serialization failed

        try
        {
            java.io.ByteArrayOutputStream session_data = new java.io.ByteArrayOutputStream();
            java.io.ObjectOutputStream session_out = new java.io.ObjectOutputStream(session_data);
            session_out.flush();
            boolean hasPrincipal = session.getPrincipal() != null;
            session_out.writeBoolean(hasPrincipal);
            if ( hasPrincipal )
            {
                session_out.writeObject(SerializablePrincipal.createPrincipal((GenericPrincipal)session.getPrincipal()));
            }//end if
            ((ReplicatedSession)session).writeObjectData(session_out);
            return session_data.toByteArray();

        }
        catch ( Exception x )
        {
            log.error("Failed to serialize the session!",x);
        }
        return null;