FileDocCategorySizeDatePackage
InboundConnectionCacheBlockingImpl.javaAPI DocExample8813Tue May 29 16:57:04 BST 2007com.sun.xml.ws.transport.tcp.connectioncache.impl.transport

InboundConnectionCacheBlockingImpl.java

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 * 
 * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
 * 
 * The contents of this file are subject to the terms of either the GNU
 * General Public License Version 2 only ("GPL") or the Common Development
 * and Distribution License("CDDL") (collectively, the "License").  You
 * may not use this file except in compliance with the License. You can obtain
 * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 * language governing permissions and limitations under the License.
 * 
 * When distributing the software, include this License Header Notice in each
 * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 * Sun designates this particular file as subject to the "Classpath" exception
 * as provided by Sun in the GPL Version 2 section of the License file that
 * accompanied this code.  If applicable, add the following below the License
 * Header, with the fields enclosed by brackets [] replaced by your own
 * identifying information: "Portions Copyrighted [year]
 * [name of copyright owner]"
 * 
 * Contributor(s):
 * 
 * If you wish your version of this file to be governed by only the CDDL or
 * only the GPL Version 2, indicate your decision by adding "[Contributor]
 * elects to include this software in this distribution under the [CDDL or GPL
 * Version 2] license."  If you don't indicate a single choice of license, a
 * recipient has the option to distribute your version of this file under
 * either the CDDL, the GPL Version 2 or to extend the choice of license to
 * its licensees as provided above.  However, if you add GPL Version 2 code
 * and therefore, elected the GPL Version 2 license, then the option applies
 * only if the new code is made subject to such option by the copyright
 * holder.
 */

package com.sun.xml.ws.transport.tcp.connectioncache.impl.transport;

import com.sun.xml.ws.transport.tcp.connectioncache.spi.concurrent.ConcurrentQueue;
import com.sun.xml.ws.transport.tcp.connectioncache.spi.transport.Connection;
import com.sun.xml.ws.transport.tcp.connectioncache.spi.transport.InboundConnectionCache;
import java.io.IOException ;

import java.util.logging.Logger ;

import java.util.Map ;
import java.util.HashMap ;

/** Manage connections that are initiated from another VM. 
 *
 * @author Ken Cavanaugh
 */
public final class InboundConnectionCacheBlockingImpl<C extends Connection> 
    extends ConnectionCacheBlockingBase<C> 
    implements InboundConnectionCache<C> {

    private final Map<C,ConnectionState<C>> connectionMap ;

    protected String thisClassName() {
	return "InboundConnectionCacheBlockingImpl" ;
    }

    private static final class ConnectionState<C extends Connection> {
	final C connection ;		// Connection of the 
					// ConnectionState
	int busyCount ;			// Number of calls to 
					// get without release
	int expectedResponseCount ;	// Number of expected 
					// responses not yet 
					// received

	ConcurrentQueue.Handle<C> reclaimableHandle ;  // non-null iff connection 
						    // is not in use and has no
						    // outstanding requests

	ConnectionState( final C conn ) {
	    this.connection = conn ;

	    busyCount = 0 ;
	    expectedResponseCount = 0 ;
	    reclaimableHandle = null ;
	}
    }

    public InboundConnectionCacheBlockingImpl( final String cacheType, 
	final int highWaterMark, final int numberToReclaim, 
	Logger logger ) {

	super( cacheType, highWaterMark, numberToReclaim, logger ) ;

	this.connectionMap = new HashMap<C,ConnectionState<C>>() ;

	if (debug()) {
	    dprint(".constructor completed: " + getCacheType() );
	}
    }

    // We do not need to define equals or hashCode for this class.

    public synchronized void requestReceived( final C conn ) {
	if (debug())
	    dprint( "->requestReceived: connection " + conn ) ;

	try {
	    ConnectionState<C> cs = getConnectionState( conn ) ;

	    final int totalConnections = totalBusy + totalIdle ;
	    if (totalConnections > highWaterMark())
		reclaim() ;

	    ConcurrentQueue.Handle<C> reclaimHandle = cs.reclaimableHandle ;
	    if (reclaimHandle != null) {
		if (debug())
		    dprint( ".requestReceived: " + conn 
			+ " removed from reclaimableQueue" ) ;
		reclaimHandle.remove() ;
	    }

	    int count = cs.busyCount++ ;
	    if (count == 0) {
		if (debug())
		    dprint( ".requestReceived: " + conn 
			+ " transition from idle to busy" ) ;

		totalIdle-- ;
		totalBusy++ ;
	    }
	} finally {
	    if (debug())
		dprint( "<-requestReceived: connection " + conn ) ;
	}
    }

    public synchronized void requestProcessed( final C conn, 
	final int numResponsesExpected ) {

	if (debug())
	    dprint( "->requestProcessed: connection " + conn 
		+ " expecting " + numResponsesExpected + " responses" ) ;

	try {
	    final ConnectionState<C> cs = connectionMap.get( conn ) ;

	    if (cs == null) {
		if (debug())
		    dprint( ".release: connection " + conn + " was closed" ) ;

		return ; 
	    } else {
		cs.expectedResponseCount += numResponsesExpected ;
		int numResp = cs.expectedResponseCount ;
		int numBusy = --cs.busyCount ;

		if (debug()) {
		    dprint( ".release: " + numResp + " responses expected" ) ;
		    dprint( ".release: " + numBusy + 
			" busy count for connection" ) ;
		}

		if (numBusy == 0) {
		    totalBusy-- ;
		    totalIdle++ ;

		    if (numResp == 0) {
			if (debug())
			    dprint( ".release: "
				+ "queuing reclaimable connection "
				+ conn ) ;

			if ((totalBusy+totalIdle) > highWaterMark()) {
			    close( conn ) ;
			} else {
			    cs.reclaimableHandle = 
				reclaimableConnections.offer( conn ) ;
			}
		    }
		}
	    }
	} finally {
	    if (debug())
		dprint( "<-requestProcessed" ) ;
	}
    }

    /** Decrement the number of expected responses.  When a connection is idle 
     * and has no expected responses, it can be reclaimed.
     */
    public synchronized void responseSent( final C conn ) {
	if (debug())
	    dprint( "->responseSent: " + conn ) ;

	try {
	    final ConnectionState<C> cs = connectionMap.get( conn ) ;
	    final int waitCount = --cs.expectedResponseCount ;
	    if (waitCount == 0) {
		if (debug())
		    dprint( ".responseSent: " + conn + " is now reclaimable" ) ;

		if ((totalBusy+totalIdle) > highWaterMark()) {
		    if (debug()) {
			dprint( ".responseSent: " + conn 
			    + " closing connection" ) ; 
		    }

		    close( conn ) ;
		} else {
		    cs.reclaimableHandle = 
			reclaimableConnections.offer( conn ) ;

		    if (debug()) {
			dprint( ".responseSent: " + conn 
			    + " is now reclaimable" ) ; 
		    }
		}
	    } else {
		if (debug()) {
		    dprint( ".responseSent: " + conn + " waitCount=" 
			+ waitCount ) ;
		}
	    }
	} finally {
	    if (debug()) {
		dprint( "<-responseSent: " + conn ) ;
	    }
	}
    }

    /** Close a connection, regardless of whether the connection is busy
     * or not.
     */
    public synchronized void close( final C conn ) {
	if (debug()) 
	    dprint( "->close: " + conn ) ;
	
	try {
	    final ConnectionState<C> cs = connectionMap.remove( conn ) ;
	    int count = cs.busyCount ;
	    if (debug())
		dprint( ".close: " + conn + " count = " + count ) ;

	    if (count == 0)
		totalIdle-- ;
	    else
		totalBusy-- ;

	    final ConcurrentQueue.Handle rh = cs.reclaimableHandle ;
	    if (rh != null) {
		if (debug())
		    dprint( ".close: " + conn + " connection was reclaimable" ) ;

		rh.remove() ;
	    }

	    try {
		conn.close() ;
	    } catch (IOException exc) {
		if (debug())
		    dprint( ".close: " + conn + " close threw "  + exc ) ;
	    }
	} finally {
	    if (debug())
		dprint( "<-close: " + conn ) ;
	}
    }

    // Atomically either get the ConnectionState for conn OR 
    // create a new one AND put it in the cache
    private ConnectionState<C> getConnectionState( C conn ) {
	// This should be the only place a CacheEntry is constructed.
	if (debug())
	    dprint( "->getConnectionState: " + conn ) ;

	try {
	    ConnectionState<C> result = connectionMap.get( conn ) ;
	    if (result == null) {
		if (debug())
		    dprint( ".getConnectionState: " + conn + 
			" creating new ConnectionState instance" ) ;
		result = new ConnectionState<C>( conn ) ;
		connectionMap.put( conn, result ) ;
		totalIdle++ ;
	    }
	    
	    return result ;
	} finally {
	    if (debug())
		dprint( "<-getConnectionState: " + conn ) ;
	}
    }
}

// End of file.