FileDocCategorySizeDatePackage
FramedMessageInputStream.javaAPI DocExample16131Tue May 29 16:57:08 BST 2007com.sun.xml.ws.transport.tcp.io

FramedMessageInputStream

public final class FramedMessageInputStream extends InputStream implements com.sun.xml.ws.transport.tcp.pool.LifeCycle
Stream wrapper around a ByteBuffer

Fields Summary
private static final Logger
logger
private ByteBuffer
byteBuffer
private SocketChannel
socketChannel
private int
bufferClearLimit
private static final int
READ_TIMEOUT
The time to wait before timing out when reading bytes
static final int
READ_TRY
Number of times to retry before return EOF
private final int[]
headerTmpArray
private boolean
isDirectMode
is message framed or direct mode is used
private int
frameSize
private int
frameBytesRead
private boolean
isLastFrame
private int
currentFrameDataSize
private int
channelId
private int
contentId
private int
messageId
private final Map
contentProps
private boolean
isReadingHeader
private long
receivedMessageLength
could be useful for debug reasons
Constructors Summary
public FramedMessageInputStream()

    
    // ------------------------------------------------- Constructor -------//
    
    
      
        this(TCPConstants.DEFAULT_FRAME_SIZE);
    
public FramedMessageInputStream(int frameSize)

        setFrameSize(frameSize);
    
Methods Summary
public voidactivate()

    
public intavailable()
Return the available bytes

return
the wrapped byteBuffer.remaining()

        return remaining();
    
public voidclose()
Close this stream.

    
private intdoRead()
Read bytes using the read ReadSelector

        if ( socketChannel == null ) return -1;
        if (isEOF()) {
            return -1;
        }
        
        if (!byteBuffer.hasRemaining() && byteBuffer.position() >= bufferClearLimit) {
            byteBuffer.clear();
        }
        
        final int bbPosition = byteBuffer.position();
        byteBuffer.limit(byteBuffer.capacity());
        
        int count;
        int byteRead = 0;
        Selector readSelector = null;
        SelectionKey tmpKey = null;
        
        try{
            do {
                count = socketChannel.read(byteBuffer);
                byteRead += count;
            } while (count > 0);
            
            if (count == -1 && byteRead >= 0) byteRead++;
            
            if ( byteRead == 0 ){
                readSelector = SelectorFactory.getSelector();
                
                if ( readSelector == null ){
                    return 0;
                }
                tmpKey = socketChannel
                        .register(readSelector,SelectionKey.OP_READ);
                tmpKey.interestOps(tmpKey.interestOps() | SelectionKey.OP_READ);
                final int code = readSelector.select(READ_TIMEOUT);
                
                //Nothing so return.
                tmpKey.interestOps(tmpKey.interestOps() & (~SelectionKey.OP_READ));
                if ( code == 0 ){
                    return 0;
                }
                
                do {
                    count = socketChannel.read(byteBuffer);
                    byteRead += count;
                } while (count > 0);
                if (count == -1 && byteRead >= 0) byteRead++;
            }
        } catch (Exception e){
            logger.log(Level.SEVERE, MessagesMessages.WSTCP_0018_ERROR_READING_FROM_SOCKET(), e);
            return -1;
        } finally {
            if (tmpKey != null)
                tmpKey.cancel();
            
            if ( readSelector != null){
                try{
                    readSelector.selectNow();
                } catch (IOException ex){
                }
                SelectorFactory.returnSelector(readSelector);
            }
            
            byteBuffer.flip();
            byteBuffer.position(bbPosition);
        }
        
        return byteRead;
    
public voidforceHeaderRead()

        readHeader();
    
public intgetChannelId()

        return channelId;
    
public intgetContentId()

        return contentId;
    
public java.util.MapgetContentProperties()

        return contentProps;
    
public intgetMessageId()

        return messageId;
    
public booleanisDirectMode()

        return isDirectMode;
    
private booleanisEOF()

        return isLastFrame && frameBytesRead >= currentFrameDataSize - 1;
    
public booleanisMessageInProcess()

        if (currentFrameDataSize == 0 || isEOF()) return false;
        
        return true;
    
public booleanmarkSupported()
Return true if mark is supported.

        return false;
    
public voidpassivate()

        reset();
        setSocketChannel(null);
        setByteBuffer(null);
    
public intread()
Read the first byte from the wrapped ByteBuffer.

        int eof = 0;
        if (!byteBuffer.hasRemaining()){
            eof = readFromChannel();
        } else if (remaining() == 0 && isLastFrame) {   // if in buffer there is last frame's tale only
            return -1;
        }
        
        if (eof == -1 || readFrameHeaderIfRequired() == -1) return -1;
        
        if (byteBuffer.hasRemaining()) {
            frameBytesRead++;
            receivedMessageLength ++;
            return byteBuffer.get() & 0xff;
        }
        
        return read();
    
public intread(byte[] b)
Read the bytes from the wrapped ByteBuffer.

        return (read(b, 0, b.length));
    
public intread(byte[] b, int offset, int length)
Read the first byte of the wrapped ByteBuffer.

        if (!byteBuffer.hasRemaining()) {
            final int eof = readFromChannel();
            if (eof <= 0){
                return -1;
            }
        } else if (remaining() == 0 && isLastFrame) {   // if in buffer there is last frame's tale only
            return -1;
        }
        
        if (readFrameHeaderIfRequired() == -1) return -1;
        
        //@TODO add logic for reading from several frames if required
        final int remaining = remaining();
        if (length > remaining) {
            length = remaining;
        }
        
        byteBuffer.get(b, offset, length);
        frameBytesRead += length;
        receivedMessageLength += length;
        
        return length;
    
private intreadFrameHeaderIfRequired()

        if (!isDirectMode && !isLastFrame && !isReadingHeader && (frameBytesRead == 0 || frameBytesRead == currentFrameDataSize)) {
            try {
                readHeader();
            } catch (IOException ex) {
                return -1;
            }
        }
        
        return 0;
    
private intreadFromChannel()

        int eof = 0;
        for (int i=0; i < READ_TRY; i++) {
            eof = doRead();
            
            if (eof != 0) {
                break;
            }
        }
        
        return eof;
    
private voidreadHeader()

        if (logger.isLoggable(Level.FINEST)) {
            logger.log(Level.FINEST, MessagesMessages.WSTCP_1060_FRAMED_MESSAGE_IS_READ_HEADER_ENTER());
        }
        frameBytesRead = 0;
        isReadingHeader = true;
        // Read channel-id and message-id
        int lowNeebleValue = DataInOutUtils.readInts4(this, headerTmpArray, 2, 0);
        channelId = headerTmpArray[0];
        messageId = headerTmpArray[1];

        if (FrameType.isFrameContainsParams(messageId)) {  //message types have description
            // Read content-id and number-of-parameters
            lowNeebleValue = DataInOutUtils.readInts4(this, headerTmpArray, 2, lowNeebleValue);
            contentId = headerTmpArray[0];
            final int paramNumber = headerTmpArray[1];
            for(int i=0; i<paramNumber; i++) {
                // Read parameter-id and length of parameter-value buffer
                DataInOutUtils.readInts4(this, headerTmpArray, 2, lowNeebleValue);
                final int paramId = headerTmpArray[0];
                final int paramValueLen = headerTmpArray[1];
                byte[] paramValueBytes = new byte[paramValueLen];
                // Read parameter-value
                DataInOutUtils.readFully(this, paramValueBytes);
                final String paramValue = new String(paramValueBytes, TCPConstants.UTF8);
                contentProps.put(paramId, paramValue);
                lowNeebleValue = 0;
            }
        }
        
        // Read payload-size
        currentFrameDataSize = DataInOutUtils.readInt8(this);
        isLastFrame = FrameType.isLastFrame(messageId);
        currentFrameDataSize += frameBytesRead;
        isReadingHeader = false;
        if (logger.isLoggable(Level.FINEST)) {
            logger.log(Level.FINEST, MessagesMessages.WSTCP_1061_FRAMED_MESSAGE_IS_READ_HEADER_DONE(channelId, messageId, contentId, contentProps, currentFrameDataSize, isLastFrame));
        }
    
private intremaining()

        if (isReadingHeader || isDirectMode) {
            return byteBuffer.remaining();
        }
        
        return Math.min(currentFrameDataSize - frameBytesRead, byteBuffer.remaining());
    
public voidreset()

        frameBytesRead = 0;
        currentFrameDataSize = 0;
        isLastFrame = false;
        isReadingHeader = false;
        contentId = -1;
        messageId = -1;
        contentProps.clear();
        receivedMessageLength = 0;
    
public voidsetByteBuffer(java.nio.ByteBuffer byteBuffer)

        this.byteBuffer = byteBuffer;
        if (byteBuffer != null) {
            bufferClearLimit = byteBuffer.capacity() * 3 / 4;
        }
    
public voidsetDirectMode(boolean isDirectMode)

        reset();
        this.isDirectMode = isDirectMode;
    
public voidsetFrameSize(int frameSize)

        this.frameSize = frameSize;
    
public voidsetSocketChannel(java.nio.channels.SocketChannel socketChannel)

        this.socketChannel = socketChannel;
    
private voidskipToEndOfFrame()

        if (currentFrameDataSize > 0) {
            if (byteBuffer.hasRemaining()) {
                final int remainFrameBytes = currentFrameDataSize - frameBytesRead;
                if (remainFrameBytes <= byteBuffer.remaining()) {
                    byteBuffer.position(byteBuffer.position() + remainFrameBytes);
                    return;
                }
                
                frameBytesRead += byteBuffer.remaining();
                byteBuffer.position(byteBuffer.limit());
            }
            
            while(frameBytesRead < currentFrameDataSize) {
                final int eof = readFromChannel();
                if (eof == -1) {
                    String errorMessage = MessagesMessages.WSTCP_1062_FRAMED_MESSAGE_IS_READ_UNEXPECTED_EOF(isLastFrame, frameBytesRead, frameSize, currentFrameDataSize);
                    logger.log(Level.SEVERE, errorMessage);
                    throw new EOFException(errorMessage);
                }
                frameBytesRead += eof;
                byteBuffer.position(byteBuffer.position() + eof);
            }
            
            // if extra frame bytes were read - move position backwards
            byteBuffer.position(byteBuffer.position() - (frameBytesRead - currentFrameDataSize));
        }
    
public voidskipToEndOfMessage()

        do {
            readFrameHeaderIfRequired();
            skipToEndOfFrame();
            frameBytesRead = 0;
        } while(!isLastFrame);
    
public java.lang.StringtoString()

        final StringBuffer buffer = new StringBuffer(100);
        buffer.append("ByteBuffer: ");
        buffer.append(byteBuffer);
        buffer.append(" FrameBytesRead: ");
        buffer.append(frameBytesRead);
        buffer.append(" CurrentFrameDataSize: ");
        buffer.append(currentFrameDataSize);
        buffer.append(" isLastFrame: ");
        buffer.append(isLastFrame);
        buffer.append(" isDirectMode: ");
        buffer.append(isDirectMode);
        buffer.append(" isReadingHeader: ");
        buffer.append(isReadingHeader);
        
        return buffer.toString();