FileDocCategorySizeDatePackage
NotificationEmitterSupport.javaAPI DocGlassfish v2 API10954Fri May 04 22:31:04 BST 2007com.sun.appserv.management.util.jmx

NotificationEmitterSupport.java

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 * 
 * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
 * 
 * The contents of this file are subject to the terms of either the GNU
 * General Public License Version 2 only ("GPL") or the Common Development
 * and Distribution License("CDDL") (collectively, the "License").  You
 * may not use this file except in compliance with the License. You can obtain
 * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 * language governing permissions and limitations under the License.
 * 
 * When distributing the software, include this License Header Notice in each
 * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 * Sun designates this particular file as subject to the "Classpath" exception
 * as provided by Sun in the GPL Version 2 section of the License file that
 * accompanied this code.  If applicable, add the following below the License
 * Header, with the fields enclosed by brackets [] replaced by your own
 * identifying information: "Portions Copyrighted [year]
 * [name of copyright owner]"
 * 
 * Contributor(s):
 * 
 * If you wish your version of this file to be governed by only the CDDL or
 * only the GPL Version 2, indicate your decision by adding "[Contributor]
 * elects to include this software in this distribution under the [CDDL or GPL
 * Version 2] license."  If you don't indicate a single choice of license, a
 * recipient has the option to distribute your version of this file under
 * either the CDDL, the GPL Version 2 or to extend the choice of license to
 * its licensees as provided above.  However, if you add GPL Version 2 code
 * and therefore, elected the GPL Version 2 license, then the option applies
 * only if the new code is made subject to such option by the copyright
 * holder.
 */
package com.sun.appserv.management.util.jmx;

import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Collections;

import javax.management.NotificationBroadcasterSupport;
import javax.management.NotificationListener;
import javax.management.NotificationFilter;
import javax.management.relation.MBeanServerNotificationFilter;
import javax.management.AttributeChangeNotificationFilter;
import javax.management.NotificationFilterSupport;
import javax.management.Notification;
import javax.management.AttributeChangeNotification;
import javax.management.MBeanServerNotification;
import javax.management.ListenerNotFoundException;


import com.sun.appserv.management.util.misc.ListUtil;

/**
	Features:
	<ul>
	<li>Maintains information on all NotificationListeners so that queries can
	be made on the number of listeners, and the number of listeners of each type</li>
	<li>optionally sends all Notifications asynchronously via a separate Thread</li>
	</ul>
 */
public class NotificationEmitterSupport
    extends NotificationBroadcasterSupport
{
	private final boolean		mAsyncDelivery;
	private SenderThread		mSenderThread;
	private final Map<String,Integer>   mListenerTypeCounts;
	private final NotificationListenerTracking   mListeners;
	
	
		public
	NotificationEmitterSupport(
		final boolean	asyncDelivery)
	{
		mAsyncDelivery	= asyncDelivery;
		// don't create a thread until needed
		mSenderThread	= null;
		
		mListenerTypeCounts = Collections.synchronizedMap( new HashMap<String,Integer>() );
		
		mListeners = new NotificationListenerTracking( true );
	}
	
		private synchronized SenderThread
	getSenderThread()
	{
		if ( mSenderThread == null )
		{
			mSenderThread	= mAsyncDelivery ? new SenderThread() : null;
			if ( mSenderThread != null )
			{
				mSenderThread.start();
			}
		}
		
		return( mSenderThread );
	}
	
		public synchronized void
	cleanup()
	{
		if ( mSenderThread != null )
		{
			mSenderThread.quit();
			mSenderThread	= null;
		}
	}
	
	/**
		Synchronously (on current thread), ensure that all Notifications
		have been delivered.
	 */
		public void
	sendAll( )
	{
		if ( mSenderThread != null )
		{
			mSenderThread.sendAll();
		}
	}
	
		public int
	getListenerCount()
	{
		return( mListeners.getListenerCount() );
	}
	
		public int
	getNotificationTypeListenerCount( final String type )
	{
	    final Integer   count   = mListenerTypeCounts.get( type );
	    
	    int resultCount = 0;
	    
	    if ( count == null )
	    {
	        final Integer allCount  = mListenerTypeCounts.get( WILDCARD_TYPE );
	        if ( allCount != null )
	        {
	            resultCount = allCount;
	        }
	        else
	        {
	            // no wildcards are in use
	        }
	    }
	    
		return( resultCount );
	}
	
	
	private static final String[]   NO_TYPES  = new String[0];
	private static final String     WILDCARD_TYPE  = "***";
	private static final String[]   ALL_TYPES  = new String[] { WILDCARD_TYPE };
	
	private static final String[]   ATTRIBUTE_CHANGE_TYPES  = new String[]
	{
	    AttributeChangeNotification.ATTRIBUTE_CHANGE
	};
	
	private static final String[]   MBEAN_SERVER_NOTIFICATION_TYPES  = new String[]
	{
	    MBeanServerNotification.REGISTRATION_NOTIFICATION,
	    MBeanServerNotification.UNREGISTRATION_NOTIFICATION,
	};
	
	
	private final Integer   COUNT_1 = new Integer( 1 );
	
	    private void
	incrementListenerCountForType( final String type )
	{
	    synchronized( mListenerTypeCounts )
	    {
	        final Integer count   = mListenerTypeCounts.get( type );
	        
	        final Integer newCount  = (count == null ) ?
	                                    COUNT_1 : new Integer( count.intValue() + 1 );
	        
	        mListenerTypeCounts.put( type, newCount );
	    }
	}
	
	   private void
	decrementListenerCountForType( final String type )
	{
	    synchronized( mListenerTypeCounts )
	    {
	        final Integer count   = mListenerTypeCounts.get( type );
	        if ( count == null )
	        {
	            throw new IllegalArgumentException( type );
	        }
	        
	        final int   oldValue    = count.intValue();
	        if ( oldValue == 1 )
	        {
	            mListenerTypeCounts.remove( type );
	        }
	        else
	        {
	            mListenerTypeCounts.put( type, new Integer( oldValue - 1 ) );
	        }
	    }
	}
	
	
	    private String[]
	getTypes(
		final NotificationFilter filter )
	{
	    String[]    types   = NO_TYPES;
	    
	    if ( filter instanceof NotificationFilterSupport )
	    {
	        final NotificationFilterSupport fs  = (NotificationFilterSupport)filter;
	        
	        types   = ListUtil.toStringArray( fs.getEnabledTypes() );
	    }
	    else if ( filter instanceof AttributeChangeNotificationFilter )
	    {
	        types   = ATTRIBUTE_CHANGE_TYPES;
	    }
	    else if ( filter instanceof MBeanServerNotificationFilter )
	    {
	        types   = MBEAN_SERVER_NOTIFICATION_TYPES;
	    }
	    else
	    {
	        // no filter, or non-standard one, have to assume all types
	        types   = ALL_TYPES;
	    }
	    
	    return types;
	}
	
	    private void
	addFilterTypeCounts( final NotificationFilter filter )
	{
	    String[]  types  = getTypes( filter );
    	    
	    for( String type : types )
	    {
	        incrementListenerCountForType( type );
	    }
	}
	
	    private void
	removeFilterTypeCounts( final NotificationFilter filter )
	{
    	final String[]  types   = getTypes( filter );
    	    
	    for( String type : types )
	    {
	        decrementListenerCountForType( type );
	    }
	}
	
	    private void
	removeFilterTypeCounts( final List<NotificationListenerInfo> infos )
	{
	    for( NotificationListenerInfo info : infos )
	    {
	        removeFilterTypeCounts( info.getFilter() );
	    }
	}
	
		public void
	addNotificationListener(
		final NotificationListener listener,
		final NotificationFilter filter,
		final Object handback)
	{
		super.addNotificationListener( listener, filter, handback );
		
		mListeners.addNotificationListener( listener, filter, handback );
		addFilterTypeCounts( filter );
	}
	
		public void
	removeNotificationListener(final NotificationListener listener)
		throws ListenerNotFoundException
	{
		super.removeNotificationListener( listener );
		
		final List<NotificationListenerInfo>    infos =
		    mListeners.removeNotificationListener( listener );
		removeFilterTypeCounts( infos );
	}
	
		public void
	removeNotificationListener(
		final NotificationListener	listener,
		final NotificationFilter	filter,
		final Object				handback)
		throws ListenerNotFoundException
	{
		super.removeNotificationListener( listener, filter, handback );
		
		mListeners.removeNotificationListener( listener );
		if ( filter != null )
		{
		    removeFilterTypeCounts( filter );
		}
		
	}
	
		protected void
	internalSendNotification( final Notification notif )
	{
		super.sendNotification( notif );
	}

	/**
		Send the Notification.  If created with async=true,
		then this routine returns immediately and the Notification is sent
		on a separate Thread.
	 */
		public void
	sendNotification( final Notification notif )
	{
		if ( getListenerCount() != 0 )
		{
			if ( getSenderThread() != null )
			{
				mSenderThread.enqueue( notif );
			}
			else
			{
				internalSendNotification( notif );
			}
		}
	}
	
	private final class SenderThread extends Thread
	{
		private boolean	mQuit;
		private List<Notification>	mPendingNotifications;
		
			public
		SenderThread()
		{
			mQuit	= false;
			mPendingNotifications	=
			    Collections.synchronizedList( new LinkedList<Notification>() );
		}
		
			public void
		quit()
		{
			mQuit	= true;
			notifySelf();
		}
		
		
			private void
		notifySelf()
		{
			synchronized( this )
			{
				this.notify();
			}
		}
		
			private void
		enqueue( final Notification notif )
		{
			mPendingNotifications.add( notif );
			notifySelf();
		}
		
			public boolean
		sendAll()
		{
			Notification	notif			= null;
			boolean			sentSomething	= false;
			
			while ( ! mPendingNotifications.isEmpty() )
			{
				sentSomething	= true;	// or rather, we'll try to
				
				try
				{
					notif = mPendingNotifications.remove( 0 );
					internalSendNotification( notif );
				}
				catch( ArrayIndexOutOfBoundsException e )
				{
					// can happen if more than one Thread is in here
				}
			}
			
			return( sentSomething );
		}
		
			public void
		run()
		{
			// wake up every 5 minutes
			final int	INTERVAL	=  5 * 1000;
			
			mQuit	= false;
			
			while ( ! mQuit )
			{
				try
				{
					synchronized( this )
					{
						wait( INTERVAL );
					}
				}
				catch( InterruptedException e )
				{
				}
				
				if ( mQuit )
				{
					break;
				}
				
				final boolean	sentSomething	= sendAll();
				
				// nothing available, get rid of ourself till needed
				if ( ! sentSomething )
				{
					cleanup();
					// now no new ones can be added, but ensure any pending are sent
					sendAll();
					break;
				}
			}
		}
	}

}