FileDocCategorySizeDatePackage
OutboundConnectionCacheBlockingImpl.javaAPI DocExample25447Tue May 29 16:57:04 BST 2007com.sun.xml.ws.transport.tcp.connectioncache.impl.transport

OutboundConnectionCacheBlockingImpl

public final class OutboundConnectionCacheBlockingImpl extends ConnectionCacheBlockingBase implements com.sun.xml.ws.transport.tcp.connectioncache.spi.transport.OutboundConnectionCache

Fields Summary
private final int
maxParallelConnections
private Map
entryMap
private Map
connectionMap
Constructors Summary
public OutboundConnectionCacheBlockingImpl(String cacheType, int highWaterMark, int numberToReclaim, int maxParallelConnections, Logger logger)

        
        super( cacheType, highWaterMark, numberToReclaim, logger ) ;
        
        if (maxParallelConnections < 1)
            throw new IllegalArgumentException(
                    "maxParallelConnections must be > 0" ) ;
        
        this.maxParallelConnections = maxParallelConnections ;
        
        this.entryMap = new HashMap<ContactInfo<C>,CacheEntry<C>>() ;
        this.connectionMap = new HashMap<C,ConnectionState<C>>() ;
        
        if (debug()) {
            dprint(".constructor completed: " + cacheType );
        }
    
Methods Summary
public booleancanCreateNewConnection(com.sun.xml.ws.transport.tcp.connectioncache.spi.transport.ContactInfo cinfo)

        CacheEntry<C> entry = entryMap.get( cinfo ) ;
        if (entry == null)
            return true ;
        
        return internalCanCreateNewConnection( entry ) ;
    
public synchronized voidclose(C conn)
Close a connection, regardless of whether the connection is busy or not.

        if (debug()) {
            dprint( "->close: " + conn ) ;
        }
        
        try {
            final ConnectionState<C> cs = connectionMap.remove( conn ) ;
            if (cs == null) {
                if (debug()) {
                    dprint( ".close: " + conn + " was already closed" ) ;
                }
                
                return ;
            }
            
            if (debug()) {
                dprint( ".close: " + conn
                        + "Connection state=" + cs ) ;
            }
            
            final ConcurrentQueue.Handle rh = cs.reclaimableHandle ;
            if (rh != null) {
                boolean result = rh.remove() ;
                if (debug()) {
                    dprint( ".close: " + conn
                            + "reclaimableHandle .remove = " + result ) ;
                }
            }
            
            if (cs.entry.busyConnections.remove( conn )) {
                if (debug()) {
                    dprint( ".close: " + conn
                            + " removed from busyConnections" ) ;
                }
                
                decrementTotalBusy() ;
            }
            
            if (cs.entry.idleConnections.remove( conn )) {
                if (debug()) {
                    dprint( ".close: " + conn
                            + " removed from idleConnections" ) ;
                }
                
                decrementTotalIdle() ;
            }
            
            try {
                conn.close() ;
            } catch (IOException exc) {
                if (debug())
                    dprint( ".close: " + conn + ": Caught IOException on close:"
                            + exc ) ;
            }
        } finally {
            if (debug()) {
                dprintStatistics() ;
                dprint( "<-close: " + conn ) ;
            }
        }
    
private voiddecrementTotalBusy()

        if (debug())
            dprint( "->decrementTotalBusy: totalBusy = "
                    + totalBusy ) ;
        
        try {
            if (totalBusy > 0) {
                totalBusy-- ;
            } else {
                if (debug()) {
                    dprint( ".decrementTotalBusy: "
                            + "incorrect idle count: was already 0" ) ;
                }
            }
        } finally {
            if (debug()) {
                dprint( "<-decrementTotalBusy: totalBusy = "
                        + totalBusy ) ;
            }
        }
    
private voiddecrementTotalIdle()

        if (debug())
            dprint( "->decrementTotalIdle: totalIdle = "
                    + totalIdle ) ;
        
        try {
            if (totalIdle > 0) {
                totalIdle-- ;
            } else {
                if (debug()) {
                    dprint( ".decrementTotalIdle: "
                            + "incorrect idle count: was already 0" ) ;
                }
            }
        } finally {
            if (debug()) {
                dprint( "<-decrementTotalIdle: totalIdle = "
                        + totalIdle ) ;
            }
        }
    
public synchronized Cget(com.sun.xml.ws.transport.tcp.connectioncache.spi.transport.ContactInfo cinfo)

        
        return get( cinfo, null ) ;
    
public synchronized Cget(com.sun.xml.ws.transport.tcp.connectioncache.spi.transport.ContactInfo cinfo, com.sun.xml.ws.transport.tcp.connectioncache.spi.transport.ConnectionFinder finder)

        
        if (debug()) {
            dprint( "->get: " + cinfo ) ;
        }
        
        ConnectionState<C> cs = null ;
        
        try {
            final CacheEntry<C> entry = getEntry( cinfo ) ;
            C result = null ;
            
            if (numberOfConnections() >= highWaterMark()) {
                // This reclaim probably does nothing, because
                // connections are reclaimed on release in the
                // overflow state.
                reclaim() ;
            }
            
            if (finder != null) {
                // Try the finder if present.
                if (debug()) {
                    dprint( ".get: " + cinfo +
                            " Calling the finder to get a connection" ) ;
                }
                
                result = finder.find( cinfo, entry.idleConnectionsView,
                        entry.busyConnectionsView ) ;
                
                if (result != null) {
                    cs = getConnectionState( cinfo, entry, result ) ;
                    
                    // Dequeue from cache entry if not NEW
                    if (cs.csv == ConnectionStateValue.BUSY)
                        entry.busyConnections.remove( result ) ;
                    else if (cs.csv == ConnectionStateValue.IDLE)
                        entry.idleConnections.remove( result ) ;
                }
            }
            
            if (result == null) {
                result = tryIdleConnections( entry ) ;
            }
            
            if (result == null) {
                result = tryNewConnection( entry, cinfo ) ;
            }
            
            if (result == null) {
                result = tryBusyConnections( entry ) ;
            }
            
            if (cs == null)
                cs = getConnectionState( cinfo, entry, result ) ;
            
            makeResultBusy( result, cs, entry ) ;
            return result ;
        } finally {
            if (debug()) {
                dprint( ".get " + cinfo
                        + " totalIdle=" + totalIdle
                        + " totalBusy=" + totalBusy ) ;
                
                dprint( "<-get " + cinfo + " ConnectionState=" + cs ) ;
            }
        }
    
public synchronized com.sun.xml.ws.transport.tcp.connectioncache.impl.transport.OutboundConnectionCacheBlockingImpl$ConnectionStategetConnectionState(com.sun.xml.ws.transport.tcp.connectioncache.spi.transport.ContactInfo cinfo, com.sun.xml.ws.transport.tcp.connectioncache.impl.transport.OutboundConnectionCacheBlockingImpl$CacheEntry entry, C conn)

        
        if (debug())
            dprint( "->getConnectionState: " + conn ) ;
        
        try {
            ConnectionState<C> cs = connectionMap.get( conn ) ;
            if (cs == null) {
                if (debug())
                    dprint( ".getConnectionState: " + conn
                            + " creating new ConnectionState" + cs ) ;
                
                cs = new ConnectionState<C>( cinfo, entry, conn ) ;
                connectionMap.put( conn, cs ) ;
            } else {
                if (debug())
                    dprint( ".getConnectionState: " + conn
                            + " found ConnectionState" + cs ) ;
            }
            
            return cs ;
        } finally {
            if (debug())
                dprint( "<-getConnectionState: " + conn ) ;
        }
    
private com.sun.xml.ws.transport.tcp.connectioncache.impl.transport.OutboundConnectionCacheBlockingImpl$CacheEntrygetEntry(com.sun.xml.ws.transport.tcp.connectioncache.spi.transport.ContactInfo cinfo)

        
        if (debug()) {
            dprint( "->getEntry: " + cinfo ) ;
        }
        
        try {
            // This should be the only place a CacheEntry is constructed.
            CacheEntry<C> result = entryMap.get( cinfo ) ;
            if (result == null) {
                if (debug()) {
                    dprint( ".getEntry: " + cinfo
                            + " creating new CacheEntry" ) ;
                }
                
                result = new CacheEntry<C>() ;
                entryMap.put( cinfo, result ) ;
            } else {
                if (debug()) {
                    dprint( ".getEntry: " + cinfo +
                            " re-using existing CacheEntry" ) ;
                }
            }
            
            return result ;
        } finally {
            if (debug()) {
                dprint( "<-getEntry: " + cinfo ) ;
            }
        }
    
private booleaninternalCanCreateNewConnection(com.sun.xml.ws.transport.tcp.connectioncache.impl.transport.OutboundConnectionCacheBlockingImpl$CacheEntry entry)

        final int totalConnectionsInEntry = entry.totalConnections() ;
        
        final boolean createNewConnection =
                (totalConnectionsInEntry == 0) ||
                ((numberOfConnections() < highWaterMark()) &&
                (totalConnectionsInEntry < maxParallelConnections)) ;
        
        return createNewConnection ;
    
private voidmakeResultBusy(C result, com.sun.xml.ws.transport.tcp.connectioncache.impl.transport.OutboundConnectionCacheBlockingImpl$ConnectionState cs, com.sun.xml.ws.transport.tcp.connectioncache.impl.transport.OutboundConnectionCacheBlockingImpl$CacheEntry entry)

        
        if (debug())
            dprint( "->makeResultBusy: " + result
                    + " was previously " + cs.csv ) ;
        
        try {
            switch (cs.csv) {
                case NEW :
                    totalBusy++ ;
                    break ;
                    
                case IDLE :
                    totalBusy++ ;
                    decrementTotalIdle() ;
                    
                    final ConcurrentQueue.Handle<C> handle =
                            cs.reclaimableHandle ;
                    
                    if (handle != null) {
                        if (!handle.remove()) {
                            if (debug()) {
                                dprint( ".makeResultBusy: " + cs.cinfo
                                        + " result was not on reclaimable Q" ) ;
                            }
                        }
                        cs.reclaimableHandle = null ;
                    }
                    break ;
                    
                case BUSY :
                    // Nothing to do here
                    break ;
            }
            
            entry.busyConnections.offer( result ) ;
            cs.csv = ConnectionStateValue.BUSY ;
            cs.busyCount++ ;
        } finally {
            if (debug())
                dprint( "<-makeResultBusy: " + result ) ;
        }
    
public intmaxParallelConnections()

        return maxParallelConnections ;
    
private booleanreclaimOrClose(com.sun.xml.ws.transport.tcp.connectioncache.impl.transport.OutboundConnectionCacheBlockingImpl$ConnectionState cs, C conn)

        if (debug())
            dprint( "->reclaimOrClose: " + conn ) ;
        
        try {
            final boolean isOverflow = numberOfConnections() >
                    highWaterMark() ;
            
            if (isOverflow) {
                if (debug()) {
                    dprint( ".reclaimOrClose: closing overflow connection "
                            + conn ) ;
                }
                
                close( conn ) ;
            } else {
                if (debug()) {
                    dprint( ".reclaimOrClose: queuing reclaimable connection "
                            + conn ) ;
                }
                
                cs.reclaimableHandle =
                        reclaimableConnections.offer( conn ) ;
            }
            
            return isOverflow ;
        } finally {
            if (debug())
                dprint( "<-reclaimOrClose: " + conn ) ;
        }
    
public synchronized voidrelease(C conn, int numResponsesExpected)

        
        if (debug()) {
            dprint( "->release: " + conn
                    + " expecting " + numResponsesExpected + " responses" ) ;
        }
        
        final ConnectionState<C> cs = connectionMap.get( conn ) ;
        
        try {
            if (cs == null) {
                if (debug()) {
                    dprint( ".release: " + conn + " was closed" ) ;
                }
                
                return ;
            } else {
                cs.expectedResponseCount += numResponsesExpected ;
                int numResp = cs.expectedResponseCount ;
                int numBusy = --cs.busyCount ;
                if (numBusy < 0) {
                    if (debug()) {
                        dprint( ".release: " + conn + " numBusy=" +
                                numBusy + " is < 0: error" ) ;
                    }
                    
                    cs.busyCount = 0 ;
                    return ;
                }
                
                if (debug()) {
                    dprint( ".release: " + numResp + " responses expected" ) ;
                    dprint( ".release: " + numBusy + " busy count" ) ;
                }
                
                if (numBusy == 0) {
                    final CacheEntry<C> entry = cs.entry ;
                    boolean wasOnBusy = entry.busyConnections.remove( conn ) ;
                    if (!wasOnBusy)
                        if (debug())
                            dprint( ".release: " + conn
                                    + " was NOT on busy queue, "
                                    + "but should have been" ) ;
                    
                    boolean connectionClosed = false ;
                    if (numResp == 0) {
                        connectionClosed = reclaimOrClose( cs, conn ) ;
                    }
                    
                    decrementTotalBusy() ;
                    
                    if (!connectionClosed) {
                        if (debug()) {
                            dprint( ".release: queuing idle connection "
                                    + conn ) ;
                        }
                        
                        totalIdle++ ;
                        entry.idleConnections.offer( conn ) ;
                        cs.csv = ConnectionStateValue.IDLE ;
                    }
                }
            }
        } finally {
            if (debug()) {
                dprint( ".release " + conn
                        + " cs=" + cs
                        + " totalIdle=" + totalIdle
                        + " totalBusy=" + totalBusy ) ;
                
                dprint( "<-release" + conn ) ;
            }
        }
    
public synchronized voidresponseReceived(C conn)
Decrement the number of expected responses. When a connection is idle and has no expected responses, it can be reclaimed.

        if (debug()) {
            dprint( "->responseReceived: " + conn ) ;
        }
        
        try {
            final ConnectionState<C> cs = connectionMap.get( conn ) ;
            if (cs == null) {
                if (debug()) {
                    dprint(
                            ".responseReceived: "
                            + "received response on closed connection "
                            + conn ) ;
                }
                
                return ;
            }
            
            final int waitCount = --cs.expectedResponseCount ;
            
            if (debug())  {
                dprint( ".responseReceived: " + conn
                        + " waitCount=" + waitCount ) ;
            }
            
            if (waitCount < 0) {
                if (debug())  {
                    dprint( ".responseReceived: " + conn
                            + " incorrect call: error" ) ;
                }
                cs.expectedResponseCount = 0 ;
                return ;
            }
            
            if ((waitCount == 0) && (cs.busyCount == 0)) {
                reclaimOrClose( cs, conn ) ;
            }
        } finally {
            if (debug()) {
                dprint( "<-responseReceived: " + conn ) ;
            }
        }
    
protected java.lang.StringthisClassName()

        return "OutboundConnectionCacheBlockingImpl" ;
    
private CtryBusyConnections(com.sun.xml.ws.transport.tcp.connectioncache.impl.transport.OutboundConnectionCacheBlockingImpl$CacheEntry entry)

        // Use a busy connection.  Note that there MUST be a busy
        // connection available at this point, because
        // tryNewConnection did not create a new connection.
        if (debug()) {
            dprint( "->tryBusyConnections" ) ;
        }
        
        try {
            C result = entry.busyConnections.poll() ;
            
            if (result == null) {
                throw new RuntimeException(
                        "INTERNAL ERROR: no busy connection available" ) ;
            }
            
            return result ;
        } finally {
            if (debug()) {
                dprint( "<-tryBusyConnections" ) ;
            }
        }
    
private CtryIdleConnections(com.sun.xml.ws.transport.tcp.connectioncache.impl.transport.OutboundConnectionCacheBlockingImpl$CacheEntry entry)

        if (debug()) {
            dprint( "->tryIdleConnections" ) ;
        }
        
        try {
            return entry.idleConnections.poll() ;
        } finally {
            if (debug()) {
                dprint( "<-tryIdleConnections" ) ;
            }
        }
    
private CtryNewConnection(com.sun.xml.ws.transport.tcp.connectioncache.impl.transport.OutboundConnectionCacheBlockingImpl$CacheEntry entry, com.sun.xml.ws.transport.tcp.connectioncache.spi.transport.ContactInfo cinfo)

        
        if (debug())
            dprint( "->tryNewConnection: " + cinfo ) ;
        
        try {
            C conn = null ;
            
            if (internalCanCreateNewConnection(entry)) {
                // If this throws an exception just let it
                // propagate: let a higher layer handle a
                // connection creation failure.
                conn = cinfo.createConnection() ;
                
                if (debug()) {
                    dprint( ".tryNewConnection: " + cinfo
                            + " created connection " + conn ) ;
                }
            }
            
            return conn ;
        } finally {
            if (debug())
                dprint( "<-tryNewConnection: " + cinfo ) ;
        }