Methods Summary |
---|
public void | activate()
|
public void | addAllContentProperties(java.util.Map properties)
this.contentProps.putAll(properties);
|
private int | calcPayloadSizeToSend(int readyBytesToSend)
int payloadLength = outputBuffer.remaining();
if (readyBytesToSend > frameSize) {
payloadLength -= (readyBytesToSend - frameSize);
}
return payloadLength;
|
public void | close()
|
private void | flushBuffer()
final int payloadLength = outputBuffer.remaining();
if (!isDirectMode) {
headerBuffer.clear();
// Write channel-id
int frameMessageIdHighValue = DataInOutUtils.writeInt4(headerBuffer, channelId, 0, false);
int frameMessageIdPosition = headerBuffer.position();
boolean isFrameWithParameters = FrameType.isFrameContainsParams(messageId) && frameNumber == 0;
// Write message-id without counting with possible chunking
int highValue = DataInOutUtils.writeInt4(headerBuffer, messageId, frameMessageIdHighValue, !isFrameWithParameters);
if (isFrameWithParameters) {
// If required - serialize frame content-id, content-parameters
// Write content-id
highValue = DataInOutUtils.writeInt4(headerBuffer, contentId, highValue, false);
final int propsCount = contentProps.size();
// Write number-of-parameters
highValue = DataInOutUtils.writeInt4(headerBuffer, propsCount, highValue, propsCount == 0);
for(Map.Entry<Integer, String> entry : contentProps.entrySet()) {
final String value = entry.getValue();
byte[] valueBytes = value.getBytes(TCPConstants.UTF8);
// Write parameter-id
highValue = DataInOutUtils.writeInt4(headerBuffer, entry.getKey(), highValue, false);
// Write parameter-value buffer length
DataInOutUtils.writeInt4(headerBuffer, valueBytes.length, highValue, true);
// Write parameter-value
headerBuffer.put(valueBytes);
highValue = 0;
}
}
int readyBytesToSend = headerBuffer.position() + payloadlengthLength + payloadLength;
if (messageId == FrameType.MESSAGE) {
// If message will be chunked - update message-id
updateMessageIdIfRequired(frameMessageIdPosition,
frameMessageIdHighValue,
isFlushLast && readyBytesToSend <= frameSize);
}
final int sendingPayloadLength = calcPayloadSizeToSend(readyBytesToSend);
// Write payload-length
DataInOutUtils.writeInt8(headerBuffer, sendingPayloadLength);
headerBuffer.flip();
final int payloadLimit = outputBuffer.limit();
if (sendingPayloadLength < payloadLength) {
// check to change for outputBuffer.limit(sendingPayloadLength);
outputBuffer.limit(outputBuffer.limit() - (payloadLength - sendingPayloadLength));
}
OutputWriter.flushChannel(socketChannel, frame);
outputBuffer.limit(payloadLimit);
sentMessageLength += sendingPayloadLength;
frameNumber++;
} else {
OutputWriter.flushChannel(socketChannel, outputBuffer);
}
|
private void | flushFrame()
outputBuffer.flip();
flushBuffer();
outputBuffer.compact();
|
public void | flushLast()
if (!isFlushLast) {
outputBuffer.flip();
isFlushLast = true;
do {
flushBuffer();
} while(outputBuffer.hasRemaining());
outputBuffer.clear();
}
|
private void | formFrameBufferArray()
frame[0] = headerBuffer;
frame[1] = outputBuffer;
|
public boolean | isDirectMode()
return isDirectMode;
|
public void | passivate()
reset();
socketChannel = null;
|
public void | reset()
outputBuffer.clear();
headerBuffer.clear();
messageId = -1;
contentId = -1;
contentProps.clear();
frameNumber = 0;
isFlushLast = false;
sentMessageLength = 0;
|
public void | setChannelId(int channelId)
this.channelId = channelId;
|
public void | setContentId(int contentId)
this.contentId = contentId;
|
public void | setContentProperty(int key, java.lang.String value)
this.contentProps.put(key, value);
|
public void | setDirectMode(boolean isDirectMode)
reset();
this.isDirectMode = isDirectMode;
|
public void | setFrameSize(int frameSize)
this.frameSize = frameSize;
payloadlengthLength = (int) Math.ceil(Math.log(frameSize) / Math.log(2));
outputBuffer = ByteBufferFactory.allocateView(frameSize, useDirectBuffer);
formFrameBufferArray();
|
public void | setMessageId(int messageId)
this.messageId = messageId;
|
public void | setSocketChannel(java.nio.channels.SocketChannel socketChannel)
this.socketChannel = socketChannel;
|
private void | updateMessageIdIfRequired(int frameMessageIdPosition, int frameMessageIdHighValue, boolean isLastFrame)
int frameMessageId;
if (isLastFrame) {
if (frameNumber != 0) {
frameMessageId = FrameType.MESSAGE_END_CHUNK;
} else {
// Serialized message-id is correct
return;
}
} else if (frameNumber == 0) {
frameMessageId = FrameType.MESSAGE_START_CHUNK;
} else {
frameMessageId = FrameType.MESSAGE_CHUNK;
}
// merge message-id Integer4 data with next value
if (frameMessageIdHighValue != 0) {
// merge message-id as lower octet nibble
headerBuffer.put(frameMessageIdPosition, (byte) ((frameMessageIdHighValue & 0x70) | frameMessageId));
} else {
// merge message-id as higher octet nibble
int value = headerBuffer.get(frameMessageIdPosition);
headerBuffer.put(frameMessageIdPosition, (byte) ((frameMessageId << 4) | (value & 0xF)));
}
|
public void | write(int data)
if (!outputBuffer.hasRemaining()) {
flushFrame();
}
outputBuffer.put((byte) data);
|
public void | write(byte[] data, int offset, int size)
while(size > 0) {
final int bytesToWrite = Math.min(size, outputBuffer.remaining());
outputBuffer.put(data, offset, bytesToWrite);
size -= bytesToWrite;
offset += bytesToWrite;
if (!outputBuffer.hasRemaining() && size > 0) {
flushFrame();
}
}
|