HttpDownloadManagerpublic class HttpDownloadManager extends Thread This class manages asynchonous HTTP GET downloads and demonstrates
non-blocking I/O with SocketChannel and Selector and also demonstrates
logging with the java.util.logging package. This example uses a number
of inner classes and interfaces.
Call download() for each HTTP GET request you want to issue. You may
optionally pass a Listener object that will be notified when the download
terminates or encounters an exception. download() returns a Download object
which holds the downloaded bytes (including HTTP headers) and which allows
you to poll the Status of the download. Call release() when there are
no more downloads. |
Fields Summary |
---|
Selector | selector | ByteBuffer | buffer | List | pendingDownloads | boolean | released | Logger | log | static final Charset | LATIN1 |
Constructors Summary |
---|
public HttpDownloadManager(Logger log)
if (log == null) log = Logger.getLogger(this.getClass().getName());
this.log = log;
selector = Selector.open(); // create Selector
buffer = ByteBuffer.allocateDirect(64*1024); // allocate buffer
pendingDownloads = Collections.synchronizedList(new ArrayList());
this.start(); // start thread
|
Methods Summary |
---|
public je3.nio.HttpDownloadManager$Download | download(java.net.URI uri, je3.nio.HttpDownloadManager$Listener l)
if (released)
throw new IllegalStateException("Can't download() after release()");
// Get info from the URI
String scheme = uri.getScheme();
if (scheme == null || !scheme.equals("http"))
throw new IllegalArgumentException("Must use 'http:' protocol");
String hostname = uri.getHost();
int port = uri.getPort();
if (port == -1) port = 80; // Use default port if none specified
String path = uri.getRawPath();
if (path == null || path.length() == 0) path = "/";
String query = uri.getRawQuery();
if (query != null) path += "?" + query;
// Create a Download object with the pieces of the URL
Download download = new DownloadImpl(hostname, port, path, l);
// Add it to the list of pending downloads. This is a synchronized list
pendingDownloads.add(download);
// And ask the thread to stop blocking in the select() call so that
// it will notice and process this new pending Download object.
selector.wakeup();
// Return the Download so that the caller can monitor it if desired.
return download;
| void | handleError(je3.nio.HttpDownloadManager$DownloadImpl download, java.nio.channels.SocketChannel channel, java.nio.channels.SelectionKey key, java.lang.Throwable throwable)
download.status = Status.ERROR;
try {if (channel != null) channel.close();} catch(IOException e) {}
if (key != null) key.cancel();
log.log(Level.WARNING,
"Error connecting to or downloading from " + download.host +
":" + download.port,
throwable);
if (download.listener != null)
download.listener.error(download, throwable);
| public void | release()
released = true; // The thread will terminate when it notices the flag.
try { selector.close(); } // This will wake the thread up
catch(IOException e) {
log.log(Level.SEVERE, "Error closing selector", e);
}
| public void | run()
log.info("HttpDownloadManager thread starting.");
// The download thread runs until release() is called
while(!released) {
// The thread blocks here waiting for something to happen
try { selector.select(); }
catch(IOException e) {
// This should never happen.
log.log(Level.SEVERE, "Error in select()", e);
return;
}
// If release() was called, the thread should exit.
if (released) break;
// If any new Download objects are pending, deal with them first
if (!pendingDownloads.isEmpty()) {
// Although pendingDownloads is a synchronized list, we still
// need to use a synchronized block to iterate through its
// elements to prevent a concurrent call to download().
synchronized(pendingDownloads) {
Iterator iter = pendingDownloads.iterator();
while(iter.hasNext()) {
// Get the pending download object from the list
DownloadImpl download = (DownloadImpl)iter.next();
iter.remove(); // And remove it.
// Now begin an asynchronous connection to the
// specified host and port. We don't block while
// waiting to connect.
SelectionKey key = null;
SocketChannel channel = null;
try {
// Open an unconnected channel
channel = SocketChannel.open();
// Put it in non-blocking mode
channel.configureBlocking(false);
// Register it with the selector, specifying that
// we want to know when it is ready to connect
// and when it is ready to read.
key = channel.register(selector,
SelectionKey.OP_READ |
SelectionKey.OP_CONNECT,
download);
// Create the web server address
SocketAddress address =
new InetSocketAddress(download.host,
download.port);
// Ask the channel to start connecting
// Note that we don't send the HTTP request yet.
// We'll do that when the connection completes.
channel.connect(address);
}
catch(Exception e) {
handleError(download, channel, key, e);
}
}
}
}
// Now get the set of keys that are ready for connecting or reading
Set keys = selector.selectedKeys();
if (keys == null) continue; // bug workaround; should not be needed
// Loop through the keys in the set
for(Iterator i = keys.iterator(); i.hasNext(); ) {
SelectionKey key = (SelectionKey)i.next();
i.remove(); // Remove the key from the set before handling
// Get the Download object we attached to the key
DownloadImpl download = (DownloadImpl) key.attachment();
// Get the channel associated with the key.
SocketChannel channel = (SocketChannel)key.channel();
try {
if (key.isConnectable()) {
// If the channel is ready to connect, complete the
// connection and then send the HTTP GET request to it.
if (channel.finishConnect()) {
download.status = Status.CONNECTED;
// This is the HTTP request we wend
String request =
"GET " + download.path + " HTTP/1.1\r\n" +
"Host: " + download.host + "\r\n" +
"Connection: close\r\n" +
"\r\n";
// Wrap in a CharBuffer and encode to a ByteBuffer
ByteBuffer requestBytes =
LATIN1.encode(CharBuffer.wrap(request));
// Send the request to the server. If the bytes
// aren't all written in one call, we busy loop!
while(requestBytes.hasRemaining())
channel.write(requestBytes);
log.info("Sent HTTP request: " + download.host +
":" + download.port + ": " + request);
}
}
if (key.isReadable()) {
// If the key indicates that there is data to be read,
// then read it and store it in the Download object.
int numbytes = channel.read(buffer);
// If we read some bytes, store them, otherwise
// the download is complete and we need to note this
if (numbytes != -1) {
buffer.flip(); // Prepare to drain the buffer
download.addData(buffer); // Store the data
buffer.clear(); // Prepare for another read
log.info("Read " + numbytes + " bytes from " +
download.host + ":" + download.port);
}
else {
// If there are no more bytes to read
key.cancel(); // We're done with the key
channel.close(); // And with the channel.
download.status = Status.DONE;
if (download.listener != null) // notify listener
download.listener.done(download);
log.info("Download complete from " +
download.host + ":" + download.port);
}
}
}
catch (Exception e) {
handleError(download, channel, key, e);
}
}
}
log.info("HttpDownloadManager thread exiting.");
|
|