FileDocCategorySizeDatePackage
OutboundConnectionCacheBlockingImpl.javaAPI DocExample25447Tue May 29 16:57:04 BST 2007com.sun.xml.ws.transport.tcp.connectioncache.impl.transport

OutboundConnectionCacheBlockingImpl.java

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 * 
 * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
 * 
 * The contents of this file are subject to the terms of either the GNU
 * General Public License Version 2 only ("GPL") or the Common Development
 * and Distribution License("CDDL") (collectively, the "License").  You
 * may not use this file except in compliance with the License. You can obtain
 * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 * language governing permissions and limitations under the License.
 * 
 * When distributing the software, include this License Header Notice in each
 * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 * Sun designates this particular file as subject to the "Classpath" exception
 * as provided by Sun in the GPL Version 2 section of the License file that
 * accompanied this code.  If applicable, add the following below the License
 * Header, with the fields enclosed by brackets [] replaced by your own
 * identifying information: "Portions Copyrighted [year]
 * [name of copyright owner]"
 * 
 * Contributor(s):
 * 
 * If you wish your version of this file to be governed by only the CDDL or
 * only the GPL Version 2, indicate your decision by adding "[Contributor]
 * elects to include this software in this distribution under the [CDDL or GPL
 * Version 2] license."  If you don't indicate a single choice of license, a
 * recipient has the option to distribute your version of this file under
 * either the CDDL, the GPL Version 2 or to extend the choice of license to
 * its licensees as provided above.  However, if you add GPL Version 2 code
 * and therefore, elected the GPL Version 2 license, then the option applies
 * only if the new code is made subject to such option by the copyright
 * holder.
 */

package com.sun.xml.ws.transport.tcp.connectioncache.impl.transport;

import com.sun.xml.ws.transport.tcp.connectioncache.spi.concurrent.ConcurrentQueue;
import com.sun.xml.ws.transport.tcp.connectioncache.spi.transport.Connection;
import com.sun.xml.ws.transport.tcp.connectioncache.spi.transport.ConnectionFinder;
import com.sun.xml.ws.transport.tcp.connectioncache.spi.transport.ContactInfo;
import com.sun.xml.ws.transport.tcp.connectioncache.spi.transport.OutboundConnectionCache;
import java.io.IOException ;

import java.util.Map ;
import java.util.HashMap ;
import java.util.Queue ;
import java.util.Collection ;
import java.util.Collections ;

import java.util.concurrent.LinkedBlockingQueue ;

import java.util.logging.Logger ;

public final class OutboundConnectionCacheBlockingImpl<C extends Connection>
        extends ConnectionCacheBlockingBase<C>
        implements OutboundConnectionCache<C> {
    
    // Configuration data
    // XXX we may want this data to be dynamically re-configurable
    private final int maxParallelConnections ;	// Maximum number of
    // connections we will open
    // to the same endpoint
    
    private Map<ContactInfo<C>,CacheEntry<C>> entryMap ;
    private Map<C,ConnectionState<C>> connectionMap ;
    
    public int maxParallelConnections() {
        return maxParallelConnections ;
    }
    
    protected String thisClassName() {
        return "OutboundConnectionCacheBlockingImpl" ;
    }
    
    // NEW: connection was just created; currently not queued
    // BUSY: connection queued on busyConnections queue
    // IDLE: connection queued on idleConnections queue
    private enum ConnectionStateValue { NEW, BUSY, IDLE }
    
    private static final class ConnectionState<C extends Connection> {
        ConnectionStateValue csv ;			// Indicates state of
        // connection
        final ContactInfo<C> cinfo ;			// ContactInfo used to
        // create this
        // Connection
        final C connection ;				// Connection of the
        // ConnectionState
        final CacheEntry<C> entry ;			// This Connection's
        // CacheEntry
        
        int busyCount ;					// Number of calls to
        // get without release
        int expectedResponseCount ;			// Number of expected
        // responses not yet
        // received
        
        // At all times, a connection is either on the busy or idle queue in
        // its ConnectionEntry.  If the connection is on the idle queue,
        // reclaimableHandle may also be non-null if the Connection is also on
        // the reclaimableConnections queue.
        ConcurrentQueue.Handle<C> reclaimableHandle ;	// non-null iff
        // connection is not
        // in use and has no
        // outstanding requests
        
        ConnectionState( final ContactInfo<C> cinfo, final CacheEntry<C> entry,
                final C conn ) {
            
            this.csv = ConnectionStateValue.NEW ;
            this.cinfo = cinfo ;
            this.connection = conn ;
            this.entry = entry ;
            
            busyCount = 0 ;
            expectedResponseCount = 0 ;
            reclaimableHandle = null ;
        }
        
        public String toString() {
            return "ConnectionState["
                    + "cinfo=" + cinfo
                    + " connection=" + connection
                    + " busyCount=" + busyCount
                    + " expectedResponseCount=" + expectedResponseCount
                    + "]" ;
        }
    }
    
    // Represents an entry in the outbound connection cache.
    // This version handles normal shareable ContactInfo
    // (we also need to handle no share).
    private static final class CacheEntry<C extends Connection> {
        final Queue<C> idleConnections = new LinkedBlockingQueue<C>() ;
        final Collection<C> idleConnectionsView =
                Collections.unmodifiableCollection( idleConnections ) ;
        
        final Queue<C> busyConnections = new LinkedBlockingQueue<C>() ;
        final Collection<C> busyConnectionsView =
                Collections.unmodifiableCollection( busyConnections ) ;
        
        public int totalConnections() {
            return idleConnections.size() + busyConnections.size() ;
        }
    }
    
    public OutboundConnectionCacheBlockingImpl( final String cacheType,
            final int highWaterMark, final int numberToReclaim,
            final int maxParallelConnections, Logger logger ) {
        
        super( cacheType, highWaterMark, numberToReclaim, logger ) ;
        
        if (maxParallelConnections < 1)
            throw new IllegalArgumentException(
                    "maxParallelConnections must be > 0" ) ;
        
        this.maxParallelConnections = maxParallelConnections ;
        
        this.entryMap = new HashMap<ContactInfo<C>,CacheEntry<C>>() ;
        this.connectionMap = new HashMap<C,ConnectionState<C>>() ;
        
        if (debug()) {
            dprint(".constructor completed: " + cacheType );
        }
    }
    
    public boolean canCreateNewConnection( ContactInfo<C> cinfo ) {
        CacheEntry<C> entry = entryMap.get( cinfo ) ;
        if (entry == null)
            return true ;
        
        return internalCanCreateNewConnection( entry ) ;
    }
    
    private boolean internalCanCreateNewConnection( final CacheEntry<C> entry ) {
        final int totalConnectionsInEntry = entry.totalConnections() ;
        
        final boolean createNewConnection =
                (totalConnectionsInEntry == 0) ||
                ((numberOfConnections() < highWaterMark()) &&
                (totalConnectionsInEntry < maxParallelConnections)) ;
        
        return createNewConnection ;
    }
    
    private CacheEntry<C> getEntry( final ContactInfo<C> cinfo
            ) throws IOException {
        
        if (debug()) {
            dprint( "->getEntry: " + cinfo ) ;
        }
        
        try {
            // This should be the only place a CacheEntry is constructed.
            CacheEntry<C> result = entryMap.get( cinfo ) ;
            if (result == null) {
                if (debug()) {
                    dprint( ".getEntry: " + cinfo
                            + " creating new CacheEntry" ) ;
                }
                
                result = new CacheEntry<C>() ;
                entryMap.put( cinfo, result ) ;
            } else {
                if (debug()) {
                    dprint( ".getEntry: " + cinfo +
                            " re-using existing CacheEntry" ) ;
                }
            }
            
            return result ;
        } finally {
            if (debug()) {
                dprint( "<-getEntry: " + cinfo ) ;
            }
        }
    }
    
    // Note that tryNewConnection will ALWAYS create a new connection if
    // no connection currently exists.
    private C tryNewConnection( final CacheEntry<C> entry,
            final ContactInfo<C> cinfo ) throws IOException {
        
        if (debug())
            dprint( "->tryNewConnection: " + cinfo ) ;
        
        try {
            C conn = null ;
            
            if (internalCanCreateNewConnection(entry)) {
                // If this throws an exception just let it
                // propagate: let a higher layer handle a
                // connection creation failure.
                conn = cinfo.createConnection() ;
                
                if (debug()) {
                    dprint( ".tryNewConnection: " + cinfo
                            + " created connection " + conn ) ;
                }
            }
            
            return conn ;
        } finally {
            if (debug())
                dprint( "<-tryNewConnection: " + cinfo ) ;
        }
    }
    
    private void decrementTotalIdle() {
        if (debug())
            dprint( "->decrementTotalIdle: totalIdle = "
                    + totalIdle ) ;
        
        try {
            if (totalIdle > 0) {
                totalIdle-- ;
            } else {
                if (debug()) {
                    dprint( ".decrementTotalIdle: "
                            + "incorrect idle count: was already 0" ) ;
                }
            }
        } finally {
            if (debug()) {
                dprint( "<-decrementTotalIdle: totalIdle = "
                        + totalIdle ) ;
            }
        }
    }
    
    private void decrementTotalBusy() {
        if (debug())
            dprint( "->decrementTotalBusy: totalBusy = "
                    + totalBusy ) ;
        
        try {
            if (totalBusy > 0) {
                totalBusy-- ;
            } else {
                if (debug()) {
                    dprint( ".decrementTotalBusy: "
                            + "incorrect idle count: was already 0" ) ;
                }
            }
        } finally {
            if (debug()) {
                dprint( "<-decrementTotalBusy: totalBusy = "
                        + totalBusy ) ;
            }
        }
    }
    
    // Update queues and counts to make the result busy.
    private void makeResultBusy( C result, ConnectionState<C> cs,
            CacheEntry<C> entry ) {
        
        if (debug())
            dprint( "->makeResultBusy: " + result
                    + " was previously " + cs.csv ) ;
        
        try {
            switch (cs.csv) {
                case NEW :
                    totalBusy++ ;
                    break ;
                    
                case IDLE :
                    totalBusy++ ;
                    decrementTotalIdle() ;
                    
                    final ConcurrentQueue.Handle<C> handle =
                            cs.reclaimableHandle ;
                    
                    if (handle != null) {
                        if (!handle.remove()) {
                            if (debug()) {
                                dprint( ".makeResultBusy: " + cs.cinfo
                                        + " result was not on reclaimable Q" ) ;
                            }
                        }
                        cs.reclaimableHandle = null ;
                    }
                    break ;
                    
                case BUSY :
                    // Nothing to do here
                    break ;
            }
            
            entry.busyConnections.offer( result ) ;
            cs.csv = ConnectionStateValue.BUSY ;
            cs.busyCount++ ;
        } finally {
            if (debug())
                dprint( "<-makeResultBusy: " + result ) ;
        }
    }
    
    private C tryIdleConnections( CacheEntry<C> entry ) {
        if (debug()) {
            dprint( "->tryIdleConnections" ) ;
        }
        
        try {
            return entry.idleConnections.poll() ;
        } finally {
            if (debug()) {
                dprint( "<-tryIdleConnections" ) ;
            }
        }
    }
    
    private C tryBusyConnections( CacheEntry<C> entry ) {
        // Use a busy connection.  Note that there MUST be a busy
        // connection available at this point, because
        // tryNewConnection did not create a new connection.
        if (debug()) {
            dprint( "->tryBusyConnections" ) ;
        }
        
        try {
            C result = entry.busyConnections.poll() ;
            
            if (result == null) {
                throw new RuntimeException(
                        "INTERNAL ERROR: no busy connection available" ) ;
            }
            
            return result ;
        } finally {
            if (debug()) {
                dprint( "<-tryBusyConnections" ) ;
            }
        }
    }
    
    public synchronized C get( final ContactInfo<C> cinfo
            ) throws IOException {
        
        return get( cinfo, null ) ;
    }
    
    public synchronized ConnectionState<C> getConnectionState(
            ContactInfo<C> cinfo, CacheEntry<C> entry, C conn ) {
        
        if (debug())
            dprint( "->getConnectionState: " + conn ) ;
        
        try {
            ConnectionState<C> cs = connectionMap.get( conn ) ;
            if (cs == null) {
                if (debug())
                    dprint( ".getConnectionState: " + conn
                            + " creating new ConnectionState" + cs ) ;
                
                cs = new ConnectionState<C>( cinfo, entry, conn ) ;
                connectionMap.put( conn, cs ) ;
            } else {
                if (debug())
                    dprint( ".getConnectionState: " + conn
                            + " found ConnectionState" + cs ) ;
            }
            
            return cs ;
        } finally {
            if (debug())
                dprint( "<-getConnectionState: " + conn ) ;
        }
    }
    
    public synchronized C get( final ContactInfo<C> cinfo,
            final ConnectionFinder<C> finder ) throws IOException {
        
        if (debug()) {
            dprint( "->get: " + cinfo ) ;
        }
        
        ConnectionState<C> cs = null ;
        
        try {
            final CacheEntry<C> entry = getEntry( cinfo ) ;
            C result = null ;
            
            if (numberOfConnections() >= highWaterMark()) {
                // This reclaim probably does nothing, because
                // connections are reclaimed on release in the
                // overflow state.
                reclaim() ;
            }
            
            if (finder != null) {
                // Try the finder if present.
                if (debug()) {
                    dprint( ".get: " + cinfo +
                            " Calling the finder to get a connection" ) ;
                }
                
                result = finder.find( cinfo, entry.idleConnectionsView,
                        entry.busyConnectionsView ) ;
                
                if (result != null) {
                    cs = getConnectionState( cinfo, entry, result ) ;
                    
                    // Dequeue from cache entry if not NEW
                    if (cs.csv == ConnectionStateValue.BUSY)
                        entry.busyConnections.remove( result ) ;
                    else if (cs.csv == ConnectionStateValue.IDLE)
                        entry.idleConnections.remove( result ) ;
                }
            }
            
            if (result == null) {
                result = tryIdleConnections( entry ) ;
            }
            
            if (result == null) {
                result = tryNewConnection( entry, cinfo ) ;
            }
            
            if (result == null) {
                result = tryBusyConnections( entry ) ;
            }
            
            if (cs == null)
                cs = getConnectionState( cinfo, entry, result ) ;
            
            makeResultBusy( result, cs, entry ) ;
            return result ;
        } finally {
            if (debug()) {
                dprint( ".get " + cinfo
                        + " totalIdle=" + totalIdle
                        + " totalBusy=" + totalBusy ) ;
                
                dprint( "<-get " + cinfo + " ConnectionState=" + cs ) ;
            }
        }
    }
    
    // If overflow, close conn and return true,
    // otherwise enqueue on reclaimable queue and return false.
    private boolean reclaimOrClose( ConnectionState<C> cs, final C conn ) {
        if (debug())
            dprint( "->reclaimOrClose: " + conn ) ;
        
        try {
            final boolean isOverflow = numberOfConnections() >
                    highWaterMark() ;
            
            if (isOverflow) {
                if (debug()) {
                    dprint( ".reclaimOrClose: closing overflow connection "
                            + conn ) ;
                }
                
                close( conn ) ;
            } else {
                if (debug()) {
                    dprint( ".reclaimOrClose: queuing reclaimable connection "
                            + conn ) ;
                }
                
                cs.reclaimableHandle =
                        reclaimableConnections.offer( conn ) ;
            }
            
            return isOverflow ;
        } finally {
            if (debug())
                dprint( "<-reclaimOrClose: " + conn ) ;
        }
    }
    
    public synchronized void release( final C conn,
            final int numResponsesExpected ) {
        
        if (debug()) {
            dprint( "->release: " + conn
                    + " expecting " + numResponsesExpected + " responses" ) ;
        }
        
        final ConnectionState<C> cs = connectionMap.get( conn ) ;
        
        try {
            if (cs == null) {
                if (debug()) {
                    dprint( ".release: " + conn + " was closed" ) ;
                }
                
                return ;
            } else {
                cs.expectedResponseCount += numResponsesExpected ;
                int numResp = cs.expectedResponseCount ;
                int numBusy = --cs.busyCount ;
                if (numBusy < 0) {
                    if (debug()) {
                        dprint( ".release: " + conn + " numBusy=" +
                                numBusy + " is < 0: error" ) ;
                    }
                    
                    cs.busyCount = 0 ;
                    return ;
                }
                
                if (debug()) {
                    dprint( ".release: " + numResp + " responses expected" ) ;
                    dprint( ".release: " + numBusy + " busy count" ) ;
                }
                
                if (numBusy == 0) {
                    final CacheEntry<C> entry = cs.entry ;
                    boolean wasOnBusy = entry.busyConnections.remove( conn ) ;
                    if (!wasOnBusy)
                        if (debug())
                            dprint( ".release: " + conn
                                    + " was NOT on busy queue, "
                                    + "but should have been" ) ;
                    
                    boolean connectionClosed = false ;
                    if (numResp == 0) {
                        connectionClosed = reclaimOrClose( cs, conn ) ;
                    }
                    
                    decrementTotalBusy() ;
                    
                    if (!connectionClosed) {
                        if (debug()) {
                            dprint( ".release: queuing idle connection "
                                    + conn ) ;
                        }
                        
                        totalIdle++ ;
                        entry.idleConnections.offer( conn ) ;
                        cs.csv = ConnectionStateValue.IDLE ;
                    }
                }
            }
        } finally {
            if (debug()) {
                dprint( ".release " + conn
                        + " cs=" + cs
                        + " totalIdle=" + totalIdle
                        + " totalBusy=" + totalBusy ) ;
                
                dprint( "<-release" + conn ) ;
            }
        }
    }
    
    /** Decrement the number of expected responses.  When a connection is idle
     * and has no expected responses, it can be reclaimed.
     */
    public synchronized void responseReceived( final C conn ) {
        if (debug()) {
            dprint( "->responseReceived: " + conn ) ;
        }
        
        try {
            final ConnectionState<C> cs = connectionMap.get( conn ) ;
            if (cs == null) {
                if (debug()) {
                    dprint(
                            ".responseReceived: "
                            + "received response on closed connection "
                            + conn ) ;
                }
                
                return ;
            }
            
            final int waitCount = --cs.expectedResponseCount ;
            
            if (debug())  {
                dprint( ".responseReceived: " + conn
                        + " waitCount=" + waitCount ) ;
            }
            
            if (waitCount < 0) {
                if (debug())  {
                    dprint( ".responseReceived: " + conn
                            + " incorrect call: error" ) ;
                }
                cs.expectedResponseCount = 0 ;
                return ;
            }
            
            if ((waitCount == 0) && (cs.busyCount == 0)) {
                reclaimOrClose( cs, conn ) ;
            }
        } finally {
            if (debug()) {
                dprint( "<-responseReceived: " + conn ) ;
            }
        }
    }
    
    /** Close a connection, regardless of whether the connection is busy
     * or not.
     */
    public synchronized void close( final C conn ) {
        if (debug()) {
            dprint( "->close: " + conn ) ;
        }
        
        try {
            final ConnectionState<C> cs = connectionMap.remove( conn ) ;
            if (cs == null) {
                if (debug()) {
                    dprint( ".close: " + conn + " was already closed" ) ;
                }
                
                return ;
            }
            
            if (debug()) {
                dprint( ".close: " + conn
                        + "Connection state=" + cs ) ;
            }
            
            final ConcurrentQueue.Handle rh = cs.reclaimableHandle ;
            if (rh != null) {
                boolean result = rh.remove() ;
                if (debug()) {
                    dprint( ".close: " + conn
                            + "reclaimableHandle .remove = " + result ) ;
                }
            }
            
            if (cs.entry.busyConnections.remove( conn )) {
                if (debug()) {
                    dprint( ".close: " + conn
                            + " removed from busyConnections" ) ;
                }
                
                decrementTotalBusy() ;
            }
            
            if (cs.entry.idleConnections.remove( conn )) {
                if (debug()) {
                    dprint( ".close: " + conn
                            + " removed from idleConnections" ) ;
                }
                
                decrementTotalIdle() ;
            }
            
            try {
                conn.close() ;
            } catch (IOException exc) {
                if (debug())
                    dprint( ".close: " + conn + ": Caught IOException on close:"
                            + exc ) ;
            }
        } finally {
            if (debug()) {
                dprintStatistics() ;
                dprint( "<-close: " + conn ) ;
            }
        }
    }
}

// End of file.