Methods Summary |
---|
public org.apache.catalina.ha.ClusterManager | cloneFromTemplate()
throw new UnsupportedOperationException();
|
protected org.apache.catalina.Session | createSession(java.lang.String sessionId, boolean notify, boolean setId)Creates a HTTP session.
Most of the code in here is copied from the StandardManager.
This is not pretty, yeah I know, but it was necessary since the
StandardManager had hard coded the session instantiation to the a
StandardSession, when we actually want to instantiate a ReplicatedSession
If the call comes from the Tomcat servlet engine, a SessionMessage goes out to the other
nodes in the cluster that this session has been created.
//inherited from the basic manager
if ((getMaxActiveSessions() >= 0) &&
(sessions.size() >= getMaxActiveSessions()))
throw new IllegalStateException(sm.getString("standardManager.createSession.ise"));
Session session = new ReplicatedSession(this);
// Initialize the properties of the new session and return it
session.setNew(true);
session.setValid(true);
session.setCreationTime(System.currentTimeMillis());
session.setMaxInactiveInterval(this.maxInactiveInterval);
if(sessionId == null)
sessionId = generateSessionId();
if ( setId ) session.setId(sessionId);
if ( notify && (cluster!=null) ) {
((ReplicatedSession)session).setIsDirty(true);
}
return (session);
|
public org.apache.catalina.Session | createSession(java.lang.String sessionId)Construct and return a new session object, based on the default
settings specified by this Manager's properties. The session
id will be assigned by this method, and available via the getId()
method of the returned session. If a new session cannot be created
for any reason, return null .
//create a session and notify the other nodes in the cluster
Session session = createSession(sessionId,getDistributable(),true);
add(session);
return session;
|
public boolean | doDomainReplication()
return sendClusterDomainOnly;
|
public org.apache.catalina.ha.CatalinaCluster | getCluster()
return cluster;
|
public boolean | getDistributable()
return distributable;
|
public boolean | getExpireSessionsOnShutdown()
return mExpireSessionsOnShutdown;
|
public java.lang.String[] | getInvalidatedSessions()
synchronized ( invalidatedSessions ) {
String[] result = new String[invalidatedSessions.size()];
invalidatedSessions.values().toArray(result);
return result;
}
|
public java.lang.String | getName()
return this.name;
|
public org.apache.catalina.tribes.io.ReplicationStream | getReplicationStream(byte[] data)Open Stream and use correct ClassLoader (Container) Switch
ThreadClassLoader
return getReplicationStream(data,0,data.length);
|
public org.apache.catalina.tribes.io.ReplicationStream | getReplicationStream(byte[] data, int offset, int length)
ByteArrayInputStream fis =null;
ReplicationStream ois = null;
Loader loader = null;
ClassLoader classLoader = null;
//fix to be able to run the DeltaManager
//stand alone without a container.
//use the Threads context class loader
if (container != null)
loader = container.getLoader();
if (loader != null)
classLoader = loader.getClassLoader();
else
classLoader = Thread.currentThread().getContextClassLoader();
//end fix
fis = new ByteArrayInputStream(data, offset, length);
if ( classLoader == Thread.currentThread().getContextClassLoader() ) {
ois = new ReplicationStream(fis, new ClassLoader[] {classLoader});
} else {
ois = new ReplicationStream(fis, new ClassLoader[] {classLoader,Thread.currentThread().getContextClassLoader()});
}
return ois;
|
public boolean | isDefaultMode()
return defaultMode;
|
public boolean | isManagerRunning()
return mManagerRunning;
|
public boolean | isNotifyListenersOnReplication()
return notifyListenersOnReplication;
|
public boolean | isStateTransferred()
return stateTransferred;
|
public void | messageDataReceived(org.apache.catalina.ha.ClusterMessage cmsg)
try {
if ( cmsg instanceof SessionMessage ) {
SessionMessage msg = (SessionMessage)cmsg;
messageReceived(msg,
msg.getAddress() != null ? (Member) msg.getAddress() : null);
}
} catch(Throwable ex){
log.error("InMemoryReplicationManager.messageDataReceived()", ex);
}//catch
|
protected void | messageReceived(SessionMessage msg, org.apache.catalina.tribes.Member sender)This method is called by the received thread when a SessionMessage has
been received from one of the other nodes in the cluster.
try {
if(log.isInfoEnabled()) {
log.debug("Received SessionMessage of type="+msg.getEventTypeString());
log.debug("Received SessionMessage sender="+sender);
}
switch ( msg.getEventType() ) {
case SessionMessage.EVT_GET_ALL_SESSIONS: {
//get a list of all the session from this manager
Object[] sessions = findSessions();
java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream();
java.io.ObjectOutputStream oout = new java.io.ObjectOutputStream(bout);
oout.writeInt(sessions.length);
for (int i=0; i<sessions.length; i++){
ReplicatedSession ses = (ReplicatedSession)sessions[i];
oout.writeUTF(ses.getIdInternal());
byte[] data = writeSession(ses);
oout.writeObject(data);
}//for
//don't send a message if we don't have to
oout.flush();
oout.close();
byte[] data = bout.toByteArray();
SessionMessage newmsg = new SessionMessageImpl(name,
SessionMessage.EVT_ALL_SESSION_DATA,
data, "SESSION-STATE","SESSION-STATE-"+getName());
cluster.send(newmsg, sender);
break;
}
case SessionMessage.EVT_ALL_SESSION_DATA: {
java.io.ByteArrayInputStream bin =
new java.io.ByteArrayInputStream(msg.getSession());
java.io.ObjectInputStream oin = new java.io.ObjectInputStream(bin);
int size = oin.readInt();
for ( int i=0; i<size; i++) {
String id = oin.readUTF();
byte[] data = (byte[])oin.readObject();
Session session = readSession(data,id);
}//for
stateTransferred=true;
break;
}
case SessionMessage.EVT_SESSION_CREATED: {
Session session = this.readSession(msg.getSession(),msg.getSessionID());
if ( log.isDebugEnabled() ) {
log.debug("Received replicated session=" + session +
" isValid=" + session.isValid());
}
break;
}
case SessionMessage.EVT_SESSION_EXPIRED: {
Session session = findSession(msg.getSessionID());
if ( session != null ) {
session.expire();
this.remove(session);
}//end if
break;
}
case SessionMessage.EVT_SESSION_ACCESSED :{
Session session = findSession(msg.getSessionID());
if ( session != null ) {
session.access();
session.endAccess();
}
break;
}
default: {
//we didn't recognize the message type, do nothing
break;
}
}//switch
}
catch ( Exception x )
{
log.error("Unable to receive message through TCP channel",x);
}
|
protected org.apache.catalina.Session | readSession(byte[] data, java.lang.String sessionId)Reinstantiates a serialized session from the data passed in.
This will first call createSession() so that we get a fresh instance with all
the managers set and all the transient fields validated.
Then it calls Session.readObjectData(byte[]) to deserialize the object
try
{
ReplicationStream session_in = getReplicationStream(data);
Session session = sessionId!=null?this.findSession(sessionId):null;
boolean isNew = (session==null);
//clear the old values from the existing session
if ( session!=null ) {
ReplicatedSession rs = (ReplicatedSession)session;
rs.expire(false); //cleans up the previous values, since we are not doing removes
session = null;
}//end if
if (session==null) {
session = createSession(null,false, false);
sessions.remove(session.getIdInternal());
}
boolean hasPrincipal = session_in.readBoolean();
SerializablePrincipal p = null;
if ( hasPrincipal )
p = (SerializablePrincipal)session_in.readObject();
((ReplicatedSession)session).readObjectData(session_in);
if ( hasPrincipal )
session.setPrincipal(p.getPrincipal(getContainer().getRealm()));
((ReplicatedSession)session).setId(sessionId,isNew);
ReplicatedSession rsession = (ReplicatedSession)session;
rsession.setAccessCount(1);
session.setManager(this);
session.setValid(true);
rsession.setLastAccessedTime(System.currentTimeMillis());
rsession.setThisAccessedTime(System.currentTimeMillis());
((ReplicatedSession)session).setAccessCount(0);
session.setNew(false);
if(log.isTraceEnabled())
log.trace("Session loaded id="+sessionId +
" actualId="+session.getId()+
" exists="+this.sessions.containsKey(sessionId)+
" valid="+rsession.isValid());
return session;
}
catch ( Exception x )
{
log.error("Failed to deserialize the session!",x);
}
return null;
|
public org.apache.catalina.ha.ClusterMessage | requestCompleted(java.lang.String sessionId)
if ( !getDistributable() ) {
log.warn("Received requestCompleted message, although this context["+
getName()+"] is not distributable. Ignoring message");
return null;
}
try
{
if ( invalidatedSessions.get(sessionId) != null ) {
synchronized ( invalidatedSessions ) {
invalidatedSessions.remove(sessionId);
SessionMessage msg = new SessionMessageImpl(name,
SessionMessage.EVT_SESSION_EXPIRED,
null,
sessionId,
sessionId);
return msg;
}
} else {
ReplicatedSession session = (ReplicatedSession) findSession(
sessionId);
if (session != null) {
//return immediately if the session is not dirty
if (useDirtyFlag && (!session.isDirty())) {
//but before we return doing nothing,
//see if we should send
//an updated last access message so that
//sessions across cluster dont expire
long interval = session.getMaxInactiveInterval();
long lastaccdist = System.currentTimeMillis() -
session.getLastAccessWasDistributed();
if ( ((interval*1000) / lastaccdist)< 3 ) {
SessionMessage accmsg = new SessionMessageImpl(name,
SessionMessage.EVT_SESSION_ACCESSED,
null,
sessionId,
sessionId);
session.setLastAccessWasDistributed(System.currentTimeMillis());
return accmsg;
}
return null;
}
session.setIsDirty(false);
if (log.isDebugEnabled()) {
try {
log.debug("Sending session to cluster=" + session);
}
catch (Exception ignore) {}
}
SessionMessage msg = new SessionMessageImpl(name,
SessionMessage.EVT_SESSION_CREATED,
writeSession(session),
session.getIdInternal(),
session.getIdInternal());
return msg;
} //end if
}//end if
}
catch (Exception x )
{
log.error("Unable to replicate session",x);
}
return null;
|
public void | sessionInvalidated(java.lang.String sessionId)
synchronized ( invalidatedSessions ) {
invalidatedSessions.put(sessionId, sessionId);
}
|
public void | setCluster(org.apache.catalina.ha.CatalinaCluster cluster)
if(log.isDebugEnabled())
log.debug("Cluster associated with SimpleTcpReplicationManager");
this.cluster = cluster;
|
public void | setDefaultMode(boolean defaultMode)
this.defaultMode = defaultMode;
|
public void | setDistributable(boolean dist)
this.distributable = dist;
|
public void | setDomainReplication(boolean sendClusterDomainOnly)
this.sendClusterDomainOnly = sendClusterDomainOnly;
|
public void | setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown)
mExpireSessionsOnShutdown = expireSessionsOnShutdown;
|
public void | setName(java.lang.String name)
this.name = name;
|
public void | setNotifyListenersOnReplication(boolean notifyListenersOnReplication)
this.notifyListenersOnReplication = notifyListenersOnReplication;
|
public void | setPrintToScreen(boolean printtoscreen)
if(log.isDebugEnabled())
log.debug("Setting screen debug to:"+printtoscreen);
mPrintToScreen = printtoscreen;
|
public void | setSynchronousReplication(boolean flag)
synchronousReplication=flag;
|
public void | setUseDirtyFlag(boolean usedirtyflag)
this.useDirtyFlag = usedirtyflag;
|
public void | start()Prepare for the beginning of active use of the public methods of this
component. This method should be called after configure() ,
and before any of the public methods of the component are utilized.
Starts the cluster communication channel, this will connect with the other nodes
in the cluster, and request the current session state to be transferred to this node.
mManagerRunning = true;
super.start();
try {
//the channel is already running
if ( mChannelStarted ) return;
if(log.isInfoEnabled())
log.info("Starting clustering manager...:"+getName());
if ( cluster == null ) {
log.error("Starting... no cluster associated with this context:"+getName());
return;
}
cluster.registerManager(this);
if (cluster.getMembers().length > 0) {
Member mbr = cluster.getMembers()[0];
SessionMessage msg =
new SessionMessageImpl(this.getName(),
SessionMessage.EVT_GET_ALL_SESSIONS,
null,
"GET-ALL",
"GET-ALL-"+this.getName());
cluster.send(msg, mbr);
if(log.isWarnEnabled())
log.warn("Manager["+getName()+"], requesting session state from "+mbr+
". This operation will timeout if no session state has been received within "+
"60 seconds");
long reqStart = System.currentTimeMillis();
long reqNow = 0;
boolean isTimeout=false;
do {
try {
Thread.sleep(100);
}catch ( Exception sleep) {}
reqNow = System.currentTimeMillis();
isTimeout=((reqNow-reqStart)>(1000*60));
} while ( (!isStateTransferred()) && (!isTimeout));
if ( isTimeout || (!isStateTransferred()) ) {
log.error("Manager["+getName()+"], No session state received, timing out.");
}else {
if(log.isInfoEnabled())
log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms.");
}
} else {
if(log.isInfoEnabled())
log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group.");
}//end if
mChannelStarted = true;
} catch ( Exception x ) {
log.error("Unable to start SimpleTcpReplicationManager",x);
}
|
public void | stop()Gracefully terminate the active use of the public methods of this
component. This method should be the last one called on a given
instance of this component.
This will disconnect the cluster communication channel and stop the listener thread.
mManagerRunning = false;
mChannelStarted = false;
super.stop();
try
{
this.sessions.clear();
cluster.removeManager(this);
}
catch ( Exception x )
{
log.error("Unable to stop SimpleTcpReplicationManager",x);
}
|
public void | unload()Override persistence since they don't go hand in hand with replication for now.
if ( !getDistributable() ) {
super.unload();
}
|
protected byte[] | writeSession(org.apache.catalina.Session session)Serialize a session into a byte array
This method simple calls the writeObjectData method on the session
and returns the byte data from that call
try
{
java.io.ByteArrayOutputStream session_data = new java.io.ByteArrayOutputStream();
java.io.ObjectOutputStream session_out = new java.io.ObjectOutputStream(session_data);
session_out.flush();
boolean hasPrincipal = session.getPrincipal() != null;
session_out.writeBoolean(hasPrincipal);
if ( hasPrincipal )
{
session_out.writeObject(SerializablePrincipal.createPrincipal((GenericPrincipal)session.getPrincipal()));
}//end if
((ReplicatedSession)session).writeObjectData(session_out);
return session_data.toByteArray();
}
catch ( Exception x )
{
log.error("Failed to serialize the session!",x);
}
return null;
|