FileDocCategorySizeDatePackage
ManagedMemoryDataSource.javaAPI DocApache Axis 1.427603Sat Apr 22 18:57:28 BST 2006org.apache.axis.attachments

ManagedMemoryDataSource.java

/*
 * Copyright 2001-2004 The Apache Software Foundation.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.axis.attachments;

import org.apache.axis.InternalException;
import org.apache.axis.MessageContext;
import org.apache.axis.components.logger.LogFactory;
import org.apache.axis.utils.Messages;
import org.apache.commons.logging.Log;

import java.io.File;
import java.io.BufferedInputStream;

/**
 * This class allows small attachments to be cached in memory, while large ones are
 * cached out.  It implements a Java Activiation Data source interface.
 *
 * @author Rick Rineholt
 */
public class ManagedMemoryDataSource implements javax.activation.DataSource {

    /** Field log           */
    protected static Log log =
         LogFactory.getLog(ManagedMemoryDataSource.class.getName());

    /**
     * The content type. This defaults to
     * <code>application/octet-stream</code>.
     */
    protected String contentType = "application/octet-stream";

    /** The incoming source stream. */
    java.io.InputStream ss = null;

    /** Field MIN_MEMORY_DISK_CACHED           */
    public static final int MIN_MEMORY_DISK_CACHED = -1;

    /** Field MAX_MEMORY_DISK_CACHED           */
    public static final int MAX_MEMORY_DISK_CACHED = 16 * 1024;

    /** Field maxCached           */
    protected int maxCached = MAX_MEMORY_DISK_CACHED;       // max in memory cached. Default.

    // If set the file the disk is cached to.

    /** Field diskCacheFile           */
    protected java.io.File diskCacheFile = null;

    // A list of open input Streams.

    /** Field readers           */
    protected java.util.WeakHashMap readers = new java.util.WeakHashMap();

    /**
     * Flag to show if the resources behind this have been deleted.
     */
    protected boolean deleted =
            false;

    // Memory is allocated in these size chunks.

    /** Field READ_CHUNK_SZ           */
    public static final int READ_CHUNK_SZ = 32 * 1024;

    /** Field debugEnabled           */
    protected boolean debugEnabled = false;    // Log debugging if true.

    // Should not be called;

    /**
     * Constructor ManagedMemoryDataSource.
     */
    protected ManagedMemoryDataSource() {
    }

    /**
     * Create a new boundary stream.
     *
     * @param ss is the source input stream that is used to create this data source.
     * @param maxCached  This is the max memory that is to be used to cache the data.
     * @param contentType the mime type for this data stream.
     *   by buffering you can some effiency in searching.
     *
     * @throws java.io.IOException
     */
    public ManagedMemoryDataSource(
            java.io.InputStream ss, int maxCached, String contentType)
            throws java.io.IOException {
        this(ss, maxCached, contentType, false);
    }

    /**
     * Create a new boundary stream.
     *
     * @param ss is the source input stream that is used to create this data source.
     * @param maxCached  This is the max memory that is to be used to cache the data.
     * @param contentType the mime type for this data stream.
     *   by buffering you can some effiency in searching.
     * @param readall if true will read in the whole source.
     *
     * @throws java.io.IOException
     */
    public ManagedMemoryDataSource(
            java.io.InputStream ss, int maxCached, String contentType, boolean readall)
            throws java.io.IOException {

        if(ss instanceof BufferedInputStream) {
            this.ss = ss;
        } else {
            this.ss = new BufferedInputStream(ss);
        }
        this.maxCached = maxCached;

        if ((null != contentType) && (contentType.length() != 0)) {
            this.contentType = contentType;
        }

        if (maxCached < MIN_MEMORY_DISK_CACHED) {
            throw new IllegalArgumentException(
                    Messages.getMessage("badMaxCached", "" + maxCached));
        }

        if (log.isDebugEnabled()) {
            debugEnabled = true;    // Logging should be initialized by time;
        }

        // for now read all in to disk.
        if (readall) {
            byte[] readbuffer = new byte[READ_CHUNK_SZ];
            int read = 0;

            do {
                read = ss.read(readbuffer);

                if (read > 0) {
                    write(readbuffer, read);
                }
            } while (read > -1);

            close();
        }
    }

    /* javax.activation.Interface DataSource implementation */

    /**
     * This method returns the MIME type of the data in the form of a string.
     * @return The mime type.
     */
    public java.lang.String getContentType() {
        return contentType;
    }

    /**
     * This method returns an InputStream representing the the data and throws the appropriate exception if it can not do so.
     * @return the java.io.InputStream for the data source.
     *
     * @throws java.io.IOException
     */
    public synchronized java.io.InputStream getInputStream()
            throws java.io.IOException {

        /*
         * if (memorybuflist == null) {
         *   return  new java.io.FileInputStream(diskCacheFile);
         * }
         * else
         */
        return new Instream();    // Return the memory held stream.
    }

    /**
     * This will flush any memory source to disk and
     * provide the name of the file if desired.
     *
     * @return the name of the file of the stream
     */
    public java.lang.String getName() {

        String ret = null;

        try {
            flushToDisk();

            if (diskCacheFile != null) {
                ret = diskCacheFile.getAbsolutePath();
            }
        } catch (Exception e) {
            diskCacheFile = null;
        }

        return ret;
    }

    /**
     * This method returns an OutputStream where the data can be written and
     * throws the appropriate exception if it can not do so.
     * NOT SUPPORTED, not need for axis, data sources are create by constructors.
     *
     *
     * @return always <code>null</code>
     *
     * @throws java.io.IOException
     */
    public java.io.OutputStream getOutputStream() throws java.io.IOException {
        return null;
    }

    /** The linked list to hold the in memory buffers. */
    protected java.util.LinkedList memorybuflist =
            new java.util.LinkedList();

    /** Hold the last memory buffer. */
    protected byte[] currentMemoryBuf = null;

    /** The number of bytes written to the above buffer. */
    protected int currentMemoryBufSz =
            0;

    /** The total size in bytes in this data source. */
    protected long totalsz = 0;

    /** This is the cached disk stream. */
    protected java.io.BufferedOutputStream cachediskstream =
            null;

    /** If true the source input stream is now closed. */
    protected boolean closed = false;

    /**
     * Write bytes to the stream.
     *
     * @param data all bytes of this array are written to the stream
     * @throws java.io.IOException if there was a problem writing the data
     */
    protected void write(byte[] data) throws java.io.IOException {
        write(data, data.length);
    }

    /**
     * This method is a low level write.
     * Note it is designed to in the future to allow streaming to both memory
     *  AND to disk simultaneously.
     *
     * @param data
     * @param length
     *
     * @throws java.io.IOException
     */
    protected synchronized void write(byte[] data, int length)
            throws java.io.IOException {

        if (closed) {
            throw new java.io.IOException(Messages.getMessage("streamClosed"));
        }

        int writesz = length;
        int byteswritten = 0;

        if ((null != memorybuflist)
                && (totalsz + writesz > maxCached)) {    // Cache to disk.
            if (null == cachediskstream) {               // Need to create a disk cache
                flushToDisk();
            }
        }

        if (memorybuflist != null) {    // Can write to memory.
            do {
                if (null == currentMemoryBuf) {
                    currentMemoryBuf = new byte[READ_CHUNK_SZ];
                    currentMemoryBufSz = 0;

                    memorybuflist.add(currentMemoryBuf);
                }

                // bytes to write is the min. between the remaining bytes and what is left in this buffer.
                int bytes2write = Math.min((writesz - byteswritten),
                        (currentMemoryBuf.length
                        - currentMemoryBufSz));

                // copy the data.
                System.arraycopy(data, byteswritten, currentMemoryBuf,
                        currentMemoryBufSz, bytes2write);

                byteswritten += bytes2write;
                currentMemoryBufSz += bytes2write;

                if (byteswritten
                        < writesz) {    // only get more if we really need it.
                    currentMemoryBuf = new byte[READ_CHUNK_SZ];
                    currentMemoryBufSz = 0;

                    memorybuflist.add(currentMemoryBuf);    // add it to the chain.
                }
            } while (byteswritten < writesz);
        }

        if (null != cachediskstream) {    // Write to the out going stream.
            cachediskstream.write(data, 0, length);
        }

        totalsz += writesz;

        return;
    }

    /**
     * This method is a low level write.
     * Close the stream.
     *
     * @throws java.io.IOException
     */
    protected synchronized void close() throws java.io.IOException {

        if (!closed) {
            closed = true;                    // Markit as closed.

            if (null != cachediskstream) {    // close the disk cache.
                cachediskstream.close();

                cachediskstream = null;
            }

            if (null != memorybuflist) {      // There is a memory buffer.
                if (currentMemoryBufSz > 0) {
                    byte[] tmp =
                            new byte[currentMemoryBufSz];    // Get the last buffer and make it the sizeof the actual data.

                    System.arraycopy(currentMemoryBuf, 0, tmp, 0,
                            currentMemoryBufSz);
                    memorybuflist.set(
                            memorybuflist.size() - 1,
                            tmp);                 // Now replace the last buffer with this size.
                }

                currentMemoryBuf = null;      // No need for this anymore.
            }
        }
    }

    protected void finalize() throws Throwable {

        if (null != cachediskstream) {    // close the disk cache.
            cachediskstream.close();

            cachediskstream = null;
        }
    }

    /**
     * Routine to flush data to disk if is in memory.
     *
     * @throws java.io.IOException
     * @throws java.io.FileNotFoundException
     */
    protected void flushToDisk()
            throws java.io.IOException, java.io.FileNotFoundException {

        java.util.LinkedList ml = memorybuflist;

        log.debug(Messages.getMessage("maxCached", "" + maxCached,
                "" + totalsz));

        if (ml != null) {
            if (null == cachediskstream) {    // Need to create a disk cache
                try {
                    MessageContext mc = MessageContext.getCurrentContext();
                    String attdir = (mc == null)
                            ? null
                            : mc.getStrProp(
                                    MessageContext.ATTACHMENTS_DIR);

                    diskCacheFile = java.io.File.createTempFile("Axis", ".att",
                            (attdir == null)
                            ? null
                            : new File(
                                    attdir));

                    if(log.isDebugEnabled()) {
                        log.debug(
                            Messages.getMessage(
                                    "diskCache", diskCacheFile.getAbsolutePath()));
                     }

                    cachediskstream = new java.io.BufferedOutputStream(
                            new java.io.FileOutputStream(diskCacheFile));

                    int listsz = ml.size();

                    // Write out the entire memory held store to disk.
                    for (java.util.Iterator it = ml.iterator();
                         it.hasNext();) {
                        byte[] rbuf = (byte[]) it.next();
                        int bwrite = (listsz-- == 0)
                                ? currentMemoryBufSz
                                : rbuf.length;

                        cachediskstream.write(rbuf, 0, bwrite);

                        if (closed) {
                            cachediskstream.close();

                            cachediskstream = null;
                        }
                    }

                    memorybuflist = null;
                } catch (java.lang.SecurityException se) {
                    diskCacheFile = null;
                    cachediskstream = null;
                    maxCached = java.lang.Integer.MAX_VALUE;

                    log.info(Messages.getMessage("nodisk00"), se);
                }
            }
        }
    }

    public synchronized boolean delete() {

        boolean ret = false;

        deleted = true;

        memorybuflist = null;

        if (diskCacheFile != null) {
            if (cachediskstream != null) {
                try {
                    cachediskstream.close();
                } catch (Exception e) {
                }

                cachediskstream = null;
            }

            Object[] array = readers.keySet().toArray();
            for (int i = 0; i < array.length; i++) {
                Instream stream = (Instream) array[i];
                if (null != stream) {
                    try {
                        stream.close();
                    } catch (Exception e) {
                    }
                }
            }
            readers.clear();

            try {
                diskCacheFile.delete();

                ret = true;
            } catch (Exception e) {

                // Give it our best shot.
                diskCacheFile.deleteOnExit();
            }
        }


        return ret;
    }

    // inner classes cannot have static declarations...

    /** Field is_log           */
    protected static Log is_log =
        LogFactory.getLog(Instream.class.getName());

    /**
     * Inner class to handle getting an input stream to this data source
     *  Handles creating an input stream to the source.
     */
    private class Instream extends java.io.InputStream {

        /** bytes read. */
        protected long bread = 0;

        /** The real stream. */
        java.io.FileInputStream fin = null;

        /** The position in the list were we are reading from. */
        int currentIndex =
                0;

        /** the buffer we are currently reading from. */
        byte[] currentBuf = null;

        /** The current position in there. */
        int currentBufPos = 0;

        /** The read stream has been closed. */
        boolean readClosed = false;

        /**
         * Constructor Instream.
         *
         * @throws java.io.IOException if the Instream could not be created or
         *              if the data source has been deleted
         */
        protected Instream() throws java.io.IOException {

            if (deleted) {
                throw new java.io.IOException(
                        Messages.getMessage("resourceDeleted"));
            }

            readers.put(this, null);
        }

        /**
         * Query for the number of bytes available for reading.
         *
         * @return the number of bytes left
         *
         * @throws java.io.IOException if this stream is not in a state that
         *              supports reading
         */
        public int available() throws java.io.IOException {

            if (deleted) {
                throw new java.io.IOException(
                        Messages.getMessage("resourceDeleted"));
            }

            if (readClosed) {
                throw new java.io.IOException(
                        Messages.getMessage("streamClosed"));
            }

            int ret = new Long(Math.min(Integer.MAX_VALUE, totalsz - bread)).intValue();

            if (debugEnabled) {
                is_log.debug("available() = " + ret + ".");
            }

            return ret;
        }

        /**
         * Read a byte from the stream.
         *
         * @return byte read or -1 if no more data.
         *
         * @throws java.io.IOException
         */
        public int read() throws java.io.IOException {

            synchronized (ManagedMemoryDataSource.this) {
                byte[] retb = new byte[1];
                int br = read(retb, 0, 1);

                if (br == -1) {
                    return -1;
                }
                return 0xFF & retb[0];
            }
        }

        /**
         * Not supported.
         *
         * @return
         */
        public boolean markSupported() {

            if (debugEnabled) {
                is_log.debug("markSupported() = " + false + ".");
            }

            return false;
        }

        /**
         * Not supported.
         *
         * @param readlimit
         */
        public void mark(int readlimit) {

            if (debugEnabled) {
                is_log.debug("mark()");
            }
        }

        /**
         * Not supported.
         *
         * @throws java.io.IOException
         */
        public void reset() throws java.io.IOException {

            if (debugEnabled) {
                is_log.debug("reset()");
            }

            throw new java.io.IOException(Messages.getMessage("noResetMark"));
        }

        public long skip(long skipped) throws java.io.IOException {

            if (debugEnabled) {
                is_log.debug("skip(" + skipped + ").");
            }

            if (deleted) {
                throw new java.io.IOException(
                        Messages.getMessage("resourceDeleted"));
            }

            if (readClosed) {
                throw new java.io.IOException(
                        Messages.getMessage("streamClosed"));
            }

            if (skipped < 1) {
                return 0;    // nothing to skip.
            }

            synchronized (ManagedMemoryDataSource.this) {
                skipped = Math.min(skipped,
                        totalsz
                        - bread);    // only skip what we've read.

                if (skipped == 0) {
                    return 0;
                }

                java.util.List ml = memorybuflist;    // hold the memory list.
                int bwritten = 0;

                if (ml != null) {
                    if (null == currentBuf) {    // get the buffer we need to read from.
                        currentBuf = (byte[]) ml.get(currentIndex);
                        currentBufPos = 0;    // start reading from the begining.
                    }

                    do {
                        long bcopy = Math.min(currentBuf.length
                                - currentBufPos,
                                skipped - bwritten);

                        bwritten += bcopy;
                        currentBufPos += bcopy;

                        if (bwritten < skipped) {
                            currentBuf = (byte[]) ml.get(++currentIndex);
                            currentBufPos = 0;
                        }
                    } while (bwritten < skipped);
                }

                if (null != fin) {
                    fin.skip(skipped);
                }

                bread += skipped;
            }

            if (debugEnabled) {
                is_log.debug("skipped " + skipped + ".");
            }

            return skipped;
        }

        public int read(byte[] b, int off, int len) throws java.io.IOException {

            if (debugEnabled) {
                is_log.debug(this.hashCode() + " read(" + off + ", " + len
                        + ")");
            }

            if (deleted) {
                throw new java.io.IOException(
                        Messages.getMessage("resourceDeleted"));
            }

            if (readClosed) {
                throw new java.io.IOException(
                        Messages.getMessage("streamClosed"));
            }

            if (b == null) {
                throw new InternalException(Messages.getMessage("nullInput"));
            }

            if (off < 0) {
                throw new IndexOutOfBoundsException(
                        Messages.getMessage("negOffset", "" + off));
            }

            if (len < 0) {
                throw new IndexOutOfBoundsException(
                        Messages.getMessage("length", "" + len));
            }

            if (len + off > b.length) {
                throw new IndexOutOfBoundsException(
                        Messages.getMessage("writeBeyond"));
            }

            if (len == 0) {
                return 0;
            }

            int bwritten = 0;

            synchronized (ManagedMemoryDataSource.this) {
                if (bread == totalsz) {
                    return -1;
                }

                java.util.List ml = memorybuflist;
                
                long longlen = len;
                longlen = Math.min(
                        longlen,
                        totalsz
                        - bread);    // Only return the number of bytes in the data store that is left.
                len = new Long(longlen).intValue();
                
                if (debugEnabled) {
                    is_log.debug("len = " + len);
                }

                if (ml != null) {
                    if (null == currentBuf) {    // Get the buffer we need to read from.
                        currentBuf = (byte[]) ml.get(currentIndex);
                        currentBufPos = 0;    // New buffer start from the begining.
                    }

                    do {

                        // The bytes to copy, the minimum of the bytes left in this buffer or bytes remaining.
                        int bcopy = Math.min(currentBuf.length - currentBufPos,
                                len - bwritten);

                        // Copy the data.
                        System.arraycopy(currentBuf, currentBufPos, b,
                                off + bwritten, bcopy);

                        bwritten += bcopy;
                        currentBufPos += bcopy;

                        if (bwritten < len) {    // Get the next buffer.
                            currentBuf = (byte[]) ml.get(++currentIndex);
                            currentBufPos = 0;
                        }
                    } while (bwritten < len);
                }

                if ((bwritten == 0) && (null != diskCacheFile)) {
                    if (debugEnabled) {
                        is_log.debug(Messages.getMessage("reading", "" + len));
                    }

                    if (null == fin) {           // we are now reading from disk.
                        if (debugEnabled) {
                            is_log.debug(
                                    Messages.getMessage(
                                            "openBread",
                                            diskCacheFile.getCanonicalPath()));
                        }

                        if (debugEnabled) {
                            is_log.debug(Messages.getMessage("openBread",
                                    "" + bread));
                        }

                        fin = new java.io.FileInputStream(diskCacheFile);

                        if (bread > 0) {
                            fin.skip(bread);     // Skip what we've read so far.
                        }
                    }

                    if (cachediskstream != null) {
                        if (debugEnabled) {
                            is_log.debug(Messages.getMessage("flushing"));
                        }

                        cachediskstream.flush();
                    }

                    if (debugEnabled) {
                        is_log.debug(Messages.getMessage("flushing"));
                        is_log.debug("len=" + len);
                        is_log.debug("off=" + off);
                        is_log.debug("b.length=" + b.length);
                    }

                    bwritten = fin.read(b, off, len);
                }

                if (bwritten > 0) {
                    bread += bwritten;
                }
            }

            if (debugEnabled) {
                is_log.debug(this.hashCode()
                        + Messages.getMessage("read", "" + bwritten));
            }

            return bwritten;
        }

        /**
         * close the stream.
         *
         * @throws java.io.IOException
         */
        public synchronized void close() throws java.io.IOException {

            if (debugEnabled) {
                is_log.debug("close()");
            }

            if (!readClosed) {
                readers.remove(this);

                readClosed = true;

                if (fin != null) {
                    fin.close();
                }

                fin = null;
            }
        }

        protected void finalize() throws Throwable {
            close();
        }
    }                                          // endof innerclass Instream

    // Used to test.

    /**
     * Method main
     *
     * @param arg
     */
    public static void main(String arg[]) {    // test

        try {
            String readFile = arg[0];
            String writeFile = arg[1];
            java.io.FileInputStream ss =
                    new java.io.FileInputStream(readFile);
            ManagedMemoryDataSource ms =
                    new ManagedMemoryDataSource(ss, 1024 * 1024, "foo/data", true);
            javax.activation.DataHandler dh =
                    new javax.activation.DataHandler(ms);
            java.io.InputStream is = dh.getInputStream();
            java.io.FileOutputStream fo =
                    new java.io.FileOutputStream(writeFile);
            byte[] buf = new byte[512];
            int read = 0;

            do {
                read = is.read(buf);

                if (read > 0) {
                    fo.write(buf, 0, read);
                }
            } while (read > -1);

            fo.close();
            is.close();
        } catch (java.lang.Exception e) {
            log.error(Messages.getMessage("exception00"), e);
        }
    }

    /**
     * get the filename of the content if it is cached to disk.
     * @return file object pointing to file, or null for memory-stored content
     */
    public File getDiskCacheFile() {
        return diskCacheFile;
    }
}