PipedInputStreampublic class PipedInputStream extends InputStream A piped input stream should be connected
to a piped output stream; the piped input
stream then provides whatever data bytes
are written to the piped output stream.
Typically, data is read from a PipedInputStream
object by one thread and data is written
to the corresponding PipedOutputStream
by some other thread. Attempting to use
both objects from a single thread is not
recommended, as it may deadlock the thread.
The piped input stream contains a buffer,
decoupling read operations from write operations,
within limits.
A pipe is said to be broken if a
thread that was providing data bytes to the connected
piped output stream is no longer alive. |
Fields Summary |
---|
boolean | closedByWriter | volatile boolean | closedByReader | boolean | connected | Thread | readSide | Thread | writeSide | private static final int | DEFAULT_PIPE_SIZE | protected static final int | PIPE_SIZEThe default size of the pipe's circular input buffer. | protected byte[] | bufferThe circular buffer into which incoming data is placed. | protected int | inThe index of the position in the circular buffer at which the
next byte of data will be stored when received from the connected
piped output stream. in<0 implies the buffer is empty,
in==out implies the buffer is full | protected int | outThe index of the position in the circular buffer at which the next
byte of data will be read by this piped input stream. |
Constructors Summary |
---|
public PipedInputStream(PipedOutputStream src)Creates a PipedInputStream so
that it is connected to the piped output
stream src . Data bytes written
to src will then be available
as input from this stream.
this(src, DEFAULT_PIPE_SIZE);
| public PipedInputStream(PipedOutputStream src, int pipeSize)Creates a PipedInputStream so that it is
connected to the piped output stream
src and uses the specified pipe size for
the pipe's buffer.
Data bytes written to src will then
be available as input from this stream.
initPipe(pipeSize);
connect(src);
| public PipedInputStream()Creates a PipedInputStream so
that it is not yet {@linkplain #connect(java.io.PipedOutputStream)
connected}.
It must be {@linkplain java.io.PipedOutputStream#connect(
java.io.PipedInputStream) connected} to a
PipedOutputStream before being used.
initPipe(DEFAULT_PIPE_SIZE);
| public PipedInputStream(int pipeSize)Creates a PipedInputStream so that it is not yet
{@linkplain #connect(java.io.PipedOutputStream) connected} and
uses the specified pipe size for the pipe's buffer.
It must be {@linkplain java.io.PipedOutputStream#connect(
java.io.PipedInputStream)
connected} to a PipedOutputStream before being used.
initPipe(pipeSize);
|
Methods Summary |
---|
public synchronized int | available()Returns the number of bytes that can be read from this input
stream without blocking.
if(in < 0)
return 0;
else if(in == out)
return buffer.length;
else if (in > out)
return in - out;
else
return in + buffer.length - out;
| private void | awaitSpace()
while (in == out) {
checkStateForReceive();
/* full: kick any waiting readers */
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
| private void | checkStateForReceive()
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByWriter || closedByReader) {
throw new IOException("Pipe closed");
} else if (readSide != null && !readSide.isAlive()) {
throw new IOException("Read end dead");
}
| public void | close()Closes this piped input stream and releases any system resources
associated with the stream.
closedByReader = true;
synchronized (this) {
in = -1;
}
| public void | connect(java.io.PipedOutputStream src)Causes this piped input stream to be connected
to the piped output stream src .
If this object is already connected to some
other piped output stream, an IOException
is thrown.
If src is an
unconnected piped output stream and snk
is an unconnected piped input stream, they
may be connected by either the call:
snk.connect(src)
or the call:
src.connect(snk)
The two
calls have the same effect.
src.connect(this);
| private void | initPipe(int pipeSize)
if (pipeSize <= 0) {
throw new IllegalArgumentException("Pipe Size <= 0");
}
buffer = new byte[pipeSize];
| public synchronized int | read()Reads the next byte of data from this piped input stream. The
value byte is returned as an int in the range
0 to 255 .
This method blocks until input data is available, the end of the
stream is detected, or an exception is thrown.
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < 0)) {
throw new IOException("Write end dead");
}
readSide = Thread.currentThread();
int trials = 2;
while (in < 0) {
if (closedByWriter) {
/* closed by writer, return EOF */
return -1;
}
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
throw new IOException("Pipe broken");
}
/* might be a writer waiting */
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
int ret = buffer[out++] & 0xFF;
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}
return ret;
| public synchronized int | read(byte[] b, int off, int len)Reads up to len bytes of data from this piped input
stream into an array of bytes. Less than len bytes
will be read if the end of the data stream is reached or if
len exceeds the pipe's buffer size.
If len is zero, then no bytes are read and 0 is returned;
otherwise, the method blocks until at least 1 byte of input is
available, end of the stream has been detected, or an exception is
thrown.
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
/* possibly wait on the first character */
int c = read();
if (c < 0) {
return -1;
}
b[off] = (byte) c;
int rlen = 1;
while ((in >= 0) && (len > 1)) {
int available;
if (in > out) {
available = Math.min((buffer.length - out), (in - out));
} else {
available = buffer.length - out;
}
// A byte is read beforehand outside the loop
if (available > (len - 1)) {
available = len - 1;
}
System.arraycopy(buffer, out, b, off + rlen, available);
out += available;
rlen += available;
len -= available;
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}
}
return rlen;
| protected synchronized void | receive(int b)Receives a byte of data. This method will block if no input is
available.
checkStateForReceive();
writeSide = Thread.currentThread();
if (in == out)
awaitSpace();
if (in < 0) {
in = 0;
out = 0;
}
buffer[in++] = (byte)(b & 0xFF);
if (in >= buffer.length) {
in = 0;
}
| synchronized void | receive(byte[] b, int off, int len)Receives data into an array of bytes. This method will
block until some input is available.
checkStateForReceive();
writeSide = Thread.currentThread();
int bytesToTransfer = len;
while (bytesToTransfer > 0) {
if (in == out)
awaitSpace();
int nextTransferAmount = 0;
if (out < in) {
nextTransferAmount = buffer.length - in;
} else if (in < out) {
if (in == -1) {
in = out = 0;
nextTransferAmount = buffer.length - in;
} else {
nextTransferAmount = out - in;
}
}
if (nextTransferAmount > bytesToTransfer)
nextTransferAmount = bytesToTransfer;
assert(nextTransferAmount > 0);
System.arraycopy(b, off, buffer, in, nextTransferAmount);
bytesToTransfer -= nextTransferAmount;
off += nextTransferAmount;
in += nextTransferAmount;
if (in >= buffer.length) {
in = 0;
}
}
| synchronized void | receivedLast()Notifies all waiting threads that the last byte of data has been
received.
closedByWriter = true;
notifyAll();
|
|