FileDocCategorySizeDatePackage
CachedPullSourceStream.javaAPI DocJMF 2.1.1e24282Mon May 12 12:20:54 BST 2003com.sun.media.protocol

CachedPullSourceStream

public class CachedPullSourceStream extends Object implements Seekable, PullSourceStream, Runnable, CachedStream

Fields Summary
private InputStream
stream
private RandomAccessFile
readRAF
private RandomAccessFile
writeRAF
private String
fileName
private int
bufferSize
private byte[]
buffer
private boolean
eosReached
private boolean
ioException
private long
length
private File
file
private String
protocol
private boolean
readAborted
private boolean
paused
private boolean
abort
private MediaThread
downloadThread
private long
contentLength
private int
highMarkFactor
private boolean
blockRead
private static int
MAX_HIGH_MARK
private static int
DEFAULT_HIGH_MARK
private static int
MIN_HIGH_MARK
private int
highMark
private int
lowMark
private boolean
enabled
private boolean
jitterEnabled
private DownloadProgressListener
listener
private int
numKiloBytesUpdateIncrement
private boolean
closed
private static JMFSecurity
jmfSecurity
private static boolean
securityPrivelege
private Method[]
m
private Class[]
cl
private Object[]
args
private int
maxCacheSize
Constructors Summary
public CachedPullSourceStream(InputStream stream, String fileName, long contentLength, String protocol)


     
	try {
	    jmfSecurity = JMFSecurityManager.getJMFSecurity();
	    securityPrivelege = true;
	} catch (SecurityException e) {
	}
    
	this.stream = stream;
	this.contentLength = contentLength;
	this.fileName = fileName;
	this.protocol = protocol;

	/**
	 * We don't want to do caching unless we have read, write and
	 * delete privileges. We don't need thread and thread group
	 * permissions for caching
	 */
	if ( /* securityPrivelege &&*/ (jmfSecurity != null) ) {
	    String permission = null;
	    int permissionid = 0;
	    try {
		if (jmfSecurity.getName().startsWith("jmf-security")) {
		    try {
			// Note: the thread and thread_group permissions would have
			// been asked by MediaThread static initializer
			permission = "thread";
			jmfSecurity.requestPermission(m, cl, args, JMFSecurity.THREAD);
			m[0].invoke(cl[0], args[0]);
			
			permission = "thread group";
			jmfSecurity.requestPermission(m, cl, args, JMFSecurity.THREAD_GROUP);
			m[0].invoke(cl[0], args[0]);
		    } catch (Throwable t) {
			// Ignore Security Errors/Exceptions.
		    }

		    permission = "read file";
		    permissionid = JMFSecurity.READ_FILE;
		    jmfSecurity.requestPermission(m, cl, args, JMFSecurity.READ_FILE);
		    m[0].invoke(cl[0], args[0]);
		    
		    
		    permission = "write file";
		    permissionid = JMFSecurity.WRITE_FILE;
		    jmfSecurity.requestPermission(m, cl, args, JMFSecurity.WRITE_FILE);
		    m[0].invoke(cl[0], args[0]);
		    
		    permission = "delete file";
		    permissionid = JMFSecurity.DELETE_FILE;
		    jmfSecurity.requestPermission(m, cl, args, JMFSecurity.DELETE_FILE);
		    m[0].invoke(cl[0], args[0]);
		    
		} else if (jmfSecurity.getName().startsWith("internet")) {
		    PolicyEngine.checkPermission(PermissionID.FILEIO);
		    PolicyEngine.assertPermission(PermissionID.FILEIO);
		    try {
			PolicyEngine.checkPermission(PermissionID.THREAD);
			PolicyEngine.assertPermission(PermissionID.THREAD);
		    } catch (Throwable t) {
		    }
		}
	    } catch (Exception e) {
		if (JMFSecurityManager.DEBUG) {
		    System.err.println("Unable to get " + permission +
				       " privilege  " + e);
		}
		if (permissionid > 0)
		    jmfSecurity.permissionFailureNotification(permissionid);
		securityPrivelege = false;
		// TODO: Do the right thing if permissions cannot be obtained.
		// User should be notified via an event
	    }
	}


         if (!securityPrivelege) {
           //System.out.println("JMF Fatal Error: don't have security privelege to use file caching");
            throw new IOException("No security privilege for caching");
         }

	 createFilesAndThread(fileName);

	 Object cdir = com.sun.media.util.Registry.get("secure.maxCacheSizeMB");

	 if ( (cdir != null) && (cdir instanceof Integer) ) {
	     int size = ((Integer) cdir).intValue();
	     if (size < 1)
		 size = 1;
	     maxCacheSize = size * 1000000; //bytes;
	 }

	 highMark = getHighMark(contentLength);
	 closed = false;
    
Methods Summary
public voidabortDownload()

	// System.out.println("abortDownload");
	abort = true;
    
public voidabortRead()

	synchronized (this) {
	    readAborted = true;
	}
    
voidaddDownloadProgressListener(javax.media.DownloadProgressListener l, int numKiloBytes)

	listener = l;
	if (numKiloBytes <= 0)
	    numKiloBytes = 1024;
	numKiloBytesUpdateIncrement = numKiloBytes * 1024;
    
voidclose()

	if (!abort)
	    abortDownload();
	// Note: downloadThread is never set to null.
	// It is null only when it is not initialized
	if (downloadThread != null) {
	    // Wait for download thread to die
	    for (int i = 0; i < 20; i++) {
		if ( !downloadThread.isAlive() ) {
		    // System.out.println("downloadThread is dead");
		    break;
		}
		try {
		    Thread.currentThread().sleep(100);
		} catch (InterruptedException e) {
		}
	    }
	    // The thread didn't quit
	    // in 2 seconds when abort is true.
	    // This generally won't happen if the contentLength
	    // is known; in this case stream.read will never block
	    // as we call available() first.
	    // This may happen if the contentLength is not known
	    // and if the stream.read() call blocks for more than
	    // 2 seconds. In this case we have have no choice but
	    // to kill the download thread

	    // Commenting out code below as stopping a Thread is
	    // deprecated
// 	    if (downloadThread.isAlive()) {
// 		// System.out.println("Killing downloadThread");
// 		downloadThread.stop();
// 	    }
	}
	doClose();
    
private voidcreateFilesAndThread(java.lang.String fileName)

	try {
	    file = new File(fileName);
	    // TODO: throw IOException if file exists
// 		// Highly unlikely as we use random numbers
// 	    if (file.exists()) {
// 		throw new IOException("Cache file " + fileName + " exists");
// 	    }

	    String parent = file.getParent();
	    File parentFile = null;
	    if (parent != null) {
		parentFile = new File(parent);
	    }


	    if (securityPrivelege && 
		(jmfSecurity != null) && (jmfSecurity.getName().startsWith("jdk12"))) {
		Constructor cons;
		if (parentFile != null) {
		    cons = jdk12MakeDirectoryAction.cons;
		    Boolean success;
		    success = (Boolean) jdk12.doPrivM.invoke(
					 jdk12.ac,
					 new Object[] {
			        cons.newInstance(
					 new Object[] {
				           parentFile
                                         })
				});
		    if ((success == null) || !success.booleanValue()) {
			throw new IOException("Unable to create directory " + parentFile);
		    }
		}

		cons = jdk12RandomAccessFileAction.cons;

		writeRAF = (RandomAccessFile) jdk12.doPrivM.invoke(
					 jdk12.ac,
					 new Object[] {
			        cons.newInstance(
					 new Object[] {
				           file.getPath(), "rw"
                                         })
				});

		if (writeRAF == null) {
		    throw new IOException("Cannot create cache file");
		}

		readRAF = (RandomAccessFile) jdk12.doPrivM.invoke(
					 jdk12.ac,
					 new Object[] {
			        cons.newInstance(
					 new Object[] {
				           file.getPath(), "r"
                                         })
				});

		if (readRAF == null) {
		    throw new IOException("Cannot create cache file");
		}

		cons = jdk12CreateThreadRunnableAction.cons;
		
		downloadThread = (MediaThread) jdk12.doPrivM.invoke(
                                           jdk12.ac,
					   new Object[] {
 					  cons.newInstance(
 					   new Object[] {
                                               MediaThread.class,
                                               this
                                           })});

		downloadThread.setName("download");
		cons = jdk12PriorityAction.cons;
		jdk12.doPrivM.invoke(
				     jdk12.ac,
				     new Object[] {
 					  cons.newInstance(
 					   new Object[] {
                                               downloadThread,
                                               new Integer(downloadThread.getVideoPriority())
                                           })});

	    } else {
		if (parentFile != null) {
		    if (!parentFile.exists() && !parentFile.mkdirs()) {
 			throw new IOException("Unable to create directory " + parentFile);
		    }
		}
		writeRAF = new RandomAccessFile(file, "rw");
		readRAF = new RandomAccessFile(file, "r");
		
		downloadThread = new MediaThread(this, "download");
		downloadThread.useVideoPriority();
	    }

	} catch (Throwable e) {
	    throw new IOException(e.getMessage());
	}
    
private booleandeleteFile(java.io.File file)


	boolean fileDeleted=false;
	try {
	    if ( /*securityPrivelege &&*/ (jmfSecurity != null) ) {
		try {
		    if (jmfSecurity.getName().startsWith("jmf-security")) {
			jmfSecurity.requestPermission(m, cl, args, JMFSecurity.DELETE_FILE);
			m[0].invoke(cl[0], args[0]);
		    } else if (jmfSecurity.getName().startsWith("internet")) {
			PolicyEngine.checkPermission(PermissionID.FILEIO);
			PolicyEngine.assertPermission(PermissionID.FILEIO);
		    }
		} catch (Exception e) {
		    if (JMFSecurityManager.DEBUG) {
			System.err.println("Unable to get DELETE_FILE " +
					   " privilege  " + e);
		    }
		    securityPrivelege = false;
		    // TODO: Do the right thing if permissions cannot be obtained.
		    // User should be notified via an event, if applicable
		}
	    }
 	    if ( (jmfSecurity != null) && (jmfSecurity.getName().startsWith("jdk12"))) {
		Constructor cons = jdk12DeleteFileAction.cons;
		Boolean success;
		success = (Boolean) jdk12.doPrivM.invoke(
					 jdk12.ac,
					 new Object[] {
			        cons.newInstance(
					 new Object[] {
				           file
                                         })
				});
		fileDeleted = success.booleanValue();
	    } else {
		fileDeleted = file.delete();
	    }
	} catch (Throwable e) {
	}
        return fileDeleted;
    
private synchronized voiddoClose()

	try {
	    closed = true;
	    if (readRAF != null) {
		readRAF.close();
	    }
	    if (writeRAF != null) {
		writeRAF.close();
	    }
	    if (file == null)
		return;
	    
	    deleteFile(file);
	    file = null;
	    
	} catch (IOException e) {
	}
    
public synchronized intdoRead(byte[] buffer, int offset, int length)
need to synchronize because a close (datasource.disconnect) can come anytime


	if (closed)
	    return -1;
	try {
	    // 	    System.out.println("before reading " + length);
	    // 	    System.out.println("to read " + length + " from " + tell() +
	    // 			       " getLength() is " + getLength());
	    int actual = readRAF.read(buffer, offset, length);
	    // 	    System.out.println("actual read is " + actual);
	    return actual;
	    // return readRAF.read(buffer, offset, length);
	} catch (ArrayIndexOutOfBoundsException e) {
	    // TODO: remove this catch block
	    // System.out.println("warning:aioubexception: reas: buffer_length, offset, length " + buffer.length + " : " + offset + " : " + length);
	    e.printStackTrace();
	    return com.sun.media.protocol.BasicSourceStream.LENGTH_DISCARD;
	}

    
private synchronized longdoSeek(long where)
need to synchronize because a close (datasource.disconnect) can come anytime

	if (closed)
	    return -1;
	try {
	    readRAF.seek(where);
	    return readRAF.getFilePointer();
	} catch (IOException e) {
	    return -1;
	}
    
private synchronized booleandrainCondition()

	return drainCondition(tell());
    
private synchronized booleandrainCondition(long offset)

	offset = getLength() - offset;
	if ( eosReached ) {
	    if (blockRead) {
		blockRead = false;
		notify();
	    }
	    return false;
	}

	if (blockRead) {
	    if (offset < highMark) {
		return true;
	    } else {
		blockRead = false;
		notify();
		return false;
	    }
	} else {
	    if (offset < lowMark) {
		blockRead = true;
		return true;
	    } else {
		return false;
	    }
	}
    
public booleanendOfStream()

	return false; // TODO
    
public javax.media.protocol.ContentDescriptorgetContentDescriptor()

	return null;
    
public longgetContentLength()
Get the total number of bytes in the media being downloaded. Returns LENGTH_UNKNOWN if this information is not available.

return
The media length in bytes, or LENGTH_UNKNOWN.

 	return contentLength;
     
longgetContentProgress()
Get the total number of bytes of media data that have been downloaded so far.

return
The number of bytes downloaded.

	return length;
    
public java.lang.ObjectgetControl(java.lang.String controlType)

	    return null;
    
public java.lang.Object[]getControls()

	return new Object[0];
    
public booleangetEnabledBuffering()

	return jitterEnabled;
    
longgetEndOffset()

	return length;
    
private intgetHighMark(long contentLength)

	if (contentLength <= 0)
	    return DEFAULT_HIGH_MARK;

	long tryHighMark = contentLength / highMarkFactor;

	if (tryHighMark < MIN_HIGH_MARK)
	    tryHighMark = MIN_HIGH_MARK;
	else if (tryHighMark > MAX_HIGH_MARK)
	    tryHighMark = MAX_HIGH_MARK;
	return (int) tryHighMark;
    
private synchronized longgetLength()

	return length;
    
longgetStartOffset()

	return 0L;
    
private longgetWriteReadPtrOffset()

	// System.out.println("getLength, tell is " + getLength() + " : " + tell());
	return getLength() - tell();
    
booleanisDownloading()

	if (eosReached)
	    return false;
	return (length != LENGTH_UNKNOWN);
    
public booleanisRandomAccess()

	if (enabled)
	    return true;
	try {
	    Seekable s = (Seekable) stream;
	    return s.isRandomAccess();
	} catch (ClassCastException e) {
	    return false;
	}
    
private synchronized voidloadUpdate()

// 	System.out.println("loadUpdate: " + blockRead + " : " +
// 			   getWriteReadPtrOffset() + " : " + highMark);
	if (blockRead) {
	    if ( ( eosReached || ( getWriteReadPtrOffset() >= highMark) ) ) {
// 		System.out.println("loadUpdate: setting blockRead to false " +
// 				   getLength() + " : " + tell());
		blockRead = false;
		synchronized(this) {
		    notify();
		}
	    }
	}
    
voidpauseDownload()

	if ( (downloadThread != null) && !downloadThread.isAlive() )
	    return;
	if (enabled) {
	    synchronized(this) {
		if (!paused) {
		    paused = true;
		    // System.out.println("setting paused to true and notify");
		    notify();
		}
	    }
	}
    
public intread(byte[] buffer, int offset, int length)


	try {
	    int result = waitUntilSeekWillSucceed(tell() + length);
	    if (result == -1)
		return -1; // EOS
	    if (result != com.sun.media.protocol.BasicSourceStream.LENGTH_DISCARD) {
		return doRead(buffer, offset, length);
	    } else {
		// System.out.println(" read returns " + result);
		return result;
	    }
	} finally {
	    if (jitterEnabled)  // TODO: remove after test
		drainCondition(); // to update blockRead
	}
    
voidremoveDownloadProgressListener(javax.media.DownloadProgressListener l)

	listener = null;
    
voidresumeDownload()

	if ( (downloadThread != null) && !downloadThread.isAlive() )
	    return;
	if (enabled) {
	    synchronized(this) {
		if (paused) {
		    paused = false;
		    // System.out.println("setting paused to false and notify");
		    notify();
		}
	    }
	}
    
public voidrun()

	int totalBytesRead = 0;
	int nextUpdate = numKiloBytesUpdateIncrement;
	int debugIndex = 1;

	if (ioException)
	    return;

	while (!eosReached) {

	    if (abort) {
		// System.out.println("run: abort true: download thread exit1");
		return;
	    }

	    try {
		// System.out.println("available is " + stream.available());

		/**
		 * The JDK designers could have made the available()
		 * more useful by returning -1 if End of Media (EOM) has
		 * been reached. The only way to check for EOM is
		 * to call read which is a blocking call
		 * Because of this limitation, if the contentLength is
		 * not known, then the available() call is not made
		 * (because it will always return 0 on EOM) prior to
		 * calling read.
		 */

		// For some reason https streams return available() always as 0
		// So don't call available() for https streams
		if ( (contentLength > 0) && (!protocol.equals("https")) ) {
		    while (stream.available() == 0) {
			synchronized(this) {
			    try {
				wait(25);
			    } catch (InterruptedException e) {
			    }
			}
			if (abort) {
			    // System.out.println("abort true: download thread exit2");
			    return;
			}
		    }
		}

		while (paused) {
		    // System.out.println("DOWNLOAD THREAD PAUSED");
		    synchronized(this) {
			// pause is triggered by user, so wait a longer time
			// to see if the user wants to resume download
			try {
			    wait(1000);
			} catch (InterruptedException e) {
			}
			if (abort) {
			    // System.out.println("abort true: download thread exit3");
			    return;
			}
		    }
		}
		// This read will never block if content length is known
		int bytesRead = stream.read(buffer, 0, buffer.length);
		if (bytesRead != -1) {
		    if ( (getLength() + bytesRead) > maxCacheSize ) {
			Log.warning("MAX CACHESIZE of " +
					   maxCacheSize + " reached ");
			contentLength = totalBytesRead;
			eosReached = true; // Is this correct?
		    }
		    writeRAF.write(buffer, 0, bytesRead);
		    totalBytesRead += bytesRead;
		    // 			long length = writeRAF.length();
		    long length = totalBytesRead;
		    setLength(length);

		    if (length == contentLength) {
			eosReached = true;
			// System.out.println("TOTAL DOWNLOAD: " + totalBytesRead);
		    }
		    // TODO: support multiple listeners??
		    if (listener != null) {
			if (totalBytesRead >= nextUpdate) {
			    listener.downloadUpdate();
			    nextUpdate += numKiloBytesUpdateIncrement;
			}
		    }

		} else {
		    //setLength(writeRAF.length());
		    setLength(totalBytesRead);
		    contentLength = totalBytesRead;
		    eosReached = true;
		}
		loadUpdate();
	    } catch (IOException e) {
		Log.warning(e + " : Check if you have enough space in the cache directory");
		ioException = true;
		eosReached = true;
		blockRead = false;
		break;
	    }
	}

	if (listener != null) {
	    listener.downloadUpdate();
	}

	if (writeRAF != null) {
	    try {
		writeRAF.close();
		writeRAF = null;
	    } catch (IOException e) {
	    }
	    writeRAF = null;
	}

    
public longseek(long where)

	int debugTime = 0;
	// TODO: IMPORTANT: check for race condition. try better solution
	synchronized(this) {
	    readAborted = false;
	}
	try {
// 	    drainCondition(where);
// 	    if ( !jitterEnabled  || !blockRead) {
	    if (!jitterEnabled || !drainCondition(where)) {
		if (where <= getLength()) {  // <
		    return doSeek(where);
		}
	    }
		
	    while (true) {
		if (eosReached) {
		    if ( where <= getLength() ) { // <
			return doSeek(where);
		    } else {
			return -1; // Attempt to seek past EOF
		    }
		}
		    
		if (jitterEnabled) {
		    synchronized(this) {
			while (blockRead) {
			    try {
				wait(100);
			    } catch (InterruptedException e) {
			    }
			    if (readAborted) {
// 				System.out.println("seek changed. discard: " +
// 						   Thread.currentThread().getName());
				readAborted = false;
				return com.sun.media.protocol.BasicSourceStream.LENGTH_DISCARD;
			    }
			}
		    }
		}

		if (readAborted) {
// 		    System.out.println("seek changed. discard: " +
// 				       Thread.currentThread().getName());
		    readAborted = false;
		    return com.sun.media.protocol.BasicSourceStream.LENGTH_DISCARD;
		}

		if (where <= getLength()) { // <
		    return doSeek(where);
		}
		    
		try {
		    Thread.currentThread().sleep(250);
		} catch (InterruptedException e) {
		}
	    }
	} finally {
	    // System.out.println("seek to " + where + " success");
	    if (jitterEnabled)  // TODO: remove after test
		drainCondition(where);
	}
    
public voidsetEnabledBuffering(boolean b)

	jitterEnabled = b;
    
private synchronized voidsetLength(long length)

	this.length = length;
    
voidstartDownload()

// 	synchronized(this) {
// 	    startDownloadCalled = true;
// 	}
	if (enabled) {
	    if (downloadThread != null) {
		downloadThread.start();
	    }
	}
    
public longtell()
need to synchronize because a close (datasource.disconnect) can come anytime

	synchronized(this) {
	    if (closed)
		return -1;
	    try {
		return readRAF.getFilePointer();
	    } catch (IOException e) {
		// System.out.println(Thread.currentThread() + " tell: " + e);
		// e.printStackTrace();
		return -1;
	    }
	}
    
private intwaitUntilSeekWillSucceed(long where)

	boolean debugPrint = true;

	// TODO: check for race condition. try better solution
 	if (!jitterEnabled || !drainCondition(where)) {

	    if (where <= getLength()) { // <
		return 0;
	    }
	}

	while (true) {
	    if (eosReached) {
		if ( where <= getLength() ) { // <
		    return 0;
		} else {
		    // throw new IOException("Attempt to read past EOM");
		    return -1;
		}
	    }


	    if (jitterEnabled) {
		synchronized(this) {
		    while (blockRead) {
			try {
			    wait(100);
			} catch (InterruptedException e) {
			}
			if (readAborted) {
			    return com.sun.media.protocol.BasicSourceStream.LENGTH_DISCARD;
			}
		    }
		}
	    }


	    if (readAborted) {
		return com.sun.media.protocol.BasicSourceStream.LENGTH_DISCARD;
	    }

	    if (where <= getLength()) { // <
		return 0;
	    }
	    // TODO: replace sleep with wait/notify
	    try {
//		if (debugPrint) {
// 		    System.out.println("waitUntilSeekWillSucceed: " + where + " : " +
// 				       getLength() + " eos? " + eosReached);
//		    debugPrint = false;
//		}
		Thread.currentThread().sleep(250);
	    } catch (InterruptedException e) {
	    }
	}
    
public booleanwillReadBlock()

	return false;
    
public booleanwillReadBytesBlock(long offset, int numBytes)

	if (jitterEnabled && drainCondition(offset)) {
	    return true;
	}
	return ( (offset + numBytes) >  getLength());
    
public booleanwillReadBytesBlock(int numBytes)

	return willReadBytesBlock(tell(), numBytes);