FileDocCategorySizeDatePackage
ConnPoolByRoute.javaAPI DocAndroid 1.5 API23327Wed May 06 22:41:10 BST 2009org.apache.http.impl.conn.tsccm

ConnPoolByRoute

public class ConnPoolByRoute extends AbstractConnPool
A connection pool that maintains connections by route. This class is derived from MultiThreadedHttpConnectionManager in HttpClient 3.x, see there for original authors. It implements the same algorithm for connection re-use and connection-per-host enforcement:
  • connections are re-used only for the exact same route
  • connection limits are enforced per route rather than per host
Note that access to the pool datastructures is synchronized via the {@link AbstractConnPool#poolLock poolLock} in the base class, not via synchronized methods.
author
Roland Weber
author
Michael Becke
author
and others

Fields Summary
private final Log
log
protected final ClientConnectionOperator
operator
Connection operator for this pool
protected Queue
freeConnections
The list of free connections
protected Queue
waitingThreads
The list of WaitingThreads waiting for a connection
protected final Map
routeToPool
A map of route-specific pools. Keys are of class {@link HttpRoute}, values of class {@link RouteSpecificPool}.
protected final int
maxTotalConnections
private final ConnPerRoute
connPerRoute
Constructors Summary
public ConnPoolByRoute(ClientConnectionOperator operator, HttpParams params)
Creates a new connection pool, managed by route.

    
                 
           
        super();
        if (operator == null) {
            throw new IllegalArgumentException("Connection operator may not be null");
        }
        this.operator = operator;
        
        freeConnections = createFreeConnQueue();
        waitingThreads  = createWaitingThreadQueue();
        routeToPool     = createRouteToPoolMap();
        maxTotalConnections = ConnManagerParams
            .getMaxTotalConnections(params);
        connPerRoute = ConnManagerParams
            .getMaxConnectionsPerRoute(params);
    
Methods Summary
protected org.apache.http.impl.conn.tsccm.BasicPoolEntrycreateEntry(org.apache.http.impl.conn.tsccm.RouteSpecificPool rospl, org.apache.http.conn.ClientConnectionOperator op)
Creates a new pool entry. This method assumes that the new connection will be handed out immediately.

param
rospl the route-specific pool for which to create the entry
param
op the operator for creating a connection
return
the new pool entry for a new connection


        if (log.isDebugEnabled()) {
            log.debug("Creating new connection [" + rospl.getRoute() + "]");
        }

        // the entry will create the connection when needed
        BasicPoolEntry entry =
            new BasicPoolEntry(op, rospl.getRoute(), refQueue);

        poolLock.lock();
        try {

            rospl.createdEntry(entry);
            numConnections++;

            issuedConnections.add(entry.getWeakRef());

        } finally {
            poolLock.unlock();
        }

        return entry;
    
protected java.util.QueuecreateFreeConnQueue()
Creates the queue for {@link #freeConnections}. Called once by the constructor.

return
a queue

        return new LinkedList<BasicPoolEntry>();
    
protected java.util.MapcreateRouteToPoolMap()
Creates the map for {@link #routeToPool}. Called once by the constructor.

return
a map

        return new HashMap<HttpRoute, RouteSpecificPool>();
    
protected java.util.QueuecreateWaitingThreadQueue()
Creates the queue for {@link #waitingThreads}. Called once by the constructor.

return
a queue

        return new LinkedList<WaitingThread>();
    
public voiddeleteClosedConnections()


        poolLock.lock();
        try {

            Iterator<BasicPoolEntry>  iter = freeConnections.iterator();
            while (iter.hasNext()) {
                BasicPoolEntry entry = iter.next();
                if (!entry.getConnection().isOpen()) {
                    iter.remove();
                    deleteEntry(entry);
                }
            }

        } finally {
            poolLock.unlock();
        }
    
protected voiddeleteEntry(org.apache.http.impl.conn.tsccm.BasicPoolEntry entry)
Deletes a given pool entry. This closes the pooled connection and removes all references, so that it can be GCed.

Note: Does not remove the entry from the freeConnections list. It is assumed that the caller has already handled this step.

param
entry the pool entry for the connection to delete


        HttpRoute route = entry.getPlannedRoute();

        if (log.isDebugEnabled()) {
            log.debug("Deleting connection" 
                    + " [" + route + "][" + entry.getState() + "]");
        }

        poolLock.lock();
        try {

            closeConnection(entry.getConnection());

            RouteSpecificPool rospl = getRoutePool(route, true);
            rospl.deleteEntry(entry);
            numConnections--;
            if (rospl.isUnused()) {
                routeToPool.remove(route);
            }

            idleConnHandler.remove(entry.getConnection());// not idle, but dead

        } finally {
            poolLock.unlock();
        }
    
protected voiddeleteLeastUsedEntry()
Delete an old, free pool entry to make room for a new one. Used to replace pool entries with ones for a different route.


        try {
            poolLock.lock();

            //@@@ with get() instead of remove, we could
            //@@@ leave the removing to deleteEntry()
            BasicPoolEntry entry = freeConnections.remove();

            if (entry != null) {
                deleteEntry(entry);
            } else if (log.isDebugEnabled()) {
                log.debug("No free connection to delete.");
            }

        } finally {
            poolLock.unlock();
        }
    
public voidfreeEntry(org.apache.http.impl.conn.tsccm.BasicPoolEntry entry, boolean reusable, long validDuration, java.util.concurrent.TimeUnit timeUnit)


        HttpRoute route = entry.getPlannedRoute();
        if (log.isDebugEnabled()) {
            log.debug("Freeing connection" +                                 
                    " [" + route + "][" + entry.getState() + "]");
        }

        poolLock.lock();
        try {
            if (isShutDown) {
                // the pool is shut down, release the
                // connection's resources and get out of here
                closeConnection(entry.getConnection());
                return;
            }

            // no longer issued, we keep a hard reference now
            issuedConnections.remove(entry.getWeakRef());

            RouteSpecificPool rospl = getRoutePool(route, true);

            if (reusable) {
                rospl.freeEntry(entry);
                freeConnections.add(entry);
                idleConnHandler.add(entry.getConnection(), validDuration, timeUnit);
            } else {
                rospl.dropEntry();
                numConnections--;
            }

            notifyWaitingThread(rospl);

        } finally {
            poolLock.unlock();
        }

    
public intgetConnectionsInPool(org.apache.http.conn.routing.HttpRoute route)


        poolLock.lock();
        try {
            // don't allow a pool to be created here!
            RouteSpecificPool rospl = getRoutePool(route, false);
            return (rospl != null) ? rospl.getEntryCount() : 0;

        } finally {
            poolLock.unlock();
        }
    
protected org.apache.http.impl.conn.tsccm.BasicPoolEntrygetEntryBlocking(org.apache.http.conn.routing.HttpRoute route, java.lang.Object state, long timeout, java.util.concurrent.TimeUnit tunit, org.apache.http.impl.conn.tsccm.WaitingThreadAborter aborter)
Obtains a pool entry with a connection within the given timeout. If a {@link WaitingThread} is used to block, {@link WaitingThreadAborter#setWaitingThread(WaitingThread)} must be called before blocking, to allow the thread to be interrupted.

param
route the route for which to get the connection
param
timeout the timeout, 0 or negative for no timeout
param
tunit the unit for the timeout, may be null only if there is no timeout
param
aborter an object which can abort a {@link WaitingThread}.
return
pool entry holding a connection for the route
throws
ConnectionPoolTimeoutException if the timeout expired
throws
InterruptedException if the calling thread was interrupted


        Date deadline = null;
        if (timeout > 0) {
            deadline = new Date
                (System.currentTimeMillis() + tunit.toMillis(timeout));
        }

        BasicPoolEntry entry = null;
        poolLock.lock();
        try {

            RouteSpecificPool rospl = getRoutePool(route, true);
            WaitingThread waitingThread = null;

            while (entry == null) {

                if (isShutDown) {
                    throw new IllegalStateException
                        ("Connection pool shut down.");
                }

                if (log.isDebugEnabled()) {
                    log.debug("Total connections kept alive: " + freeConnections.size()); 
                    log.debug("Total issued connections: " + issuedConnections.size()); 
                    log.debug("Total allocated connection: " + numConnections + " out of " + maxTotalConnections);
                }
                
                // the cases to check for:
                // - have a free connection for that route
                // - allowed to create a free connection for that route
                // - can delete and replace a free connection for another route
                // - need to wait for one of the things above to come true

                entry = getFreeEntry(rospl, state);
                if (entry != null) {
                    break;
                }
                
                boolean hasCapacity = rospl.getCapacity() > 0; 
                
                if (log.isDebugEnabled()) {
                    log.debug("Available capacity: " + rospl.getCapacity() 
                            + " out of " + rospl.getMaxEntries()
                            + " [" + route + "][" + state + "]");
                }
                
                if (hasCapacity && numConnections < maxTotalConnections) {

                    entry = createEntry(rospl, operator);

                } else if (hasCapacity && !freeConnections.isEmpty()) {

                    deleteLeastUsedEntry();
                    entry = createEntry(rospl, operator);

                } else {

                    if (log.isDebugEnabled()) {
                        log.debug("Need to wait for connection" +
                                " [" + route + "][" + state + "]");
                    }

                    if (waitingThread == null) {
                        waitingThread =
                            newWaitingThread(poolLock.newCondition(), rospl);
                        aborter.setWaitingThread(waitingThread);
                    }

                    boolean success = false;
                    try {
                        rospl.queueThread(waitingThread);
                        waitingThreads.add(waitingThread);
                        success = waitingThread.await(deadline);

                    } finally {
                        // In case of 'success', we were woken up by the
                        // connection pool and should now have a connection
                        // waiting for us, or else we're shutting down.
                        // Just continue in the loop, both cases are checked.
                        rospl.removeThread(waitingThread);
                        waitingThreads.remove(waitingThread);
                    }

                    // check for spurious wakeup vs. timeout
                    if (!success && (deadline != null) &&
                        (deadline.getTime() <= System.currentTimeMillis())) {
                        throw new ConnectionPoolTimeoutException
                            ("Timeout waiting for connection");
                    }
                }
            } // while no entry

        } finally {
            poolLock.unlock();
        }

        return entry;

    
protected org.apache.http.impl.conn.tsccm.BasicPoolEntrygetFreeEntry(org.apache.http.impl.conn.tsccm.RouteSpecificPool rospl, java.lang.Object state)
If available, get a free pool entry for a route.

param
rospl the route-specific pool from which to get an entry
return
an available pool entry for the given route, or null if none is available


        BasicPoolEntry entry = null;
        poolLock.lock();
        try {
            boolean done = false;
            while(!done) {

                entry = rospl.allocEntry(state);
    
                if (entry != null) {
                    if (log.isDebugEnabled()) {
                        log.debug("Getting free connection" 
                                + " [" + rospl.getRoute() + "][" + state + "]");
    
                    }
                    freeConnections.remove(entry);
                    boolean valid = idleConnHandler.remove(entry.getConnection());
                    if(!valid) {
                        // If the free entry isn't valid anymore, get rid of it
                        // and loop to find another one that might be valid.
                        if(log.isDebugEnabled())
                            log.debug("Closing expired free connection"
                                    + " [" + rospl.getRoute() + "][" + state + "]");
                        closeConnection(entry.getConnection());
                        // We use dropEntry instead of deleteEntry because the entry
                        // is no longer "free" (we just allocated it), and deleteEntry
                        // can only be used to delete free entries.
                        rospl.dropEntry();
                        numConnections--;
                    } else {
                        issuedConnections.add(entry.getWeakRef());
                        done = true;
                    }
    
                } else {
                    done = true;
                    if (log.isDebugEnabled()) {
                        log.debug("No free connections" 
                                + " [" + rospl.getRoute() + "][" + state + "]");
                    }
                }
            }
        } finally {
            poolLock.unlock();
        }

        return entry;
    
protected org.apache.http.impl.conn.tsccm.RouteSpecificPoolgetRoutePool(org.apache.http.conn.routing.HttpRoute route, boolean create)
Get a route-specific pool of available connections.

param
route the route
param
create whether to create the pool if it doesn't exist
return
the pool for the argument route, never null if create is true

        RouteSpecificPool rospl = null;
        poolLock.lock();
        try {

            rospl = routeToPool.get(route);
            if ((rospl == null) && create) {
                // no pool for this route yet (or anymore)
                rospl = newRouteSpecificPool(route);
                routeToPool.put(route, rospl);
            }

        } finally {
            poolLock.unlock();
        }

        return rospl;
    
protected voidhandleLostEntry(org.apache.http.conn.routing.HttpRoute route)


        poolLock.lock();
        try {

            RouteSpecificPool rospl = getRoutePool(route, true);
            rospl.dropEntry();
            if (rospl.isUnused()) {
                routeToPool.remove(route);
            }

            numConnections--;
            notifyWaitingThread(rospl);

        } finally {
            poolLock.unlock();
        }
    
protected org.apache.http.impl.conn.tsccm.RouteSpecificPoolnewRouteSpecificPool(org.apache.http.conn.routing.HttpRoute route)
Creates a new route-specific pool. Called by {@link #getRoutePool} when necessary.

param
route the route
return
the new pool

        return new RouteSpecificPool(route, connPerRoute.getMaxForRoute(route));
    
protected org.apache.http.impl.conn.tsccm.WaitingThreadnewWaitingThread(java.util.concurrent.locks.Condition cond, org.apache.http.impl.conn.tsccm.RouteSpecificPool rospl)
Creates a new waiting thread. Called by {@link #getRoutePool} when necessary.

param
cond the condition to wait for
param
rospl the route specific pool, or null
return
a waiting thread representation

        return new WaitingThread(cond, rospl);
    
protected voidnotifyWaitingThread(org.apache.http.impl.conn.tsccm.RouteSpecificPool rospl)
Notifies a waiting thread that a connection is available. This will wake a thread waiting in the specific route pool, if there is one. Otherwise, a thread in the connection pool will be notified.

param
rospl the pool in which to notify, or null


        //@@@ while this strategy provides for best connection re-use,
        //@@@ is it fair? only do this if the connection is open?
        // Find the thread we are going to notify. We want to ensure that
        // each waiting thread is only interrupted once, so we will remove
        // it from all wait queues before interrupting.
        WaitingThread waitingThread = null;

        poolLock.lock();
        try {

            if ((rospl != null) && rospl.hasThread()) {
                if (log.isDebugEnabled()) {
                    log.debug("Notifying thread waiting on pool" +
                            " [" + rospl.getRoute() + "]");
                }
                waitingThread = rospl.nextThread();
            } else if (!waitingThreads.isEmpty()) {
                if (log.isDebugEnabled()) {
                    log.debug("Notifying thread waiting on any pool");
                }
                waitingThread = waitingThreads.remove();
            } else if (log.isDebugEnabled()) {
                log.debug("Notifying no-one, there are no waiting threads");
            }

            if (waitingThread != null) {
                waitingThread.wakeup();
            }

        } finally {
            poolLock.unlock();
        }
    
public org.apache.http.impl.conn.tsccm.PoolEntryRequestrequestPoolEntry(org.apache.http.conn.routing.HttpRoute route, java.lang.Object state)

        
        final WaitingThreadAborter aborter = new WaitingThreadAborter();
        
        return new PoolEntryRequest() {
        
            public void abortRequest() {
                poolLock.lock();
                try {
                    aborter.abort();
                } finally {
                    poolLock.unlock();
                }
            }
            
            public BasicPoolEntry getPoolEntry(
                    long timeout,
                    TimeUnit tunit)
                        throws InterruptedException, ConnectionPoolTimeoutException {
                return getEntryBlocking(route, state, timeout, tunit, aborter);
            }
            
        };
    
public voidshutdown()


        poolLock.lock();
        try {

            super.shutdown();

            // close all free connections
            //@@@ move this to base class?
            Iterator<BasicPoolEntry> ibpe = freeConnections.iterator();
            while (ibpe.hasNext()) {
                BasicPoolEntry entry = ibpe.next();
                ibpe.remove();
                closeConnection(entry.getConnection());
            }

            // wake up all waiting threads
            Iterator<WaitingThread> iwth = waitingThreads.iterator();
            while (iwth.hasNext()) {
                WaitingThread waiter = iwth.next();
                iwth.remove();
                waiter.wakeup();
            }

            routeToPool.clear();

        } finally {
            poolLock.unlock();
        }