FileDocCategorySizeDatePackage
JmsServerSessionPool.javaAPI DocJBoss 4.2.17483Fri Jul 13 21:01:16 BST 2007org.jboss.resource.adapter.jms.inflow

JmsServerSessionPool

public class JmsServerSessionPool extends Object implements javax.jms.ServerSessionPool
A generic jms session pool.
author
Adrian Brock
version
$Revision: 60552 $

Fields Summary
private static final Logger
log
The logger
JmsActivation
activation
The activation
javax.jms.ConnectionConsumer
consumer
The consumer
ArrayList
serverSessions
The server sessions
boolean
stopped
Whether the pool is stopped
int
sessionCount
The number of sessions
Constructors Summary
public JmsServerSessionPool(JmsActivation activation)
Create a new session pool

param
activation the jms activation

   
   
                  
     
   
      this.activation = activation;
   
Methods Summary
public JmsActivationgetActivation()

return
the activation

      return activation;
   
public javax.jms.ServerSessiongetServerSession()

      boolean trace = log.isTraceEnabled();
      if (trace)
         log.trace("getServerSession");

      ServerSession result = null;
      
      try
      {
         synchronized (serverSessions)
         {
            while (true)
            {
               int sessionsSize = serverSessions.size();
               
               if (stopped)
                  throw new Exception("Cannot get a server session after the pool is stopped");
               
               else if (sessionsSize > 0)
               {
                  result = (ServerSession) serverSessions.remove(sessionsSize-1);
                  break;
               }
               
               else
               {
                  try
                  {
                     serverSessions.wait();
                  }
                  catch (InterruptedException ignored)
                  {
                  }
               }
            }
         }
      }
      catch (Throwable t)
      {
         log.error("Unable to get a server session", t);
         throw new JMSException("Unable to get a server session " + t);
      }
      
      if (trace)
         log.trace("Returning server session " + result);
      
      return result;
   
protected voidreturnServerSession(JmsServerSession session)
Return the server session

param
session the session

      synchronized (serverSessions)
      {
         if (stopped)
         {
            session.teardown();
            --sessionCount;
         }
         else
            serverSessions.add(session);
         serverSessions.notifyAll();
      }
   
protected voidsetupConsumer()
Setup the connection consumer

throws
Exeption for any error

      Connection connection = activation.getConnection();
      JmsActivationSpec spec = activation.getActivationSpec();
      String selector = spec.getMessageSelector();
      int maxMessages = spec.getMaxMessagesInt();
      if (spec.isTopic())
      {
         Topic topic = (Topic) activation.getDestination();
         String subscriptionName = spec.getSubscriptionName();
         if (spec.isDurable())
            consumer = connection.createDurableConnectionConsumer(topic, subscriptionName, selector, this, maxMessages);
         else
            consumer = connection.createConnectionConsumer(topic, selector, this, maxMessages);
      }
      else
      {
         Queue queue = (Queue) activation.getDestination();
         consumer = connection.createConnectionConsumer(queue, selector, this, maxMessages);
      }
      log.debug("Created consumer " + consumer);
   
protected voidsetupSessions()
Setup the sessions

throws
Exeption for any error

      JmsActivationSpec spec = activation.getActivationSpec();
      ArrayList clonedSessions = null;

      // Create the sessions
      synchronized (serverSessions)
      {
         for (int i = 0; i < spec.getMaxSessionInt(); ++i)
         {
            JmsServerSession session = new JmsServerSession(this);
            serverSessions.add(session);
         }
         sessionCount = serverSessions.size();
         clonedSessions = (ArrayList) serverSessions.clone();

      }
      
      // Start the sessions
      for (int i = 0; i < clonedSessions.size(); ++ i)
      {
         JmsServerSession session = (JmsServerSession) clonedSessions.get(i);
         session.setup();
      }
   
public voidstart()
Start the server session pool

throws
Exeption for any error

      setupSessions();
      setupConsumer();
   
public voidstop()
Stop the server session pool

      teardownConsumer();
      teardownSessions();
   
protected voidteardownConsumer()
Stop the connection consumer

      try
      {
         if (consumer != null)
         {
            log.debug("Closing the " + consumer);
            consumer.close();
         }
      }
      catch (Throwable t)
      {
         log.debug("Error closing the consumer " + consumer, t);
      }
   
protected voidteardownSessions()
Stop the sessions

      synchronized (serverSessions)
      {
         // Disallow any new sessions
         stopped = true;
         serverSessions.notifyAll();
         
         // Stop inactive sessions
         for (int i = 0; i < serverSessions.size(); ++i)
         {
            JmsServerSession session = (JmsServerSession) serverSessions.get(i);
            session.teardown();
         }

         sessionCount -= serverSessions.size();
         serverSessions.clear();

         // Wait for inuse sessions
         while (sessionCount > 0)
         {
            try
            {
               serverSessions.wait();
            }
            catch (InterruptedException ignore)
            {
            }
         }
      }