FileDocCategorySizeDatePackage
FramedMessageInputStream.javaAPI DocExample16131Tue May 29 16:57:08 BST 2007com.sun.xml.ws.transport.tcp.io

FramedMessageInputStream.java

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 * 
 * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
 * 
 * The contents of this file are subject to the terms of either the GNU
 * General Public License Version 2 only ("GPL") or the Common Development
 * and Distribution License("CDDL") (collectively, the "License").  You
 * may not use this file except in compliance with the License. You can obtain
 * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 * language governing permissions and limitations under the License.
 * 
 * When distributing the software, include this License Header Notice in each
 * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 * Sun designates this particular file as subject to the "Classpath" exception
 * as provided by Sun in the GPL Version 2 section of the License file that
 * accompanied this code.  If applicable, add the following below the License
 * Header, with the fields enclosed by brackets [] replaced by your own
 * identifying information: "Portions Copyrighted [year]
 * [name of copyright owner]"
 * 
 * Contributor(s):
 * 
 * If you wish your version of this file to be governed by only the CDDL or
 * only the GPL Version 2, indicate your decision by adding "[Contributor]
 * elects to include this software in this distribution under the [CDDL or GPL
 * Version 2] license."  If you don't indicate a single choice of license, a
 * recipient has the option to distribute your version of this file under
 * either the CDDL, the GPL Version 2 or to extend the choice of license to
 * its licensees as provided above.  However, if you add GPL Version 2 code
 * and therefore, elected the GPL Version 2 license, then the option applies
 * only if the new code is made subject to such option by the copyright
 * holder.
 */

package com.sun.xml.ws.transport.tcp.io;

import com.sun.xml.ws.transport.tcp.pool.LifeCycle;
import com.sun.xml.ws.transport.tcp.resources.MessagesMessages;
import com.sun.xml.ws.transport.tcp.util.FrameType;
import com.sun.xml.ws.transport.tcp.util.SelectorFactory;
import com.sun.xml.ws.transport.tcp.util.TCPConstants;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Stream wrapper around a <code>ByteBuffer</code>
 */
public final class FramedMessageInputStream extends InputStream implements LifeCycle {
    private static final Logger logger = Logger.getLogger(
            com.sun.xml.ws.transport.tcp.util.TCPConstants.LoggingDomain + ".streams");
    
    private ByteBuffer byteBuffer;
    
    private SocketChannel socketChannel;
    
    private int bufferClearLimit;  // clear buffer before read from socket if its size already more than this value
    /**
     * The time to wait before timing out when reading bytes
     */
    final private static int READ_TIMEOUT = 1000;
    
    /**
     * Number of times to retry before return EOF
     */
    final static int READ_TRY = 10;
    
    private final int[] headerTmpArray = new int[2];
    
    /** is message framed or direct mode is used */
    private boolean isDirectMode;
    
    private int frameSize;
    private int frameBytesRead;
    private boolean isLastFrame;
    private int currentFrameDataSize;    // for last frame actual data size could be smaller than frame size
    
    private int channelId;
    private int contentId;
    private int messageId;
    private final Map<Integer, String> contentProps = new HashMap<Integer, String>(8);
    
    private boolean isReadingHeader;
    
    /**
     * could be useful for debug reasons
     */
    private long receivedMessageLength;
    
    // ------------------------------------------------- Constructor -------//
    
    
    public FramedMessageInputStream() {
        this(TCPConstants.DEFAULT_FRAME_SIZE);
    }
    
    public FramedMessageInputStream(int frameSize) {
        setFrameSize(frameSize);
    }
    // ---------------------------------------------------------------------//
    
    
    public void setSocketChannel(final SocketChannel socketChannel){
        this.socketChannel = socketChannel;
    }
    
    public int getChannelId() {
        return channelId;
    }
    
    public int getMessageId() {
        return messageId;
    }
    
    public int getContentId() {
        return contentId;
    }
    
    public Map<Integer, String> getContentProperties() {
        return contentProps;
    }
    
    public boolean isDirectMode() {
        return isDirectMode;
    }
    
    public void setDirectMode(final boolean isDirectMode) {
        reset();
        this.isDirectMode = isDirectMode;
    }
    
    public void setFrameSize(final int frameSize) {
        this.frameSize = frameSize;
    }
    
    public void setByteBuffer(final ByteBuffer byteBuffer) {
        this.byteBuffer = byteBuffer;
        if (byteBuffer != null) {
            bufferClearLimit = byteBuffer.capacity() * 3 / 4;
        }
    }
    
    /**
     * Return the available bytes
     * @return the wrapped byteBuffer.remaining()
     */
    public int available() {
        return remaining();
    }
    
    
    /**
     * Close this stream.
     */
    public void close() {
    }
    
    /**
     * Return true if mark is supported.
     */
    public boolean markSupported() {
        return false;
    }
    
    /**
     * Read the first byte from the wrapped <code>ByteBuffer</code>.
     */
    public int read() {
        int eof = 0;
        if (!byteBuffer.hasRemaining()){
            eof = readFromChannel();
        } else if (remaining() == 0 && isLastFrame) {   // if in buffer there is last frame's tale only
            return -1;
        }
        
        if (eof == -1 || readFrameHeaderIfRequired() == -1) return -1;
        
        if (byteBuffer.hasRemaining()) {
            frameBytesRead++;
            receivedMessageLength ++;
            return byteBuffer.get() & 0xff;
        }
        
        return read();
    }
    
    
    /**
     * Read the bytes from the wrapped <code>ByteBuffer</code>.
     */
    public int read(final byte[] b) {
        return (read(b, 0, b.length));
    }
    
    
    /**
     * Read the first byte of the wrapped <code>ByteBuffer</code>.
     */
    public int read(final byte[] b, final int offset, int length) {
        if (!byteBuffer.hasRemaining()) {
            final int eof = readFromChannel();
            if (eof <= 0){
                return -1;
            }
        } else if (remaining() == 0 && isLastFrame) {   // if in buffer there is last frame's tale only
            return -1;
        }
        
        if (readFrameHeaderIfRequired() == -1) return -1;
        
        //@TODO add logic for reading from several frames if required
        final int remaining = remaining();
        if (length > remaining) {
            length = remaining;
        }
        
        byteBuffer.get(b, offset, length);
        frameBytesRead += length;
        receivedMessageLength += length;
        
        return length;
    }
    
    
    public void forceHeaderRead() throws IOException {
        readHeader();
    }
    
    private int readFrameHeaderIfRequired() {
        if (!isDirectMode && !isLastFrame && !isReadingHeader && (frameBytesRead == 0 || frameBytesRead == currentFrameDataSize)) {
            try {
                readHeader();
            } catch (IOException ex) {
                return -1;
            }
        }
        
        return 0;
    }
    
    private void readHeader() throws IOException {
        if (logger.isLoggable(Level.FINEST)) {
            logger.log(Level.FINEST, MessagesMessages.WSTCP_1060_FRAMED_MESSAGE_IS_READ_HEADER_ENTER());
        }
        frameBytesRead = 0;
        isReadingHeader = true;
        // Read channel-id and message-id
        int lowNeebleValue = DataInOutUtils.readInts4(this, headerTmpArray, 2, 0);
        channelId = headerTmpArray[0];
        messageId = headerTmpArray[1];

        if (FrameType.isFrameContainsParams(messageId)) {  //message types have description
            // Read content-id and number-of-parameters
            lowNeebleValue = DataInOutUtils.readInts4(this, headerTmpArray, 2, lowNeebleValue);
            contentId = headerTmpArray[0];
            final int paramNumber = headerTmpArray[1];
            for(int i=0; i<paramNumber; i++) {
                // Read parameter-id and length of parameter-value buffer
                DataInOutUtils.readInts4(this, headerTmpArray, 2, lowNeebleValue);
                final int paramId = headerTmpArray[0];
                final int paramValueLen = headerTmpArray[1];
                byte[] paramValueBytes = new byte[paramValueLen];
                // Read parameter-value
                DataInOutUtils.readFully(this, paramValueBytes);
                final String paramValue = new String(paramValueBytes, TCPConstants.UTF8);
                contentProps.put(paramId, paramValue);
                lowNeebleValue = 0;
            }
        }
        
        // Read payload-size
        currentFrameDataSize = DataInOutUtils.readInt8(this);
        isLastFrame = FrameType.isLastFrame(messageId);
        currentFrameDataSize += frameBytesRead;
        isReadingHeader = false;
        if (logger.isLoggable(Level.FINEST)) {
            logger.log(Level.FINEST, MessagesMessages.WSTCP_1061_FRAMED_MESSAGE_IS_READ_HEADER_DONE(channelId, messageId, contentId, contentProps, currentFrameDataSize, isLastFrame));
        }
    }
    
    private int readFromChannel() {
        int eof = 0;
        for (int i=0; i < READ_TRY; i++) {
            eof = doRead();
            
            if (eof != 0) {
                break;
            }
        }
        
        return eof;
    }
    
    public void skipToEndOfMessage() throws EOFException {
        do {
            readFrameHeaderIfRequired();
            skipToEndOfFrame();
            frameBytesRead = 0;
        } while(!isLastFrame);
    }
    
    private void skipToEndOfFrame() throws EOFException {
        if (currentFrameDataSize > 0) {
            if (byteBuffer.hasRemaining()) {
                final int remainFrameBytes = currentFrameDataSize - frameBytesRead;
                if (remainFrameBytes <= byteBuffer.remaining()) {
                    byteBuffer.position(byteBuffer.position() + remainFrameBytes);
                    return;
                }
                
                frameBytesRead += byteBuffer.remaining();
                byteBuffer.position(byteBuffer.limit());
            }
            
            while(frameBytesRead < currentFrameDataSize) {
                final int eof = readFromChannel();
                if (eof == -1) {
                    String errorMessage = MessagesMessages.WSTCP_1062_FRAMED_MESSAGE_IS_READ_UNEXPECTED_EOF(isLastFrame, frameBytesRead, frameSize, currentFrameDataSize);
                    logger.log(Level.SEVERE, errorMessage);
                    throw new EOFException(errorMessage);
                }
                frameBytesRead += eof;
                byteBuffer.position(byteBuffer.position() + eof);
            }
            
            // if extra frame bytes were read - move position backwards
            byteBuffer.position(byteBuffer.position() - (frameBytesRead - currentFrameDataSize));
        }
    }
    
    /**
     * Read bytes using the read <code>ReadSelector</code>
     */
    private int doRead(){
        if ( socketChannel == null ) return -1;
        if (isEOF()) {
            return -1;
        }
        
        if (!byteBuffer.hasRemaining() && byteBuffer.position() >= bufferClearLimit) {
            byteBuffer.clear();
        }
        
        final int bbPosition = byteBuffer.position();
        byteBuffer.limit(byteBuffer.capacity());
        
        int count;
        int byteRead = 0;
        Selector readSelector = null;
        SelectionKey tmpKey = null;
        
        try{
            do {
                count = socketChannel.read(byteBuffer);
                byteRead += count;
            } while (count > 0);
            
            if (count == -1 && byteRead >= 0) byteRead++;
            
            if ( byteRead == 0 ){
                readSelector = SelectorFactory.getSelector();
                
                if ( readSelector == null ){
                    return 0;
                }
                tmpKey = socketChannel
                        .register(readSelector,SelectionKey.OP_READ);
                tmpKey.interestOps(tmpKey.interestOps() | SelectionKey.OP_READ);
                final int code = readSelector.select(READ_TIMEOUT);
                
                //Nothing so return.
                tmpKey.interestOps(tmpKey.interestOps() & (~SelectionKey.OP_READ));
                if ( code == 0 ){
                    return 0;
                }
                
                do {
                    count = socketChannel.read(byteBuffer);
                    byteRead += count;
                } while (count > 0);
                if (count == -1 && byteRead >= 0) byteRead++;
            }
        } catch (Exception e){
            logger.log(Level.SEVERE, MessagesMessages.WSTCP_0018_ERROR_READING_FROM_SOCKET(), e);
            return -1;
        } finally {
            if (tmpKey != null)
                tmpKey.cancel();
            
            if ( readSelector != null){
                try{
                    readSelector.selectNow();
                } catch (IOException ex){
                }
                SelectorFactory.returnSelector(readSelector);
            }
            
            byteBuffer.flip();
            byteBuffer.position(bbPosition);
        }
        
        return byteRead;
    }
    
    public boolean isMessageInProcess() {
        if (currentFrameDataSize == 0 || isEOF()) return false;
        
        return true;
    }
    
    private boolean isEOF() {
        return isLastFrame && frameBytesRead >= currentFrameDataSize - 1;
    }
    
    private int remaining() {
        if (isReadingHeader || isDirectMode) {
            return byteBuffer.remaining();
        }
        
        return Math.min(currentFrameDataSize - frameBytesRead, byteBuffer.remaining());
    }
    
    public void reset() {
        frameBytesRead = 0;
        currentFrameDataSize = 0;
        isLastFrame = false;
        isReadingHeader = false;
        contentId = -1;
        messageId = -1;
        contentProps.clear();
        receivedMessageLength = 0;
    }
    
    public void activate() {
    }
    
    public void passivate() {
        reset();
        setSocketChannel(null);
        setByteBuffer(null);
    }
    
    public String toString() {
        final StringBuffer buffer = new StringBuffer(100);
        buffer.append("ByteBuffer: ");
        buffer.append(byteBuffer);
        buffer.append(" FrameBytesRead: ");
        buffer.append(frameBytesRead);
        buffer.append(" CurrentFrameDataSize: ");
        buffer.append(currentFrameDataSize);
        buffer.append(" isLastFrame: ");
        buffer.append(isLastFrame);
        buffer.append(" isDirectMode: ");
        buffer.append(isDirectMode);
        buffer.append(" isReadingHeader: ");
        buffer.append(isReadingHeader);
        
        return buffer.toString();
    }
}