FileDocCategorySizeDatePackage
MessageHandler.javaAPI DocExample5852Tue Jan 20 22:20:44 GMT 1998dcj.util.Collaborative

MessageHandler.java

package dcj.util.Collaborative;

import java.util.Vector;
import java.util.Hashtable;
import java.util.Enumeration;
import java.net.SocketException;
import java.io.*;

/**
 * Source code from "Java Distributed Computing", by Jim Farley.
 *
 * Classes: MessageHandler, AgentConnection, AgentHandler
 * Example: 9-1
 * Description: A new version of message handler that handles each remote client
 *      with a separate thread.
 */

class AgentConnection {
  public AgentConnection(InputStream i, OutputStream o) {
    in = i;
    out = o;
  }
  public InputStream in;
  public OutputStream out;
}

class AgentHandler implements Runnable {
  int peerId;
  MessageHandler handler;
  public AgentHandler(int id, MessageHandler h) {
    peerId = id;
    handler = h;
  }

  public void run() {
    System.out.println("ph: Starting peer handler for peer " + peerId);
    while (true) {
      try {
        Message m = handler.readMsg(peerId);
        System.out.println("ph: Got a message from peer " + peerId);
        if (m != null)
          m.Do();
      }
      catch (IOException e) {}
    }
  }
}

public class MessageHandler implements Runnable
{
  // A global MessageHandler, for applications where one central
  // handler is used.
  public static MessageHandler current = null;

  Hashtable connections = new Hashtable();
  Hashtable handlers = new Hashtable();
  Vector msgPrototypes = new Vector();

  public MessageHandler() {}
  public MessageHandler(InputStream in, OutputStream out) {
    addAgent(0, in, out);
  }

  synchronized public int nextAgentId() {
    return connections.size();
  }

  synchronized public Vector getAgentIds() {
    Vector ids = new Vector();
    Enumeration e = connections.keys();
    while (e.hasMoreElements()) {
      ids.addElement((Integer)e.nextElement());
    }
    return ids;
  }

  synchronized public int addAgent(InputStream i, OutputStream o) {
    int nextId = nextAgentId();
    addAgent(nextId, i, o);
    return nextId;
  }

  synchronized public void addAgent(int id, InputStream i, OutputStream o) {
    connections.put(new Integer(id), new AgentConnection(i, o));
    AgentHandler ph = new AgentHandler(id, this);
    Thread phThread = new Thread(ph);
    phThread.start();
    handlers.put(new Integer(id), phThread);
  }

  synchronized public boolean removeAgent(int id) {
    boolean success = false;
    Thread hthread = (Thread)handlers.remove(new Integer(id));
    if (hthread != null && connections.remove(new Integer(id)) != null) {
      hthread.stop();
      success = true;
    }

    return success;
  }

  synchronized protected AgentConnection getAgent(int id) {
    return (AgentConnection)connections.get(new Integer(id));
  }

  public void addMessageType(Message prototype) {
    synchronized (msgPrototypes) {
      msgPrototypes.addElement(prototype);
    }
  }

  public Message readMsg(int id) throws IOException {
    Message msg = null;

    AgentConnection conn = getAgent(id);
    if (conn != null) {
      try {
        synchronized (conn.in) {
          DataInputStream din = new DataInputStream(conn.in);
          String msgId = din.readUTF();
          System.out.println("mh: Got message id " + msgId);
          msg = buildMessage(msgId);
          if (msg != null) {
            msg.readArgs(conn.in);
          }
          System.out.println("mh: Received complete message" + msg + ".");
        }
      }
      catch (SocketException s) {
        System.out.println("mm: Lost connection to peer " + id);
        removeAgent(id);
        msg = null;
      }
      catch (Exception e) {
        System.out.println("mm: Error reading message:");
        e.printStackTrace();
        msg = null;
      }
    }

    return msg;
  }

  // Send a message to a specific agent.
  public boolean sendMsg(Message msg, int id) throws IOException {
    boolean success = false;
    AgentConnection conn = getAgent(id);
    if (conn != null) {
      System.out.println("mh: Trying to lock on peer " + id);
      try {
        synchronized (conn.out) {
          System.out.println("mh: Got lock on peer " + id);
          DataOutputStream dout = new DataOutputStream(conn.out);
          System.out.println("mh: Printing message id...");
          dout.writeUTF(msg.messageID());
          System.out.println("mh: Printing message args...");
          msg.writeArgs(conn.out);
          success = true;
        }
      }
      catch (SocketException s) {
        System.out.println("mh: Lost connection to peer " + id);
        removeAgent(id);
        success = false;
      }
      catch (Exception e) {
        success = false;
      }
    }
    return success;
  }

  // Broadcast a message to all connected agents.
  public boolean sendMsg(Message msg) throws IOException {
    Enumeration ids = connections.keys();
    boolean success = true;
    while (ids.hasMoreElements()) {
      Integer id = (Integer)ids.nextElement();
      System.out.println("mh: Attempting send to peer " + id.intValue());
      if (!sendMsg(msg, id.intValue()))
        success = false;
      else
        System.out.println("mh: Sent message to peer " + id.intValue());
    }
    return success;
  }

  // Default run() method does nothing...
  public void run() {}

  protected Message buildMessage(String msgId) {
    Message msg = null;
    int numMTypes = msgPrototypes.size();
    for (int i = 0; i < numMTypes; i++) {
      Message m = null;
      synchronized (msgPrototypes) {
        m = (Message)msgPrototypes.elementAt(i);
      }
      if (m.handles(msgId)) {
        msg = m.newCopy();
        msg.setId(msgId);
        break;
      }
    }
    return msg;
  }
}