Methods Summary |
---|
public void | activate()
|
public int | available()Return the available bytes
return remaining();
|
public void | close()Close this stream.
|
private int | doRead()Read bytes using the read ReadSelector
if ( socketChannel == null ) return -1;
if (isEOF()) {
return -1;
}
if (!byteBuffer.hasRemaining() && byteBuffer.position() >= bufferClearLimit) {
byteBuffer.clear();
}
final int bbPosition = byteBuffer.position();
byteBuffer.limit(byteBuffer.capacity());
int count;
int byteRead = 0;
Selector readSelector = null;
SelectionKey tmpKey = null;
try{
do {
count = socketChannel.read(byteBuffer);
byteRead += count;
} while (count > 0);
if (count == -1 && byteRead >= 0) byteRead++;
if ( byteRead == 0 ){
readSelector = SelectorFactory.getSelector();
if ( readSelector == null ){
return 0;
}
tmpKey = socketChannel
.register(readSelector,SelectionKey.OP_READ);
tmpKey.interestOps(tmpKey.interestOps() | SelectionKey.OP_READ);
final int code = readSelector.select(READ_TIMEOUT);
//Nothing so return.
tmpKey.interestOps(tmpKey.interestOps() & (~SelectionKey.OP_READ));
if ( code == 0 ){
return 0;
}
do {
count = socketChannel.read(byteBuffer);
byteRead += count;
} while (count > 0);
if (count == -1 && byteRead >= 0) byteRead++;
}
} catch (Exception e){
logger.log(Level.SEVERE, MessagesMessages.WSTCP_0018_ERROR_READING_FROM_SOCKET(), e);
return -1;
} finally {
if (tmpKey != null)
tmpKey.cancel();
if ( readSelector != null){
try{
readSelector.selectNow();
} catch (IOException ex){
}
SelectorFactory.returnSelector(readSelector);
}
byteBuffer.flip();
byteBuffer.position(bbPosition);
}
return byteRead;
|
public void | forceHeaderRead()
readHeader();
|
public int | getChannelId()
return channelId;
|
public int | getContentId()
return contentId;
|
public java.util.Map | getContentProperties()
return contentProps;
|
public int | getMessageId()
return messageId;
|
public boolean | isDirectMode()
return isDirectMode;
|
private boolean | isEOF()
return isLastFrame && frameBytesRead >= currentFrameDataSize - 1;
|
public boolean | isMessageInProcess()
if (currentFrameDataSize == 0 || isEOF()) return false;
return true;
|
public boolean | markSupported()Return true if mark is supported.
return false;
|
public void | passivate()
reset();
setSocketChannel(null);
setByteBuffer(null);
|
public int | read()Read the first byte from the wrapped ByteBuffer .
int eof = 0;
if (!byteBuffer.hasRemaining()){
eof = readFromChannel();
} else if (remaining() == 0 && isLastFrame) { // if in buffer there is last frame's tale only
return -1;
}
if (eof == -1 || readFrameHeaderIfRequired() == -1) return -1;
if (byteBuffer.hasRemaining()) {
frameBytesRead++;
receivedMessageLength ++;
return byteBuffer.get() & 0xff;
}
return read();
|
public int | read(byte[] b)Read the bytes from the wrapped ByteBuffer .
return (read(b, 0, b.length));
|
public int | read(byte[] b, int offset, int length)Read the first byte of the wrapped ByteBuffer .
if (!byteBuffer.hasRemaining()) {
final int eof = readFromChannel();
if (eof <= 0){
return -1;
}
} else if (remaining() == 0 && isLastFrame) { // if in buffer there is last frame's tale only
return -1;
}
if (readFrameHeaderIfRequired() == -1) return -1;
//@TODO add logic for reading from several frames if required
final int remaining = remaining();
if (length > remaining) {
length = remaining;
}
byteBuffer.get(b, offset, length);
frameBytesRead += length;
receivedMessageLength += length;
return length;
|
private int | readFrameHeaderIfRequired()
if (!isDirectMode && !isLastFrame && !isReadingHeader && (frameBytesRead == 0 || frameBytesRead == currentFrameDataSize)) {
try {
readHeader();
} catch (IOException ex) {
return -1;
}
}
return 0;
|
private int | readFromChannel()
int eof = 0;
for (int i=0; i < READ_TRY; i++) {
eof = doRead();
if (eof != 0) {
break;
}
}
return eof;
|
private void | readHeader()
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, MessagesMessages.WSTCP_1060_FRAMED_MESSAGE_IS_READ_HEADER_ENTER());
}
frameBytesRead = 0;
isReadingHeader = true;
// Read channel-id and message-id
int lowNeebleValue = DataInOutUtils.readInts4(this, headerTmpArray, 2, 0);
channelId = headerTmpArray[0];
messageId = headerTmpArray[1];
if (FrameType.isFrameContainsParams(messageId)) { //message types have description
// Read content-id and number-of-parameters
lowNeebleValue = DataInOutUtils.readInts4(this, headerTmpArray, 2, lowNeebleValue);
contentId = headerTmpArray[0];
final int paramNumber = headerTmpArray[1];
for(int i=0; i<paramNumber; i++) {
// Read parameter-id and length of parameter-value buffer
DataInOutUtils.readInts4(this, headerTmpArray, 2, lowNeebleValue);
final int paramId = headerTmpArray[0];
final int paramValueLen = headerTmpArray[1];
byte[] paramValueBytes = new byte[paramValueLen];
// Read parameter-value
DataInOutUtils.readFully(this, paramValueBytes);
final String paramValue = new String(paramValueBytes, TCPConstants.UTF8);
contentProps.put(paramId, paramValue);
lowNeebleValue = 0;
}
}
// Read payload-size
currentFrameDataSize = DataInOutUtils.readInt8(this);
isLastFrame = FrameType.isLastFrame(messageId);
currentFrameDataSize += frameBytesRead;
isReadingHeader = false;
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, MessagesMessages.WSTCP_1061_FRAMED_MESSAGE_IS_READ_HEADER_DONE(channelId, messageId, contentId, contentProps, currentFrameDataSize, isLastFrame));
}
|
private int | remaining()
if (isReadingHeader || isDirectMode) {
return byteBuffer.remaining();
}
return Math.min(currentFrameDataSize - frameBytesRead, byteBuffer.remaining());
|
public void | reset()
frameBytesRead = 0;
currentFrameDataSize = 0;
isLastFrame = false;
isReadingHeader = false;
contentId = -1;
messageId = -1;
contentProps.clear();
receivedMessageLength = 0;
|
public void | setByteBuffer(java.nio.ByteBuffer byteBuffer)
this.byteBuffer = byteBuffer;
if (byteBuffer != null) {
bufferClearLimit = byteBuffer.capacity() * 3 / 4;
}
|
public void | setDirectMode(boolean isDirectMode)
reset();
this.isDirectMode = isDirectMode;
|
public void | setFrameSize(int frameSize)
this.frameSize = frameSize;
|
public void | setSocketChannel(java.nio.channels.SocketChannel socketChannel)
this.socketChannel = socketChannel;
|
private void | skipToEndOfFrame()
if (currentFrameDataSize > 0) {
if (byteBuffer.hasRemaining()) {
final int remainFrameBytes = currentFrameDataSize - frameBytesRead;
if (remainFrameBytes <= byteBuffer.remaining()) {
byteBuffer.position(byteBuffer.position() + remainFrameBytes);
return;
}
frameBytesRead += byteBuffer.remaining();
byteBuffer.position(byteBuffer.limit());
}
while(frameBytesRead < currentFrameDataSize) {
final int eof = readFromChannel();
if (eof == -1) {
String errorMessage = MessagesMessages.WSTCP_1062_FRAMED_MESSAGE_IS_READ_UNEXPECTED_EOF(isLastFrame, frameBytesRead, frameSize, currentFrameDataSize);
logger.log(Level.SEVERE, errorMessage);
throw new EOFException(errorMessage);
}
frameBytesRead += eof;
byteBuffer.position(byteBuffer.position() + eof);
}
// if extra frame bytes were read - move position backwards
byteBuffer.position(byteBuffer.position() - (frameBytesRead - currentFrameDataSize));
}
|
public void | skipToEndOfMessage()
do {
readFrameHeaderIfRequired();
skipToEndOfFrame();
frameBytesRead = 0;
} while(!isLastFrame);
|
public java.lang.String | toString()
final StringBuffer buffer = new StringBuffer(100);
buffer.append("ByteBuffer: ");
buffer.append(byteBuffer);
buffer.append(" FrameBytesRead: ");
buffer.append(frameBytesRead);
buffer.append(" CurrentFrameDataSize: ");
buffer.append(currentFrameDataSize);
buffer.append(" isLastFrame: ");
buffer.append(isLastFrame);
buffer.append(" isDirectMode: ");
buffer.append(isDirectMode);
buffer.append(" isReadingHeader: ");
buffer.append(isReadingHeader);
return buffer.toString();
|