FileDocCategorySizeDatePackage
JmsManagedConnection.javaAPI DocJBoss 4.2.124083Fri Jul 13 21:01:16 BST 2007org.jboss.resource.adapter.jms

JmsManagedConnection

public class JmsManagedConnection extends Object implements javax.resource.spi.ManagedConnection, javax.jms.ExceptionListener
Managed Connection, manages one or more JMS sessions.

Every ManagedConnection will have a physical JMSConnection under the hood. This may leave out several session, as specifyed in 5.5.4 Multiple Connection Handles. Thread safe semantics is provided

Hm. If we are to follow the example in 6.11 this will not work. We would have to use the SAME session. This means we will have to guard against concurrent access. We use a stack, and only allowes the handle at the top of the stack to do things.

As to transactions we some fairly hairy alternatives to handle: XA - we get an XA. We may now only do transaction through the XAResource, since a XASession MUST throw exceptions in commit etc. But since XA support implies LocatTransaction support, we will have to use the XAResource in the LocalTransaction class. LocalTx - we get a normal session. The LocalTransaction will then work against the normal session api.

An invokation of JMS MAY BE DONE in none transacted context. What do we do then? How much should we leave to the user???

One possible solution is to use transactions any way, but under the hood. If not LocalTransaction or XA has been aquired by the container, we have to do the commit in send and publish. (CHECK is the container required to get a XA every time it uses a managed connection? No its is not, only at creation!)

Does this mean that a session one time may be used in a transacted env, and another time in a not transacted.

Maybe we could have this simple rule:

If a user is going to use non trans:

  • mark that i ra deployment descr
  • Use a JmsProviderAdapter with non XA factorys
  • Mark session as non transacted (this defeats the purpose of specifying
  • trans attrinbutes in deploy descr NOT GOOD

From the JMS tutorial: "When you create a session in an enterprise bean, the container ignores the arguments you specify, because it manages all transactional properties for enterprise beans."

And further: "You do not specify a message acknowledgment mode when you create a message-driven bean that uses container-managed transactions. The container handles acknowledgment automatically."

On Session or Connection:

From Tutorial: "A JMS API resource is a JMS API connection or a JMS API session." But in the J2EE spec only connection is considered a resource.

Not resolved: connectionErrorOccurred: it is verry hard to know from the exceptions thrown if it is a connection error. Should we register an ExceptionListener and mark al handles as errounous? And then let them send the event and throw an exception?

author
Peter Antman.
author
Jason Dillon
author
Adrian Brock
version
$Revision: 57189 $

Fields Summary
private static final Logger
log
private JmsManagedConnectionFactory
mcf
private JmsConnectionRequestInfo
info
private String
user
private String
pwd
private boolean
isDestroyed
private javax.jms.Connection
con
private javax.jms.Session
session
private javax.jms.TopicSession
topicSession
private javax.jms.QueueSession
queueSession
private javax.jms.XASession
xaSession
private javax.jms.XATopicSession
xaTopicSession
private javax.jms.XAQueueSession
xaQueueSession
private XAResource
xaResource
private boolean
xaTransacted
private Set
handles
Holds all current JmsSession handles.
private Vector
listeners
The event listeners
Constructors Summary
public JmsManagedConnection(JmsManagedConnectionFactory mcf, javax.resource.spi.ConnectionRequestInfo info, String user, String pwd)
Create a JmsManagedConnection.

param
mcf
param
info
param
user
param
pwd
throws
ResourceException


                    
      
                                 
                                 
                                 
       
   
      this.mcf = mcf;

      // seem like its asking for trouble here
      this.info = (JmsConnectionRequestInfo)info;
      this.user = user;
      this.pwd = pwd;

      setup();
   
Methods Summary
public voidaddConnectionEventListener(javax.resource.spi.ConnectionEventListener l)
Add a connection event listener.

param
l The connection event listener to be added.

      listeners.addElement(l);

      if (log.isTraceEnabled())
         log.trace("ConnectionEvent listener added: " + l);
   
public voidassociateConnection(java.lang.Object obj)
Move a handler from one mc to this one.

param
obj An object of type JmsSession.
throws
ResourceException Failed to associate connection.
throws
IllegalStateException ManagedConnection in an illegal state.

      //
      // Should we check auth, ie user and pwd? FIXME
      //

      if (!isDestroyed && obj instanceof JmsSession)
      {
         JmsSession h = (JmsSession)obj;
         h.setManagedConnection(this);
         handles.add(h);
      }
      else
         throw new IllegalStateException
            ("ManagedConnection in an illegal state");
   
public voidcleanup()
Cleans up the, from the spec - The cleanup of ManagedConnection instance resets its client specific state. Does that mean that autentication should be redone. FIXME

      if (isDestroyed)
         throw new IllegalStateException("ManagedConnection already destroyed");

      // destory handles
      destroyHandles();
   
public voiddestroy()
Destroy the physical connection.

throws
ResourceException Could not property close the session and connection.

      if (isDestroyed) return;

      isDestroyed = true;

      try
      {
         con.setExceptionListener(null);
      }
      catch (JMSException e)
      {
         log.debug("Error unsetting the exception listener " + this, e);
      }
      
      // destory handles
      destroyHandles();
      
      try
      {
         // Close session and connection
         try
         {
            if (info.getType() == JmsConnectionFactory.TOPIC)
            {
               topicSession.close();
               if (xaTransacted) {
                  xaTopicSession.close();
               }
            }
            else if (info.getType() == JmsConnectionFactory.QUEUE)
            {
               queueSession.close();
               if (xaTransacted)
                  xaQueueSession.close();
            }
            else
            {
               session.close();
               if (xaTransacted)
                  xaSession.close();
            }
         }
         catch (JMSException e)
         {
            log.debug("Error closing session " +this, e);
         }
         con.close();
      }
      catch (JMSException e)
      {
         throw new JBossResourceException
            ("Could not properly close the session and connection", e);
      }
   
private voiddestroyHandles()
Destroy all handles.

throws
ResourceException Failed to close one or more handles.

      try
      {
         if (con != null)
            con.stop();  
      }
      catch (Throwable t)
      {
         log.trace("Ignored error stopping connection", t);
      }
      
      Iterator iter = handles.iterator();
      while (iter.hasNext())
         ((JmsSession)iter.next()).destroy();

      // clear the handles map
      handles.clear();
   
public java.lang.ObjectgetConnection(javax.security.auth.Subject subject, javax.resource.spi.ConnectionRequestInfo info)
Get the physical connection handler.

This bummer will be called in two situations:

  1. When a new mc has bean created and a connection is needed
  2. When an mc has been fetched from the pool (returned in match*)

It may also be called multiple time without a cleanup, to support connection sharing.

param
subject
param
info
return
A new connection object.
throws
ResourceException

      // Check user first
      JmsCred cred = JmsCred.getJmsCred(mcf,subject,info);

      // Null users are allowed!
      if (user != null && !user.equals(cred.name))
         throw new SecurityException
            ("Password credentials not the same, reauthentication not allowed");
      if (cred.name != null && user == null) {
         throw new SecurityException
            ("Password credentials not the same, reauthentication not allowed");
      }

      user = cred.name; // Basically meaningless

      if (isDestroyed)
         throw new IllegalStateException("ManagedConnection already destroyd");

      // Create a handle
      JmsSession handle = new JmsSession(this, (JmsConnectionRequestInfo) info);
      handles.add(handle);
      return handle;
   
protected javax.resource.spi.ConnectionRequestInfogetInfo()
Get the request info for this connection.

return
The request info for this connection.

      return info;
   
public javax.resource.spi.LocalTransactiongetLocalTransaction()
Get the location transaction for the connection.

return
The local transaction for the connection.
throws
ResourceException

      LocalTransaction tx = new JmsLocalTransaction(this);
      if (log.isTraceEnabled())
         log.trace("LocalTransaction=" + tx);
      return tx;
   
public java.io.PrintWritergetLogWriter()
Get the log writer for this connection.

return
Always null

      //
      // jason: screw the logWriter stuff for now it sucks ass
      //

      return null;
   
protected JmsManagedConnectionFactorygetManagedConnectionFactory()
Get the connection factory for this connection.

return
The connection factory for this connection.

      return mcf;
   
public javax.resource.spi.ManagedConnectionMetaDatagetMetaData()
Get the meta data for the connection.

return
The meta data for the connection.
throws
ResourceException
throws
IllegalStateException ManagedConnection already destroyed.

      if (isDestroyed)
         throw new IllegalStateException("ManagedConnection already destroyd");

      return new JmsMetaData(this);
   
private org.jboss.jms.jndi.JMSProviderAdaptergetProviderAdapter()
Get the JMS provider adapter that will be used to create JMS resources.

return
A JMS provider adapter.
throws
NamingException Failed to lookup provider adapter.

      JMSProviderAdapter adapter;

      if (mcf.getJmsProviderAdapterJNDI() != null)
      {
         // lookup the adapter from JNDI
         Context ctx = new InitialContext();
         try
         {
            adapter = (JMSProviderAdapter)
               ctx.lookup(mcf.getJmsProviderAdapterJNDI());
         }
         finally
         {
            ctx.close();
         }
      }
      else
         adapter = mcf.getJmsProviderAdapter();

      return adapter;
   
protected javax.jms.SessiongetSession()
Get the session for this connection.

return
Either a topic or queue connection.

      if (info.getType() == JmsConnectionFactory.TOPIC)
         return topicSession;
      else if (info.getType() == JmsConnectionFactory.QUEUE)
         return queueSession;
      else
         return session;
   
protected java.lang.StringgetUserName()
Get the user name for this connection.

return
The user name for this connection.

      return user;
   
public javax.transaction.xa.XAResourcegetXAResource()
Get the XAResource for the connection.

return
The XAResource for the connection.
throws
ResourceException XA transaction not supported

      //
      // Spec says a mc must allways return the same XA resource,
      // so we cache it.
      //
      if (!xaTransacted)
         throw new NotSupportedException("Non XA transaction not supported");

      if (xaResource == null)
      {
         if (info.getType() == JmsConnectionFactory.TOPIC)
            xaResource = xaTopicSession.getXAResource();
         else if (info.getType() == JmsConnectionFactory.QUEUE)
            xaResource = xaQueueSession.getXAResource();
         else
            xaResource = xaSession.getXAResource();
      }

      if (log.isTraceEnabled())
         log.trace("XAResource=" + xaResource);

      return xaResource;
   
public voidonException(javax.jms.JMSException exception)

      if (isDestroyed)
      {
         if (log.isTraceEnabled())
            log.trace("Ignoring error on already destroyed connection " + this, exception);
         return;
      }

      log.warn("Handling jms exception failure: " + this, exception);

      try
      {
         con.setExceptionListener(null);
      }
      catch (JMSException e)
      {
         log.debug("Unable to unset exception listener", e);
      }
      
      ConnectionEvent event = new ConnectionEvent(this, ConnectionEvent.CONNECTION_ERROR_OCCURRED, exception);
      sendEvent(event);
   
public voidremoveConnectionEventListener(javax.resource.spi.ConnectionEventListener l)
Remove a connection event listener.

param
l The connection event listener to be removed.

      listeners.removeElement(l);
   
protected voidremoveHandle(JmsSession handle)
Remove a handle from the handle map.

param
handle The handle to remove.

      handles.remove(handle);
   
protected voidsendEvent(javax.resource.spi.ConnectionEvent event)
Send an event.

param
event The event to send.

      int type = event.getId();

      if (log.isTraceEnabled())
         log.trace("Sending connection event: " + type);

      // convert to an array to avoid concurrent modification exceptions
      ConnectionEventListener[] list =
         (ConnectionEventListener[])listeners.toArray(new ConnectionEventListener[listeners.size()]);

      for (int i=0; i<list.length; i++)
      {
         switch (type) {
            case ConnectionEvent.CONNECTION_CLOSED:
               list[i].connectionClosed(event);
               break;

            case ConnectionEvent.LOCAL_TRANSACTION_STARTED:
               list[i].localTransactionStarted(event);
               break;

            case ConnectionEvent.LOCAL_TRANSACTION_COMMITTED:
               list[i].localTransactionCommitted(event);
               break;

            case ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK:
               list[i].localTransactionRolledback(event);
               break;

            case ConnectionEvent.CONNECTION_ERROR_OCCURRED:
               list[i].connectionErrorOccurred(event);
               break;

            default:
               throw new IllegalArgumentException("Illegal eventType: " + type);
         }
      }
   
public voidsetLogWriter(java.io.PrintWriter out)
Set the log writer for this connection.

param
out The log writer for this connection.
throws
ResourceException

      //
      // jason: screw the logWriter stuff for now it sucks ass
      //
   
private voidsetup()
Setup the connection.

throws
ResourceException

      boolean trace = log.isTraceEnabled();

      try
      {
         JMSProviderAdapter adapter = getProviderAdapter();
         Context context = adapter.getInitialContext();
         Object factory;
         boolean transacted = info.isTransacted();
         int ack = Session.AUTO_ACKNOWLEDGE;

         if (info.getType() == JmsConnectionFactory.TOPIC)
         {
            String jndi = adapter.getTopicFactoryRef();
            if (jndi == null)
               throw new IllegalStateException("No configured 'TopicFactoryRef' on the jms provider " + mcf.getJmsProviderAdapterJNDI());
            factory = context.lookup(jndi);
            con = ConnectionFactoryHelper.createTopicConnection(factory, user, pwd);
            if (info.getClientID() != null)
               con.setClientID(info.getClientID());
            con.setExceptionListener(this);
            if (trace)
               log.trace("created connection: " + con);

            if (con instanceof XATopicConnection)
            {
               xaTopicSession = ((XATopicConnection)con).createXATopicSession();
               topicSession = xaTopicSession.getTopicSession();
               xaTransacted = true;
            }
            else if (con instanceof TopicConnection)
            {
               topicSession =
                  ((TopicConnection)con).createTopicSession(transacted, ack);
               if (trace)
                  log.trace("Using a non-XA TopicConnection.  " +
                            "It will not be able to participate in a Global UOW");
            }
            else
               throw new JBossResourceException("Connection was not recognizable: " + con);

            if (trace)
               log.trace("xaTopicSession=" + xaTopicSession + ", topicSession=" + topicSession);
         }
         else if (info.getType() == JmsConnectionFactory.QUEUE)
         {
            String jndi = adapter.getQueueFactoryRef();
            if (jndi == null)
               throw new IllegalStateException("No configured 'QueueFactoryRef' on the jms provider " + mcf.getJmsProviderAdapterJNDI());
            factory = context.lookup(jndi);
            con = ConnectionFactoryHelper.createQueueConnection(factory, user, pwd);
            if (info.getClientID() != null)
               con.setClientID(info.getClientID());
            con.setExceptionListener(this);
            if (trace) 
               log.debug("created connection: " + con);

            if (con instanceof XAQueueConnection)
            {
               xaQueueSession =
                  ((XAQueueConnection)con).createXAQueueSession();
               queueSession = xaQueueSession.getQueueSession();
               xaTransacted = true;
            }
            else if (con instanceof QueueConnection)
            {
               queueSession =
                  ((QueueConnection)con).createQueueSession(transacted, ack);
               if (trace)
                  log.trace("Using a non-XA QueueConnection.  " +
                            "It will not be able to participate in a Global UOW");
            }
            else
               throw new JBossResourceException("Connection was not reconizable: " + con);

            if (trace)
               log.trace("xaQueueSession=" + xaQueueSession + ", queueSession=" + queueSession);
         }
         else
         {
            String jndi = adapter.getFactoryRef();
            if (jndi == null)
               throw new IllegalStateException("No configured 'FactoryRef' on the jms provider " + mcf.getJmsProviderAdapterJNDI());
            factory = context.lookup(jndi);
            con = ConnectionFactoryHelper.createConnection(factory, user, pwd);
            if (info.getClientID() != null)
               con.setClientID(info.getClientID());
            con.setExceptionListener(this);
            if (trace) 
               log.trace("created connection: " + con);

            if (con instanceof XAConnection)
            {
               xaSession =
                  ((XAConnection)con).createXASession();
               session = xaSession.getSession();
               xaTransacted = true;
            }
            else
            {
               session = con.createSession(transacted, ack);
               if (trace)
                  log.trace("Using a non-XA Connection.  " +
                            "It will not be able to participate in a Global UOW");
            }

            if (trace)
               log.debug("xaSession=" + xaQueueSession + ", Session=" + session);
         }

         if (trace)
            log.debug("transacted=" + transacted + ", ack=" + ack);
      }
      catch (NamingException e)
      {
         throw new JBossResourceException("Unable to setup connection", e);
      }
      catch (JMSException e)
      {
         throw new JBossResourceException("Unable to setup connection", e);
      }
   
voidstart()

      con.start();
   
voidstop()

      con.stop();