/*
* JBoss, Home of Professional Open Source.
* Copyright 2006, Red Hat Middleware LLC, and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.ejb3.stateful;
import org.jboss.aop.metadata.SimpleMetaData;
import org.jboss.ejb3.BaseContext;
import org.jboss.ejb3.Container;
import org.jboss.ejb3.Ejb3Registry;
import org.jboss.ejb3.ProxyFactoryHelper;
import org.jboss.ejb3.ThreadLocalStack;
import org.jboss.ejb3.cache.StatefulCache;
import org.jboss.ejb3.interceptor.InterceptorInfo;
import org.jboss.ejb3.tx.TxUtil;
import org.jboss.serial.io.MarshalledObject;
import org.jboss.tm.TxUtils;
import javax.ejb.EJBLocalObject;
import javax.naming.NamingException;
import javax.persistence.EntityManager;
import javax.transaction.Synchronization;
import javax.transaction.Transaction;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
/**
* BeanContext for a stateful session bean.
*
* @author <a href="mailto:bill@jboss.org">Bill Burke</a>
* @author Brian Stansberry
*
* @version $Revision: 61595 $
*/
public class StatefulBeanContext extends BaseContext implements Externalizable
{
/** The serialVersionUID */
private static final long serialVersionUID = -102470788178912606L;
protected Object id;
protected boolean txSynchronized = false;
protected boolean inInvocation = false;
protected MarshalledObject beanMO;
protected ReentrantLock lock = new ReentrantLock();
protected boolean discarded;
// these two are needed for propagated extended persistence contexts when one
// SFSB injects another.
public static ThreadLocalStack<StatefulBeanContext> propagatedContainedIn = new ThreadLocalStack<StatefulBeanContext>();
public static ThreadLocalStack<StatefulBeanContext> currentBean = new ThreadLocalStack<StatefulBeanContext>();
protected StatefulBeanContext containedIn;
protected List<StatefulBeanContext> contains;
protected HashMap<String, EntityManager> persistenceContexts;
protected boolean removed;
protected String containerName;
protected boolean replicationIsPassivation = true;
protected transient boolean passivated = false;
public StatefulBeanContext()
{
}
public List<StatefulBeanContext> getContains()
{
if (bean == null)
extractBeanAndInterceptors();
return contains;
}
/**
* Makes a copy of the contains list so nested callback iterators
* can iterate over it without concern that another thread will
* remove the context.
*
* TODO replace contains list with a concurrent collection
*/
private List<StatefulBeanContext> getThreadSafeContains()
{
// Call getContains() to ensure unmarshalling
List<StatefulBeanContext> orig = getContains();
List<StatefulBeanContext> copy = null;
if (orig != null)
{
synchronized (orig)
{
copy = new ArrayList<StatefulBeanContext>(orig);
}
}
return copy;
}
public EntityManager getExtendedPersistenceContext(String id)
{
EntityManager found = null;
Map<String, EntityManager> extendedPCS = getExtendedPersistenceContexts();
if (extendedPCS != null)
{
found = extendedPCS.get(id);
}
if (found != null)
return found;
if (containedIn != null)
{
found = containedIn.getExtendedPersistenceContext(id);
}
return found;
}
public void addExtendedPersistenceContext(String id, EntityManager pc)
{
Map<String, EntityManager> extendedPCS = getExtendedPersistenceContexts();
if (extendedPCS == null)
{
extendedPCS = persistenceContexts = new HashMap<String, EntityManager>();
}
extendedPCS.put(id, pc);
}
public boolean scanForExtendedPersistenceContext(String id, StatefulBeanContext ignore)
{
if (this.equals(ignore))
return false;
if (!removed)
{
Map<String, EntityManager> extendedPCS = getExtendedPersistenceContexts();
if (extendedPCS != null && extendedPCS.containsKey(id))
return true;
}
if (getContains() != null)
{
synchronized (contains)
{
for (StatefulBeanContext contained : contains)
{
if (!contained.equals(ignore))
{
if (contained.scanForExtendedPersistenceContext(id, ignore))
return true;
}
}
}
}
return false;
}
public void removeExtendedPersistenceContext(String id)
{
Map<String, EntityManager> extendedPCS = getExtendedPersistenceContexts();
if (extendedPCS != null)
{
extendedPCS.remove(id);
}
if (getContains() != null)
{
synchronized (contains)
{
for (StatefulBeanContext contained: contains)
{
contained.removeExtendedPersistenceContext(id);
}
}
}
}
public Map<String, EntityManager> getExtendedPersistenceContexts()
{
if (persistenceContexts == null)
{
if (bean == null)
extractBeanAndInterceptors(); // unmarshall
}
return persistenceContexts;
}
public StatefulBeanContext getContainedIn()
{
return containedIn;
}
public StatefulBeanContext getUltimateContainedIn()
{
StatefulBeanContext child = this;
StatefulBeanContext parent = containedIn;
while (parent != null)
{
child = parent;
parent = parent.getContainedIn();
}
if (parent == null && this != child)
{
// Don't hand out a ref to our parent obtained by walking the
// tree. Rather, get it from its cache. This gives the cache
// a chance to activate it if it hasn't been. We don't want
// to mark the parent as in use though.
StatefulCache ultimateCache = ((StatefulContainer)child.getContainer()).getCache();
child = ultimateCache.get(child.getId(), false);
}
return child;
}
public void addContains(StatefulBeanContext ctx)
{
if (getContains() == null)
contains = new ArrayList<StatefulBeanContext>();
synchronized (contains)
{
contains.add(ctx);
ctx.containedIn = this;
}
}
public void removeContains(StatefulBeanContext ctx)
{
if (getContains() != null) // call getContains() to ensure unmarshalling
{
// Need to be thread safe
synchronized (contains)
{
if (contains.remove(ctx))
{
ctx.containedIn = null;
}
}
if (removed)
{
// Close out any XPCs that are no longer referenced
cleanExtendedPCs();
}
if (getCanRemoveFromCache())
{
if (containedIn != null)
{
containedIn.removeContains(this);
}
// Notify our cache to remove us as we no longer have children
((StatefulContainer) getContainer()).getCache().remove(getId());
}
}
}
public StatefulBeanContext pushContainedIn()
{
StatefulBeanContext thisPtr = this;
if (propagatedContainedIn.getList() != null)
{
// This is a nested stateful bean, within another stateful bean.
// We need to create a nested bean context. The nested one will
// be put in the parent's list and owned by it. It is a special
// class because we do not want to put its state in a separate
// marshalled object as we want to maintain object references
// between it and its parent.
// We also do not want to put the nested context within its container's
// cache. If placed in the cache, it could be independently passivated,
// activated and replicated, again breaking object references due to
// independent marshalling. Instead, we return a proxy to it that will
// be stored in its container's cache
containedIn = propagatedContainedIn.get();
NestedStatefulBeanContext nested = new NestedStatefulBeanContext();
nested.id = id;
nested.container = getContainer();
nested.containerName = containerName;
nested.bean = bean;
nested.replicationIsPassivation = replicationIsPassivation;
containedIn.addContains(nested);
thisPtr = new ProxiedStatefulBeanContext(nested);
}
propagatedContainedIn.push(thisPtr);
return thisPtr;
}
/**
* Checks whether this context or any of its children are in use.
*/
public boolean getCanPassivate()
{
boolean canPassivate = (removed || !inUse);
// Just check contains directly; don't call getContains() since
// getContains() will deserialize the beanMO. If the beanMO isn't
// deserialized it's safe to assume the children aren't in use
if (canPassivate && contains != null)
{
synchronized (contains)
{
for (StatefulBeanContext contained : contains)
{
if (!contained.getCanPassivate())
{
canPassivate = false;
break;
}
}
}
}
return canPassivate;
}
/**
* Notification from a non-clustered StatefulCache to inform
* that we are about to be passivated.
*/
public void prePassivate()
{
if (!removed && !passivated)
{
getContainer().invokePrePassivate(this);
passivated = true;
}
// Pass the call on to any nested children
List<StatefulBeanContext> children = getThreadSafeContains();
if (children != null)
{
for (StatefulBeanContext contained : children)
{
contained.prePassivate();
}
}
}
/**
* Notification from a non-clustered StatefulCache to inform
* that we have been activated.
*/
public void postActivate()
{
if (!removed && passivated)
{
getContainer().invokePostActivate(this);
passivated = false;
}
// Pass the call on to any nested children
List<StatefulBeanContext> children = getThreadSafeContains();
if (children != null)
{
for (StatefulBeanContext contained : children)
{
contained.postActivate();
}
}
}
/**
* Notification from a ClusteredStatefulCache to inform
* that a bean that is stored in the distributed cache is now
* being passivated as well. Something of a misnomer
* as it is possible the bean wasn't replicated (if it implements
* Optimized it may have been activated and then a reference left
* in the cache without the bean ever being replicated).
*/
public void passivateAfterReplication()
{
if (!removed && !passivated)
{
getInstance(); // make sure we're unmarshalled
getContainer().invokePrePassivate(this);
passivated = true;
}
// Only bother informing children if we aren't already serialized.
// If we're serialized, so are they and there's no point.
// Notifying them would cause us to deserialize a beanMO to no purpose.
if (contains != null)
{
// Pass the call on to any nested children
List<StatefulBeanContext> children = getThreadSafeContains();
if (children != null)
{
for (StatefulBeanContext contained : children)
{
contained.passivateAfterReplication();
}
}
}
}
public void activateAfterReplication()
{
if (!removed && passivated)
{
getInstance(); // make sure we're unmarshalled
getContainer().invokePostActivate(this);
passivated = false;
}
// Pass the call on to any nested children
List<StatefulBeanContext> children = getThreadSafeContains();
if (children != null)
{
for (StatefulBeanContext contained : children)
{
contained.activateAfterReplication();
}
}
}
public boolean getReplicationIsPassivation()
{
return replicationIsPassivation;
}
public void setReplicationIsPassivation(boolean replicationIsPassivation)
{
this.replicationIsPassivation = replicationIsPassivation;
}
/**
* Notification from a ClusteredStatefulCache before a bean is
* replicated.
*/
public void preReplicate()
{
if (!removed && replicationIsPassivation && !passivated)
{
getContainer().invokePrePassivate(this);
passivated = true;
}
// Pass the call on to any nested children
List<StatefulBeanContext> children = getThreadSafeContains();
if (children != null)
{
for (StatefulBeanContext contained : children)
{
contained.preReplicate();
}
}
}
/**
* Notification from a ClusteredStatefulCache after the bean
* is fetched from the distributed cache. Something of a misnomer
* as it is possible the bean wasn't replicated (if it implements
* Optimized it can be fetched from the cache twice without ever
* being replicated).
*/
public void postReplicate()
{
// We may not have been replicated, so only invoke @PostActivate
// if we are marked as passivated
if (!removed && passivated)
{
getContainer().invokePostActivate(this);
passivated = false;
}
// Pass the call on to any nested children
List<StatefulBeanContext> children = getThreadSafeContains();
if (children != null)
{
for (StatefulBeanContext contained : children)
{
contained.postReplicate();
}
}
}
public void popContainedIn()
{
propagatedContainedIn.pop();
}
public boolean isInUse()
{
return inUse;
}
public void setInUse(boolean inUse)
{
this.inUse = inUse;
}
public boolean isDiscarded()
{
return discarded;
}
public void setDiscarded(boolean discarded)
{
this.discarded = discarded;
}
public ReentrantLock getLock()
{
return lock;
}
public boolean isInInvocation()
{
return inInvocation;
}
public void setInInvocation(boolean inInvocation)
{
this.inInvocation = inInvocation;
}
public Object getId()
{
return id;
}
public void setId(Object id)
{
this.id = id;
}
public boolean isTxSynchronized()
{
return txSynchronized;
}
public void setTxSynchronized(boolean txSynchronized)
{
this.txSynchronized = txSynchronized;
}
public boolean isRemoved()
{
return removed;
}
public void remove()
{
if (removed)
return;
removed = true;
RuntimeException exceptionThrown = null;
// Close any XPCs that haven't been injected into live
// beans in our family
try
{
cleanExtendedPCs();
}
catch (RuntimeException e)
{
// we still need to remove ourself from any parent, so save
// the thrown exception and rethrow it after we have cleaned up.
if (exceptionThrown == null)
exceptionThrown = e;
}
if (containedIn != null && getCanRemoveFromCache())
{
try
{
containedIn.removeContains(this);
}
catch (RuntimeException e)
{
// we still need to clean internal state, so save the
// thrown exception and rethrow it after we have cleaned up.
if (exceptionThrown == null)
exceptionThrown = e;
}
}
// Clear out refs to our bean and interceptors, to reduce our footprint
// in case we are still cached for our refs to any XPCs
bean = null;
interceptorInstances = null;
if (exceptionThrown != null) throw new RuntimeException("exception thrown while removing SFSB", exceptionThrown);
}
public boolean getCanRemoveFromCache()
{
boolean canRemove = removed;
if (canRemove && getContains() != null) // call getContains() to ensure unmarshalling
{
synchronized (contains)
{
canRemove = (contains.size() == 0);
}
}
return canRemove;
}
private void cleanExtendedPCs()
{
try
{
Transaction tx = TxUtil.getTransactionManager().getTransaction();
if (tx != null && TxUtils.isActive(tx))
{
tx.registerSynchronization(new XPCCloseSynchronization(this));
}
else
{
closeExtendedPCs();
}
}
catch (RuntimeException e)
{
throw e;
}
catch (Exception e)
{
throw new RuntimeException("Error cleaning PersistenceContexts in SFSB removal", e);
}
}
private void closeExtendedPCs()
{
Map<String, EntityManager> extendedPCS = getExtendedPersistenceContexts();
if (extendedPCS != null)
{
RuntimeException exceptionThrown = null;
List<String> closedXPCs = new ArrayList<String>();
StatefulBeanContext topCtx = getUltimateContainedIn();
for (Iterator<Map.Entry<String,EntityManager>> iter = extendedPCS.entrySet().iterator();
iter.hasNext();)
{
Map.Entry<String,EntityManager> entry = iter.next();
String id = entry.getKey();
EntityManager xpc = entry.getValue();
// Only close the XPC if our live parent(s) or cousins
// don't also have a ref to it
boolean canClose = topCtx.scanForExtendedPersistenceContext(id, this);
if (canClose && getContains() != null)
{
// Only close the XPC if our live childrenScan don't have a ref
synchronized (contains)
{
for (StatefulBeanContext contained : contains)
{
if (contained.scanForExtendedPersistenceContext(id, null))
{
canClose = false;
break;
}
}
}
}
if (canClose)
{
try
{
xpc.close();
closedXPCs.add(id);
}
catch (RuntimeException e)
{
exceptionThrown = e;
}
}
}
// Clean all refs to the closed XPCs from the tree
for (String id : closedXPCs)
{
topCtx.removeExtendedPersistenceContext(id);
}
if (exceptionThrown != null) throw new RuntimeException("Error closing PersistenceContexts in SFSB removal", exceptionThrown);
}
}
public void setContainer(Container container)
{
super.setContainer(container);
containerName = container.getObjectName().getCanonicalName();
}
public Container getContainer()
{
if (container == null)
{
container = Ejb3Registry.getContainer(containerName);
}
return container;
}
@Override
public Object getInstance()
{
if (bean == null)
{
extractBeanAndInterceptors();
}
return bean;
}
@Override
public SimpleMetaData getMetaData()
{
return super.getMetaData();
}
// these are public for fast concurrent access/update
public volatile boolean markedForPassivation = false;
public volatile boolean markedForReplication = false;
// BES 2007/02/16 make private and use a getter/setter as
// ProxiedStatefulBeanContext needs to pass the value on
// to its NestedStatefulBeanContext
private volatile boolean inUse = false;
public long lastUsed = System.currentTimeMillis();
@Override
public Object[] getInterceptorInstances(InterceptorInfo[] interceptorInfos)
{
if (bean == null)
{
extractBeanAndInterceptors();
}
return super.getInterceptorInstances(interceptorInfos);
}
protected synchronized void extractBeanAndInterceptors()
{
if (beanMO == null)
return;
try
{
Object[] beanAndInterceptors = (Object[]) beanMO.get();
bean = beanAndInterceptors[0];
persistenceContexts = (HashMap<String, EntityManager>) beanAndInterceptors[1];
ArrayList list = (ArrayList) beanAndInterceptors[2];
interceptorInstances = new HashMap<Class, Object>();
if (list != null)
{
for (Object o : list)
{
interceptorInstances.put(o.getClass(), o);
}
}
contains = (List<StatefulBeanContext>) beanAndInterceptors[3];
// Reestablish links to our children; if they serialize a link
// to us for some reason serialization blows up
if (contains != null)
{
for (StatefulBeanContext contained : contains)
{
contained.containedIn = this;
}
}
// Don't hold onto the beanMo, as its contents are mutable
// and we don't want to serialize a stale version of them
beanMO = null;
}
catch (IOException e)
{
throw new RuntimeException(e);
}
catch (ClassNotFoundException e)
{
throw new RuntimeException(e);
}
}
public void writeExternal(ObjectOutput out) throws IOException
{
out.writeUTF(containerName);
out.writeObject(id);
out.writeObject(metadata);
out.writeLong(lastUsed);
if (beanMO == null)
{
Object[] beanAndInterceptors = new Object[4];
beanAndInterceptors[0] = bean;
beanAndInterceptors[1] = persistenceContexts;
if (interceptorInstances != null && interceptorInstances.size() > 0)
{
ArrayList list = new ArrayList();
list.addAll(interceptorInstances.values());
beanAndInterceptors[2] = list;
}
beanAndInterceptors[3] = contains;
// BES 2007/02/12 Previously we were trying to hold a ref to
// beanMO after we created it, but that exposes the risk of
// two different versions of the constituent state that
// can fall out of sync. So now we just write a local variable.
MarshalledObject mo = new MarshalledObject(beanAndInterceptors);
out.writeObject(mo);
}
else
{
// We've been deserialized and are now being re-serialized, but
// extractBeanAndInterceptors hasn't been called in between.
// This can happen if a passivated session is involved in a
// JBoss Cache state transfer to a newly deployed node.
out.writeObject(beanMO);
}
out.writeBoolean(removed);
out.writeBoolean(replicationIsPassivation);
}
public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException
{
containerName = in.readUTF();
id = in.readObject();
metadata = (SimpleMetaData) in.readObject();
lastUsed = in.readLong();
beanMO = (MarshalledObject) in.readObject();
removed = in.readBoolean();
replicationIsPassivation = in.readBoolean();
// If we've just been deserialized, we are passivated
passivated = true;
}
public Object getInvokedMethodKey()
{
return this.getId();
}
public EJBLocalObject getEJBLocalObject() throws IllegalStateException
{
try
{
Object proxy = ((StatefulContainer)container).createLocalProxy(getId());
return (EJBLocalObject)proxy;
}
catch (Exception e)
{
throw new IllegalStateException(e);
}
}
private static class XPCCloseSynchronization implements Synchronization
{
private StatefulBeanContext ctx;
private XPCCloseSynchronization(StatefulBeanContext context)
{
ctx = context;
}
public void beforeCompletion()
{
}
public void afterCompletion(int status)
{
ctx.closeExtendedPCs();
// Clean ref to ctx, as some TMs leak Synchronization refs
ctx = null;
}
}
}
|