HungryPeerpublic class HungryPeer extends Object implements net.jxta.discovery.DiscoveryListener
Fields Summary |
---|
private net.jxta.peergroup.PeerGroup | netpg | private net.jxta.peergroup.PeerGroup | restoNet | private net.jxta.discovery.DiscoveryService | disco | private net.jxta.pipe.PipeService | pipes | private net.jxta.protocol.PipeAdvertisement | myAdv | private net.jxta.pipe.InputPipe | myPipe | private net.jxta.document.MimeMediaType | mimeType | private int | timeout | private int | rtimeout | private Vector | restoPeerAdvs | private Vector | restoPeerPipes | private String | myIdentity | private String | friesRequest |
Methods Summary |
---|
private void | connectAndSend(net.jxta.protocol.PipeAdvertisement padv)
System.out.println("Attempting to connect to discovered RestoPeer");
// Create an output pipe connection to the RestoPeer
OutputPipe op = pipes.createOutputPipe(padv, rtimeout);
// Check if we have a connected pipe
if (op == null) {
// Failed; go to next RestoPeer
System.out.println("Failure to connect to RestoPeer Pipe:" +
padv.getName());
return;
}
StructuredDocument request =
StructuredDocumentFactory.newStructuredDocument(mimeType,
"RestoNet:Request");
// Fill up the Fries auction request argument
Element re;
re = request.createElement("Name", myIdentity);
request.appendChild(re);
re = request.createElement("Fries", friesRequest);
request.appendChild(re);
// Create the pipe message to send
Message msg = pipes.createMessage();
// Fill the first message element which is the HungryPeer
// pipe advertisement return address. We need this
// so RestoPeers can respond to us
msg.addElement(msg.newMessageElement("HungryPeerPipe",
mimeType, myAdv.getDocument(mimeType).getStream()));
// Fill the second message element, which is
// the fries request. Insert the document
// in the message
msg.addElement(msg.newMessageElement("Request",
mimeType, request.getStream()));
// Send the auction message to the RestoPeer
op.send(msg);
System.out.println("Sent Fries Auction Request ("
+ friesRequest + ") to connected peers");
| public void | discoveryEvent(net.jxta.discovery.DiscoveryEvent ev)
System.out.println("Processing discovery event");
DiscoveryResponseMsg msg = ev.getResponse();
// The enumeration contains all the pipe advertisements that
// were found.
Enumeration e = msg.getResponses();
while (e.hasMoreElements()) {
try {
String s = (String) e.nextElement();
PipeAdvertisement adv = (PipeAdvertisement)
AdvertisementFactory.newAdvertisement(
new MimeMediaType("text/xml"),
new ByteArrayInputStream(s.getBytes()));
connectAndSend(adv);
} catch (Exception ex) {
System.out.println("Can't connect to peer " + ex);
continue;
}
}
| private boolean | joinRestoNet()
int count = 3; // maximum number of attempts to discover
System.out.println("Attempting to discover the RestoNet Peergroup");
// Get the Discovery service handle from the NetPeerGroup
DiscoveryService hdisco = netpg.getDiscoveryService();
// All discovered RestoNet Peers
Enumeration ae = null;
// Loop until we find the "RestoNet" Peergroup advertisement
// or we've exhausted the desired number of attempts
while (count-- > 0) {
try {
// Check if we have the advertisement in the local
// peer cache
ae = hdisco.getLocalAdvertisements(DiscoveryService.GROUP,
"Name", "RestoNet");
// If we found the RestoNet advertisement, we are done
if ((ae != null) && ae.hasMoreElements())
break;
// The RestoNet advertisement is not in the local
// cache . Send a discovery request to search for it.
hdisco.getRemoteAdvertisements(null,
DiscoveryService.GROUP, "Name", "RestoNet", 1, null);
// Wait to give peers a chance to respond
try {
Thread.sleep(timeout);
} catch (InterruptedException ie) {}
} catch (IOException e) {
// Found nothing! Move on.
}
}
// Check if we found the RestoNet advertisement
if (ae == null || !ae.hasMoreElements()) {
return false;
}
System.out.println("Found the RestoNet PeerGroup Advertisement");
// Get the advertisement
PeerGroupAdvertisement adv =
(PeerGroupAdvertisement) ae.nextElement();
try {
// Call the PeerGroup Factory to instantiate a new
// peergroup instance
restoNet = netpg.newGroup(adv);
// Get the Discovery and Pipe services to
// be used within the RestoNet Peergroup
disco = restoNet.getDiscoveryService();
pipes = restoNet.getPipeService();
} catch (Exception e) {
System.out.println("Could not create RestoPeerGroup");
return false;
}
System.out.println("The HungryPeer joined the restoNet PeerGroup");
return true;
| public static void | main(java.lang.String[] args) // Fries Auction request
HungryPeer myapp = new HungryPeer();
myapp.startJxta();
System.exit(0);
| private void | receiveFriesBids()
// Continue until we get all answers
while (true) {
Message msg = null; // Pipe message received
String price = null; // Fries price bid
String brand = null; // RestoPeer name which offers the bid
String specials = null; // Specials offer bid
InputStream ip = null; // Input stream to read message element
StructuredDocument bid = null; //Bid document received
try {
// Wait for a bid message to arrive from a RestoPeer
// Will block until a message arrive
msg = myPipe.waitForMessage();
// Check if the message is valid
if (msg == null) {
if (Thread.interrupted()) {
// We have been asked to stop
System.out.println(
"Abort Receiving bid loop interrupted");
myPipe.close(); // Close the Pipe
return;
}
}
} catch (Exception ex) {
// Error in receiving message
myPipe.close();
System.out.println("Abort Receiving Error receiving bids");
return;
}
// We got a message from a RestoPeer.
// Extract and display infomation about the bid received.
try {
// Extract the Bid document from the message
ip = msg.getElement("Bid").getStream();
bid = StructuredDocumentFactory.newStructuredDocument(
mimeType, ip);
// Parse the document to extract bid information
Enumeration enum = bid.getChildren();
while (enum.hasMoreElements()) {
Element element = (Element) enum.nextElement();
String attr = (String) element.getKey();
String value = (String) element.getValue();
if (attr.equals("Price")) {
price = value;
continue;
}
if (attr.equals("Brand")) {
brand = value;
continue;
}
if (attr.equals("Specials")) {
specials = value;
continue;
}
}
// We got a valid bid. Print it.
System.out.println("Received Fries Bid from RestoPeers (" +
brand + ") at a Price ($" + price +
") \nRestoPeers Special (" + specials + ")");
} catch (Exception e) {
// Broken content
System.out.println("Error extracting bid from the message");
continue;
}
}
| private boolean | setHungryPeerPipe()
try {
// Create a pipe advertisement for our hungry peer. This
// pipe will be used within the RestoNet peergroup for other
// peers to talk to our hungry peer
myAdv = (PipeAdvertisement)
AdvertisementFactory.newAdvertisement(
PipeAdvertisement.getAdvertisementType());
// Initialize the advertisement with unique peer information
// So we can communicate
myAdv.setPipeID(IDFactory.newPipeID(restoNet.getPeerGroupID()));
myAdv.setName("restoNet:HungryPipe:" + myIdentity);
// Set the pipe type to be unicast unidrectional
myAdv.setType(PipeService.UnicastType);
// Create the input pipe
myPipe = pipes.createInputPipe(myAdv);
} catch (Exception e) {
System.out.println("Could not create the HungryPeer pipe");
return false;
}
return true;
| private void | startJxta()
try {
// Discover (or create) and join the default jxta NetPeerGroup
netpg = PeerGroupFactory.newNetPeerGroup();
} catch (PeerGroupException e) {
//Couldn't initialize; can't continue
System.out.println("Fatal error : creating the NetPeerGroup");
System.exit(1);
}
// Discover and join the RestoNet Peergroup
try {
if (!joinRestoNet()) {
System.out.println("Sorry could not find the RestoNet Peergroup");
System.exit(2);
}
} catch (Exception e) {
System.out.println("Can't join RestoNet group");
System.exit(1);
}
// Set our HungryPeer communication pipe so RestoPeers
// can talk to us
if (!setHungryPeerPipe()) {
System.out.println(
"Aborting due to failure to create our HungryPeer pipe");
System.exit(1);
}
// Register for discovery events for pipe advertisements
disco.addDiscoveryListener(this);
disco.getRemoteAdvertisements(null,
DiscoveryService.ADV,
"name", "RestoNet:RestoPipe:*", 5, null);
// Wait; processing events as they happen
while(true) {
receiveFriesBids();
}
|
|