FileDocCategorySizeDatePackage
AbstractReplicatedMap.javaAPI DocApache Tomcat 6.0.1453506Fri Jul 20 04:20:36 BST 2007org.apache.catalina.tribes.tipis

AbstractReplicatedMap

public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements org.apache.catalina.tribes.ChannelListener, org.apache.catalina.tribes.Heartbeat, org.apache.catalina.tribes.group.RpcCallback, org.apache.catalina.tribes.MembershipListener
author
Filip Hanik
version
1.0

Fields Summary
protected static org.apache.juli.logging.Log
log
public static final int
DEFAULT_INITIAL_CAPACITY
The default initial capacity - MUST be a power of two.
public static final float
DEFAULT_LOAD_FACTOR
The load factor used when none specified in constructor.
final String
chset
Used to identify the map
protected transient long
rpcTimeout
Timeout for RPC messages, how long we will wait for a reply
protected transient org.apache.catalina.tribes.Channel
channel
Reference to the channel for sending messages
protected transient org.apache.catalina.tribes.group.RpcChannel
rpcChannel
The RpcChannel to send RPC messages through
protected transient byte[]
mapContextName
The Map context name makes this map unique, this allows us to have more than one map shared through one channel
protected transient boolean
stateTransferred
Has the state been transferred
protected transient Object
stateMutex
Simple lock object for transfers
protected transient HashMap
mapMembers
A list of members in our map
protected transient int
channelSendOptions
Our default send options
protected transient Object
mapOwner
The owner of this map, ala a SessionManager for example
protected transient ClassLoader[]
externalLoaders
External class loaders if serialization and deserialization is to be performed successfully.
protected transient int
currentNode
The node we are currently backing up data to, this index will rotate on a round robin basis
protected transient long
accessTimeout
Since the map keeps internal membership this is the timeout for a ping message to be responded to If a remote map doesn't respond within this timeframe, its considered dead.
protected transient String
mapname
Readable string of the mapContextName value
Constructors Summary
public AbstractReplicatedMap(Object owner, org.apache.catalina.tribes.Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor, int channelSendOptions, ClassLoader[] cls)
Creates a new map

param
channel The channel to use for communication
param
timeout long - timeout for RPC messags
param
mapContextName String - unique name for this map, to allow multiple maps per channel
param
initialCapacity int - the size of this map, see HashMap
param
loadFactor float - load factor, see HashMap
param
cls - a list of classloaders to be used for deserialization of objects.

    

//------------------------------------------------------------------------------
//              CONSTRUCTORS
//------------------------------------------------------------------------------

                                                                             
      
                                   
                                   
                                   
                                  
                                  
                                  
                                   
        super(initialCapacity, loadFactor, 15);
        init(owner, channel, mapContextName, timeout, channelSendOptions, cls);
        
    
Methods Summary
public booleanaccept(java.io.Serializable msg, org.apache.catalina.tribes.Member sender)

        boolean result = false;
        if (msg instanceof MapMessage) {
            if ( log.isTraceEnabled() ) log.trace("Map["+mapname+"] accepting...."+msg);
            result = Arrays.equals(mapContextName, ( (MapMessage) msg).getMapId());
            if ( log.isTraceEnabled() ) log.trace("Msg["+mapname+"] accepted["+result+"]...."+msg);
        }
        return result;
    
public voidbreakdown()

        finalize();
    
protected voidbroadcast(int msgtype, boolean rpc)
Helper method to broadcast a message to all members in a channel

param
msgtype int
param
rpc boolean
throws
ChannelException

        //send out a map membership message, only wait for the first reply
        MapMessage msg = new MapMessage(this.mapContextName, msgtype,
                                        false, null, null, null, wrap(channel.getLocalMember(false)));
        if ( rpc) {
            Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
            for (int i = 0; i < resp.length; i++) {
                mapMemberAdded(resp[i].getSource());
                messageReceived(resp[i].getMessage(), resp[i].getSource());
            }
        } else {
            channel.send(channel.getMembers(),msg,channelSendOptions);
        }
    
public voidclear()

            clear(true);
        
public voidclear(boolean notify)

            if ( notify ) {
                //only delete active keys
                Iterator keys = keySet().iterator();
                while (keys.hasNext())
                    remove(keys.next());
            } else {
                super.clear();
            }
        
public java.lang.Objectclone()

            throw new UnsupportedOperationException("This operation is not valid on a replicated map");
        
public booleancontainsKey(java.lang.Object key)
Returns true if the key has an entry in the map. The entry can be a proxy or a backup entry, invoking get(key) will make this entry primary for the group

param
key Object
return
boolean

            return super.containsKey(key);
        
public booleancontainsValue(java.lang.Object value)

            if ( value == null ) {
                return super.containsValue(value);
            } else {
                Iterator i = super.entrySet().iterator();
                while (i.hasNext()) {
                    Map.Entry e = (Map.Entry) i.next();
                    MapEntry entry = (MapEntry) super.get(e.getKey());
                    if (entry.isPrimary() && value.equals(entry.getValue())) return true;
                }//while
                return false;
            }//end if
        
public java.util.SetentrySet()

            LinkedHashSet set = new LinkedHashSet(super.size());
            Iterator i = super.entrySet().iterator();
            while ( i.hasNext() ) {
                Map.Entry e = (Map.Entry)i.next();
                Object key = e.getKey();
                MapEntry entry = (MapEntry)super.get(key);
                if ( entry.isPrimary() ) set.add(entry.getValue());
            }
            return Collections.unmodifiableSet(set);
        
public java.util.SetentrySetFull()
Returns the entire contents of the map Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information about the object.

return
Set

            return super.entrySet();
        
public booleanequals(java.lang.Object o)

        if ( o == null ) return false;
        if ( !(o instanceof AbstractReplicatedMap)) return false;
        if ( !(o.getClass().equals(this.getClass())) ) return false;
        AbstractReplicatedMap other = (AbstractReplicatedMap)o;
        return Arrays.equals(mapContextName,other.mapContextName);
    
public org.apache.catalina.tribes.Member[]excludeFromSet(org.apache.catalina.tribes.Member[] mbrs, org.apache.catalina.tribes.Member[] set)

        ArrayList result = new ArrayList();
        for (int i=0; i<set.length; i++ ) {
            boolean include = true;
            for (int j=0; j<mbrs.length; j++ ) 
                if ( mbrs[j].equals(set[i]) ) include = false;
            if ( include ) result.add(set[i]);
        }
        return (Member[])result.toArray(new Member[result.size()]);
    
public voidfinalize()

        try {broadcast(MapMessage.MSG_STOP,false); }catch ( Exception ignore){}
        //cleanup
        if (this.rpcChannel != null) {
            this.rpcChannel.breakdown();
        }
        if (this.channel != null) {
            this.channel.removeChannelListener(this);
            this.channel.removeMembershipListener(this);
        }
        this.rpcChannel = null;
        this.channel = null;
        this.mapMembers.clear();
        super.clear();
        this.stateTransferred = false;
        this.externalLoaders = null;
    
public java.lang.Objectget(java.lang.Object key)

        MapEntry entry = (MapEntry)super.get(key);
        if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" entry:"+entry);
        if ( entry == null ) return null;
        if ( !entry.isPrimary() ) {
            //if the message is not primary, we need to retrieve the latest value
            try {
                Member[] backup = null;
                MapMessage msg = null;
                if ( !entry.isBackup() ) {
                    //make sure we don't retrieve from ourselves
                    msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
                                         (Serializable) key, null, null, null);
                    Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout());
                    if (resp == null || resp.length == 0) {
                        //no responses
                        log.warn("Unable to retrieve remote object for key:" + key);
                        return null;
                    }
                    msg = (MapMessage) resp[0].getMessage();
                    msg.deserialize(getExternalLoaders());
                    backup = entry.getBackupNodes();
                    if ( entry.getValue() instanceof ReplicatedMapEntry ) {
                        ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
                        val.setOwner(getMapOwner());
                    }
                    if ( msg.getValue()!=null ) entry.setValue(msg.getValue());
                }
                if (entry.isBackup()) {
                    //select a new backup node
                    backup = publishEntryInfo(key, entry.getValue());
                } else if ( entry.isProxy() ) {
                    //invalidate the previous primary
                    msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
                    Member[] dest = getMapMembersExcl(backup);
                    if ( dest!=null && dest.length >0) {
                        getChannel().send(dest, msg, getChannelSendOptions());
                    }
                }

                entry.setBackupNodes(backup);
                entry.setBackup(false);
                entry.setProxy(false);


            } catch (Exception x) {
                log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x);
                return null;
            }
        }
        if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" result:"+entry.getValue());
        if ( entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry ) {
            ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
            //hack, somehow this is not being set above
            val.setOwner(getMapOwner());
            
        }
        return entry.getValue();
    
public longgetAccessTimeout()

        return accessTimeout;
    
public org.apache.catalina.tribes.ChannelgetChannel()

        return channel;
    
public intgetChannelSendOptions()

        return channelSendOptions;
    
public java.lang.ClassLoader[]getExternalLoaders()

        return externalLoaders;
    
public org.apache.catalina.tribes.tipis.AbstractReplicatedMap$MapEntrygetInternal(java.lang.Object key)

        return (MapEntry)super.get(key);
    
public byte[]getMapContextName()

        return mapContextName;
    
public org.apache.catalina.tribes.Member[]getMapMembers(java.util.HashMap members)

        synchronized (members) {
            Member[] result = new Member[members.size()];
            members.keySet().toArray(result);
            return result;
        }
    
public org.apache.catalina.tribes.Member[]getMapMembers()

        return getMapMembers(this.mapMembers);
    
public org.apache.catalina.tribes.Member[]getMapMembersExcl(org.apache.catalina.tribes.Member[] exclude)

        synchronized (mapMembers) {
            HashMap list = (HashMap)mapMembers.clone();
            for (int i=0; i<exclude.length;i++) list.remove(exclude[i]);
            return getMapMembers(list);
        }
    
public java.lang.ObjectgetMapOwner()

        return mapOwner;
    
public intgetNextBackupIndex()

        int size = mapMembers.size();
        if (mapMembers.size() == 0)return -1;
        int node = currentNode++;
        if (node >= size) {
            node = 0;
            currentNode = 0;
        }
        return node;
    
public org.apache.catalina.tribes.MembergetNextBackupNode()

        Member[] members = getMapMembers();
        int node = getNextBackupIndex();
        if ( members.length == 0 || node==-1) return null;
        if ( node >= members.length ) node = 0;
        return members[node];
    
public org.apache.catalina.tribes.group.RpcChannelgetRpcChannel()

        return rpcChannel;
    
public longgetRpcTimeout()

        return rpcTimeout;
    
protected abstract intgetStateMessageType()

public java.lang.ObjectgetStateMutex()

        return stateMutex;
    
public inthashCode()

        return Arrays.hashCode(this.mapContextName);
    
public voidheartbeat()

        try {
            ping(accessTimeout);
        }catch ( Exception x ) {
            log.error("Unable to send AbstractReplicatedMap.ping message",x);
        }
    
public booleaninSet(org.apache.catalina.tribes.Member m, org.apache.catalina.tribes.Member[] set)

        if ( set == null ) return false;
        boolean result = false;
        for (int i=0; i<set.length && (!result); i++ )
            if ( m.equals(set[i]) ) result = true;
        return result;
    
protected voidinit(java.lang.Object owner, org.apache.catalina.tribes.Channel channel, java.lang.String mapContextName, long timeout, int channelSendOptions, java.lang.ClassLoader[] cls)
Initializes the map by creating the RPC channel, registering itself as a channel listener This method is also responsible for initiating the state transfer

param
owner Object
param
channel Channel
param
mapContextName String
param
timeout long
param
channelSendOptions int
param
cls ClassLoader[]

        log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName);
        this.mapOwner = owner;
        this.externalLoaders = cls;
        this.channelSendOptions = channelSendOptions;
        this.channel = channel;
        this.rpcTimeout = timeout;

        try {
            this.mapname = mapContextName;
            //unique context is more efficient if it is stored as bytes
            this.mapContextName = mapContextName.getBytes(chset);
        } catch (UnsupportedEncodingException x) {
            log.warn("Unable to encode mapContextName[" + mapContextName + "] using getBytes(" + chset +") using default getBytes()", x);
            this.mapContextName = mapContextName.getBytes();
        }
        if ( log.isTraceEnabled() ) log.trace("Created Lazy Map with name:"+mapContextName+", bytes:"+Arrays.toString(this.mapContextName));

        //create an rpc channel and add the map as a listener
        this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
        //add this map as a message listener
        this.channel.addChannelListener(this);
        //listen for membership notifications
        this.channel.addMembershipListener(this);
        
        
        try {
            //broadcast our map, this just notifies other members of our existence
            broadcast(MapMessage.MSG_INIT, true);
            //transfer state from another map
            transferState();
            //state is transferred, we are ready for messaging
            broadcast(MapMessage.MSG_START, true);
        } catch (ChannelException x) {
            log.warn("Unable to send map start message.");
            throw new RuntimeException("Unable to start replicated map.",x);
        }
    
public booleanisEmpty()

            return size()==0;
        
public booleanisStateTransferred()

        return stateTransferred;
    
public java.util.SetkeySet()

            //todo implement
            //should only return keys where this is active.
            LinkedHashSet set = new LinkedHashSet(super.size());
            Iterator i = super.entrySet().iterator();
            while ( i.hasNext() ) {
                Map.Entry e = (Map.Entry)i.next();
                Object key = e.getKey();
                MapEntry entry = (MapEntry)super.get(key);
                if ( entry.isPrimary() ) set.add(key);
            }
            return Collections.unmodifiableSet(set);

        
public java.util.SetkeySetFull()

            return super.keySet();
        
public voidleftOver(java.io.Serializable msg, org.apache.catalina.tribes.Member sender)
If the reply has already been sent to the requesting thread, the rpc callback can handle any data that comes in after the fact.

param
msg Serializable
param
sender Member

        //left over membership messages
        if (! (msg instanceof MapMessage))return;

        MapMessage mapmsg = (MapMessage) msg;
        try {
            mapmsg.deserialize(getExternalLoaders());
            if (mapmsg.getMsgType() == MapMessage.MSG_START) {
                mapMemberAdded(mapmsg.getBackupNodes()[0]);
            } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
                memberAlive(mapmsg.getBackupNodes()[0]);
            }
        } catch (IOException x ) {
            log.error("Unable to deserialize MapMessage.",x);
        } catch (ClassNotFoundException x ) {
            log.error("Unable to deserialize MapMessage.",x);
        }
    
public voidmapMemberAdded(org.apache.catalina.tribes.Member member)

        if ( member.equals(getChannel().getLocalMember(false)) ) return;
        boolean memberAdded = false;
        //select a backup node if we don't have one
        synchronized (mapMembers) {
            if (!mapMembers.containsKey(member) ) {
                mapMembers.put(member, new Long(System.currentTimeMillis()));
                memberAdded = true;
            }
        }
        if ( memberAdded ) {
            synchronized (stateMutex) {
                Iterator i = super.entrySet().iterator();
                while (i.hasNext()) {
                    Map.Entry e = (Map.Entry) i.next();
                    MapEntry entry = (MapEntry) super.get(e.getKey());
                    if ( entry == null ) continue;
                    if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
                        try {
                            Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
                            entry.setBackupNodes(backup);
                        } catch (ChannelException x) {
                            log.error("Unable to select backup node.", x);
                        } //catch
                    } //end if
                } //while
            } //synchronized
        }//end if
    
public voidmemberAdded(org.apache.catalina.tribes.Member member)

        //do nothing
    
protected voidmemberAlive(org.apache.catalina.tribes.Member member)
We have received a member alive notification

param
member Member

        synchronized (mapMembers) {
            if (!mapMembers.containsKey(member)) {
                mapMemberAdded(member);
            } //end if
            mapMembers.put(member, new Long(System.currentTimeMillis()));
        }
    
public voidmemberDisappeared(org.apache.catalina.tribes.Member member)

        boolean removed = false;
        synchronized (mapMembers) {
            removed = (mapMembers.remove(member) != null );
        }
        Iterator i = super.entrySet().iterator();
        while (i.hasNext()) {
            Map.Entry e = (Map.Entry) i.next();
            MapEntry entry = (MapEntry) super.get(e.getKey());
            if (entry.isPrimary() && inSet(member,entry.getBackupNodes())) {
                try {
                    Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
                    entry.setBackupNodes(backup);
                } catch (ChannelException x) {
                    log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
                }
            } //end if
        } //while
    
public voidmessageReceived(java.io.Serializable msg, org.apache.catalina.tribes.Member sender)

        if (! (msg instanceof MapMessage)) return;

        MapMessage mapmsg = (MapMessage) msg;
        if ( log.isTraceEnabled() ) {
            log.trace("Map["+mapname+"] received message:"+mapmsg);
        }
        
        try {
            mapmsg.deserialize(getExternalLoaders());
        } catch (IOException x) {
            log.error("Unable to deserialize MapMessage.", x);
            return;
        } catch (ClassNotFoundException x) {
            log.error("Unable to deserialize MapMessage.", x);
            return;
        }
        if ( log.isTraceEnabled() ) 
            log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg);
        if (mapmsg.getMsgType() == MapMessage.MSG_START) {
            mapMemberAdded(mapmsg.getBackupNodes()[0]);
        }

        if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
            memberDisappeared(mapmsg.getBackupNodes()[0]);
        }

        if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
            MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
            if ( entry==null ) {
                entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
                entry.setBackup(false);
                entry.setProxy(true);
                entry.setBackupNodes(mapmsg.getBackupNodes());
                super.put(entry.getKey(), entry);
            } else {
                entry.setProxy(true);
                entry.setBackup(false);
                entry.setBackupNodes(mapmsg.getBackupNodes());
            }
        }

        if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) {
            super.remove(mapmsg.getKey());
        }

        if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP || mapmsg.getMsgType() == MapMessage.MSG_COPY) {
            MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
            if (entry == null) {
                entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
                entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
                entry.setProxy(false);
                entry.setBackupNodes(mapmsg.getBackupNodes());
                if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
                    ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
                }
            } else {
                entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
                entry.setProxy(false);
                entry.setBackupNodes(mapmsg.getBackupNodes());
                if (entry.getValue() instanceof ReplicatedMapEntry) {
                    ReplicatedMapEntry diff = (ReplicatedMapEntry) entry.getValue();
                    if (mapmsg.isDiff()) {
                        try {
                            diff.lock();
                            diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
                        } catch (Exception x) {
                            log.error("Unable to apply diff to key:" + entry.getKey(), x);
                        } finally {
                            diff.unlock();
                        }
                    } else {
                        if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
                        ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner());
                    } //end if
                } else if  (mapmsg.getValue() instanceof ReplicatedMapEntry) {
                    ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue();
                    re.setOwner(getMapOwner());
                    entry.setValue(re);
                } else {
                    if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
                } //end if
            } //end if
            super.put(entry.getKey(), entry);
        } //end if
    
protected voidping(long timeout)
Sends a ping out to all the members in the cluster, not just map members that this map is alive.

param
timeout long
throws
ChannelException

        //send out a map membership message, only wait for the first reply
        MapMessage msg = new MapMessage(this.mapContextName, 
                                        MapMessage.MSG_INIT,
                                        false, 
                                        null, 
                                        null, 
                                        null, 
                                        wrap(channel.getLocalMember(false)));
        if ( channel.getMembers().length > 0 ) {
            //send a ping, wait for all nodes to reply
            Response[] resp = rpcChannel.send(channel.getMembers(), 
                                              msg, rpcChannel.ALL_REPLY, 
                                              (channelSendOptions),
                                              (int) accessTimeout);
            for (int i = 0; i < resp.length; i++) {
                memberAlive(resp[i].getSource());
            } //for
        }
        //update our map of members, expire some if we didn't receive a ping back
        synchronized (mapMembers) {
            Iterator it = mapMembers.entrySet().iterator();
            long now = System.currentTimeMillis();
            while ( it.hasNext() ) {
                Map.Entry entry = (Map.Entry)it.next();
                long access = ((Long)entry.getValue()).longValue(); 
                if ( (now - access) > timeout ) {
                    it.remove();
                    memberDisappeared( (Member) entry.getKey());
                }
            }
        }//synch
    
protected voidprintMap(java.lang.String header)

        try {
            System.out.println("\nDEBUG MAP:"+header);
            System.out.println("Map["+ new String(mapContextName, chset) + ", Map Size:" + super.size());
            Member[] mbrs = getMapMembers();
            for ( int i=0; i<mbrs.length;i++ ) {
                System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName());
            }
            Iterator i = super.entrySet().iterator();
            int cnt = 0;

            while (i.hasNext()) {
                Map.Entry e = (Map.Entry) i.next();
                System.out.println( (++cnt) + ". " + super.get(e.getKey()));
            }
            System.out.println("EndMap]\n\n");
        }catch ( Exception ignore) {
            ignore.printStackTrace();
        }
    
protected abstract org.apache.catalina.tribes.Member[]publishEntryInfo(java.lang.Object key, java.lang.Object value)

public java.lang.Objectput(java.lang.Object key, java.lang.Object value)

            return put(key,value,true);
        
public java.lang.Objectput(java.lang.Object key, java.lang.Object value, boolean notify)

            MapEntry entry = new MapEntry(key,value);
            entry.setBackup(false);
            entry.setProxy(false);
    
            Object old = null;
    
            //make sure that any old values get removed
            if ( containsKey(key) ) old = remove(key);
            try {
                if ( notify ) {
                    Member[] backup = publishEntryInfo(key, value);
                    entry.setBackupNodes(backup);
                }
            } catch (ChannelException x) {
                log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x);
            }
            super.put(key,entry);
            return old;
        
public voidputAll(java.util.Map m)
Copies all values from one map to this instance

param
m Map

            Iterator i = m.entrySet().iterator();
            while ( i.hasNext() ) {
                Map.Entry entry = (Map.Entry)i.next();
                put(entry.getKey(),entry.getValue());
            }
        
public java.lang.Objectremove(java.lang.Object key)
Removes an object from this map, it will also remove it from

param
key Object
return
Object

        return remove(key,true);
    
public java.lang.Objectremove(java.lang.Object key, boolean notify)

        MapEntry entry = (MapEntry)super.remove(key);

        try {
            if (getMapMembers().length > 0 && notify) {
                MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null);
                getChannel().send(getMapMembers(), msg, getChannelSendOptions());
            }
        } catch ( ChannelException x ) {
            log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation",x);
        }
        return entry!=null?entry.getValue():null;
    
protected booleanremoveEldestEntry(java.util.Map$Entry eldest)

            return false;
        
public voidreplicate(java.lang.Object key, boolean complete)
Replicates any changes to the object since the last time The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated

param
complete - if set to true, the object is replicated to its backup if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will be replicated

        if ( log.isTraceEnabled() )
            log.trace("Replicate invoked on key:"+key);
        MapEntry entry = (MapEntry)super.get(key);
        if ( entry == null ) return;
        if ( !entry.isSerializable() ) return;
        if (entry != null && entry.isPrimary() && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0) {
            Object value = entry.getValue();
            //check to see if we need to replicate this object isDirty()||complete
            boolean repl = complete || ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDirty());
            
            if (!repl) {
                if ( log.isTraceEnabled() )
                    log.trace("Not replicating:"+key+", no change made");
                
                return;
            }
            //check to see if the message is diffable
            boolean diff = ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDiffable());
            MapMessage msg = null;
            if (diff) {
                ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue();
                try {
                    rentry.lock();
                    //construct a diff message
                    msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
                                         true, (Serializable) entry.getKey(), null,
                                         rentry.getDiff(),
                                         entry.getBackupNodes());
                } catch (IOException x) {
                    log.error("Unable to diff object. Will replicate the entire object instead.", x);
                } finally {
                    rentry.unlock();
                }
                
            }
            if (msg == null) {
                //construct a complete
                msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
                                     false, (Serializable) entry.getKey(),
                                     (Serializable) entry.getValue(),
                                     null, entry.getBackupNodes());

            }
            try {
                if ( channel!=null && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0 ) {
                    channel.send(entry.getBackupNodes(), msg, channelSendOptions);
                }
            } catch (ChannelException x) {
                log.error("Unable to replicate data.", x);
            }
        } //end if

    
public voidreplicate(boolean complete)
This can be invoked by a periodic thread to replicate out any changes. For maps that don't store objects that implement ReplicatedMapEntry, this method should be used infrequently to avoid large amounts of data transfer

param
complete boolean

        Iterator i = super.entrySet().iterator();
        while (i.hasNext()) {
            Map.Entry e = (Map.Entry) i.next();
            replicate(e.getKey(), complete);
        } //while

    
public java.io.SerializablereplyRequest(java.io.Serializable msg, org.apache.catalina.tribes.Member sender)

todo
implement state transfer
param
msg Serializable
return
Serializable - null if no reply should be sent

        if (! (msg instanceof MapMessage))return null;
        MapMessage mapmsg = (MapMessage) msg;

        //map init request
        if (mapmsg.getMsgType() == mapmsg.MSG_INIT) {
            mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
            return mapmsg;
        }
        
        //map start request
        if (mapmsg.getMsgType() == mapmsg.MSG_START) {
            mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
            mapMemberAdded(sender);
            return mapmsg;
        }

        //backup request
        if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) {
            MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
            if (entry == null || (!entry.isSerializable()) )return null;
            mapmsg.setValue( (Serializable) entry.getValue());
            return mapmsg;
        }

        //state transfer request
        if (mapmsg.getMsgType() == mapmsg.MSG_STATE || mapmsg.getMsgType() == mapmsg.MSG_STATE_COPY) {
            synchronized (stateMutex) { //make sure we dont do two things at the same time
                ArrayList list = new ArrayList();
                Iterator i = super.entrySet().iterator();
                while (i.hasNext()) {
                    Map.Entry e = (Map.Entry) i.next();
                    MapEntry entry = (MapEntry) super.get(e.getKey());
                    if ( entry.isSerializable() ) {
                        boolean copy = (mapmsg.getMsgType() == mapmsg.MSG_STATE_COPY);
                        MapMessage me = new MapMessage(mapContextName, 
                                                       copy?MapMessage.MSG_COPY:MapMessage.MSG_PROXY,
                            false, (Serializable) entry.getKey(), copy?(Serializable) entry.getValue():null, null, entry.getBackupNodes());
                        list.add(me);
                    }
                }
                mapmsg.setValue(list);
                return mapmsg;
                
            } //synchronized
        }

        return null;

    
public voidsetAccessTimeout(long accessTimeout)

        this.accessTimeout = accessTimeout;
    
public voidsetChannelSendOptions(int channelSendOptions)

        this.channelSendOptions = channelSendOptions;
    
public voidsetExternalLoaders(java.lang.ClassLoader[] externalLoaders)

        this.externalLoaders = externalLoaders;
    
public voidsetMapOwner(java.lang.Object mapOwner)

        this.mapOwner = mapOwner;
    
public intsize()

            //todo, implement a counter variable instead
            //only count active members in this node
            int counter = 0;
            Iterator it = super.entrySet().iterator();
            while (it!=null && it.hasNext() ) {
                Map.Entry e = (Map.Entry) it.next();
                if ( e != null ) {
                    MapEntry entry = (MapEntry) super.get(e.getKey());
                    if (entry!=null && entry.isPrimary() && entry.getValue() != null) counter++;
                }
            }
            return counter;
        
public intsizeFull()

            return super.size();
        
public voidtransferState()

        try {
            Member[] members = getMapMembers();
            Member backup = members.length > 0 ? (Member) members[0] : null;
            if (backup != null) {
                MapMessage msg = new MapMessage(mapContextName, getStateMessageType(), false,
                                                null, null, null, null);
                Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
                if (resp.length > 0) {
                    synchronized (stateMutex) {
                        msg = (MapMessage) resp[0].getMessage();
                        msg.deserialize(getExternalLoaders());
                        ArrayList list = (ArrayList) msg.getValue();
                        for (int i = 0; i < list.size(); i++) {
                            messageReceived( (Serializable) list.get(i), resp[0].getSource());
                        } //for
                    }
                } else {
                    log.warn("Transfer state, 0 replies, probably a timeout.");
                }
            }
        } catch (ChannelException x) {
            log.error("Unable to transfer LazyReplicatedMap state.", x);
        } catch (IOException x) {
            log.error("Unable to transfer LazyReplicatedMap state.", x);
        } catch (ClassNotFoundException x) {
            log.error("Unable to transfer LazyReplicatedMap state.", x);
        }
        stateTransferred = true;
    
public java.util.Collectionvalues()

            ArrayList values = new ArrayList();
            Iterator i = super.entrySet().iterator();
            while ( i.hasNext() ) {
                Map.Entry e = (Map.Entry)i.next();
                MapEntry entry = (MapEntry)super.get(e.getKey());
                if ( entry.isPrimary() && entry.getValue()!=null) values.add(entry.getValue());
            }
            return Collections.unmodifiableCollection(values);
        
protected org.apache.catalina.tribes.Member[]wrap(org.apache.catalina.tribes.Member m)
Helper methods, wraps a single member in an array

param
m Member
return
Member[]

        if ( m == null ) return new Member[0];
        else return new Member[] {m};