/*
*
* Copyright (c) 2000 Scott Oaks and Henry Wong. All Rights Reserved.
*
* Permission to use, copy, modify, and distribute this software
* and its documentation for NON-COMMERCIAL purposes and
* without fee is hereby granted.
*
* This sample source code is provided for example only,
* on an unsupported, as-is basis.
*
* AUTHOR MAKES NO REPRESENTATIONS OR WARRANTIES ABOUT THE SUITABILITY OF
* THE SOFTWARE, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
* TO THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
* PARTICULAR PURPOSE, OR NON-INFRINGEMENT. AUTHOR SHALL NOT BE LIABLE FOR
* ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING OR
* DISTRIBUTING THIS SOFTWARE OR ITS DERIVATIVES.
*
* THIS SOFTWARE IS NOT DESIGNED OR INTENDED FOR USE OR RESALE AS ON-LINE
* CONTROL EQUIPMENT IN HAZARDOUS ENVIRONMENTS REQUIRING FAIL-SAFE
* PERFORMANCE, SUCH AS IN THE OPERATION OF NUCLEAR FACILITIES, AIRCRAFT
* NAVIGATION OR COMMUNICATION SYSTEMS, AIR TRAFFIC CONTROL, DIRECT LIFE
* SUPPORT MACHINES, OR WEAPONS SYSTEMS, IN WHICH THE FAILURE OF THE
* SOFTWARE COULD LEAD DIRECTLY TO DEATH, PERSONAL INJURY, OR SEVERE
* PHYSICAL OR ENVIRONMENTAL DAMAGE ("HIGH RISK ACTIVITIES"). AUTHOR
* SPECIFICALLY DISCLAIMS ANY EXPRESS OR IMPLIED WARRANTY OF FITNESS FOR
* HIGH RISK ACTIVITIES.
*/
import java.util.*;
import java.io.*;
import java.rmi.*;
import java.rmi.server.*;
import java.rmi.activation.*;
import net.jini.discovery.*;
import net.jini.lookup.*;
import net.jini.lease.*;
import net.jini.core.lookup.*;
import net.jini.core.lease.*;
import net.jini.core.event.*;
import com.sun.jini.constants.*;
import com.sun.jini.lookup.*;
import com.sun.jini.reliableLog.*;
public class ConvertServiceImpl extends Activatable
implements ConvertServiceProxy,
RemoteEventListener {
private ServerLandlord lord;
private ServerDelivery sender;
private ConvertServiceLogHandler handler;
private ReliableLog log;
private long seqNo;
private ServiceID serviceID;
private ActivationID activeID;
private LookupDiscoveryRegistration lookupReg;
private LeaseRenewalSet leaseRenewal;
private String[] groups = new String[] { "" };
private Hashtable leases;
private Thread waitThread;
private long lastLookupSeqNo;
private ServiceItem serviceItem;
// This class holds the information necessary for an update to the
// sequence number.
private static class ConvertServiceUpdateRecord implements Serializable {
long seqNo;
}
private static class ConvertServiceUpdateLeases implements Serializable {
transient Hashtable leases;
long seqNo;
ConvertServiceUpdateLeases(Hashtable l, long seqNo) {
leases = l;
this.seqNo = seqNo;
}
// The leases hashtable contains service registrar objects, the
// class definition for which we obtained dynamically from the
// lookup service. In order to reread the class definition, we
// must preserve the java.rmi.server.codebase from which we loaded
// it; we save it as a marshalled object which preserves that
// information.
private void writeObject(ObjectOutputStream oos) throws IOException {
oos.defaultWriteObject();
MarshalledObject[] keys = new MarshalledObject[leases.size()];
MarshalledObject[] values = new MarshalledObject[leases.size()];
int i = 0;
for (Enumeration e = leases.keys(); e.hasMoreElements(); ) {
Object key = e.nextElement();
keys[i] = new MarshalledObject(key);
values[i++] = new MarshalledObject(leases.get(key));
}
oos.writeObject(keys);
oos.writeObject(values);
}
private void readObject(ObjectInputStream ois) throws NotActiveException, ClassNotFoundException, IOException {
ois.defaultReadObject();
MarshalledObject[] keys = (MarshalledObject[]) ois.readObject();
MarshalledObject[] values = (MarshalledObject[]) ois.readObject();
leases = new Hashtable(keys.length);
for (int i = 0; i < keys.length; i++) {
Object key = keys[i].get();
Object value = values[i].get();
leases.put(key, value);
}
}
}
// This class reads and writes the reliable log
class ConvertServiceLogHandler extends LogHandler {
public void snapshot(OutputStream os) throws Exception {
ObjectOutputStream oos = new ObjectOutputStream(os);
Object target;
if (serviceID == null)
// write a dummy object
target = new String();
else target = serviceID;
oos.writeObject(target);
oos.writeLong(seqNo);
if (leaseRenewal == null)
target = new String();
else target = new MarshalledObject(leaseRenewal);
oos.writeObject(target);
if (lookupReg == null)
target = new String();
else target = new MarshalledObject(lookupReg);
oos.writeObject(target);
oos.writeObject(new ConvertServiceUpdateLeases(leases, lastLookupSeqNo));
oos.flush(); // Make sure it all gets to disk
}
public void recover(InputStream is) throws Exception {
ObjectInputStream ois = new ObjectInputStream(is);
Object target;
try {
target = ois.readObject();
} catch (Exception e) {
// no log
return;
}
if (target instanceof ServiceID)
serviceID = (ServiceID) target;
// else it was the dummy string
seqNo = ois.readLong();
target = ois.readObject();
if (target instanceof MarshalledObject) {
MarshalledObject mo = (MarshalledObject) target;
leaseRenewal = (LeaseRenewalSet) mo.get();
}
// else it was the dummy string; we'll find it when we first
// need it.
target = ois.readObject();
if (target instanceof MarshalledObject) {
MarshalledObject mo = (MarshalledObject) target;
lookupReg = (LookupDiscoveryRegistration) mo.get();
}
// else it was the dummy string
else findLookupDiscoveryService();
ConvertServiceUpdateLeases csul =
(ConvertServiceUpdateLeases) ois.readObject();
leases = csul.leases;
lastLookupSeqNo = csul.seqNo;
}
// Called when reading an update; the update was written
// as a ConvertServiceUpdateRecord
public void applyUpdate(Object o) {
if (o instanceof ConvertServiceUpdateRecord)
seqNo = ((ConvertServiceUpdateRecord) o).seqNo;
else if (o instanceof ConvertServiceUpdateLeases) {
leases = ((ConvertServiceUpdateLeases) o).leases;
lastLookupSeqNo = ((ConvertServiceUpdateLeases) o).seqNo;
}
else throw new IllegalArgumentException("Unexpected update");
}
}
public ConvertServiceImpl(ActivationID id, MarshalledObject data) throws RemoteException {
super(id, 0);
System.out.println("classpath " + System.getProperty("java.class.path"));
System.out.println("codebase " + System.getProperty("java.rmi.server.codebase"));
activeID = id;
try {
String logDir = (String) data.get();
handler = new ConvertServiceLogHandler();
log = new ReliableLog(logDir, handler);
log.recover();
lord = new ServerLandlord();
sender = new ServerDelivery(this, lord);
if (lookupReg == null) {
// First time we've run, so there is no
// lookup discovery service yet
leases = new Hashtable();
findLookupDiscoveryService();
}
log.snapshot();
} catch (Exception e) {
throw new RemoteException("Can't construct service", e);
}
}
public ConvertServiceRegistration getInstance(long duration) {
Hashtable ht = new Hashtable(13);
Lease l = lord.newLease(ht, duration);
startWaitThread();
return new ConvertServiceRegistrationImpl(this, l);
}
// To convert, first check the cache for previous results. If
// there's no cache, the landlord has expired the lease. If
// there's no data in the cache, calculcate the data and put it there
public String convert(Lease l, int i) throws LeaseDeniedException {
Hashtable cache = (Hashtable) lord.getSessionData(l);
if (cache == null)
throw new LeaseDeniedException("Lease expired");
Integer I = new Integer(i);
String s;
s = (String) cache.get(I);
if (s == null) {
s = I.toString();
cache.put(I, s);
}
sender.deliver(i, getNextSeq());
return s;
}
public synchronized EventRegistration
trackConversions(long duration,
RemoteEventListener rel, MarshalledObject key) {
return sender.addListener(rel, duration, key, seqNo);
}
private synchronized long getNextSeq() {
seqNo++;
try {
// Update the log with the new seq no; we must wrap
// this into an object for the update method
ConvertServiceUpdateRecord csur = new ConvertServiceUpdateRecord();
csur.seqNo = this.seqNo;
log.update(csur, true);
} catch (Exception e) {
e.printStackTrace();
}
return seqNo;
}
// This must only be called from a synchronized method
private void unregister(ServiceRegistrar sr) {
// Remove the registration. If the lookup service comes
// back later, we'll re-register at that time.
// Tell the lease renewal to let the lease expire
// and remove it from our internal table too.
try {
leaseRenewal.remove((Lease) leases.get(sr));
} catch (Exception cnfe) {
}
leases.remove(sr);
}
// This must only be called from a synchronized method
private void register(ServiceRegistrar sr) {
try {
if (serviceItem == null)
serviceItem = new ServiceItem(serviceID, this, null);
ServiceRegistration ret = sr.register(serviceItem, Lease.FOREVER);
if (serviceID == null) {
serviceID = ret.getServiceID();
try {
log.snapshot();
} catch (Exception e) {
System.out.println("Can't take snapshot" + e);
}
}
// Save this registration
try {
leases.put(sr, ret.getLease());
renewLease(ret.getLease());
} catch (IOException ioe) {
// Can't really happen -- renewLease only throws IOException
// when it finds the service, and we already know the service
// has been found
}
} catch (RemoteException rex) {
System.out.println("Can't register with service " + rex);
try {
lookupReg.discard(sr);
} catch (RemoteException rExp) {
}
}
}
// We may have dropped an event.
// We must retrieve the entire set and compare it to what
// we have, cancelling and adding registrars as necessary.
// Getting the entire list is an expensive operation, so we
// don't do it unless we have to.
private synchronized void resyncHashtable(long seq) {
// Because this runs in another thread, it's conceivable that
// the another event has superceded this.
if (seq <= lastLookupSeqNo)
return;
ServiceRegistrar[] current = new ServiceRegistrar[0];
try {
current = lookupReg.getRegistrars();
} catch (Exception exp) {
// There's no way in the LookupDiscoveryService interface
// to find just the successful ones -- best we can do
// is to drop the event and try again when the next
// event comes.
return;
}
Hashtable newLeases = new Hashtable();
for (int i = 0; i < current.length; i++) {
Object o = leases.get(current[i]);
if (o == null) {
// We haven't registered this one yet
register(current[i]);
}
else {
// We have registered; just move the registration
newLeases.put(current[i], o);
leases.remove(current[i]);
}
}
for (Enumeration e = leases.keys(); e.hasMoreElements(); ) {
unregister((ServiceRegistrar) e.nextElement());
}
leases = newLeases;
lastLookupSeqNo = seq;
try {
log.update(new ConvertServiceUpdateLeases(leases, seq), true);
} catch (IOException ioe) {
System.out.println("Update failed " + ioe);
}
startWaitThread();
}
public synchronized void notify(RemoteEvent re) throws UnknownEventException {
if (re instanceof ExpirationWarningEvent) {
try {
ExpirationWarningEvent ewe = (ExpirationWarningEvent) re;
Lease l = ewe.getLease();
l.renew(TimeConstants.DAYS);
} catch (Exception e) {
// We should take over lease renewal ourselves, using a
// lease renewal manager.
}
startWaitThread();
return;
}
if (!(re instanceof RemoteDiscoveryEvent)) {
System.out.println("Unexpected event " + re);
throw new UnknownEventException("ConvertServiceImpl");
}
RemoteDiscoveryEvent rde = (RemoteDiscoveryEvent) re;
long seq = rde.getSequenceNumber();
if (seq <= lastLookupSeqNo) {
// This event was delivered out of order. We noticed at
// the time that we had dropped it, so we can just ignore it.
return;
}
if (seq != lastLookupSeqNo + 1) {
// We may have dropped an event. In order to tell, we have
// to check with the lookup discovery service. Since we're
// in a callback from that service, we can't call it in this
// thread or deadlock will occur.
class runResync implements Runnable {
long seq;
runResync(long l) {
seq = l;
}
public void run() {
resyncHashtable(seq);
}
};
new Thread(new runResync(seq)).start();
return;
}
else {
// Else the sequence number was as expected; get the registrars
// and process them
ServiceRegistrar[] sr = null;
try {
sr = rde.getRegistrars();
} catch (LookupUnmarshalException lue) {
// Get the ones that we can; ignore the rest
sr = lue.getUnmarshalledRegs();
}
if (rde.isDiscarded()) {
for (int i = 0; i < sr.length; i++) {
if (leases.containsKey(sr[i])) {
unregister(sr[i]);
}
}
}
else {
for (int i = 0; i < sr.length; i++) {
if (leases.containsKey(sr[i]) == false) {
register(sr[i]);
}
}
// else we were already registered in this service
}
}
// All done -- update the log with the new leases and seq no
lastLookupSeqNo = seq;
try {
log.update(new ConvertServiceUpdateLeases(leases, lastLookupSeqNo), true);
} catch (Exception e) {
System.out.println("Update failed " + e);
}
startWaitThread();
}
private void findLookupDiscoveryService() throws IOException, RemoteException {
ServiceFinder sf = new ServiceFinder(LookupDiscoveryService.class);
LookupDiscoveryService lds = (LookupDiscoveryService) sf.getObject();
lookupReg = lds.register(groups, null, this,
new MarshalledObject("dummy"), Lease.FOREVER);
EventRegistration er = lookupReg.getEventRegistration();
lastLookupSeqNo = er.getSequenceNumber();
renewLease(lookupReg.getLease());
}
private void renewLease(Lease l) throws IOException, RemoteException {
ServiceFinder sf = null;
LeaseRenewalService lrs = null;
while (true) {
// If leaseRenewal is null, we have to find one. It will be
// null the first time we run the code; if we get an error
// talking to a leaseRenewal object, we set it to null so
// that we find another one here.
// If we've just be re-activated, then leaseRenewal will be set
// from the log.
if (leaseRenewal == null) {
// Only set sf the first time we need it.
if (sf == null)
sf = new ServiceFinder(LeaseRenewalService.class);
lrs = (LeaseRenewalService) sf.getObject();
leaseRenewal = lrs.createLeaseRenewalSet(TimeConstants.DAYS);
leaseRenewal.setExpirationWarningListener(this, TimeConstants.HOURS, null);
}
try {
leaseRenewal.renewFor(l, Lease.FOREVER);
break;
} catch (RemoteException re) {
// If we were talking to a leaseRenewal from a previous
// activation, sf will be null. We'll create the sf the
// next time through the loop.
if (sf != null)
sf.errored(lrs);
leaseRenewal = null;
}
}
}
private synchronized void startWaitThread() {
if (waitThread != null)
return;
Runnable doWait = new Runnable() {
public void run() {
while (true) {
// There's a possibility that the client is connecting,
// but has not yet been granted a lease (or isn't even
// getting a lease if we became active as a result of
// a lookup discovery service event).
// Wait a bit to let that settle down, and
// then wait for the lease to be empty again.
try {
Thread.sleep(60 * 1000);
} catch (Exception e) {}
// Wait until there are no clients with an active lease
// and then exit when that happens
lord.waitForEmpty();
try {
if (Activatable.inactive(activeID)) {
try {
log.snapshot();
} catch (Exception e) {
}
System.exit(0);
}
} catch (Exception e) {
// inactive() threw an exception -- simply exit,
// because rmid is confused, but it will restart
// and recover (and then restart us if needed)
System.exit(0);
}
}
}
};
waitThread = new Thread(doWait);
waitThread.start();
}
public static void main(String[] args) throws Exception {
System.setSecurityManager(new RMISecurityManager());
Properties props = new Properties();
props.put("java.security.policy", args[1]);
props.put("java.class.path", System.getProperty("java.class.path"));
props.put("java.rmi.server.codebase", args[0]);
ActivationGroupDesc agd = new ActivationGroupDesc(props, null);
ActivationGroupID agi = ActivationGroup.getSystem().registerGroup(agd);
ActivationGroup.createGroup(agi, agd, 0);
ActivationDesc ad = new ActivationDesc("ConvertServiceImpl",
args[0], new MarshalledObject(args[2]));
ConvertService cs = (ConvertService) Activatable.register(ad);
cs.getInstance(0);
System.exit(0);
}
}
|