ArrayNotificationBufferpublic class ArrayNotificationBuffer extends Object implements NotificationBufferA circular buffer of notifications received from an MBean server. |
Fields Summary |
---|
public static final int | DEFAULT_BUFFER_SIZE | public static final String | BUFFER_SIZE_PROPERTY | private boolean | disposed | private static final HashMap | mbsToBuffer | private final Collection | sharers | private final NotificationListener | queryBufferListener | private final NotificationListener | creationBufferListener | private static final QueryExp | broadcasterQuery | private static final NotificationFilter | creationFilter | private final NotificationListener | creationListener | private static final ClassLogger | logger | private static final ObjectName | delegateName | private final MBeanServer | mBeanServer | private final ArrayQueue | queue | private int | queueSize | private long | earliestSequenceNumber | private long | nextSequenceNumber | private boolean | destroyed | static final String | broadcasterClass |
Constructors Summary |
---|
private ArrayNotificationBuffer(MBeanServer mbs, int queueSize)
if (logger.traceOn())
logger.trace("Constructor", "queueSize=" + queueSize);
if (mbs == null || queueSize < 1)
throw new IllegalArgumentException("Bad args");
this.mBeanServer = mbs;
this.queueSize = queueSize;
this.queue = new ArrayQueue(queueSize);
this.earliestSequenceNumber = System.currentTimeMillis();
this.nextSequenceNumber = this.earliestSequenceNumber;
createListeners();
logger.trace("Constructor", "ends");
|
Methods Summary |
---|
private void | addBufferListener(javax.management.ObjectName name, javax.management.NotificationListener bufferListener)
if (logger.debugOn())
logger.debug("addBufferListener", ""+name);
try {
AccessController.doPrivileged(new PrivilegedExceptionAction() {
public Object run() throws InstanceNotFoundException {
mBeanServer.addNotificationListener(name,
bufferListener,
null,
name);
return null;
}
});
} catch (Exception e) {
logger.trace("addBufferListener", extractException(e));
/* This can happen if the MBean was unregistered just
after the query. Or user NotificationBroadcaster might
throw unexpected exception. */
}
| synchronized void | addNotification(com.sun.jmx.remote.internal.ArrayNotificationBuffer$NamedNotification notif)
if (logger.traceOn())
logger.trace("addNotification", notif.toString());
while (queue.size() >= queueSize) {
dropNotification();
if (logger.debugOn()) {
logger.debug("addNotification",
"dropped oldest notif, earliestSeq=" +
earliestSequenceNumber);
}
}
queue.add(notif);
nextSequenceNumber++;
if (logger.debugOn())
logger.debug("addNotification", "nextSeq=" + nextSequenceNumber);
notifyAll();
| synchronized void | addSharer(com.sun.jmx.remote.internal.ArrayNotificationBuffer$ShareBuffer sharer)
if (sharer.getSize() > queueSize)
resize(sharer.getSize());
sharers.add(sharer);
| private synchronized void | createListeners()
logger.debug("createListeners", "starts");
try {
AccessController.doPrivileged(new PrivilegedExceptionAction() {
public Object run() throws InstanceNotFoundException {
mBeanServer.addNotificationListener(delegateName,
creationListener,
creationFilter,
null);
return null;
}
});
logger.debug("createListeners", "added creationListener");
} catch (Exception pe) {
final Exception e = extractException(pe);
final String msg = "Can't add listener to MBean server delegate: ";
RuntimeException re = new IllegalArgumentException(msg + e);
EnvHelp.initCause(re, e);
logger.fine("createListeners", msg + e);
logger.debug("createListeners", e);
throw re;
}
Set names;
try {
names = (Set)
AccessController.doPrivileged(new PrivilegedAction() {
public Object run() {
return mBeanServer.queryNames(null, broadcasterQuery);
}
});
} catch (RuntimeException e) {
logger.fine("createListeners", "Failed to query names: " + e);
logger.debug("createListeners", e);
throw e;
}
for (Iterator it = names.iterator(); it.hasNext(); ) {
ObjectName name = (ObjectName) it.next();
addBufferListener(name, queryBufferListener);
}
logger.debug("createListeners", "ends");
| private synchronized void | createdNotification(javax.management.MBeanServerNotification n)
if (destroyed) {
logger.trace("createNotification",
"NotificationBuffer was destroyed");
return;
}
final String shouldEqual =
MBeanServerNotification.REGISTRATION_NOTIFICATION;
if (!n.getType().equals(shouldEqual)) {
logger.warning("createNotification", "bad type: " + n.getType());
return;
}
final ObjectName name = n.getMBeanName();
if (logger.debugOn())
logger.debug("createdNotification", "for: " + name);
try {
Boolean instanceOf = (Boolean)
AccessController.doPrivileged(new PrivilegedExceptionAction() {
public Object run() throws InstanceNotFoundException {
return new Boolean(
mBeanServer.isInstanceOf(name,
broadcasterClass));
}
});
if (!instanceOf.booleanValue()) {
logger.debug("createdNotification",
"not a NotificationBroadcaster");
return;
}
} catch (Exception e) {
logger.trace("createdNotification", extractException(e));
/* Could happen if the MBean was immediately unregistered. */
return;
}
addBufferListener(name, creationBufferListener);
try {
AccessController.doPrivileged(new PrivilegedExceptionAction() {
public Object run() throws
InstanceNotFoundException, ListenerNotFoundException {
mBeanServer.removeNotificationListener(
name,
queryBufferListener);
return null;
}
});
logger.trace("createdNotification",
"remove queryBufferListener worked!");
} catch (PrivilegedActionException pe) {
final Exception e = extractException(pe);
if (e instanceof ListenerNotFoundException) {
logger.debug("createdNotification",
"remove queryBufferListener got " +
"ListenerNotFoundException as expected");
// Expected: see comment before createListeners()
} else {
logger.trace("createdNotification", e);
}
}
| private synchronized void | destroyListeners()
logger.debug("destroyListeners", "starts");
destroyed = true;
Set names = (Set)
AccessController.doPrivileged(new PrivilegedAction() {
public Object run() {
return mBeanServer.queryNames(null, broadcasterQuery);
}
});
for (Iterator it = names.iterator(); it.hasNext(); ) {
final ObjectName name = (ObjectName) it.next();
if (logger.debugOn())
logger.debug("destroyListeners",
"remove listener from " + name);
// remove creationBufferListener or queryBufferListener
for (int i = 0; i < 2; i++) {
final boolean creation = (i == 0);
final NotificationListener listener =
creation ? creationBufferListener : queryBufferListener;
final String what =
(creation ?
"creationBufferListener" :
"queryBufferListener");
try {
AccessController.doPrivileged(
new PrivilegedExceptionAction() {
public Object run()
throws
InstanceNotFoundException,
ListenerNotFoundException {
mBeanServer.removeNotificationListener(
name,
listener);
return null;
}
});
if (logger.debugOn()) {
logger.debug("destroyListeners", "removed " + what);
}
} catch (PrivilegedActionException pe) {
final Exception e = extractException(pe);
if (e instanceof ListenerNotFoundException) {
if (logger.debugOn()) {
logger.debug("destroyListeners",
"ListenerNotFoundException for " + what +
" (normal)");
}
} else {
logger.trace("destroyListeners", e);
}
}
}
}
logger.debug("destroyListeners", "ends");
| public void | dispose()
logger.trace("dispose", "starts");
synchronized(this) {
removeNotificationBuffer(mBeanServer);
disposed = true;
//Notify potential waiting fetchNotification call
notifyAll();
}
destroyListeners();
logger.trace("dispose", "ends");
| private void | dropNotification()
queue.remove(0);
earliestSequenceNumber++;
| synchronized long | earliestSequenceNumber()
return earliestSequenceNumber;
| private static java.lang.Exception | extractException(java.lang.Exception e)Iterate until we extract the real exception
from a stack of PrivilegedActionExceptions.
while (e instanceof PrivilegedActionException) {
e = ((PrivilegedActionException)e).getException();
}
return e;
| public javax.management.remote.NotificationResult | fetchNotifications(java.util.Set listeners, long startSequenceNumber, long timeout, int maxNotifications)Fetch notifications that match the given listeners.
The operation only considers notifications with a sequence
number at least startSequenceNumber . It will take
no longer than timeout , and will return no more
than maxNotifications different notifications.
If there are no notifications matching the criteria, the
operation will block until one arrives, subject to the
timeout.
logger.trace("fetchNotifications", "starts");
if (startSequenceNumber < 0 || isDisposed()) {
synchronized(this) {
return new NotificationResult(earliestSequenceNumber(),
nextSequenceNumber(),
new TargetedNotification[0]);
}
}
// Check arg validity
if (listeners == null
|| startSequenceNumber < 0 || timeout < 0
|| maxNotifications < 0) {
logger.trace("fetchNotifications", "Bad args");
throw new IllegalArgumentException("Bad args to fetch");
}
if (logger.debugOn()) {
logger.trace("fetchNotifications",
"listener-length=" + listeners.size() + "; startSeq=" +
startSequenceNumber + "; timeout=" + timeout +
"; max=" + maxNotifications);
}
if (startSequenceNumber > nextSequenceNumber()) {
final String msg = "Start sequence number too big: " +
startSequenceNumber + " > " + nextSequenceNumber();
logger.trace("fetchNotifications", msg);
throw new IllegalArgumentException(msg);
}
/* Determine the end time corresponding to the timeout value.
Caller may legitimately supply Long.MAX_VALUE to indicate no
timeout. In that case the addition will overflow and produce
a negative end time. Set end time to Long.MAX_VALUE in that
case. We assume System.currentTimeMillis() is positive. */
long endTime = System.currentTimeMillis() + timeout;
if (endTime < 0) // overflow
endTime = Long.MAX_VALUE;
if (logger.debugOn())
logger.debug("fetchNotifications", "endTime=" + endTime);
/* We set earliestSeq the first time through the loop. If we
set it here, notifications could be dropped before we
started examining them, so earliestSeq might not correspond
to the earliest notification we examined. */
long earliestSeq = -1;
long nextSeq = startSequenceNumber;
List/*<TargetedNotification>*/ notifs = new ArrayList();
/* On exit from this loop, notifs, earliestSeq, and nextSeq must
all be correct values for the returned NotificationResult. */
while (true) {
logger.debug("fetchNotifications", "main loop starts");
NamedNotification candidate;
/* Get the next available notification regardless of filters,
or wait for one to arrive if there is none. */
synchronized (this) {
/* First time through. The current earliestSequenceNumber
is the first one we could have examined. */
if (earliestSeq < 0) {
earliestSeq = earliestSequenceNumber();
if (logger.debugOn()) {
logger.debug("fetchNotifications",
"earliestSeq=" + earliestSeq);
}
if (nextSeq < earliestSeq) {
nextSeq = earliestSeq;
logger.debug("fetchNotifications",
"nextSeq=earliestSeq");
}
} else
earliestSeq = earliestSequenceNumber();
/* If many notifications have been dropped since the
last time through, nextSeq could now be earlier
than the current earliest. If so, notifications
may have been lost and we return now so the caller
can see this next time it calls. */
if (nextSeq < earliestSeq) {
logger.trace("fetchNotifications",
"nextSeq=" + nextSeq + " < " + "earliestSeq=" +
earliestSeq + " so may have lost notifs");
break;
}
if (nextSeq < nextSequenceNumber()) {
candidate = notificationAt(nextSeq);
if (logger.debugOn()) {
logger.debug("fetchNotifications", "candidate: " +
candidate);
logger.debug("fetchNotifications", "nextSeq now " +
nextSeq);
}
} else {
/* nextSeq is the largest sequence number. If we
already got notifications, return them now.
Otherwise wait for some to arrive, with
timeout. */
if (notifs.size() > 0) {
logger.debug("fetchNotifications",
"no more notifs but have some so don't wait");
break;
}
long toWait = endTime - System.currentTimeMillis();
if (toWait <= 0) {
logger.debug("fetchNotifications", "timeout");
break;
}
/* dispose called */
if (isDisposed()) {
if (logger.debugOn())
logger.debug("fetchNotifications",
"dispose callled, no wait");
return new NotificationResult(earliestSequenceNumber(),
nextSequenceNumber(),
new TargetedNotification[0]);
}
if (logger.debugOn())
logger.debug("fetchNotifications",
"wait(" + toWait + ")");
wait(toWait);
continue;
}
}
/* We have a candidate notification. See if it matches
our filters. We do this outside the synchronized block
so we don't hold up everyone accessing the buffer
(including notification senders) while we evaluate
potentially slow filters. */
ObjectName name = candidate.getObjectName();
Notification notif = candidate.getNotification();
List/*<TargetedNotification>*/ matchedNotifs = new ArrayList();
logger.debug("fetchNotifications",
"applying filters to candidate");
synchronized (listeners) {
for (Iterator it = listeners.iterator(); it.hasNext(); ) {
ListenerInfo li = (ListenerInfo) it.next();
ObjectName pattern = li.getObjectName();
NotificationFilter filter = li.getNotificationFilter();
if (logger.debugOn()) {
logger.debug("fetchNotifications",
"pattern=<" + pattern + ">; filter=" + filter);
}
if (pattern.apply(name)) {
logger.debug("fetchNotifications", "pattern matches");
if (filter == null
|| filter.isNotificationEnabled(notif)) {
logger.debug("fetchNotifications",
"filter matches");
Integer listenerID = li.getListenerID();
TargetedNotification tn =
new TargetedNotification(notif, listenerID);
matchedNotifs.add(tn);
}
}
}
}
if (matchedNotifs.size() > 0) {
/* We only check the max size now, so that our
returned nextSeq is as large as possible. This
prevents the caller from thinking it missed
interesting notifications when in fact we knew they
weren't. */
if (maxNotifications <= 0) {
logger.debug("fetchNotifications",
"reached maxNotifications");
break;
}
--maxNotifications;
if (logger.debugOn())
logger.debug("fetchNotifications", "add: " +
matchedNotifs);
notifs.addAll(matchedNotifs);
}
++nextSeq;
} // end while
/* Construct and return the result. */
int nnotifs = notifs.size();
TargetedNotification[] resultNotifs =
new TargetedNotification[nnotifs];
notifs.toArray(resultNotifs);
NotificationResult nr =
new NotificationResult(earliestSeq, nextSeq, resultNotifs);
if (logger.debugOn())
logger.debug("fetchNotifications", nr.toString());
logger.trace("fetchNotifications", "ends");
return nr;
| public static synchronized com.sun.jmx.remote.internal.NotificationBuffer | getNotificationBuffer(javax.management.MBeanServer mbs, java.util.Map env)
//Find out queue size
int defaultQueueSize = DEFAULT_BUFFER_SIZE;
try {
String s = (String)
AccessController.doPrivileged(new PrivilegedAction() {
public Object run() {
return System.getProperty(BUFFER_SIZE_PROPERTY);
}
});
if (s != null)
defaultQueueSize = Integer.parseInt(s);
} catch (RuntimeException e) {
logger.warning("ServerNotifForwarder",
"Can't use System property "+
BUFFER_SIZE_PROPERTY+ ": " + e);
logger.debug("ServerNotifForwarder", e);
}
int queueSize = defaultQueueSize;
try {
queueSize = (int)
EnvHelp.getIntegerAttribute(env,BUFFER_SIZE_PROPERTY,
defaultQueueSize,0,
Integer.MAX_VALUE);
} catch (RuntimeException e) {
logger.warning("ServerNotifForwarder",
"Can't determine queuesize (using default): "+
e);
logger.debug("ServerNotifForwarder", e);
}
ArrayNotificationBuffer buf =
(ArrayNotificationBuffer) mbsToBuffer.get(mbs);
if (buf == null) {
buf = new ArrayNotificationBuffer(mbs, queueSize);
mbsToBuffer.put(mbs, buf);
}
return buf.new ShareBuffer(queueSize);
| private synchronized boolean | isDisposed()
return disposed;
| synchronized long | nextSequenceNumber()
return nextSequenceNumber;
| synchronized com.sun.jmx.remote.internal.ArrayNotificationBuffer$NamedNotification | notificationAt(long seqNo)
long index = seqNo - earliestSequenceNumber;
if (index < 0 || index > Integer.MAX_VALUE) {
final String msg = "Bad sequence number: " + seqNo + " (earliest "
+ earliestSequenceNumber + ")";
logger.trace("notificationAt", msg);
throw new IllegalArgumentException(msg);
}
return (NamedNotification) queue.get((int) index);
| public static synchronized void | removeNotificationBuffer(javax.management.MBeanServer mbs)
mbsToBuffer.remove(mbs);
| synchronized void | removeSharer(com.sun.jmx.remote.internal.ArrayNotificationBuffer$ShareBuffer sharer)
sharers.remove(sharer);
if (sharers.isEmpty())
dispose();
else {
int max = 0;
for (Iterator it = sharers.iterator(); it.hasNext(); ) {
ShareBuffer buf = (ShareBuffer) it.next();
int bufsize = buf.getSize();
if (bufsize > max)
max = bufsize;
}
if (max < queueSize)
resize(max);
}
| private void | resize(int newSize)
if (newSize == queueSize)
return;
while (queue.size() > newSize)
dropNotification();
queue.resize(newSize);
queueSize = newSize;
|
|