FileDocCategorySizeDatePackage
DistThreadGroup.javaAPI DocExample4254Sat Jan 31 22:56:22 GMT 1998dcj.utils.Thread

DistThreadGroup.java

package dcj.utils.Thread;

import java.lang.*;
import java.util.*;
import java.net.*;
import java.io.*;

/**
 * Source code from "Java Distributed Computing", by Jim Farley.
 *
 * Class: DistThreadGroup
 * Example: 4-4
 * Description: A container for a set of threads distributed across the
 *      network.
 */

public class DistThreadGroup extends Thread {
  // Protected instance variables
  protected ThreadGroup  localGroup;
  protected Hashtable    remoteGroups;
  protected ServerSocket incoming;
  protected int          localPort;

  // Class variables
  static final int       hostIdx = 0;
  static final int       portIdx = 1;

  // Public constructors

  public DistThreadGroup(ThreadGroup g, int port) {
    localGroup = g;
    localPort = port;
  }

  public DistThreadGroup(int port) {
    localGroup = new ThreadGroup("local:" + port);
    localPort = port;
  }

  public DistThreadGroup(String rHost, int rPort, String gname, int port) {
    localGroup = new ThreadGroup("local:" + port);
    localPort = port;
    Add(gname, rHost, rPort);
  }

  // Add a remote thread group to this group
  public void Add(String gname, String host, int port) {
    RmtThreadGroup rg = new RmtThreadGroup(host, port);
    remoteGroups.put(gname, rg);
  }

  // Remove a thread group from this group
  public void Remove(String gname) {
    remoteGroups.remove(gname);
  }

  // Get the local thread group belonging to this distributed group
  public ThreadGroup GetLocalGroup() {
    return localGroup;
  }

  // Implementation of Thread::run - checks its port on the current machine
  // waiting for messages from remote members of this group.
  public void run() {
    try {
      incoming = new ServerSocket(localPort);
    }
    catch (IOException ioe) {
      System.out.println("Failed to bind to port " + localPort);
      return;
    }
    while (true) {
      try {
        Socket peer = incoming.accept();
        DataInputStream is = new DataInputStream(peer.getInputStream());
        String input = is.readUTF();
        if (input.compareTo("suspend") == 0)
          suspend();
        else if (input.compareTo("resume") == 0)
          resume();
        //
        // Check for other messages here ("stop", "start", etc.)
        //                            .
        //                            .
        //                            .
        else {
          System.out.println("DistThreadGroup: Received unknown command \""
                             + input + "\"");
        }
      }
      catch (IOException ioe) {
        System.out.println("Network exception reading message");
      }
    }
  }

  // Suspend the group of threads.  If requested, the suspend
  // command is sent to the remote threads first, then the local group
  // suspended.
  public synchronized void suspend(boolean bcast) {
    if (bcast)
      broadcastCmd("suspend");

    if (localGroup != null)
      localGroup.suspend();
  }

  // Resume the group of threads.  If requested, the resume
  // command is sent to the remote threads first, then the
  // local group is resumed.
  public synchronized void resume(boolean bcast) {
    if (bcast)
      broadcastCmd("resume");

    if (localGroup != null)
      localGroup.resume();
  }

  //
  //  Implement other methods corresponding to ThreadGroup methods here
  //  (e.g. resume(), stop())
  //                               .
  //                               .
  //                               .
  //

  // Broadcast the given message to the remote thread groups.
  protected void broadcastCmd(String cmd) {
    Enumeration e = remoteGroups.elements();
    while (e.hasMoreElements()) {
      RmtThreadGroup rg = (RmtThreadGroup)e.nextElement();
      try {
        Socket s = new Socket(rg.getHost(), rg.getPort());
        DataOutputStream os = new DataOutputStream(s.getOutputStream());
        os.writeUTF(cmd);
      }
      catch (Exception ex) {
        System.out.println("DistThreadGroup: Failed to " + cmd +
                           " group at \"" + rg.getHost() + ":"
                           + rg.getPort());
      }
    }
  }
}