Methods Summary |
---|
public void | addLifecycleListener(org.apache.catalina.LifecycleListener listener)Add a lifecycle event listener to this component.
lifecycle.addLifecycleListener(listener);
|
protected boolean | checkSenderDomain(SessionMessage msg, org.apache.catalina.tribes.Member sender)Test that sender and local domain is the same
boolean sameDomain= true;
if (!sameDomain && log.isWarnEnabled()) {
log.warn(sm.getString("deltaManager.receiveMessage.fromWrongDomain",
new Object[] {getName(),
msg.getEventTypeString(),
sender,
"",
"" }));
}
return sameDomain ;
|
public org.apache.catalina.ha.ClusterManager | cloneFromTemplate()
DeltaManager result = new DeltaManager();
result.name = "Clone-from-"+name;
result.cluster = cluster;
result.replicationValve = replicationValve;
result.maxActiveSessions = maxActiveSessions;
result.expireSessionsOnShutdown = expireSessionsOnShutdown;
result.notifyListenersOnReplication = notifyListenersOnReplication;
result.notifySessionListenersOnReplication = notifySessionListenersOnReplication;
result.stateTransferTimeout = stateTransferTimeout;
result.sendAllSessions = sendAllSessions;
result.sendClusterDomainOnly = sendClusterDomainOnly ;
result.sendAllSessionsSize = sendAllSessionsSize;
result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ;
result.receiverQueue = receiverQueue ;
result.stateTimestampDrop = stateTimestampDrop ;
result.stateTransferCreateSendTime = stateTransferCreateSendTime;
return result;
|
public org.apache.catalina.Session | createEmptySession()Create DeltaSession
return getNewDeltaSession() ;
|
public org.apache.catalina.Session | createSession(java.lang.String sessionId)Construct and return a new session object, based on the default settings
specified by this Manager's properties. The session id will be assigned
by this method, and available via the getId() method of the returned
session. If a new session cannot be created for any reason, return
null .
return createSession(sessionId, true);
|
public org.apache.catalina.Session | createSession(java.lang.String sessionId, boolean distribute)create new session with check maxActiveSessions and send session creation
to other cluster nodes.
if ((maxActiveSessions >= 0) && (sessions.size() >= maxActiveSessions)) {
rejectedSessions++;
throw new IllegalStateException(sm.getString("deltaManager.createSession.ise"));
}
DeltaSession session = (DeltaSession) super.createSession(sessionId) ;
if (distribute) {
sendCreateSession(session.getId(), session);
}
if (log.isDebugEnabled())
log.debug(sm.getString("deltaManager.createSession.newSession",session.getId(), new Integer(sessions.size())));
return (session);
|
protected DeltaRequest | deserializeDeltaRequest(DeltaSession session, byte[] data)Load Deltarequest from external node
Load the Class at container classloader
ReplicationStream ois = getReplicationStream(data);
session.getDeltaRequest().readExternal(ois);
ois.close();
return session.getDeltaRequest();
|
protected void | deserializeSessions(byte[] data)Load sessions from other cluster node.
FIXME replace currently sessions with same id without notifcation.
FIXME SSO handling is not really correct with the session replacement!
// Initialize our internal data structures
//sessions.clear(); //should not do this
// Open an input stream to the specified pathname, if any
ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
ObjectInputStream ois = null;
// Load the previously unloaded active sessions
try {
ois = getReplicationStream(data);
Integer count = (Integer) ois.readObject();
int n = count.intValue();
for (int i = 0; i < n; i++) {
DeltaSession session = (DeltaSession) createEmptySession();
session.readObjectData(ois);
session.setManager(this);
session.setValid(true);
session.setPrimarySession(false);
//in case the nodes in the cluster are out of
//time synch, this will make sure that we have the
//correct timestamp, isValid returns true, cause
// accessCount=1
session.access();
//make sure that the session gets ready to expire if
// needed
session.setAccessCount(0);
session.resetDeltaRequest();
// FIXME How inform other session id cache like SingleSignOn
// increment sessionCounter to correct stats report
if (findSession(session.getIdInternal()) == null ) {
sessionCounter++;
} else {
sessionReplaceCounter++;
// FIXME better is to grap this sessions again !
if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.loading.existing.session",session.getIdInternal()));
}
add(session);
}
} catch (ClassNotFoundException e) {
log.error(sm.getString("deltaManager.loading.cnfe", e), e);
throw e;
} catch (IOException e) {
log.error(sm.getString("deltaManager.loading.ioe", e), e);
throw e;
} finally {
// Close the input stream
try {
if (ois != null) ois.close();
} catch (IOException f) {
// ignored
}
ois = null;
if (originalLoader != null) Thread.currentThread().setContextClassLoader(originalLoader);
}
|
public boolean | doDomainReplication()
return sendClusterDomainOnly;
|
public void | expireAllLocalSessions()Exipre all find sessions.
long timeNow = System.currentTimeMillis();
Session sessions[] = findSessions();
int expireDirect = 0 ;
int expireIndirect = 0 ;
if(log.isDebugEnabled()) log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length);
for (int i = 0; i < sessions.length; i++) {
if (sessions[i] instanceof DeltaSession) {
DeltaSession session = (DeltaSession) sessions[i];
if (session.isPrimarySession()) {
if (session.isValid()) {
session.expire();
expireDirect++;
} else {
expireIndirect++;
}//end if
}//end if
}//end if
}//for
long timeEnd = System.currentTimeMillis();
if(log.isDebugEnabled()) log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect);
|
public org.apache.catalina.LifecycleListener[] | findLifecycleListeners()Get the lifecycle listeners associated with this lifecycle. If this
Lifecycle has no listeners registered, a zero-length array is returned.
return lifecycle.findLifecycleListeners();
|
protected org.apache.catalina.tribes.Member | findSessionMasterMember()Find the master of the session state
Member mbr = null;
Member mbrs[] = cluster.getMembers();
if(mbrs.length != 0 ) mbr = mbrs[0];
if(mbr == null && log.isWarnEnabled()) log.warn(sm.getString("deltaManager.noMasterMember",getName(), ""));
if(mbr != null && log.isDebugEnabled()) log.warn(sm.getString("deltaManager.foundMasterMember",getName(), mbr));
return mbr;
|
public synchronized void | getAllClusterSessions()get from first session master the backup from all clustered sessions
if (cluster != null && cluster.getMembers().length > 0) {
long beforeSendTime = System.currentTimeMillis();
Member mbr = findSessionMasterMember();
if(mbr == null) { // No domain member found
return;
}
SessionMessage msg = new SessionMessageImpl(this.getName(),SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL","GET-ALL-" + getName());
// set reference time
stateTransferCreateSendTime = beforeSendTime ;
// request session state
counterSend_EVT_GET_ALL_SESSIONS++;
stateTransfered = false ;
// FIXME This send call block the deploy thread, when sender waitForAck is enabled
try {
synchronized(receivedMessageQueue) {
receiverQueue = true ;
}
cluster.send(msg, mbr);
if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState",getName(), mbr));
// FIXME At sender ack mode this method check only the state transfer and resend is a problem!
waitForSendAllSessions(beforeSendTime);
} finally {
synchronized(receivedMessageQueue) {
for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) {
SessionMessage smsg = (SessionMessage) iter.next();
if (!stateTimestampDrop) {
messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
} else {
if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS && smsg.getTimestamp() >= stateTransferCreateSendTime) {
// FIXME handle EVT_GET_ALL_SESSIONS later
messageReceived(smsg,smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
} else {
if (log.isWarnEnabled()) {
log.warn(sm.getString("deltaManager.dropMessage",getName(), smsg.getEventTypeString(),new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp())));
}
}
}
}
receivedMessageQueue.clear();
receiverQueue = false ;
}
}
} else {
if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.noMembers", getName()));
}
|
public org.apache.catalina.ha.CatalinaCluster | getCluster()
return cluster;
|
public int | getCounterNoStateTransfered()
return counterNoStateTransfered;
|
public long | getCounterReceive_EVT_ALL_SESSION_DATA()
return counterReceive_EVT_ALL_SESSION_DATA;
|
public int | getCounterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE()
return counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE;
|
public long | getCounterReceive_EVT_GET_ALL_SESSIONS()
return counterReceive_EVT_GET_ALL_SESSIONS;
|
public long | getCounterReceive_EVT_SESSION_ACCESSED()
return counterReceive_EVT_SESSION_ACCESSED;
|
public long | getCounterReceive_EVT_SESSION_CREATED()
return counterReceive_EVT_SESSION_CREATED;
|
public long | getCounterReceive_EVT_SESSION_DELTA()
return counterReceive_EVT_SESSION_DELTA;
|
public long | getCounterReceive_EVT_SESSION_EXPIRED()
return counterReceive_EVT_SESSION_EXPIRED;
|
public long | getCounterSend_EVT_ALL_SESSION_DATA()
return counterSend_EVT_ALL_SESSION_DATA;
|
public int | getCounterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE()
return counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE;
|
public long | getCounterSend_EVT_GET_ALL_SESSIONS()
return counterSend_EVT_GET_ALL_SESSIONS;
|
public long | getCounterSend_EVT_SESSION_ACCESSED()
return counterSend_EVT_SESSION_ACCESSED;
|
public long | getCounterSend_EVT_SESSION_CREATED()
return counterSend_EVT_SESSION_CREATED;
|
public long | getCounterSend_EVT_SESSION_DELTA()
return counterSend_EVT_SESSION_DELTA;
|
public long | getCounterSend_EVT_SESSION_EXPIRED()
return counterSend_EVT_SESSION_EXPIRED;
|
public java.lang.String | getInfo()Return descriptive information about this Manager implementation and the
corresponding version number, in the format
<description>/<version> .
return info;
|
public java.lang.String[] | getInvalidatedSessions()When the manager expires session not tied to a request. The cluster will
periodically ask for a list of sessions that should expire and that
should be sent across the wire.
return new String[0];
|
public int | getMaxActiveSessions()Return the maximum number of active Sessions allowed, or -1 for no limit.
return (this.maxActiveSessions);
|
public java.lang.String | getName()Return the descriptive short name of this Manager implementation.
return name;
|
protected DeltaSession | getNewDeltaSession()Get new session class to be used in the doLoad() method.
return new DeltaSession(this);
|
public long | getProcessingTime()
return processingTime;
|
public int | getReceivedQueueSize()
return receivedMessageQueue.size() ;
|
public int | getRejectedSessions()Number of session creations that failed due to maxActiveSessions
return rejectedSessions;
|
public int | getSendAllSessionsSize()
return sendAllSessionsSize;
|
public int | getSendAllSessionsWaitTime()
return sendAllSessionsWaitTime;
|
public long | getSessionReplaceCounter()
return sessionReplaceCounter;
|
public int | getStateTransferTimeout()
return stateTransferTimeout;
|
public boolean | getStateTransfered()is session state transfered complete?
return stateTransfered;
|
protected void | handleALL_SESSION_DATA(SessionMessage msg, org.apache.catalina.tribes.Member sender)handle receive sessions from other not ( restart )
counterReceive_EVT_ALL_SESSION_DATA++;
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin",getName()));
byte[] data = msg.getSession();
deserializeSessions(data);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter",getName()));
//stateTransferred = true;
|
protected void | handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, org.apache.catalina.tribes.Member sender)handle receive session state is complete transfered
counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++ ;
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.transfercomplete",getName(), sender.getHost(), new Integer(sender.getPort())));
stateTransferCreateSendTime = msg.getTimestamp() ;
stateTransfered = true ;
|
protected void | handleGET_ALL_SESSIONS(SessionMessage msg, org.apache.catalina.tribes.Member sender)handle receive that other node want all sessions ( restart )
a) send all sessions with one message
b) send session at blocks
After sending send state is complete transfered
counterReceive_EVT_GET_ALL_SESSIONS++;
//get a list of all the session from this manager
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));
// Write the number of active sessions, followed by the details
// get all sessions and serialize without sync
Session[] currentSessions = findSessions();
long findSessionTimestamp = System.currentTimeMillis() ;
if (isSendAllSessions()) {
sendSessions(sender, currentSessions, findSessionTimestamp);
} else {
// send session at blocks
int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length : getSendAllSessionsSize();
Session[] sendSessions = new Session[len];
for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize();
System.arraycopy(currentSessions, i, sendSessions, 0, len);
sendSessions(sender, sendSessions,findSessionTimestamp);
if (getSendAllSessionsWaitTime() > 0) {
try {
Thread.sleep(getSendAllSessionsWaitTime());
} catch (Exception sleep) {
}
}//end if
}//for
}//end if
SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName());
newmsg.setTimestamp(findSessionTimestamp);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName()));
counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
cluster.send(newmsg, sender);
|
protected void | handleSESSION_ACCESSED(SessionMessage msg, org.apache.catalina.tribes.Member sender)handle receive session is access at other node ( primary session is now false)
counterReceive_EVT_SESSION_ACCESSED++;
DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
if (session != null) {
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.accessed",getName(), msg.getSessionID()));
session.access();
session.setPrimarySession(false);
session.endAccess();
}
|
protected void | handleSESSION_CREATED(SessionMessage msg, org.apache.catalina.tribes.Member sender)handle receive new session is created at other node (create backup - primary false)
counterReceive_EVT_SESSION_CREATED++;
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.createNewSession",getName(), msg.getSessionID()));
DeltaSession session = (DeltaSession) createEmptySession();
session.setManager(this);
session.setValid(true);
session.setPrimarySession(false);
session.setCreationTime(msg.getTimestamp());
session.access();
if(notifySessionListenersOnReplication)
session.setId(msg.getSessionID());
else
session.setIdInternal(msg.getSessionID());
session.resetDeltaRequest();
session.endAccess();
|
protected void | handleSESSION_DELTA(SessionMessage msg, org.apache.catalina.tribes.Member sender)handle receive session delta
counterReceive_EVT_SESSION_DELTA++;
byte[] delta = msg.getSession();
DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
if (session != null) {
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.delta",getName(), msg.getSessionID()));
DeltaRequest dreq = deserializeDeltaRequest(session, delta);
dreq.execute(session, notifyListenersOnReplication);
session.setPrimarySession(false);
}
|
protected void | handleSESSION_EXPIRED(SessionMessage msg, org.apache.catalina.tribes.Member sender)handle receive session is expire at other node ( expire session also here)
counterReceive_EVT_SESSION_EXPIRED++;
DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
if (session != null) {
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.expired",getName(), msg.getSessionID()));
session.expire(notifySessionListenersOnReplication, false);
}
|
public boolean | isDefaultMode()
return defaultMode;
|
public boolean | isExpireSessionsOnShutdown()
return expireSessionsOnShutdown;
|
public boolean | isNotifyListenersOnReplication()
return notifyListenersOnReplication;
|
public boolean | isNotifySessionListenersOnReplication()
return notifySessionListenersOnReplication;
|
public boolean | isSendAllSessions()
return sendAllSessions;
|
public boolean | isStateTimestampDrop()
return stateTimestampDrop;
|
public void | load()
|
public void | messageDataReceived(org.apache.catalina.ha.ClusterMessage cmsg)A message was received from another node, this is the callback method to
implement if you are interested in receiving replication messages.
if (cmsg != null && cmsg instanceof SessionMessage) {
SessionMessage msg = (SessionMessage) cmsg;
switch (msg.getEventType()) {
case SessionMessage.EVT_GET_ALL_SESSIONS:
case SessionMessage.EVT_SESSION_CREATED:
case SessionMessage.EVT_SESSION_EXPIRED:
case SessionMessage.EVT_SESSION_ACCESSED:
case SessionMessage.EVT_SESSION_DELTA: {
synchronized(receivedMessageQueue) {
if(receiverQueue) {
receivedMessageQueue.add(msg);
return ;
}
}
break;
}
default: {
//we didn't queue, do nothing
break;
}
} //switch
messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null);
}
|
protected void | messageReceived(SessionMessage msg, org.apache.catalina.tribes.Member sender)This method is called by the received thread when a SessionMessage has
been received from one of the other nodes in the cluster.
if(doDomainReplication() && !checkSenderDomain(msg,sender)) {
return;
}
ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
try {
ClassLoader[] loaders = getClassLoaders();
if ( loaders != null && loaders.length > 0) Thread.currentThread().setContextClassLoader(loaders[0]);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.eventType",getName(), msg.getEventTypeString(), sender));
switch (msg.getEventType()) {
case SessionMessage.EVT_GET_ALL_SESSIONS: {
handleGET_ALL_SESSIONS(msg,sender);
break;
}
case SessionMessage.EVT_ALL_SESSION_DATA: {
handleALL_SESSION_DATA(msg,sender);
break;
}
case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: {
handleALL_SESSION_TRANSFERCOMPLETE(msg,sender);
break;
}
case SessionMessage.EVT_SESSION_CREATED: {
handleSESSION_CREATED(msg,sender);
break;
}
case SessionMessage.EVT_SESSION_EXPIRED: {
handleSESSION_EXPIRED(msg,sender);
break;
}
case SessionMessage.EVT_SESSION_ACCESSED: {
handleSESSION_ACCESSED(msg,sender);
break;
}
case SessionMessage.EVT_SESSION_DELTA: {
handleSESSION_DELTA(msg,sender);
break;
}
default: {
//we didn't recognize the message type, do nothing
break;
}
} //switch
} catch (Exception x) {
log.error(sm.getString("deltaManager.receiveMessage.error",getName()), x);
} finally {
Thread.currentThread().setContextClassLoader(contextLoader);
}
|
public void | propertyChange(java.beans.PropertyChangeEvent event)Process property change events from our associated Context.
// Validate the source of this event
if (!(event.getSource() instanceof Context))
return;
// Process a relevant property change
if (event.getPropertyName().equals("sessionTimeout")) {
try {
setMaxInactiveInterval(((Integer) event.getNewValue()).intValue() * 60);
} catch (NumberFormatException e) {
log.error(sm.getString("deltaManager.sessionTimeout", event.getNewValue()));
}
}
|
protected void | registerSessionAtReplicationValve(DeltaSession session)Register cross context session at replication valve thread local
if(replicationValve == null) {
if(container instanceof StandardContext && ((StandardContext)container).getCrossContext()) {
Cluster cluster = getCluster() ;
if(cluster != null && cluster instanceof CatalinaCluster) {
Valve[] valves = ((CatalinaCluster)cluster).getValves();
if(valves != null && valves.length > 0) {
for(int i=0; replicationValve == null && i < valves.length ; i++ ){
if(valves[i] instanceof ReplicationValve) replicationValve = (ReplicationValve)valves[i] ;
}//for
if(replicationValve == null && log.isDebugEnabled()) {
log.debug("no ReplicationValve found for CrossContext Support");
}//endif
}//end if
}//endif
}//end if
}//end if
if(replicationValve != null) {
replicationValve.registerReplicationSession(session);
}
|
public void | removeLifecycleListener(org.apache.catalina.LifecycleListener listener)Remove a lifecycle event listener from this component.
lifecycle.removeLifecycleListener(listener);
|
public org.apache.catalina.ha.ClusterMessage | requestCompleted(java.lang.String sessionId)When the request has been completed, the replication valve will notify
the manager, and the manager will decide whether any replication is
needed or not. If there is a need for replication, the manager will
create a session message and that will be replicated. The cluster
determines where it gets sent.
try {
DeltaSession session = (DeltaSession) findSession(sessionId);
DeltaRequest deltaRequest = session.getDeltaRequest();
SessionMessage msg = null;
boolean isDeltaRequest = false ;
synchronized(deltaRequest) {
isDeltaRequest = deltaRequest.getSize() > 0 ;
if (isDeltaRequest) {
counterSend_EVT_SESSION_DELTA++;
byte[] data = serializeDeltaRequest(deltaRequest);
msg = new SessionMessageImpl(getName(),
SessionMessage.EVT_SESSION_DELTA,
data,
sessionId,
sessionId + "-" + System.currentTimeMillis());
session.resetDeltaRequest();
}
}
if(!isDeltaRequest) {
if(!session.isPrimarySession()) {
counterSend_EVT_SESSION_ACCESSED++;
msg = new SessionMessageImpl(getName(),
SessionMessage.EVT_SESSION_ACCESSED,
null,
sessionId,
sessionId + "-" + System.currentTimeMillis());
if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.createMessage.accessChangePrimary",getName(), sessionId));
}
}
} else { // log only outside synch block!
if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.createMessage.delta",getName(), sessionId));
}
}
session.setPrimarySession(true);
//check to see if we need to send out an access message
if ((msg == null)) {
long replDelta = System.currentTimeMillis() - session.getLastTimeReplicated();
if (replDelta > (getMaxInactiveInterval() * 1000)) {
counterSend_EVT_SESSION_ACCESSED++;
msg = new SessionMessageImpl(getName(),
SessionMessage.EVT_SESSION_ACCESSED,
null,
sessionId,
sessionId + "-" + System.currentTimeMillis());
if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.createMessage.access", getName(),sessionId));
}
}
}
//update last replicated time
if (msg != null) session.setLastTimeReplicated(System.currentTimeMillis());
return msg;
} catch (IOException x) {
log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest",sessionId), x);
return null;
}
|
public synchronized void | resetStatistics()Reset manager statistics
processingTime = 0 ;
expiredSessions = 0 ;
rejectedSessions = 0 ;
sessionReplaceCounter = 0 ;
counterNoStateTransfered = 0 ;
maxActive = getActiveSessions() ;
sessionCounter = getActiveSessions() ;
counterReceive_EVT_ALL_SESSION_DATA = 0;
counterReceive_EVT_GET_ALL_SESSIONS = 0;
counterReceive_EVT_SESSION_ACCESSED = 0 ;
counterReceive_EVT_SESSION_CREATED = 0 ;
counterReceive_EVT_SESSION_DELTA = 0 ;
counterReceive_EVT_SESSION_EXPIRED = 0 ;
counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
counterSend_EVT_ALL_SESSION_DATA = 0;
counterSend_EVT_GET_ALL_SESSIONS = 0;
counterSend_EVT_SESSION_ACCESSED = 0 ;
counterSend_EVT_SESSION_CREATED = 0 ;
counterSend_EVT_SESSION_DELTA = 0 ;
counterSend_EVT_SESSION_EXPIRED = 0 ;
counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
|
protected void | send(SessionMessage msg)Send messages to other backup member (domain or all)
if(cluster != null) {
if(doDomainReplication())
cluster.sendClusterDomain(msg);
else
cluster.send(msg);
}
|
protected void | sendCreateSession(java.lang.String sessionId, DeltaSession session)Send create session evt to all backup node
if(cluster.getMembers().length > 0 ) {
SessionMessage msg =
new SessionMessageImpl(getName(),
SessionMessage.EVT_SESSION_CREATED,
null,
sessionId,
sessionId + "-" + System.currentTimeMillis());
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.sendMessage.newSession",name, sessionId));
msg.setTimestamp(session.getCreationTime());
counterSend_EVT_SESSION_CREATED++;
send(msg);
}
|
protected void | sendSessions(org.apache.catalina.tribes.Member sender, org.apache.catalina.Session[] currentSessions, long sendTimestamp)send a block of session to sender
byte[] data = serializeSessions(currentSessions);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingAfter",getName()));
SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_DATA, data,"SESSION-STATE", "SESSION-STATE-" + getName());
newmsg.setTimestamp(sendTimestamp);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionData",getName()));
counterSend_EVT_ALL_SESSION_DATA++;
cluster.send(newmsg, sender);
|
protected byte[] | serializeDeltaRequest(DeltaRequest deltaRequest)serialize DeltaRequest
return deltaRequest.serialize();
|
protected byte[] | serializeSessions(org.apache.catalina.Session[] currentSessions)Save any currently active sessions in the appropriate persistence
mechanism, if any. If persistence is not supported, this method returns
without doing anything.
// Open an output stream to the specified pathname, if any
ByteArrayOutputStream fos = null;
ObjectOutputStream oos = null;
try {
fos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(new BufferedOutputStream(fos));
oos.writeObject(new Integer(currentSessions.length));
for(int i=0 ; i < currentSessions.length;i++) {
((DeltaSession)currentSessions[i]).writeObjectData(oos);
}
// Flush and close the output stream
oos.flush();
} catch (IOException e) {
log.error(sm.getString("deltaManager.unloading.ioe", e), e);
throw e;
} finally {
if (oos != null) {
try {
oos.close();
} catch (IOException f) {
;
}
oos = null;
}
}
// send object data as byte[]
return fos.toByteArray();
|
protected void | sessionExpired(java.lang.String id)send session expired to other cluster nodes
counterSend_EVT_SESSION_EXPIRED++ ;
SessionMessage msg = new SessionMessageImpl(getName(),SessionMessage.EVT_SESSION_EXPIRED, null, id, id+ "-EXPIRED-MSG");
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.expire",getName(), id));
send(msg);
|
public void | setCluster(org.apache.catalina.ha.CatalinaCluster cluster)
this.cluster = cluster;
|
public void | setContainer(org.apache.catalina.Container container)Set the Container with which this Manager has been associated. If it is a
Context (the usual case), listen for changes to the session timeout
property.
// De-register from the old Container (if any)
if ((this.container != null) && (this.container instanceof Context))
((Context) this.container).removePropertyChangeListener(this);
// Default processing provided by our superclass
super.setContainer(container);
// Register with the new Container (if any)
if ((this.container != null) && (this.container instanceof Context)) {
setMaxInactiveInterval(((Context) this.container).getSessionTimeout() * 60);
((Context) this.container).addPropertyChangeListener(this);
}
|
public void | setDefaultMode(boolean defaultMode)
this.defaultMode = defaultMode;
|
public void | setDomainReplication(boolean sendClusterDomainOnly)
this.sendClusterDomainOnly = sendClusterDomainOnly;
|
public void | setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown)
this.expireSessionsOnShutdown = expireSessionsOnShutdown;
|
public void | setMaxActiveSessions(int max)Set the maximum number of actives Sessions allowed, or -1 for no limit.
int oldMaxActiveSessions = this.maxActiveSessions;
this.maxActiveSessions = max;
support.firePropertyChange("maxActiveSessions", new Integer(oldMaxActiveSessions), new Integer(this.maxActiveSessions));
|
public void | setName(java.lang.String name)
this.name = name;
|
public void | setNotifyListenersOnReplication(boolean notifyListenersOnReplication)
this.notifyListenersOnReplication = notifyListenersOnReplication;
|
public void | setNotifySessionListenersOnReplication(boolean notifyListenersCreateSessionOnReplication)
this.notifySessionListenersOnReplication = notifyListenersCreateSessionOnReplication;
|
public void | setRejectedSessions(int rejectedSessions)
this.rejectedSessions = rejectedSessions;
|
public void | setSendAllSessions(boolean sendAllSessions)
this.sendAllSessions = sendAllSessions;
|
public void | setSendAllSessionsSize(int sendAllSessionsSize)
this.sendAllSessionsSize = sendAllSessionsSize;
|
public void | setSendAllSessionsWaitTime(int sendAllSessionsWaitTime)
this.sendAllSessionsWaitTime = sendAllSessionsWaitTime;
|
public void | setStateTimestampDrop(boolean isTimestampDrop)
this.stateTimestampDrop = isTimestampDrop;
|
public void | setStateTransferTimeout(int timeoutAllSession)
this.stateTransferTimeout = timeoutAllSession;
|
public void | setStateTransfered(boolean stateTransfered)set that state ist complete transfered
this.stateTransfered = stateTransfered;
|
public void | start()Prepare for the beginning of active use of the public methods of this
component. This method should be called after configure() ,
and before any of the public methods of the component are utilized.
if (!initialized) init();
// Validate and update our current component state
if (started) {
return;
}
started = true;
lifecycle.fireLifecycleEvent(START_EVENT, null);
// Force initialization of the random number generator
generateSessionId();
// Load unloaded sessions, if any
try {
//the channel is already running
Cluster cluster = getCluster() ;
// stop remove cluster binding
//wow, how many nested levels of if statements can we have ;)
if(cluster == null) {
Container context = getContainer() ;
if(context != null && context instanceof Context) {
Container host = context.getParent() ;
if(host != null && host instanceof Host) {
cluster = host.getCluster();
if(cluster != null && cluster instanceof CatalinaCluster) {
setCluster((CatalinaCluster) cluster) ;
} else {
Container engine = host.getParent() ;
if(engine != null && engine instanceof Engine) {
cluster = engine.getCluster();
if(cluster != null && cluster instanceof CatalinaCluster) {
setCluster((CatalinaCluster) cluster) ;
}
} else {
cluster = null ;
}
}
}
}
}
if (cluster == null) {
log.error(sm.getString("deltaManager.noCluster", getName()));
return;
} else {
if (log.isInfoEnabled()) {
String type = "unknown" ;
if( cluster.getContainer() instanceof Host){
type = "Host" ;
} else if( cluster.getContainer() instanceof Engine){
type = "Engine" ;
}
log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
}
}
if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName()));
//to survice context reloads, as only a stop/start is called, not
// createManager
cluster.registerManager(this);
getAllClusterSessions();
} catch (Throwable t) {
log.error(sm.getString("deltaManager.managerLoad"), t);
}
|
public void | stop()Gracefully terminate the active use of the public methods of this
component. This method should be the last one called on a given instance
of this component.
if (log.isDebugEnabled())
log.debug(sm.getString("deltaManager.stopped", getName()));
// Validate and update our current component state
if (!started)
throw new LifecycleException(sm.getString("deltaManager.notStarted"));
lifecycle.fireLifecycleEvent(STOP_EVENT, null);
started = false;
// Expire all active sessions
if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.expireSessions", getName()));
Session sessions[] = findSessions();
for (int i = 0; i < sessions.length; i++) {
DeltaSession session = (DeltaSession) sessions[i];
if (!session.isValid())
continue;
try {
session.expire(true, isExpireSessionsOnShutdown());
} catch (Throwable ignore) {
;
}
}
// Require a new random number generator if we are restarted
this.random = null;
getCluster().removeManager(this);
replicationValve = null;
if (initialized) {
destroy();
}
|
public void | unload()
|
protected void | waitForSendAllSessions(long beforeSendTime)Wait that cluster session state is transfer or timeout after 60 Sec
With stateTransferTimeout == -1 wait that backup is transfered (forever mode)
long reqStart = System.currentTimeMillis();
long reqNow = reqStart ;
boolean isTimeout = false;
if(getStateTransferTimeout() > 0) {
// wait that state is transfered with timeout check
do {
try {
Thread.sleep(100);
} catch (Exception sleep) {
//
}
reqNow = System.currentTimeMillis();
isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout()));
} while ((!getStateTransfered()) && (!isTimeout));
} else {
if(getStateTransferTimeout() == -1) {
// wait that state is transfered
do {
try {
Thread.sleep(100);
} catch (Exception sleep) {
}
} while ((!getStateTransfered()));
reqNow = System.currentTimeMillis();
}
}
if (isTimeout || (!getStateTransfered())) {
counterNoStateTransfered++ ;
log.error(sm.getString("deltaManager.noSessionState",getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime)));
} else {
if (log.isInfoEnabled())
log.info(sm.getString("deltaManager.sessionReceived",getName(), new Date(beforeSendTime), new Long(reqNow - beforeSendTime)));
}
|