FileDocCategorySizeDatePackage
NioSender.javaAPI DocApache Tomcat 6.0.1412971Fri Jul 20 04:20:36 BST 2007org.apache.catalina.tribes.transport.nio

NioSender

public class NioSender extends org.apache.catalina.tribes.transport.AbstractSender implements org.apache.catalina.tribes.transport.DataSender
This class is NOT thread safe and should never be used with more than one thread at a time This is a state machine, handled by the process method States are: - NOT_CONNECTED -> connect() -> CONNECTED - CONNECTED -> setMessage() -> READY TO WRITE - READY_TO_WRITE -> write() -> READY TO WRITE | READY TO READ - READY_TO_READ -> read() -> READY_TO_READ | TRANSFER_COMPLETE - TRANSFER_COMPLETE -> CONNECTED
author
Filip Hanik
version
1.0

Fields Summary
protected static org.apache.juli.logging.Log
log
protected Selector
selector
protected SocketChannel
socketChannel
protected ByteBuffer
readbuf
protected ByteBuffer
writebuf
protected byte[]
current
protected org.apache.catalina.tribes.io.XByteBuffer
ackbuf
protected int
remaining
protected boolean
complete
protected boolean
connecting
Constructors Summary
public NioSender()

    
      
        super();
        
    
Methods Summary
private voidcompleteConnect()

        //we connected, register ourselves for writing
        setConnected(true);
        connecting = false;
        setRequestCount(0);
        setConnectTime(System.currentTimeMillis());
        socketChannel.socket().setSendBufferSize(getTxBufSize());
        socketChannel.socket().setReceiveBufferSize(getRxBufSize());
        socketChannel.socket().setSoTimeout((int)getTimeout());
        socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerOn()?getSoLingerTime():0);
        socketChannel.socket().setTcpNoDelay(getTcpNoDelay());
        socketChannel.socket().setKeepAlive(getSoKeepAlive());
        socketChannel.socket().setReuseAddress(getSoReuseAddress());
        socketChannel.socket().setOOBInline(getOoBInline());
        socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
        socketChannel.socket().setTrafficClass(getSoTrafficClass());
    
public synchronized voidconnect()
connect - blocking in this operation

throws
IOException
todo
Implement this org.apache.catalina.tribes.transport.IDataSender method

        if ( connecting ) return;
        connecting = true;
        if ( isConnected() ) throw new IOException("NioSender is already in connected state.");
        if ( readbuf == null ) {
            readbuf = getReadBuffer();
        } else {
            readbuf.clear();
        }
        if ( writebuf == null ) {
            writebuf = getWriteBuffer();
        } else {
            writebuf.clear();
        }
        
        InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort());
        if ( socketChannel != null ) throw new IOException("Socket channel has already been established. Connection might be in progress.");
        socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        if ( socketChannel.connect(addr) ) {
            completeConnect();
            socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this);
        } else {
            socketChannel.register(getSelector(), SelectionKey.OP_CONNECT, this);
        }
    
public voiddisconnect()
disconnect

todo
Implement this org.apache.catalina.tribes.transport.IDataSender method

        try {
            connecting = false;
            setConnected(false);
            if ( socketChannel != null ) {
                try {
                    try {socketChannel.socket().close();}catch ( Exception x){}
                    //error free close, all the way
                    //try {socket.shutdownOutput();}catch ( Exception x){}
                    //try {socket.shutdownInput();}catch ( Exception x){}
                    //try {socket.close();}catch ( Exception x){}
                    try {socketChannel.close();}catch ( Exception x){}
                }finally {
                    socketChannel = null;
                }
            }
        } catch ( Exception x ) {
            log.error("Unable to disconnect NioSender. msg="+x.getMessage());
            if ( log.isDebugEnabled() ) log.debug("Unable to disconnect NioSender. msg="+x.getMessage(),x);
        } finally {
        }

    
private java.nio.ByteBuffergetBuffer(int size)

        return (getDirectBuffer()?ByteBuffer.allocateDirect(size):ByteBuffer.allocate(size));
    
public byte[]getMessage()

       return current;
   
private java.nio.ByteBuffergetReadBuffer()

 
        return getBuffer(getRxBufSize());
    
public java.nio.channels.SelectorgetSelector()

        return selector;
    
private java.nio.ByteBuffergetWriteBuffer()

        return getBuffer(getTxBufSize());
    
public booleanisComplete()

        return complete;
    
public booleanprocess(java.nio.channels.SelectionKey key, boolean waitForAck)
State machine to send data

param
key SelectionKey
return
boolean
throws
IOException

        int ops = key.readyOps();
        key.interestOps(key.interestOps() & ~ops);
        //in case disconnect has been called
        if ((!isConnected()) && (!connecting)) throw new IOException("Sender has been disconnected, can't selection key.");
        if ( !key.isValid() ) throw new IOException("Key is not valid, it must have been cancelled.");
        if ( key.isConnectable() ) {
            if ( socketChannel.finishConnect() ) {
                completeConnect();
                if ( current != null ) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
                return false;
            } else  { 
                //wait for the connection to finish
                key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT);
                return false;
            }//end if
        } else if ( key.isWritable() ) {
            boolean writecomplete = write(key);
            if ( writecomplete ) {
                //we are completed, should we read an ack?
                if ( waitForAck ) {
                    //register to read the ack
                    key.interestOps(key.interestOps() | SelectionKey.OP_READ);
                } else {
                    //if not, we are ready, setMessage will reregister us for another write interest
                    //do a health check, we have no way of verify a disconnected
                    //socket since we don't register for OP_READ on waitForAck=false
                    read(key);//this causes overhead
                    setRequestCount(getRequestCount()+1);
                    return true;
                }
            } else {
                //we are not complete, lets write some more
                key.interestOps(key.interestOps()|SelectionKey.OP_WRITE);
            }//end if
        } else if ( key.isReadable() ) {
            boolean readcomplete = read(key);
            if ( readcomplete ) {
                setRequestCount(getRequestCount()+1);
                return true;
            } else {
                key.interestOps(key.interestOps() | SelectionKey.OP_READ);
            }//end if
        } else {
            //unknown state, should never happen
            log.warn("Data is in unknown state. readyOps="+ops);
            throw new IOException("Data is in unknown state. readyOps="+ops);
        }//end if
        return false;
    
protected booleanread(java.nio.channels.SelectionKey key)

        //if there is no message here, we are done
        if ( current == null ) return true;
        int read = socketChannel.read(readbuf);
        //end of stream
        if ( read == -1 ) throw new IOException("Unable to receive an ack message. EOF on socket channel has been reached.");
        //no data read
        else if ( read == 0 ) return false;
        readbuf.flip();
        ackbuf.append(readbuf,read);
        readbuf.clear();
        if (ackbuf.doesPackageExist() ) {
            byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes();
            boolean ack = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA);
            boolean fack = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
            if ( fack && getThrowOnFailedAck() ) throw new RemoteProcessException("Received a failed ack:org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA");
            return ack || fack;
        } else {
            return false;
        }
    
public voidreset()

        if ( isConnected() && readbuf == null) {
            readbuf = getReadBuffer();
        }
        if ( readbuf != null ) readbuf.clear();
        if ( writebuf != null ) writebuf.clear();
        current = null;
        ackbuf.clear();
        remaining = 0;
        complete = false;
        setAttempt(0);
        setRequestCount(0);
        setConnectTime(-1);
    
public voidsetComplete(boolean complete)

        this.complete = complete;
    
public synchronized voidsetMessage(byte[] data)
sendMessage

param
data ChannelMessage
throws
IOException
todo
Implement this org.apache.catalina.tribes.transport.IDataSender method

       setMessage(data,0,data.length);
   
public synchronized voidsetMessage(byte[] data, int offset, int length)

       if ( data != null ) {
           current = data;
           remaining = length;
           ackbuf.clear();
           if ( writebuf != null ) writebuf.clear();
           else writebuf = getBuffer(length);
           if ( writebuf.capacity() < length ) writebuf = getBuffer(length);
           writebuf.put(data,offset,length);
           //writebuf.rewind();
           //set the limit so that we don't write non wanted data
           //writebuf.limit(length);
           writebuf.flip();
           if (isConnected()) {
               socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this);
           }
       } 
   
public voidsetSelector(java.nio.channels.Selector selector)

        this.selector = selector;
    
protected booleanwrite(java.nio.channels.SelectionKey key)

        if ( (!isConnected()) || (this.socketChannel==null)) {
            throw new IOException("NioSender is not connected, this should not occur.");
        }
        if ( current != null ) {
            if ( remaining > 0 ) {
                //weve written everything, or we are starting a new package
                //protect against buffer overwrite
                int byteswritten = socketChannel.write(writebuf);
                if (byteswritten == -1 ) throw new EOFException();
                remaining -= byteswritten;
                //if the entire message was written from the buffer
                //reset the position counter
                if ( remaining < 0 ) {
                    remaining = 0;
                }
            }
            return (remaining==0);
        }
        //no message to send, we can consider that complete
        return true;