package jdc.patterns.peer;
import jdc.util.*;
import java.util.Vector;
import java.util.Enumeration;
import java.util.Properties;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.InetAddress;
/**
* A peer which connects to other peers using IP sockets. The connection
* properties required are a host name and port number. Since these uniquely
* identify a peer on an IP network, they are used as the name and id
* properties on the peer's identity.
*
*/
public class SocketPeer extends Peer implements Runnable {
/**
* Standard connection property names, stored in the remote Peer's Identity
*/
public static final String HOST_PROP = "HOST";
public static final String PORT_PROP = "PORT";
public static final String SOCKET_PROP = "SOCKET";
public static final String PEER_IN = "PEER_IN";
public static final String PEER_OUT = "PEER_OUT";
public static final String POLLER = "POLLER";
/**
* A helper class that serves to poll a socket connected to a remote peer,
* listening for messages, and passing them to the local Peer's handle()
* method.
*/
class MsgPoller extends Thread {
private Socket mSocket = null;
private Peer mPeer = null;
public MsgPoller(Peer p) throws IllegalArgumentException {
mSocket = (Socket)p.getIdentity().getProperty(SOCKET_PROP);
mPeer = p;
}
public void run() {
if (mSocket != null && mPeer != null) {
ObjectInputStream oin = null;
try {
InputStream in = mSocket.getInputStream();
oin = new ObjectInputStream(in);
}
catch (Exception e) {
System.out.println("Failed to init poller.");
return;
}
while (true) {
try {
Object obj = oin.readObject();
Message msg = (Message)obj;
mPeer.handle(msg);
}
catch (Exception e) {
}
}
}
}
}
/**
* Server socket used to listen for remote peers attempting to connect.
*/
protected ServerSocket mSocket = null;
/**
* Create a local socket peer listening on the given port. Giving a port
* number of zero causes a random free port to be chosen.
*/
public SocketPeer(int port) throws IOException {
mSocket = new ServerSocket(port);
int assnPort = mSocket.getLocalPort();
mIdentity = new Identity(assnPort);
mIdentity.setName(InetAddress.getLocalHost().getHostName());
}
/**
* Internal constructor, used to create a "degenerate" SocketPeer, used
* locally to represent remote peers and to hold their connection
* properties within their identities.
*/
protected SocketPeer() {}
/**
* Connect to a peer given the connection properties. The SocketPeer looks
* for a "host" and "port" property in the property list, and attempts a
* Socket connection to the remote agent.
*/
public boolean connect(Properties connProps) {
boolean success = false;
String host = (String)connProps.get(HOST_PROP);
Integer port = (Integer)connProps.get(PORT_PROP);
if (host != null && port != null) {
try {
Socket peerConn = new Socket(host, port.intValue());
SocketPeer peer = newPeer(peerConn);
addPeer(peer);
success = true;
}
catch (Exception e) {
System.out.println("Error connecting to peer " + host + ":" + port);
}
}
return success;
}
/**
* Connect to a peer given the connection properties and identity of the
* peer. Subclasses can choose to implement this using both arguments, or
* ignoring one in favor of the other (connection properties vs. properties
* pulled from the remote peer identifier).
*/
public boolean connect(Properties connProps, Identity peerID) {
// If necessary, copy the critical properties from the identity of the
// remote peer to the connection properties.
if (peerID.getName() != null && connProps.getProperty(HOST_PROP) == null) {
connProps.put(HOST_PROP, peerID.getName());
}
if (peerID.getId() > 0 && connProps.getProperty(PORT_PROP) == null) {
connProps.put(PORT_PROP, new Integer(peerID.getId()));
}
return connect(connProps);
}
/**
* Send the message to the identified peer.
*/
public boolean send(Message msg, Identity peerID) {
boolean success = false;
try {
OutputStream out = (OutputStream)peerID.getProperty(PEER_OUT);
ObjectOutputStream oout = new ObjectOutputStream(out);
oout.writeObject(msg);
success = true;
}
catch (Exception e) {
System.out.println("Failed to send message to: " + peerID.getName() +
", " + peerID.getId());
e.printStackTrace();
}
return success;
}
/**
* Handle incoming message.
*/
public void handle(Message m) {
System.out.println(getIdentity().getName() + "(" + getIdentity().getId() +
") received message " + m);
}
/**
* Add a new Peer. In addition to the default Peer behavior, also create
* a MsgPoller for the new Peer, to listen for incoming messages.
*/
public void addPeer(Peer p) {
super.addPeer(p);
try {
MsgPoller poller = new MsgPoller(p);
p.getIdentity().setProperty(POLLER, poller);
poller.start();
}
catch (Exception e) {
System.out.println("Failed to attach poller to " +
p.getIdentity().getName() + " " +
p.getIdentity().getId());
e.printStackTrace();
}
}
/**
* Initialize a new SocketPeer with the expected properties.
*/
static protected SocketPeer newPeer(Socket s) {
SocketPeer peer = null;
try {
// Make an identity for the new peer, using the remote port as the ID
Identity i = new Identity(s.getPort());
// Use the remote host name as the peer's name
i.setName(s.getInetAddress().getHostName());
// Get the in and out streams from the socket
InputStream in = s.getInputStream();
OutputStream out = s.getOutputStream();
// Store the socket and the streams in the identity's property list
i.setProperty(SOCKET_PROP, s);
i.setProperty(PEER_IN, in);
i.setProperty(PEER_OUT, out);
// Make the new peer and set its identity
peer = new SocketPeer();
peer.setIdentity(i);
}
catch (Exception e) {
System.out.println("Failed to initialize peer:");
e.printStackTrace();
}
return peer;
}
/**
* When run within a thread, just listen to server socket for remote peer
* connect attempts, and make new peers out of them.
*/
public void run() {
while (true) {
try {
Socket s = mSocket.accept();
addPeer(newPeer(s));
}
catch (Exception e) {
System.out.println("Error accepting socket connection.");
e.printStackTrace();
}
}
}
}
|