FileDocCategorySizeDatePackage
ClientNotifForwarder.javaAPI DocJava SE 6 API23546Tue Jun 10 00:22:04 BST 2008com.sun.jmx.remote.internal

ClientNotifForwarder

public abstract class ClientNotifForwarder extends Object

Fields Summary
private static int
threadId
private final ClassLoader
defaultClassLoader
private final Executor
executor
private final Map
infoList
private long
clientSequenceNumber
private final int
maxNotifications
private final long
timeout
private Integer
mbeanRemovedNotifID
private Thread
currentFetchThread
private boolean
inited
private static final int
STARTING
This state means that a thread is being created for fetching and forwarding notifications.
private static final int
STARTED
This state tells that a thread has been started for fetching and forwarding notifications.
private static final int
STOPPING
This state means that the fetching thread is informed to stop.
private static final int
STOPPED
This state means that the fetching thread is already stopped.
private static final int
TERMINATED
This state means that this object is terminated and no more thread will be created for fetching notifications.
private int
state
private boolean
beingReconnected
This variable is used to tell whether a connector (RMIConnector or ClientIntermediary) is doing reconnection. This variable will be set to true by the method preReconnection, and set to false by postReconnection. When beingReconnected == true, no thread will be created for fetching notifications.
private static final ClassLogger
logger
Constructors Summary
public ClientNotifForwarder(Map env)

	this(null, env);
    
public ClientNotifForwarder(ClassLoader defaultClassLoader, Map env)

	maxNotifications = EnvHelp.getMaxFetchNotifNumber(env);
	timeout = EnvHelp.getFetchTimeout(env);

        /* You can supply an Executor in which the remote call to
           fetchNotifications will be made.  The Executor's execute
           method reschedules another task, so you must not use
           an Executor that executes tasks in the caller's thread.  */
	Executor ex = (Executor)
	    env.get("jmx.remote.x.fetch.notifications.executor");
	if (ex == null)
            ex = new LinearExecutor();
        else if (logger.traceOn())
            logger.trace("ClientNotifForwarder", "executor is " + ex);

	this.defaultClassLoader = defaultClassLoader;
	this.executor = ex;
    
Methods Summary
protected abstract java.lang.IntegeraddListenerForMBeanRemovedNotif()

public synchronized voidaddNotificationListener(java.lang.Integer listenerID, javax.management.ObjectName name, javax.management.NotificationListener listener, javax.management.NotificationFilter filter, java.lang.Object handback, javax.security.auth.Subject delegationSubject)


	if (logger.traceOn()) {
	    logger.trace("addNotificationListener",
			 "Add the listener "+listener+" at "+name);
	}

	infoList.put(listenerID, 
		     new ClientListenerInfo(listenerID, 
					    name,
					    listener,
					    filter,
					    handback,
					    delegationSubject));
    

	init(false);
    
private synchronized voidbeforeRemove()
Import: should not remove a listener during reconnection, the reconnection needs to change the listener list and that will possibly make removal fail.

        while (beingReconnected) {
	    if (state == TERMINATED) {
		throw new IOException("Terminated.");
	    }

	    try {
		wait();
	    } catch (InterruptedException ire) {
		IOException ioe = new IOException(ire.toString());
		EnvHelp.initCause(ioe, ire);

		throw ioe;
	    }
	}

	if (state == TERMINATED) {
	    throw new IOException("Terminated.");
	}
    
protected abstract javax.management.remote.NotificationResultfetchNotifs(long clientSequenceNumber, int maxNotifications, long timeout)
Called to to fetch notifications from a server.

private synchronized voidinit(boolean reconnected)

	switch (state) {
	case STARTED:
	    return;
	case STARTING:
	    return;
	case TERMINATED:
	    throw new IOException("The ClientNotifForwarder has been terminated.");
	case STOPPING:
	    if (beingReconnected == true) {
		// wait for another thread to do, which is doing reconnection
		return;
	    }

	    while (state == STOPPING) { // make sure only one fetching thread.		
		try {
		    wait();
		} catch (InterruptedException ire) {
		    IOException ioe = new IOException(ire.toString());
		    EnvHelp.initCause(ioe, ire);
		    
		    throw ioe;
		}
	    }
		
	    // re-call this method to check the state again,
	    // the state can be other value like TERMINATED.
	    init(reconnected);

	    return;
	case STOPPED:
	    if (beingReconnected == true) {
		// wait for another thread to do, which is doing reconnection
		return;
	    }

	    if (logger.traceOn()) {
		logger.trace("init", "Initializing...");
	    }

	    // init the clientSequenceNumber if not reconnected.
	    if (!reconnected) {
		try {
		    NotificationResult nr = fetchNotifs(-1, 0, 0);
		    clientSequenceNumber = nr.getNextSequenceNumber();
		} catch (ClassNotFoundException e) {
		    // can't happen
		    logger.warning("init", "Impossible exception: "+ e);
		    logger.debug("init",e);
		}
	    }

	    // for cleaning
	    try {
		mbeanRemovedNotifID = addListenerForMBeanRemovedNotif();
	    } catch (Exception e) {
		final String msg =
		    "Failed to register a listener to the mbean " +
		    "server: the client will not do clean when an MBean " +
		    "is unregistered";
		if (logger.traceOn()) {
		    logger.trace("init", msg, e);
		}		 
	    } 

	    setState(STARTING);

	    // start fetching
	    executor.execute(new NotifFetcher());

	    return;
	default:
	    // should not
	    throw new IOException("Unknown state.");
	}
    
protected abstract voidlostNotifs(java.lang.String message, long number)
Used to send out a notification about lost notifs

public synchronized voidpostReconnection(com.sun.jmx.remote.internal.ClientListenerInfo[] listenerInfos)
Called after reconnection is finished. This method is intended to be called only by a client connector: RMIConnector and ClientIntermediary.


	if (state == TERMINATED) {
	    return;
	}

	while (state == STOPPING) {
	    try {
		wait();
	    } catch (InterruptedException ire) {
		IOException ioe = new IOException(ire.toString());
		EnvHelp.initCause(ioe, ire);
		throw ioe;
	    }
	}

	final boolean trace = logger.traceOn();
	final int len   = listenerInfos.length;

	for (int i=0; i<len; i++) {
	    if (trace) {
		logger.trace("addNotificationListeners",
			     "Add a listener at "+
			     listenerInfos[i].getListenerID());
	    }

	    infoList.put(listenerInfos[i].getListenerID(), listenerInfos[i]);
	}

	beingReconnected = false;
	notifyAll();

	if (currentFetchThread == Thread.currentThread()) {
	    // no need to init, simply get the id
	    try {
		mbeanRemovedNotifID = addListenerForMBeanRemovedNotif();
	    } catch (Exception e) {
		final String msg =
		    "Failed to register a listener to the mbean " +
		    "server: the client will not do clean when an MBean " +
		    "is unregistered";
		if (logger.traceOn()) {
		    logger.trace("init", msg, e);
		}		 
	    } 
	} else if (listenerInfos.length > 0) { // old listeners re-registered
	    init(true);
	} else if (infoList.size() > 0) {
	    // but new listeners registered during reconnection
	    init(false);
	}
    
public synchronized com.sun.jmx.remote.internal.ClientListenerInfo[]preReconnection()

	if (state == TERMINATED || beingReconnected) { // should never
	    throw new IOException("Illegal state.");
	}

	final ClientListenerInfo[] tmp = (ClientListenerInfo[]) 
            infoList.values().toArray(new ClientListenerInfo[0]);


	beingReconnected = true;

	infoList.clear();

	if (currentFetchThread == Thread.currentThread()) {
	    /* we do not need to stop the fetching thread, because this thread is
	       used to do restarting and it will not be used to do fetching during
	       the re-registering the listeners.*/
	    return tmp;
	}

	while (state == STARTING) {
	    try {
		wait();
	    } catch (InterruptedException ire) {
		IOException ioe = new IOException(ire.toString());
		EnvHelp.initCause(ioe, ire);

		throw ioe;
	    }
	}

	if (state == STARTED) {
	    setState(STOPPING);
	}

	return tmp;
    
protected abstract voidremoveListenerForMBeanRemovedNotif(java.lang.Integer id)

public synchronized java.lang.Integer[]removeNotificationListener(javax.management.ObjectName name)

	if (logger.traceOn()) {
	    logger.trace("removeNotificationListener",
			 "Remove all listeners registered at "+name);
	}

	ArrayList ids = new ArrayList();

	ArrayList values = new ArrayList(infoList.values());
	for (int i=values.size()-1; i>=0; i--) {
	    ClientListenerInfo li = (ClientListenerInfo)values.get(i);
	    if (li.sameAs(name)) {
		ids.add(li.getListenerID());
		    
		infoList.remove(li.getListenerID());
	    }
	}

	return (Integer[]) ids.toArray(new Integer[0]);
    
public synchronized java.lang.Integer[]removeNotificationListener(javax.management.ObjectName name, javax.management.NotificationListener listener)


        beforeRemove();
	    
	if (logger.traceOn()) {
	    logger.trace("removeNotificationListener",
			 "Remove the listener "+listener+" from "+name);
	}
	    
	ArrayList ids = new ArrayList();
	ArrayList values = new ArrayList(infoList.values());
	for (int i=values.size()-1; i>=0; i--) {
	    ClientListenerInfo li = (ClientListenerInfo)values.get(i);

	    if (li.sameAs(name, listener)) {
		ids.add(li.getListenerID());

		infoList.remove(li.getListenerID());
	    }
	}

	if (ids.isEmpty())
	    throw new ListenerNotFoundException("Listener not found");

	return (Integer[])ids.toArray(new Integer[0]);
    
public synchronized java.lang.IntegerremoveNotificationListener(javax.management.ObjectName name, javax.management.NotificationListener listener, javax.management.NotificationFilter filter, java.lang.Object handback)


	if (logger.traceOn()) {
	    logger.trace("removeNotificationListener",
			 "Remove the listener "+listener+" from "+name);
	}

        beforeRemove();

	Integer id = null;

	ArrayList values = new ArrayList(infoList.values());
	for (int i=values.size()-1; i>=0; i--) {
	    ClientListenerInfo li = (ClientListenerInfo)values.get(i);
	    if (li.sameAs(name, listener, filter, handback)) {
		id=li.getListenerID();

		infoList.remove(id);

		break;		    
	    }
	}

	if (id == null)
	    throw new ListenerNotFoundException("Listener not found");

	return id;	
    
private synchronized voidsetState(int newState)

	if (state == TERMINATED) {
	    return;
	}
	
	state = newState;
	this.notifyAll();
    
public synchronized voidterminate()

	if (state == TERMINATED) {
	    return;
	}

	if (logger.traceOn()) {
	    logger.trace("terminate", "Terminating...");
	}

	if (state == STARTED) {
	   infoList.clear();
	}

	setState(TERMINATED);