package dcj.util.Collaborative;
import java.util.Vector;
import java.util.Hashtable;
import java.util.Enumeration;
import java.net.SocketException;
import java.io.*;
/**
* Source code from "Java Distributed Computing", by Jim Farley.
*
* Classes: MessageHandler, AgentConnection, AgentHandler
* Example: 9-1
* Description: A new version of message handler that handles each remote client
* with a separate thread.
*/
class AgentConnection {
public AgentConnection(InputStream i, OutputStream o) {
in = i;
out = o;
}
public InputStream in;
public OutputStream out;
}
class AgentHandler implements Runnable {
int peerId;
MessageHandler handler;
public AgentHandler(int id, MessageHandler h) {
peerId = id;
handler = h;
}
public void run() {
System.out.println("ph: Starting peer handler for peer " + peerId);
while (true) {
try {
Message m = handler.readMsg(peerId);
System.out.println("ph: Got a message from peer " + peerId);
if (m != null)
m.Do();
}
catch (IOException e) {}
}
}
}
public class MessageHandler implements Runnable
{
// A global MessageHandler, for applications where one central
// handler is used.
public static MessageHandler current = null;
Hashtable connections = new Hashtable();
Hashtable handlers = new Hashtable();
Vector msgPrototypes = new Vector();
public MessageHandler() {}
public MessageHandler(InputStream in, OutputStream out) {
addAgent(0, in, out);
}
synchronized public int nextAgentId() {
return connections.size();
}
synchronized public Vector getAgentIds() {
Vector ids = new Vector();
Enumeration e = connections.keys();
while (e.hasMoreElements()) {
ids.addElement((Integer)e.nextElement());
}
return ids;
}
synchronized public int addAgent(InputStream i, OutputStream o) {
int nextId = nextAgentId();
addAgent(nextId, i, o);
return nextId;
}
synchronized public void addAgent(int id, InputStream i, OutputStream o) {
connections.put(new Integer(id), new AgentConnection(i, o));
AgentHandler ph = new AgentHandler(id, this);
Thread phThread = new Thread(ph);
phThread.start();
handlers.put(new Integer(id), phThread);
}
synchronized public boolean removeAgent(int id) {
boolean success = false;
Thread hthread = (Thread)handlers.remove(new Integer(id));
if (hthread != null && connections.remove(new Integer(id)) != null) {
hthread.stop();
success = true;
}
return success;
}
synchronized protected AgentConnection getAgent(int id) {
return (AgentConnection)connections.get(new Integer(id));
}
public void addMessageType(Message prototype) {
synchronized (msgPrototypes) {
msgPrototypes.addElement(prototype);
}
}
public Message readMsg(int id) throws IOException {
Message msg = null;
AgentConnection conn = getAgent(id);
if (conn != null) {
try {
synchronized (conn.in) {
DataInputStream din = new DataInputStream(conn.in);
String msgId = din.readUTF();
System.out.println("mh: Got message id " + msgId);
msg = buildMessage(msgId);
if (msg != null) {
msg.readArgs(conn.in);
}
System.out.println("mh: Received complete message" + msg + ".");
}
}
catch (SocketException s) {
System.out.println("mm: Lost connection to peer " + id);
removeAgent(id);
msg = null;
}
catch (Exception e) {
System.out.println("mm: Error reading message:");
e.printStackTrace();
msg = null;
}
}
return msg;
}
// Send a message to a specific agent.
public boolean sendMsg(Message msg, int id) throws IOException {
boolean success = false;
AgentConnection conn = getAgent(id);
if (conn != null) {
System.out.println("mh: Trying to lock on peer " + id);
try {
synchronized (conn.out) {
System.out.println("mh: Got lock on peer " + id);
DataOutputStream dout = new DataOutputStream(conn.out);
System.out.println("mh: Printing message id...");
dout.writeUTF(msg.messageID());
System.out.println("mh: Printing message args...");
msg.writeArgs(conn.out);
success = true;
}
}
catch (SocketException s) {
System.out.println("mh: Lost connection to peer " + id);
removeAgent(id);
success = false;
}
catch (Exception e) {
success = false;
}
}
return success;
}
// Broadcast a message to all connected agents.
public boolean sendMsg(Message msg) throws IOException {
Enumeration ids = connections.keys();
boolean success = true;
while (ids.hasMoreElements()) {
Integer id = (Integer)ids.nextElement();
System.out.println("mh: Attempting send to peer " + id.intValue());
if (!sendMsg(msg, id.intValue()))
success = false;
else
System.out.println("mh: Sent message to peer " + id.intValue());
}
return success;
}
// Default run() method does nothing...
public void run() {}
protected Message buildMessage(String msgId) {
Message msg = null;
int numMTypes = msgPrototypes.size();
for (int i = 0; i < numMTypes; i++) {
Message m = null;
synchronized (msgPrototypes) {
m = (Message)msgPrototypes.elementAt(i);
}
if (m.handles(msgId)) {
msg = m.newCopy();
msg.setId(msgId);
break;
}
}
return msg;
}
}
|