// This example is from the book _Java Threads_ by Scott Oaks and Henry Wong.
// Written by Scott Oaks and Henry Wong.
// Copyright (c) 1997 O'Reilly & Associates.
// You may study, use, modify, and distribute this example for any purpose.
// This example is provided WITHOUT WARRANTY either expressed or implied.
// Sample AsyncInputStream -- Chapter 5, p. 101.
import java.net.*;
import java.io.*;
public class AsyncInputStream extends FilterInputStream
implements Runnable {
private Thread runner; // Async Reader Thread
private byte result[]; // Buffer
private int reslen; // Buffer Length
private boolean EOF; // End-of-File Indicator
private IOException IOError; // IOExceptions
protected AsyncInputStream(InputStream in, int bufsize) {
super(in);
result = new byte[bufsize]; // Allocate Storage Area
reslen = 0; // and initialize variables
EOF = false;
IOError = null;
runner = new Thread(this); // Start Reader Thread
runner.start();
}
protected AsyncInputStream(InputStream in) {
this(in, 1024);
}
public synchronized int read() throws IOException {
while (reslen == 0) {
try {
if (EOF) return(-1);
if (IOError != null) throw IOError;
wait();
} catch (InterruptedException e) {}
}
return (int) getChar();
}
public synchronized int read(byte b[]) throws IOException {
return read(b, 0, b.length);
}
public synchronized int read(byte b[], int off, int len)
throws IOException {
while (reslen == 0) {
try {
if (EOF) return(-1);
if (IOError != null) throw IOError;
wait();
} catch (InterruptedException e) {}
}
int sizeread = Math.min(reslen, len);
byte c[] = getChars(sizeread);
System.arraycopy(b, off, c, 0, sizeread);
return(sizeread);
}
public synchronized long skip(long n) throws IOException {
int sizeskip = Math.min(reslen, (int) n);
if (sizeskip > 0) {
byte c[] = getChars(sizeskip);
}
return((long)sizeskip);
}
public synchronized int available() throws IOException {
return reslen;
}
public synchronized void close() throws IOException {
reslen = 0; // Clear Buffer
EOF = true; // Mark End Of File
notifyAll(); // Alert all Threads
}
public synchronized void mark(int readlimit) {
}
public synchronized void reset() throws IOException {
}
public boolean markSupported() {
return false;
}
public void run() {
try {
while (true) {
int c = in.read();
synchronized (this) {
if ((c == -1) || (EOF)) {
EOF = true; // Mark End Of File
in.close(); // Close Input Source
return; // End IO Thread
} else {
putChar((byte)c); // Store the byte read
}
}
}
} catch (IOException e) {
synchronized (this) {
IOError = e; // Store Exception
}
return;
} finally {
synchronized (this) {
notifyAll(); // Alert all Threads
}
}
}
private synchronized void putChar(byte c) {
if (reslen < result.length) {
result[reslen++] = c;
notify();
}
}
private synchronized byte getChar() {
byte c = result[0];
System.arraycopy(result, 1, result, 0, --reslen);
return c;
}
private synchronized byte[] getChars(int chars) {
byte c[] = new byte[chars];
System.arraycopy(c, 0, result, 0, chars);
reslen -= chars;
System.arraycopy(result, chars, result, 0, reslen);
return c;
}
}
|