FileDocCategorySizeDatePackage
ArrayNotificationBuffer.javaAPI DocJava SE 6 API30793Tue Jun 10 00:22:04 BST 2008com.sun.jmx.remote.internal

ArrayNotificationBuffer

public class ArrayNotificationBuffer extends Object implements NotificationBuffer
A circular buffer of notifications received from an MBean server.

Fields Summary
private boolean
disposed
private static final Object
globalLock
private static final HashMap
mbsToBuffer
private final Collection
sharers
private final NotificationListener
bufferListener
private static final QueryExp
broadcasterQuery
private static final NotificationFilter
creationFilter
private final NotificationListener
creationListener
private static final ClassLogger
logger
private final MBeanServer
mBeanServer
private final ArrayQueue
queue
private int
queueSize
private long
earliestSequenceNumber
private long
nextSequenceNumber
private Set
createdDuringQuery
static final String
broadcasterClass
Constructors Summary
private ArrayNotificationBuffer(MBeanServer mbs, int queueSize)

        if (logger.traceOn())
            logger.trace("Constructor", "queueSize=" + queueSize);

        if (mbs == null || queueSize < 1)
            throw new IllegalArgumentException("Bad args");

        this.mBeanServer = mbs;
        this.queueSize = queueSize;
        this.queue = new ArrayQueue<NamedNotification>(queueSize);
        this.earliestSequenceNumber = System.currentTimeMillis();
        this.nextSequenceNumber = this.earliestSequenceNumber;

        logger.trace("Constructor", "ends");
    
Methods Summary
private voidaddBufferListener(javax.management.ObjectName name)

        checkNoLocks();
        if (logger.debugOn())
            logger.debug("addBufferListener", name.toString());
        try {
            addNotificationListener(name, bufferListener, null, name);
        } catch (Exception e) {
            logger.trace("addBufferListener", e);
            /* This can happen if the MBean was unregistered just
               after the query.  Or user NotificationBroadcaster might
               throw unexpected exception.  */
        }
    
synchronized voidaddNotification(com.sun.jmx.remote.internal.ArrayNotificationBuffer$NamedNotification notif)

        if (logger.traceOn())
            logger.trace("addNotification", notif.toString());

        while (queue.size() >= queueSize) {
	    dropNotification();
            if (logger.debugOn()) {
                logger.debug("addNotification",
                      "dropped oldest notif, earliestSeq=" +
                      earliestSequenceNumber);
            }
        }
        queue.add(notif);
        nextSequenceNumber++;
        if (logger.debugOn())
            logger.debug("addNotification", "nextSeq=" + nextSequenceNumber);
        notifyAll();
    
private voidaddNotificationListener(javax.management.ObjectName name, javax.management.NotificationListener listener, javax.management.NotificationFilter filter, java.lang.Object handback)

        try {
            AccessController.doPrivileged(new PrivilegedExceptionAction() {
                public Object run() throws InstanceNotFoundException {
                    mBeanServer.addNotificationListener(name,
                                                        listener,
                                                        filter,
                                                        handback);
                    return null;
                }
            });
        } catch (Exception e) {
            throw extractException(e);
        }
    
voidaddSharer(com.sun.jmx.remote.internal.ArrayNotificationBuffer$ShareBuffer sharer)

        synchronized (globalLock) {
            synchronized (this) {
                if (sharer.getSize() > queueSize)
                    resize(sharer.getSize());
            }
            sharers.add(sharer);
        }
    
private voidcheckNoLocks()

        if (Thread.holdsLock(this) || Thread.holdsLock(globalLock))
            logger.warning("checkNoLocks", "lock protocol violation");
    
private voidcreateListeners()

        logger.debug("createListeners", "starts");

        synchronized (this) {
            createdDuringQuery = new HashSet<ObjectName>();
        }

        try {
            addNotificationListener(MBeanServerDelegate.DELEGATE_NAME,
                                    creationListener, creationFilter, null);
            logger.debug("createListeners", "added creationListener");
        } catch (Exception e) {
            final String msg = "Can't add listener to MBean server delegate: ";
            RuntimeException re = new IllegalArgumentException(msg + e);
            EnvHelp.initCause(re, e);
            logger.fine("createListeners", msg + e);
            logger.debug("createListeners", e);
            throw re;
        }

        /* Spec doesn't say whether Set returned by QueryNames can be modified
           so we clone it. */
        Set<ObjectName> names = queryNames(null, broadcasterQuery);
        names = new HashSet<ObjectName>(names);

        synchronized (this) {
            names.addAll(createdDuringQuery);
            createdDuringQuery = null;
        }

        for (ObjectName name : names)
            addBufferListener(name);
        logger.debug("createListeners", "ends");
    
private voidcreatedNotification(javax.management.MBeanServerNotification n)

        final String shouldEqual =
            MBeanServerNotification.REGISTRATION_NOTIFICATION;
        if (!n.getType().equals(shouldEqual)) {
            logger.warning("createNotification", "bad type: " + n.getType());
            return;
        }

        ObjectName name = n.getMBeanName();
        if (logger.debugOn())
            logger.debug("createdNotification", "for: " + name);

        synchronized (this) {
            if (createdDuringQuery != null) {
                createdDuringQuery.add(name);
                return;
            }
        }

        if (isInstanceOf(mBeanServer, name, broadcasterClass)) {
            addBufferListener(name);
            if (isDisposed())
                removeBufferListener(name);
        }
    
private voiddestroyListeners()


       
        checkNoLocks();
        logger.debug("destroyListeners", "starts");
        try {
            removeNotificationListener(MBeanServerDelegate.DELEGATE_NAME,
                                       creationListener);
        } catch (Exception e) {
            logger.warning("remove listener from MBeanServer delegate", e);
        }
        Set<ObjectName> names = queryNames(null, broadcasterQuery);
        for (final ObjectName name : names) {
            if (logger.debugOn())
                logger.debug("destroyListeners",
			     "remove listener from " + name);
            removeBufferListener(name);
        }
        logger.debug("destroyListeners", "ends");
    
public voiddispose()

        throw new UnsupportedOperationException();
    
private voiddropNotification()

	queue.remove(0);
	earliestSequenceNumber++;
    
synchronized longearliestSequenceNumber()

        return earliestSequenceNumber;
    
private static java.lang.ExceptionextractException(java.lang.Exception e)
Iterate until we extract the real exception from a stack of PrivilegedActionExceptions.

        while (e instanceof PrivilegedActionException) {
            e = ((PrivilegedActionException)e).getException();
        }
        return e;
    
public javax.management.remote.NotificationResultfetchNotifications(com.sun.jmx.remote.internal.NotificationBufferFilter filter, long startSequenceNumber, long timeout, int maxNotifications)

Fetch notifications that match the given listeners.

The operation only considers notifications with a sequence number at least startSequenceNumber. It will take no longer than timeout, and will return no more than maxNotifications different notifications.

If there are no notifications matching the criteria, the operation will block until one arrives, subject to the timeout.

param
filter an object that will add notifications to a {@code List} if they match the current listeners with their filters.
param
startSequenceNumber the first sequence number to consider.
param
timeout the maximum time to wait. May be 0 to indicate not to wait if there are no notifications.
param
maxNotifications the maximum number of notifications to return. May be 0 to indicate a wait for eligible notifications that will return a usable nextSequenceNumber. The {@link TargetedNotification} array in the returned {@link NotificationResult} may contain more than this number of elements but will not contain more than this number of different notifications.


        logger.trace("fetchNotifications", "starts");

	if (startSequenceNumber < 0 || isDisposed()) {
	    synchronized(this) {
		return new NotificationResult(earliestSequenceNumber(),
					      nextSequenceNumber(),
					      new TargetedNotification[0]);
	    }
	}

        // Check arg validity
        if (filter == null
            || startSequenceNumber < 0 || timeout < 0
            || maxNotifications < 0) {
            logger.trace("fetchNotifications", "Bad args");
            throw new IllegalArgumentException("Bad args to fetch");
        }

        if (logger.debugOn()) {
            logger.trace("fetchNotifications",
                  "filter=" + filter + "; startSeq=" +
                  startSequenceNumber + "; timeout=" + timeout +
                  "; max=" + maxNotifications);
        }

        if (startSequenceNumber > nextSequenceNumber()) {
            final String msg = "Start sequence number too big: " +
                startSequenceNumber + " > " + nextSequenceNumber();
            logger.trace("fetchNotifications", msg);
            throw new IllegalArgumentException(msg);
        }

        /* Determine the end time corresponding to the timeout value.
           Caller may legitimately supply Long.MAX_VALUE to indicate no
           timeout.  In that case the addition will overflow and produce
           a negative end time.  Set end time to Long.MAX_VALUE in that
           case.  We assume System.currentTimeMillis() is positive.  */
        long endTime = System.currentTimeMillis() + timeout;
        if (endTime < 0) // overflow
            endTime = Long.MAX_VALUE;

        if (logger.debugOn())
            logger.debug("fetchNotifications", "endTime=" + endTime);

        /* We set earliestSeq the first time through the loop.  If we
           set it here, notifications could be dropped before we
           started examining them, so earliestSeq might not correspond
           to the earliest notification we examined.  */
        long earliestSeq = -1;
        long nextSeq = startSequenceNumber;
        List<TargetedNotification> notifs =
            new ArrayList<TargetedNotification>();

        /* On exit from this loop, notifs, earliestSeq, and nextSeq must
           all be correct values for the returned NotificationResult.  */
        while (true) {
            logger.debug("fetchNotifications", "main loop starts");

            NamedNotification candidate;

            /* Get the next available notification regardless of filters,
               or wait for one to arrive if there is none.  */
            synchronized (this) {

                /* First time through.  The current earliestSequenceNumber
                   is the first one we could have examined.  */
                if (earliestSeq < 0) {
                    earliestSeq = earliestSequenceNumber();
                    if (logger.debugOn()) {
                        logger.debug("fetchNotifications",
                              "earliestSeq=" + earliestSeq);
                    }
                    if (nextSeq < earliestSeq) {
                        nextSeq = earliestSeq;
                        logger.debug("fetchNotifications",
				     "nextSeq=earliestSeq");
                    }
                } else
                    earliestSeq = earliestSequenceNumber();

                /* If many notifications have been dropped since the
                   last time through, nextSeq could now be earlier
                   than the current earliest.  If so, notifications
                   may have been lost and we return now so the caller
                   can see this next time it calls.  */
                if (nextSeq < earliestSeq) {
                    logger.trace("fetchNotifications",
                          "nextSeq=" + nextSeq + " < " + "earliestSeq=" +
                          earliestSeq + " so may have lost notifs");
                    break;
                }

                if (nextSeq < nextSequenceNumber()) {
                    candidate = notificationAt(nextSeq);
                    if (logger.debugOn()) {
                        logger.debug("fetchNotifications", "candidate: " +
				     candidate);
                        logger.debug("fetchNotifications", "nextSeq now " +
				     nextSeq);
                    }
                } else {
                    /* nextSeq is the largest sequence number.  If we
                       already got notifications, return them now.
                       Otherwise wait for some to arrive, with
                       timeout.  */
                    if (notifs.size() > 0) {
                        logger.debug("fetchNotifications",
                              "no more notifs but have some so don't wait");
                        break;
                    }
                    long toWait = endTime - System.currentTimeMillis();
                    if (toWait <= 0) {
                        logger.debug("fetchNotifications", "timeout");
                        break;
                    }

		    /* dispose called */
		    if (isDisposed()) {
			if (logger.debugOn())
			    logger.debug("fetchNotifications",
					 "dispose callled, no wait");
			return new NotificationResult(earliestSequenceNumber(),
						  nextSequenceNumber(),
						  new TargetedNotification[0]);
		    }

		    if (logger.debugOn())
			logger.debug("fetchNotifications",
				     "wait(" + toWait + ")");
		    wait(toWait);

                    continue;
                }
            }

            /* We have a candidate notification.  See if it matches
               our filters.  We do this outside the synchronized block
               so we don't hold up everyone accessing the buffer
               (including notification senders) while we evaluate
               potentially slow filters.  */
            ObjectName name = candidate.getObjectName();
            Notification notif = candidate.getNotification();
            List<TargetedNotification> matchedNotifs =
                new ArrayList<TargetedNotification>();
            logger.debug("fetchNotifications",
			 "applying filter to candidate");
            filter.apply(matchedNotifs, name, notif);

            if (matchedNotifs.size() > 0) {
                /* We only check the max size now, so that our
                   returned nextSeq is as large as possible.  This
                   prevents the caller from thinking it missed
                   interesting notifications when in fact we knew they
                   weren't.  */
                if (maxNotifications <= 0) {
                    logger.debug("fetchNotifications",
				 "reached maxNotifications");
                    break;
                }
                --maxNotifications;
                if (logger.debugOn())
                    logger.debug("fetchNotifications", "add: " +
				 matchedNotifs);
                notifs.addAll(matchedNotifs);
            }

            ++nextSeq;
        } // end while

        /* Construct and return the result.  */
        int nnotifs = notifs.size();
        TargetedNotification[] resultNotifs =
            new TargetedNotification[nnotifs];
        notifs.toArray(resultNotifs);
        NotificationResult nr =
            new NotificationResult(earliestSeq, nextSeq, resultNotifs);
        if (logger.debugOn())
            logger.debug("fetchNotifications", nr.toString());
        logger.trace("fetchNotifications", "ends");

        return nr;
    
public static com.sun.jmx.remote.internal.NotificationBuffergetNotificationBuffer(javax.management.MBeanServer mbs, java.util.Map env)


       
                

        if (env == null)
            env = Collections.emptyMap();

	//Find out queue size
	int queueSize = EnvHelp.getNotifBufferSize(env);

        ArrayNotificationBuffer buf;
        boolean create;
        NotificationBuffer sharer;
        synchronized (globalLock) {
            buf = mbsToBuffer.get(mbs);
            create = (buf == null);
            if (create) {
                buf = new ArrayNotificationBuffer(mbs, queueSize);
                mbsToBuffer.put(mbs, buf);
            }
            sharer = buf.new ShareBuffer(queueSize);
        }
        /* We avoid holding any locks while calling createListeners.
         * This prevents possible deadlocks involving user code, but
         * does mean that a second ConnectorServer created and started
         * in this window will return before all the listeners are ready,
         * which could lead to surprising behaviour.  The alternative
         * would be to block the second ConnectorServer until the first
         * one has finished adding all the listeners, but that would then
         * be subject to deadlock.
         */
        if (create)
            buf.createListeners();
        return sharer;
    
private synchronized booleanisDisposed()

	return disposed;
    
private static booleanisInstanceOf(javax.management.MBeanServer mbs, javax.management.ObjectName name, java.lang.String className)

        PrivilegedExceptionAction<Boolean> act =
            new PrivilegedExceptionAction<Boolean>() {
                public Boolean run() throws InstanceNotFoundException {
                    return mbs.isInstanceOf(name, className);
                }
            };
        try {
            return AccessController.doPrivileged(act);
        } catch (Exception e) {
            logger.fine("isInstanceOf", "failed: " + e);
            logger.debug("isInstanceOf", e);
            return false;
        }
    
synchronized longnextSequenceNumber()

        return nextSequenceNumber;
    
synchronized com.sun.jmx.remote.internal.ArrayNotificationBuffer$NamedNotificationnotificationAt(long seqNo)

        long index = seqNo - earliestSequenceNumber;
        if (index < 0 || index > Integer.MAX_VALUE) {
            final String msg = "Bad sequence number: " + seqNo + " (earliest "
                + earliestSequenceNumber + ")";
            logger.trace("notificationAt", msg);
            throw new IllegalArgumentException(msg);
        }
        return queue.get((int) index);
    
private java.util.SetqueryNames(javax.management.ObjectName name, javax.management.QueryExp query)

        PrivilegedAction<Set<ObjectName>> act =
            new PrivilegedAction<Set<ObjectName>>() {
                public Set<ObjectName> run() {
                    return mBeanServer.queryNames(name, query);
                }
            };
        try {
            return AccessController.doPrivileged(act);
        } catch (RuntimeException e) {
            logger.fine("queryNames", "Failed to query names: " + e);
	    logger.debug("queryNames", e);
            throw e;
        }
    
private voidremoveBufferListener(javax.management.ObjectName name)

        checkNoLocks();
        if (logger.debugOn())
            logger.debug("removeBufferListener", name.toString());
        try {
            removeNotificationListener(name, bufferListener);
        } catch (Exception e) {
            logger.trace("removeBufferListener", e);
        }
    
static voidremoveNotificationBuffer(javax.management.MBeanServer mbs)

        synchronized (globalLock) {
            mbsToBuffer.remove(mbs);
        }
    
private voidremoveNotificationListener(javax.management.ObjectName name, javax.management.NotificationListener listener)

        try {
            AccessController.doPrivileged(new PrivilegedExceptionAction() {
                public Object run() throws Exception {
                    mBeanServer.removeNotificationListener(name, listener);
                    return null;
                }
            });
        } catch (Exception e) {
            throw extractException(e);
        }
    
private voidremoveSharer(com.sun.jmx.remote.internal.ArrayNotificationBuffer$ShareBuffer sharer)

        boolean empty;
        synchronized (globalLock) {
            sharers.remove(sharer);
            empty = sharers.isEmpty();
            if (empty)
                removeNotificationBuffer(mBeanServer);
            else {
                int max = 0;
                for (ShareBuffer buf : sharers) {
                    int bufsize = buf.getSize();
                    if (bufsize > max)
                        max = bufsize;
                }
                if (max < queueSize)
                    resize(max);
            }
        }
        if (empty) {
            synchronized (this) {
                disposed = true;
                // Notify potential waiting fetchNotification call
                notifyAll();
            }
            destroyListeners();
        }
    
private synchronized voidresize(int newSize)

	if (newSize == queueSize)
	    return;
	while (queue.size() > newSize)
	    dropNotification();
	queue.resize(newSize);
	queueSize = newSize;