DistributedTxCachepublic class DistributedTxCache extends Object implements org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransferThis is a LRU cache. The TxCache itself is not transactional
but any accesses to objects within the cache ARE transactional. |
Fields Summary |
---|
protected static Logger | log | protected long | lockTimeout | protected DistributedSynchronizationManager | synchManager | protected DistributedVersionManager | versionManager | protected String | partitionName | protected org.jboss.ha.framework.interfaces.HAPartition | partition | protected String | cacheName | protected LRUCache | cache | protected int | maxSize |
Constructors Summary |
---|
public DistributedTxCache(int maxSize, long lockTimeout, String cacheName)
this(maxSize, lockTimeout, cacheName, "DefaultPartition");
| public DistributedTxCache(int maxSize, long lockTimeout, String cacheName, String pName)
this.lockTimeout = lockTimeout;
this.partitionName = pName;
this.maxSize = maxSize;
this.cacheName = "DistributedTxCache/" + cacheName;
|
Methods Summary |
---|
public synchronized void | _flush()
cache.clear();
| public synchronized void | _insert(java.lang.Object key, java.lang.Object obj)
cache.put(key, obj);
| public synchronized void | _remove(java.lang.Object key)
cache.remove(key);
| public void | create()
this.partition = findHAPartitionWithName(partitionName);
//REVISIT: doesn't really buy us anything until JGroups synchronizes
// initial state correctly
//partition.subscribeToStateTransferEvents(cacheName, this);
//REVISIT AGAIN: Actually I talked to Bela about this. I can change the
//Clustering framework to do state transfer correctly
partition.registerRPCHandler(cacheName, this);
synchManager = new DistributedSynchronizationManager(cacheName, null, partition);
versionManager = new DistributedVersionManager(lockTimeout, synchManager);
synchManager.versionManager = versionManager;
synchManager.create();
| protected org.jboss.ha.framework.interfaces.HAPartition | findHAPartitionWithName(java.lang.String name)
HAPartition result = null;
MBeanServer server = MBeanServerLocator.locate();
QueryExp exp = Query.and(
Query.eq(
Query.classattr(),
Query.value(ClusterPartition.class.getName ())
),
Query.match(
Query.attr("PartitionName"),
Query.value(name)
)
);
Set mbeans = server.queryMBeans (null, exp);
if (mbeans != null && mbeans.size () > 0)
{
ObjectInstance inst = (ObjectInstance)(mbeans.iterator ().next ());
ClusterPartitionMBean cp = (ClusterPartitionMBean) MBeanProxyExt.create (
ClusterPartitionMBean.class,
inst.getObjectName (),
server);
result = cp.getHAPartition();
}
return result;
| public void | flush(java.lang.Object key)
Object[] args = {};
try
{
partition.callMethodOnCluster(cacheName, "_flush", args, false);
}
catch (Exception ex)
{
throw new RuntimeException(ex);
}
| public synchronized java.lang.Object | get(java.lang.Object key)
Object obj = cache.get(key);
if (obj instanceof GUID)
{
GUID guid = (GUID)obj;
obj = synchManager.getObject(guid);
}
return obj;
| public java.io.Serializable | getCurrentState()
log.trace("getCurrentState called on cache");
return cache;
| public void | insert(java.lang.Object key, java.lang.Object obj)
try
{
obj = versionManager.makeVersioned(obj);
if (versionManager.isVersioned(obj))
{
log.trace("Inserting versioned object");
obj = VersionManager.getGUID((InstanceAdvised)obj);
}
else
{
log.trace("Inserting a non-Versioned object");
}
Object[] args = {key, obj};
partition.callMethodOnCluster(cacheName, "_insert", args, false);
}
catch (Exception ex)
{
ex.printStackTrace();
throw ex;
}
| protected void | pullState()
Object[] args = {};
List rsp = partition.callMethodOnCluster(cacheName, "getCurrentState", args, true);
if (rsp.size() > 0)
{
setCurrentState((Serializable)rsp.get(0));
}
| public void | remove(java.lang.Object key)
Object[] args = {key};
try
{
partition.callMethodOnCluster(cacheName, "_remove", args, false);
}
catch (Exception ex)
{
throw new RuntimeException(ex);
}
| public void | setCurrentState(java.io.Serializable newState)
log.trace("setCurrentState called on cache");
synchronized (this)
{
this.cache = (LRUCache)newState;
}
| public synchronized void | start()
synchManager.start();
pullState();
if (cache == null) cache = new LRUCache(maxSize);
|
|