FileDocCategorySizeDatePackage
NioReceiver.javaAPI DocApache Tomcat 6.0.1415718Fri Jul 20 04:20:32 BST 2007org.apache.catalina.tribes.transport.nio

NioReceiver

public class NioReceiver extends org.apache.catalina.tribes.transport.ReceiverBase implements org.apache.catalina.tribes.ChannelReceiver, Runnable, org.apache.catalina.tribes.io.ListenCallback
author
Filip Hanik
version
$Revision: 538977 $ $Date: 2007-05-17 17:43:49 +0200 (jeu., 17 mai 2007) $

Fields Summary
protected static org.apache.juli.logging.Log
log
protected org.apache.catalina.tribes.util.StringManager
sm
The string manager for this package.
private static final String
info
The descriptive information about this implementation.
private Selector
selector
private ServerSocketChannel
serverChannel
protected LinkedList
events
protected long
lastCheck
Constructors Summary
public NioReceiver()

//    private Object interestOpsMutex = new Object();

      
    
Methods Summary
public voidaddEvent(java.lang.Runnable event)

        if ( selector != null ) {
            synchronized (events) {
                events.add(event);
            }
            if ( log.isTraceEnabled() ) log.trace("Adding event to selector:"+event);
            if ( isListening() && selector!=null ) selector.wakeup();
        }
    
protected voidbind()

        // allocate an unbound server socket channel
        serverChannel = ServerSocketChannel.open();
        // Get the associated ServerSocket to bind it with
        ServerSocket serverSocket = serverChannel.socket();
        // create a new Selector for use below
        selector = Selector.open();
        // set the port the server channel will listen to
        //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
        bind(serverSocket,getTcpListenPort(),getAutoBind());
        // set non-blocking mode for the listening socket
        serverChannel.configureBlocking(false);
        // register the ServerSocketChannel with the Selector
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        
    
public static voidcancelledKey(java.nio.channels.SelectionKey key)

        ObjectReader reader = (ObjectReader)key.attachment();
        if ( reader != null ) {
            reader.setCancelled(true);
            reader.finish();
        }
        key.cancel(); 
        key.attach(null);
        try { ((SocketChannel)key.channel()).socket().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); }
        try { key.channel().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); }
        
    
public org.apache.catalina.tribes.transport.AbstractRxTaskcreateRxTask()

        NioReplicationTask thread = new NioReplicationTask(this,this);
        thread.setUseBufferPool(this.getUseBufferPool());
        thread.setRxBufSize(getRxBufSize());
        thread.setOptions(getWorkerThreadOptions());
        return thread;
    
public voidevents()

        if ( events.size() == 0 ) return;
        synchronized (events) {
            Runnable r = null;
            while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) {
                try {
                    if ( log.isTraceEnabled() ) log.trace("Processing event in selector:"+r);
                    r.run();
                } catch ( Exception x ) {
                    log.error("",x);
                }
            }
            events.clear();
        }
    
public java.lang.StringgetInfo()
Return descriptive information about this implementation and the corresponding version number, in the format <description>/<version>.

        return (info);
    
protected voidlisten()
get data from channel and store in byte array send it to cluster

throws
IOException
throws
java.nio.channels.ClosedChannelException

        if (doListen()) {
            log.warn("ServerSocketChannel already started");
            return;
        }
        
        setListen(true);

        while (doListen() && selector != null) {
            // this may block for a long time, upon return the
            // selected set contains keys of the ready channels
            try {
                events();
                socketTimeouts();
                int n = selector.select(getTcpSelectorTimeout());
                if (n == 0) {
                    //there is a good chance that we got here
                    //because the TcpReplicationThread called
                    //selector wakeup().
                    //if that happens, we must ensure that that
                    //thread has enough time to call interestOps
//                    synchronized (interestOpsMutex) {
                        //if we got the lock, means there are no
                        //keys trying to register for the
                        //interestOps method
//                    }
                    continue; // nothing to do
                }
                // get an iterator over the set of selected keys
                Iterator it = selector.selectedKeys().iterator();
                // look at each key in the selected set
                while (it.hasNext()) {
                    SelectionKey key = (SelectionKey) it.next();
                    // Is a new connection coming in?
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel channel = server.accept();
                        channel.socket().setReceiveBufferSize(getRxBufSize());
                        channel.socket().setSendBufferSize(getTxBufSize());
                        channel.socket().setTcpNoDelay(getTcpNoDelay());
                        channel.socket().setKeepAlive(getSoKeepAlive());
                        channel.socket().setOOBInline(getOoBInline());
                        channel.socket().setReuseAddress(getSoReuseAddress());
                        channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
                        channel.socket().setTrafficClass(getSoTrafficClass());
                        channel.socket().setSoTimeout(getTimeout());
                        Object attach = new ObjectReader(channel);
                        registerChannel(selector,
                                        channel,
                                        SelectionKey.OP_READ,
                                        attach);
                    }
                    // is there data to read on this channel?
                    if (key.isReadable()) {
                        readDataFromSocket(key);
                    } else {
                        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
                    }

                    // remove key from selected set, it's been handled
                    it.remove();
                }
            } catch (java.nio.channels.ClosedSelectorException cse) {
                // ignore is normal at shutdown or stop listen socket
            } catch (java.nio.channels.CancelledKeyException nx) {
                log.warn("Replication client disconnected, error when polling key. Ignoring client.");
            } catch (Throwable x) {
                try {
                    log.error("Unable to process request in NioReceiver", x);
                }catch ( Throwable tx ) {
                    //in case an out of memory error, will affect the logging framework as well
                    tx.printStackTrace();
                }
            }

        }
        serverChannel.close();
        if (selector != null)
            selector.close();
    
protected voidreadDataFromSocket(java.nio.channels.SelectionKey key)
Sample data handler method for a channel with data ready to read.

param
key A SelectionKey object associated with a channel determined by the selector to be ready for reading. If the channel returns an EOF condition, it is closed here, which automatically invalidates the associated key. The selector will then de-register the channel on the next select call.

        NioReplicationTask task = (NioReplicationTask) getTaskPool().getRxTask();
        if (task == null) {
            // No threads/tasks available, do nothing, the selection
            // loop will keep calling this method until a
            // thread becomes available, the thread pool itself has a waiting mechanism
            // so we will not wait here.
            if (log.isDebugEnabled()) log.debug("No TcpReplicationThread available");
        } else {
            // invoking this wakes up the worker thread then returns
            //add task to thread pool
            task.serviceChannel(key);
            getExecutor().execute(task);
        }
    
protected voidregisterChannel(java.nio.channels.Selector selector, java.nio.channels.SelectableChannel channel, int ops, java.lang.Object attach)
Register the given channel with the given selector for the given operations of interest

        if (channel == null)return; // could happen
        // set the new channel non-blocking
        channel.configureBlocking(false);
        // register it with the selector
        channel.register(selector, ops, attach);
    
public voidrun()
Start thread and listen

        try {
            listen();
        } catch (Exception x) {
            log.error("Unable to run replication listener.", x);
        }
    
protected voidsocketTimeouts()

       
        long now = System.currentTimeMillis();
        if ( (now-lastCheck) < getSelectorTimeout() ) return;
        //timeout
        Selector tmpsel = selector;
        Set keys =  (isListening()&&tmpsel!=null)?tmpsel.keys():null;
        if ( keys == null ) return;
        for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
            SelectionKey key = (SelectionKey) iter.next();
            try {
//                if (key.interestOps() == SelectionKey.OP_READ) {
//                    //only timeout sockets that we are waiting for a read from
//                    ObjectReader ka = (ObjectReader) key.attachment();
//                    long delta = now - ka.getLastAccess();
//                    if (delta > (long) getTimeout()) {
//                        cancelledKey(key);
//                    }
//                }
//                else
                if ( key.interestOps() == 0 ) {
                    //check for keys that didn't make it in.
                    ObjectReader ka = (ObjectReader) key.attachment();
                    if ( ka != null ) {
                        long delta = now - ka.getLastAccess();
                        if (delta > (long) getTimeout() && (!ka.isAccessed())) {
                            log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new java.sql.Timestamp(ka.getLastAccess()));
//                            System.out.println("Interest:"+key.interestOps());
//                            System.out.println("Ready Ops:"+key.readyOps());
//                            System.out.println("Valid:"+key.isValid());
                            ka.setLastAccess(now);
                            //key.interestOps(SelectionKey.OP_READ);
                        }//end if
                    } else {
                        cancelledKey(key);
                    }//end if
                }//end if
            }catch ( CancelledKeyException ckx ) {
                cancelledKey(key);
            }
        }
        lastCheck = System.currentTimeMillis();
    
public voidstart()
start cluster receiver

throws
Exception
see
org.apache.catalina.tribes.ClusterReceiver#start()

        super.start();
        try {
            setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this));
        } catch (Exception x) {
            log.fatal("ThreadPool can initilzed. Listener not started", x);
            if ( x instanceof IOException ) throw (IOException)x;
            else throw new IOException(x.getMessage());
        }
        try {
            getBind();
            bind();
            Thread t = new Thread(this, "NioReceiver");
            t.setDaemon(true);
            t.start();
        } catch (Exception x) {
            log.fatal("Unable to start cluster receiver", x);
            if ( x instanceof IOException ) throw (IOException)x;
            else throw new IOException(x.getMessage());
        }
    
public voidstop()

        this.stopListening();
        super.stop();
    
protected voidstopListening()
Close Selector.

see
org.apache.catalina.tribes.transport.ClusterReceiverBase#stopListening()

        setListen(false);
        if (selector != null) {
            try {
                selector.wakeup();
                selector.close();
            } catch (Exception x) {
                log.error("Unable to close cluster receiver selector.", x);
            } finally {
                selector = null;
            }
        }