package dcj.utils.Thread;
import java.lang.*;
import java.util.*;
import java.net.*;
import java.io.*;
/**
* Source code from "Java Distributed Computing", by Jim Farley.
*
* Class: DistThreadGroup
* Example: 4-4
* Description: A container for a set of threads distributed across the
* network.
*/
public class DistThreadGroup extends Thread {
// Protected instance variables
protected ThreadGroup localGroup;
protected Hashtable remoteGroups;
protected ServerSocket incoming;
protected int localPort;
// Class variables
static final int hostIdx = 0;
static final int portIdx = 1;
// Public constructors
public DistThreadGroup(ThreadGroup g, int port) {
localGroup = g;
localPort = port;
}
public DistThreadGroup(int port) {
localGroup = new ThreadGroup("local:" + port);
localPort = port;
}
public DistThreadGroup(String rHost, int rPort, String gname, int port) {
localGroup = new ThreadGroup("local:" + port);
localPort = port;
Add(gname, rHost, rPort);
}
// Add a remote thread group to this group
public void Add(String gname, String host, int port) {
RmtThreadGroup rg = new RmtThreadGroup(host, port);
remoteGroups.put(gname, rg);
}
// Remove a thread group from this group
public void Remove(String gname) {
remoteGroups.remove(gname);
}
// Get the local thread group belonging to this distributed group
public ThreadGroup GetLocalGroup() {
return localGroup;
}
// Implementation of Thread::run - checks its port on the current machine
// waiting for messages from remote members of this group.
public void run() {
try {
incoming = new ServerSocket(localPort);
}
catch (IOException ioe) {
System.out.println("Failed to bind to port " + localPort);
return;
}
while (true) {
try {
Socket peer = incoming.accept();
DataInputStream is = new DataInputStream(peer.getInputStream());
String input = is.readUTF();
if (input.compareTo("suspend") == 0)
suspend();
else if (input.compareTo("resume") == 0)
resume();
//
// Check for other messages here ("stop", "start", etc.)
// .
// .
// .
else {
System.out.println("DistThreadGroup: Received unknown command \""
+ input + "\"");
}
}
catch (IOException ioe) {
System.out.println("Network exception reading message");
}
}
}
// Suspend the group of threads. If requested, the suspend
// command is sent to the remote threads first, then the local group
// suspended.
public synchronized void suspend(boolean bcast) {
if (bcast)
broadcastCmd("suspend");
if (localGroup != null)
localGroup.suspend();
}
// Resume the group of threads. If requested, the resume
// command is sent to the remote threads first, then the
// local group is resumed.
public synchronized void resume(boolean bcast) {
if (bcast)
broadcastCmd("resume");
if (localGroup != null)
localGroup.resume();
}
//
// Implement other methods corresponding to ThreadGroup methods here
// (e.g. resume(), stop())
// .
// .
// .
//
// Broadcast the given message to the remote thread groups.
protected void broadcastCmd(String cmd) {
Enumeration e = remoteGroups.elements();
while (e.hasMoreElements()) {
RmtThreadGroup rg = (RmtThreadGroup)e.nextElement();
try {
Socket s = new Socket(rg.getHost(), rg.getPort());
DataOutputStream os = new DataOutputStream(s.getOutputStream());
os.writeUTF(cmd);
}
catch (Exception ex) {
System.out.println("DistThreadGroup: Failed to " + cmd +
" group at \"" + rg.getHost() + ":"
+ rg.getPort());
}
}
}
} |