FileDocCategorySizeDatePackage
ConcurrencyManager.javaAPI DocGlassfish v2 API18489Tue May 22 16:54:34 BST 2007oracle.toplink.essentials.internal.helper

ConcurrencyManager

public class ConcurrencyManager extends Object implements Serializable
INTERNAL:

Purpose: To maintain concurrency for a paticular task. It is a wrappers of a semaphore that allows recursive waits by a single thread.

Responsibilities:

  • Keep track of the active thread.
  • Wait all other threads until the first thread is done.
  • Maintain the depth of the active thread.

Fields Summary
protected int
numberOfReaders
protected int
depth
protected int
numberOfWritersWaiting
protected transient Thread
activeThread
public static Hashtable
deferredLockManagers
protected boolean
lockedByMergeManager
protected CacheKey
ownerCacheKey
Cachkey owner set when ConcurrencyMananger is used within an cachekey on an idenity map Used to store the owner so that the object involved can be retrieved from the cachekey
Constructors Summary
public ConcurrencyManager()
Initialize the newly allocated instance of this class. Set the depth to zero.

        this.depth = 0;
        this.numberOfReaders = 0;
        this.numberOfWritersWaiting = 0;
    
public ConcurrencyManager(CacheKey cacheKey)
Initialize a new ConcurrencyManger, seting depth to zero and setting the owner cacheKey.

        this();
        this.ownerCacheKey = cacheKey;
    
Methods Summary
public synchronized voidacquire()
Wait for all threads except the active thread. If the active thread just increament the depth. This should be called before entering a critical section.

        this.acquire(false);
    
public synchronized voidacquire(boolean forMerge)
Wait for all threads except the active thread. If the active thread just increament the depth. This should be called before entering a critical section. called with true from the merge process, if true then the refresh will not refresh the object

        while (!((getActiveThread() == Thread.currentThread()) || ((getActiveThread() == null) && (getNumberOfReaders() == 0)))) {
            // This must be in a while as multiple threads may be released, or another thread may rush the acquire after one is released.
            try {
                setNumberOfWritersWaiting(getNumberOfWritersWaiting() + 1);
                wait();
                setNumberOfWritersWaiting(getNumberOfWritersWaiting() - 1);
            } catch (InterruptedException exception) {
                throw ConcurrencyException.waitWasInterrupted(exception.getMessage());
            }
        }
        if (getActiveThread() == null) {
            setActiveThread(Thread.currentThread());
        }
        setIsLockedByMergeManager(forMerge);
        setDepth(getDepth() + 1);

    
public voidacquireDeferredLock()
Add deferred lock into a hashtable to avoid deadlock

        Thread currentThread = Thread.currentThread();
        DeferredLockManager lockManager = getDeferredLockManager(currentThread);
        if (lockManager == null) {
            lockManager = new DeferredLockManager();
            putDeferredLock(currentThread, lockManager);
        }
        lockManager.incrementDepth();
        synchronized (this) {
            while (!(getNumberOfReaders() == 0)) {
                // There are readers of this object, wait until they are done before determining if
                //there are any other writers.  If not we will wait on the readers for acquire.  If another
                //thread is also waiting on the acquire then a deadlock could occur.  See bug 3049635
                //We could release all active locks before relesing defered but the object may not be finished building
                //we could make the readers get a hard lock, but then we would just build a defered lock even though
                //the object is not being built.
                try {
                    setNumberOfWritersWaiting(getNumberOfWritersWaiting() + 1);
                    wait();
                    setNumberOfWritersWaiting(getNumberOfWritersWaiting() - 1);
                } catch (InterruptedException exception) {
                    throw ConcurrencyException.waitWasInterrupted(exception.getMessage());
                }
            }
            if ((getActiveThread() == currentThread) || (!isAcquired())) {
                lockManager.addActiveLock(this);
                acquire();
            } else {
                lockManager.addDeferredLock(this);
                Object[] params = new Object[2];
                params[0] = this.getOwnerCacheKey().getObject();
                params[1] = currentThread.getName();
                AbstractSessionLog.getLog().log(SessionLog.FINER, "acquiring_deferred_lock", params, true);
            }
        }
    
public synchronized booleanacquireNoWait()
If the lock is not acquired allready acquire it and return true. If it has been acquired allready return false Added for CR 2317

        if (!isAcquired() || getActiveThread() == Thread.currentThread()) {
            //if I own the lock increment depth
            acquire(false);
            return true;
        } else {
            return false;
        }
    
public synchronized booleanacquireNoWait(boolean forMerge)
If the lock is not acquired allready acquire it and return true. If it has been acquired allready return false Added for CR 2317 called with true from the merge process, if true then the refresh will not refresh the object

        if (!isAcquired() || getActiveThread() == Thread.currentThread()) {
            //if I own the lock increment depth
            acquire(forMerge);
            return true;
        } else {
            return false;
        }
    
public synchronized voidacquireReadLock()
Wait on any writer. Allow concurrent reads.

        // Cannot check for starving writers as will lead to deadlocks.
        while (!((getActiveThread() == Thread.currentThread()) || (getActiveThread() == null))) {
            try {
                wait();
            } catch (InterruptedException exception) {
                throw ConcurrencyException.waitWasInterrupted(exception.getMessage());
            }
        }

        setNumberOfReaders(getNumberOfReaders() + 1);
    
public synchronized booleanacquireReadLockNoWait()
If this is acquired return false otherwise acquire readlock and return true

        if (!isAcquired()) {
            acquireReadLock();
            return true;
        } else {
            return false;
        }
    
public voidcheckReadLock()
Check the lock state, if locked, acquire and release a read lock. This optimizes out the normal read-lock check if not locked.

        // If it is not locked, then just return.
        if (getActiveThread() == null) {
            return;
        }
        acquireReadLock();
        releaseReadLock();
    
public java.lang.ThreadgetActiveThread()
Return the active thread.

        return activeThread;
    
public static synchronized oracle.toplink.essentials.internal.helper.DeferredLockManagergetDeferredLockManager(java.lang.Thread thread)
Return the deferred lock manager from the thread

        return (DeferredLockManager)getDeferredLockManagers().get(thread);
    
protected static java.util.HashtablegetDeferredLockManagers()
Return the deferred lock manager hashtable (thread - DeferredLockManager).

        if (deferredLockManagers == null) {
            deferredLockManagers = new Hashtable(50);
        }

        return deferredLockManagers;
    
public intgetDepth()
Return the current depth of the active thread.

        return depth;
    
public intgetNumberOfReaders()
Number of writer that want the lock. This is used to ensure that a writer is not starved.

        return numberOfReaders;
    
public intgetNumberOfWritersWaiting()
Number of writers that want the lock. This is used to ensure that a writer is not starved.

        return numberOfWritersWaiting;
    
public oracle.toplink.essentials.internal.identitymaps.CacheKeygetOwnerCacheKey()
Returns the owner cache key for this concurrency manager

        return this.ownerCacheKey;
    
public booleanisAcquired()
Return if a thread has aquire this manager.

        return depth > 0;
    
public static synchronized booleanisBuildObjectOnThreadComplete(java.lang.Thread thread, oracle.toplink.essentials.internal.helper.IdentityHashtable recursiveSet)
Check if the deferred locks of a thread are all released

        if (recursiveSet.containsKey(thread)) {
            return true;
        }
        recursiveSet.put(thread, thread);

        DeferredLockManager lockManager = getDeferredLockManager(thread);
        if (lockManager == null) {
            return true;
        }

        Vector deferredLocks = lockManager.getDeferredLocks();
        for (Enumeration deferredLocksEnum = deferredLocks.elements();
                 deferredLocksEnum.hasMoreElements();) {
            ConcurrencyManager deferedLock = (ConcurrencyManager)deferredLocksEnum.nextElement();
            Thread activeThread = null;
            if (deferedLock.isAcquired()) {
                activeThread = deferedLock.getActiveThread();

                // the active thread may be set to null at anypoint
                // if added for CR 2330 
                if (activeThread != null) {
                    DeferredLockManager currentLockManager = getDeferredLockManager(activeThread);
                    if (currentLockManager == null) {
                        return false;
                    } else if (currentLockManager.isThreadComplete()) {
                        activeThread = deferedLock.getActiveThread();
                        // The lock may suddenly finish and no longer have an active thread.
                        if (activeThread != null) {
                            if (!isBuildObjectOnThreadComplete(activeThread, recursiveSet)) {
                                return false;
                            }
                        }
                    } else {
                        return false;
                    }
                }
            }
        }
        return true;
    
public booleanisLockedByMergeManager()
INTERNAL: Used byt the refresh process to determine if this concurrency manager is locked by the merge process. If it is then the refresh should not refresh the object

        return this.lockedByMergeManager;
    
public booleanisNested()
Return if this manager is within a nested aquire.

        return depth > 1;
    
public synchronized voidputDeferredLock(java.lang.Thread thread, oracle.toplink.essentials.internal.helper.DeferredLockManager lockManager)

        getDeferredLockManagers().put(thread, lockManager);
    
public synchronized voidrelease()
Decrement the depth for the active thread. Assume the current thread is the active one. Raise an error if the depth become < 0. The notify will release the first thread waiting on the object, if no threads are waiting it will do nothing.

        if (getDepth() == 0) {
            throw ConcurrencyException.signalAttemptedBeforeWait();
        } else {
            setDepth(getDepth() - 1);
        }
        if (getDepth() == 0) {
            setActiveThread(null);
            setIsLockedByMergeManager(false);
            notifyAll();
        }
    
public voidreleaseDeferredLock()
Release the deferred lock. This uses a deadlock detection and resoultion algorthm to avoid cache deadlocks. The deferred lock manager keeps track of the lock for a thread, so that other thread know when a deadlock has occured and can resolve it.

        Thread currentThread = Thread.currentThread();
        DeferredLockManager lockManager = getDeferredLockManager(currentThread);
        if (lockManager == null) {
            return;
        }
        int depth = lockManager.getThreadDepth();

        if (depth > 1) {
            lockManager.decrementDepth();
            return;
        }

        // If the set is null or empty, means there is no deferred lock for this thread, return.
        if (!lockManager.hasDeferredLock()) {
            lockManager.releaseActiveLocksOnThread();
            removeDeferredLockManager(currentThread);
            return;
        }

        lockManager.setIsThreadComplete(true);

        // Thread have three stages, one where they are doing work (i.e. building objects)
        // two where they are done their own work but may be waiting on other threads to finish their work,
        // and a third when they and all the threads they are waiting on are done.
        // This is essentially a busy wait to determine if all the other threads are done.
        while (true) {
            // 2612538 - the default size of IdentityHashtable (32) is appropriate
            IdentityHashtable recursiveSet = new IdentityHashtable();
            if (isBuildObjectOnThreadComplete(currentThread, recursiveSet)) {// Thread job done.
                lockManager.releaseActiveLocksOnThread();
                removeDeferredLockManager(currentThread);
                Object[] params = new Object[1];
                params[0] = currentThread.getName();
                AbstractSessionLog.getLog().log(SessionLog.FINER, "deferred_locks_released", params, true);
                return;
            } else {// Not done yet, wait and check again.
                try {
                    Thread.sleep(10);
                } catch (InterruptedException ignoreAndContinue) {
                }
            }
        }
    
public synchronized voidreleaseReadLock()
Decrement the number of readers. Used to allow concurrent reads.

        if (getNumberOfReaders() == 0) {
            throw ConcurrencyException.signalAttemptedBeforeWait();
        } else {
            setNumberOfReaders(getNumberOfReaders() - 1);
        }
        if (getNumberOfReaders() == 0) {
            notifyAll();
        }
    
public static synchronized oracle.toplink.essentials.internal.helper.DeferredLockManagerremoveDeferredLockManager(java.lang.Thread thread)
Remove the deferred lock manager for the thread

        return (DeferredLockManager)getDeferredLockManagers().remove(thread);
    
public voidsetActiveThread(java.lang.Thread activeThread)
Set the active thread.

        this.activeThread = activeThread;
    
protected voidsetDepth(int depth)
Set the current depth of the active thread.

        this.depth = depth;
    
public voidsetIsLockedByMergeManager(boolean state)
INTERNAL: Used by the mergemanager to let the read know not to refresh this object as it is being loaded by the merge process.

        this.lockedByMergeManager = state;
    
protected voidsetNumberOfReaders(int numberOfReaders)
Track the number of readers.

        this.numberOfReaders = numberOfReaders;
    
protected voidsetNumberOfWritersWaiting(int numberOfWritersWaiting)
Number of writers that want the lock. This is used to ensure that a writer is not starved.

        this.numberOfWritersWaiting = numberOfWritersWaiting;
    
public java.lang.StringtoString()
Print the nested depth.

        Object[] args = { new Integer(getDepth()) };
        return Helper.getShortClassName(getClass()) + ToStringLocalization.buildMessage("nest_level", args);