Methods Summary |
---|
private void | completeConnect()
//we connected, register ourselves for writing
setConnected(true);
connecting = false;
setRequestCount(0);
setConnectTime(System.currentTimeMillis());
socketChannel.socket().setSendBufferSize(getTxBufSize());
socketChannel.socket().setReceiveBufferSize(getRxBufSize());
socketChannel.socket().setSoTimeout((int)getTimeout());
socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerOn()?getSoLingerTime():0);
socketChannel.socket().setTcpNoDelay(getTcpNoDelay());
socketChannel.socket().setKeepAlive(getSoKeepAlive());
socketChannel.socket().setReuseAddress(getSoReuseAddress());
socketChannel.socket().setOOBInline(getOoBInline());
socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
socketChannel.socket().setTrafficClass(getSoTrafficClass());
|
public synchronized void | connect()connect - blocking in this operation
if ( connecting ) return;
connecting = true;
if ( isConnected() ) throw new IOException("NioSender is already in connected state.");
if ( readbuf == null ) {
readbuf = getReadBuffer();
} else {
readbuf.clear();
}
if ( writebuf == null ) {
writebuf = getWriteBuffer();
} else {
writebuf.clear();
}
InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort());
if ( socketChannel != null ) throw new IOException("Socket channel has already been established. Connection might be in progress.");
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
if ( socketChannel.connect(addr) ) {
completeConnect();
socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this);
} else {
socketChannel.register(getSelector(), SelectionKey.OP_CONNECT, this);
}
|
public void | disconnect()disconnect
try {
connecting = false;
setConnected(false);
if ( socketChannel != null ) {
try {
try {socketChannel.socket().close();}catch ( Exception x){}
//error free close, all the way
//try {socket.shutdownOutput();}catch ( Exception x){}
//try {socket.shutdownInput();}catch ( Exception x){}
//try {socket.close();}catch ( Exception x){}
try {socketChannel.close();}catch ( Exception x){}
}finally {
socketChannel = null;
}
}
} catch ( Exception x ) {
log.error("Unable to disconnect NioSender. msg="+x.getMessage());
if ( log.isDebugEnabled() ) log.debug("Unable to disconnect NioSender. msg="+x.getMessage(),x);
} finally {
}
|
private java.nio.ByteBuffer | getBuffer(int size)
return (getDirectBuffer()?ByteBuffer.allocateDirect(size):ByteBuffer.allocate(size));
|
public byte[] | getMessage()
return current;
|
private java.nio.ByteBuffer | getReadBuffer()
return getBuffer(getRxBufSize());
|
public java.nio.channels.Selector | getSelector()
return selector;
|
private java.nio.ByteBuffer | getWriteBuffer()
return getBuffer(getTxBufSize());
|
public boolean | isComplete()
return complete;
|
public boolean | process(java.nio.channels.SelectionKey key, boolean waitForAck)State machine to send data
int ops = key.readyOps();
key.interestOps(key.interestOps() & ~ops);
//in case disconnect has been called
if ((!isConnected()) && (!connecting)) throw new IOException("Sender has been disconnected, can't selection key.");
if ( !key.isValid() ) throw new IOException("Key is not valid, it must have been cancelled.");
if ( key.isConnectable() ) {
if ( socketChannel.finishConnect() ) {
completeConnect();
if ( current != null ) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
return false;
} else {
//wait for the connection to finish
key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT);
return false;
}//end if
} else if ( key.isWritable() ) {
boolean writecomplete = write(key);
if ( writecomplete ) {
//we are completed, should we read an ack?
if ( waitForAck ) {
//register to read the ack
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
} else {
//if not, we are ready, setMessage will reregister us for another write interest
//do a health check, we have no way of verify a disconnected
//socket since we don't register for OP_READ on waitForAck=false
read(key);//this causes overhead
setRequestCount(getRequestCount()+1);
return true;
}
} else {
//we are not complete, lets write some more
key.interestOps(key.interestOps()|SelectionKey.OP_WRITE);
}//end if
} else if ( key.isReadable() ) {
boolean readcomplete = read(key);
if ( readcomplete ) {
setRequestCount(getRequestCount()+1);
return true;
} else {
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
}//end if
} else {
//unknown state, should never happen
log.warn("Data is in unknown state. readyOps="+ops);
throw new IOException("Data is in unknown state. readyOps="+ops);
}//end if
return false;
|
protected boolean | read(java.nio.channels.SelectionKey key)
//if there is no message here, we are done
if ( current == null ) return true;
int read = socketChannel.read(readbuf);
//end of stream
if ( read == -1 ) throw new IOException("Unable to receive an ack message. EOF on socket channel has been reached.");
//no data read
else if ( read == 0 ) return false;
readbuf.flip();
ackbuf.append(readbuf,read);
readbuf.clear();
if (ackbuf.doesPackageExist() ) {
byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes();
boolean ack = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA);
boolean fack = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
if ( fack && getThrowOnFailedAck() ) throw new RemoteProcessException("Received a failed ack:org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA");
return ack || fack;
} else {
return false;
}
|
public void | reset()
if ( isConnected() && readbuf == null) {
readbuf = getReadBuffer();
}
if ( readbuf != null ) readbuf.clear();
if ( writebuf != null ) writebuf.clear();
current = null;
ackbuf.clear();
remaining = 0;
complete = false;
setAttempt(0);
setRequestCount(0);
setConnectTime(-1);
|
public void | setComplete(boolean complete)
this.complete = complete;
|
public synchronized void | setMessage(byte[] data)sendMessage
setMessage(data,0,data.length);
|
public synchronized void | setMessage(byte[] data, int offset, int length)
if ( data != null ) {
current = data;
remaining = length;
ackbuf.clear();
if ( writebuf != null ) writebuf.clear();
else writebuf = getBuffer(length);
if ( writebuf.capacity() < length ) writebuf = getBuffer(length);
writebuf.put(data,offset,length);
//writebuf.rewind();
//set the limit so that we don't write non wanted data
//writebuf.limit(length);
writebuf.flip();
if (isConnected()) {
socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this);
}
}
|
public void | setSelector(java.nio.channels.Selector selector)
this.selector = selector;
|
protected boolean | write(java.nio.channels.SelectionKey key)
if ( (!isConnected()) || (this.socketChannel==null)) {
throw new IOException("NioSender is not connected, this should not occur.");
}
if ( current != null ) {
if ( remaining > 0 ) {
//weve written everything, or we are starting a new package
//protect against buffer overwrite
int byteswritten = socketChannel.write(writebuf);
if (byteswritten == -1 ) throw new EOFException();
remaining -= byteswritten;
//if the entire message was written from the buffer
//reset the position counter
if ( remaining < 0 ) {
remaining = 0;
}
}
return (remaining==0);
}
//no message to send, we can consider that complete
return true;
|