Methods Summary |
---|
public boolean | accept(java.io.Serializable msg, org.apache.catalina.tribes.Member sender)
boolean result = false;
if (msg instanceof MapMessage) {
if ( log.isTraceEnabled() ) log.trace("Map["+mapname+"] accepting...."+msg);
result = Arrays.equals(mapContextName, ( (MapMessage) msg).getMapId());
if ( log.isTraceEnabled() ) log.trace("Msg["+mapname+"] accepted["+result+"]...."+msg);
}
return result;
|
public void | breakdown()
finalize();
|
protected void | broadcast(int msgtype, boolean rpc)Helper method to broadcast a message to all members in a channel
//send out a map membership message, only wait for the first reply
MapMessage msg = new MapMessage(this.mapContextName, msgtype,
false, null, null, null, wrap(channel.getLocalMember(false)));
if ( rpc) {
Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
for (int i = 0; i < resp.length; i++) {
mapMemberAdded(resp[i].getSource());
messageReceived(resp[i].getMessage(), resp[i].getSource());
}
} else {
channel.send(channel.getMembers(),msg,channelSendOptions);
}
|
public void | clear()
clear(true);
|
public void | clear(boolean notify)
if ( notify ) {
//only delete active keys
Iterator keys = keySet().iterator();
while (keys.hasNext())
remove(keys.next());
} else {
super.clear();
}
|
public java.lang.Object | clone()
throw new UnsupportedOperationException("This operation is not valid on a replicated map");
|
public boolean | containsKey(java.lang.Object key)Returns true if the key has an entry in the map.
The entry can be a proxy or a backup entry, invoking get(key)
will make this entry primary for the group
return super.containsKey(key);
|
public boolean | containsValue(java.lang.Object value)
if ( value == null ) {
return super.containsValue(value);
} else {
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
MapEntry entry = (MapEntry) super.get(e.getKey());
if (entry.isPrimary() && value.equals(entry.getValue())) return true;
}//while
return false;
}//end if
|
public java.util.Set | entrySet()
LinkedHashSet set = new LinkedHashSet(super.size());
Iterator i = super.entrySet().iterator();
while ( i.hasNext() ) {
Map.Entry e = (Map.Entry)i.next();
Object key = e.getKey();
MapEntry entry = (MapEntry)super.get(key);
if ( entry.isPrimary() ) set.add(entry.getValue());
}
return Collections.unmodifiableSet(set);
|
public java.util.Set | entrySetFull()Returns the entire contents of the map
Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information
about the object.
return super.entrySet();
|
public boolean | equals(java.lang.Object o)
if ( o == null ) return false;
if ( !(o instanceof AbstractReplicatedMap)) return false;
if ( !(o.getClass().equals(this.getClass())) ) return false;
AbstractReplicatedMap other = (AbstractReplicatedMap)o;
return Arrays.equals(mapContextName,other.mapContextName);
|
public org.apache.catalina.tribes.Member[] | excludeFromSet(org.apache.catalina.tribes.Member[] mbrs, org.apache.catalina.tribes.Member[] set)
ArrayList result = new ArrayList();
for (int i=0; i<set.length; i++ ) {
boolean include = true;
for (int j=0; j<mbrs.length; j++ )
if ( mbrs[j].equals(set[i]) ) include = false;
if ( include ) result.add(set[i]);
}
return (Member[])result.toArray(new Member[result.size()]);
|
public void | finalize()
try {broadcast(MapMessage.MSG_STOP,false); }catch ( Exception ignore){}
//cleanup
if (this.rpcChannel != null) {
this.rpcChannel.breakdown();
}
if (this.channel != null) {
this.channel.removeChannelListener(this);
this.channel.removeMembershipListener(this);
}
this.rpcChannel = null;
this.channel = null;
this.mapMembers.clear();
super.clear();
this.stateTransferred = false;
this.externalLoaders = null;
|
public java.lang.Object | get(java.lang.Object key)
MapEntry entry = (MapEntry)super.get(key);
if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" entry:"+entry);
if ( entry == null ) return null;
if ( !entry.isPrimary() ) {
//if the message is not primary, we need to retrieve the latest value
try {
Member[] backup = null;
MapMessage msg = null;
if ( !entry.isBackup() ) {
//make sure we don't retrieve from ourselves
msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
(Serializable) key, null, null, null);
Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout());
if (resp == null || resp.length == 0) {
//no responses
log.warn("Unable to retrieve remote object for key:" + key);
return null;
}
msg = (MapMessage) resp[0].getMessage();
msg.deserialize(getExternalLoaders());
backup = entry.getBackupNodes();
if ( entry.getValue() instanceof ReplicatedMapEntry ) {
ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
val.setOwner(getMapOwner());
}
if ( msg.getValue()!=null ) entry.setValue(msg.getValue());
}
if (entry.isBackup()) {
//select a new backup node
backup = publishEntryInfo(key, entry.getValue());
} else if ( entry.isProxy() ) {
//invalidate the previous primary
msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
Member[] dest = getMapMembersExcl(backup);
if ( dest!=null && dest.length >0) {
getChannel().send(dest, msg, getChannelSendOptions());
}
}
entry.setBackupNodes(backup);
entry.setBackup(false);
entry.setProxy(false);
} catch (Exception x) {
log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x);
return null;
}
}
if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" result:"+entry.getValue());
if ( entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry ) {
ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
//hack, somehow this is not being set above
val.setOwner(getMapOwner());
}
return entry.getValue();
|
public long | getAccessTimeout()
return accessTimeout;
|
public org.apache.catalina.tribes.Channel | getChannel()
return channel;
|
public int | getChannelSendOptions()
return channelSendOptions;
|
public java.lang.ClassLoader[] | getExternalLoaders()
return externalLoaders;
|
public org.apache.catalina.tribes.tipis.AbstractReplicatedMap$MapEntry | getInternal(java.lang.Object key)
return (MapEntry)super.get(key);
|
public byte[] | getMapContextName()
return mapContextName;
|
public org.apache.catalina.tribes.Member[] | getMapMembers(java.util.HashMap members)
synchronized (members) {
Member[] result = new Member[members.size()];
members.keySet().toArray(result);
return result;
}
|
public org.apache.catalina.tribes.Member[] | getMapMembers()
return getMapMembers(this.mapMembers);
|
public org.apache.catalina.tribes.Member[] | getMapMembersExcl(org.apache.catalina.tribes.Member[] exclude)
synchronized (mapMembers) {
HashMap list = (HashMap)mapMembers.clone();
for (int i=0; i<exclude.length;i++) list.remove(exclude[i]);
return getMapMembers(list);
}
|
public java.lang.Object | getMapOwner()
return mapOwner;
|
public int | getNextBackupIndex()
int size = mapMembers.size();
if (mapMembers.size() == 0)return -1;
int node = currentNode++;
if (node >= size) {
node = 0;
currentNode = 0;
}
return node;
|
public org.apache.catalina.tribes.Member | getNextBackupNode()
Member[] members = getMapMembers();
int node = getNextBackupIndex();
if ( members.length == 0 || node==-1) return null;
if ( node >= members.length ) node = 0;
return members[node];
|
public org.apache.catalina.tribes.group.RpcChannel | getRpcChannel()
return rpcChannel;
|
public long | getRpcTimeout()
return rpcTimeout;
|
protected abstract int | getStateMessageType()
|
public java.lang.Object | getStateMutex()
return stateMutex;
|
public int | hashCode()
return Arrays.hashCode(this.mapContextName);
|
public void | heartbeat()
try {
ping(accessTimeout);
}catch ( Exception x ) {
log.error("Unable to send AbstractReplicatedMap.ping message",x);
}
|
public boolean | inSet(org.apache.catalina.tribes.Member m, org.apache.catalina.tribes.Member[] set)
if ( set == null ) return false;
boolean result = false;
for (int i=0; i<set.length && (!result); i++ )
if ( m.equals(set[i]) ) result = true;
return result;
|
protected void | init(java.lang.Object owner, org.apache.catalina.tribes.Channel channel, java.lang.String mapContextName, long timeout, int channelSendOptions, java.lang.ClassLoader[] cls)Initializes the map by creating the RPC channel, registering itself as a channel listener
This method is also responsible for initiating the state transfer
log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName);
this.mapOwner = owner;
this.externalLoaders = cls;
this.channelSendOptions = channelSendOptions;
this.channel = channel;
this.rpcTimeout = timeout;
try {
this.mapname = mapContextName;
//unique context is more efficient if it is stored as bytes
this.mapContextName = mapContextName.getBytes(chset);
} catch (UnsupportedEncodingException x) {
log.warn("Unable to encode mapContextName[" + mapContextName + "] using getBytes(" + chset +") using default getBytes()", x);
this.mapContextName = mapContextName.getBytes();
}
if ( log.isTraceEnabled() ) log.trace("Created Lazy Map with name:"+mapContextName+", bytes:"+Arrays.toString(this.mapContextName));
//create an rpc channel and add the map as a listener
this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
//add this map as a message listener
this.channel.addChannelListener(this);
//listen for membership notifications
this.channel.addMembershipListener(this);
try {
//broadcast our map, this just notifies other members of our existence
broadcast(MapMessage.MSG_INIT, true);
//transfer state from another map
transferState();
//state is transferred, we are ready for messaging
broadcast(MapMessage.MSG_START, true);
} catch (ChannelException x) {
log.warn("Unable to send map start message.");
throw new RuntimeException("Unable to start replicated map.",x);
}
|
public boolean | isEmpty()
return size()==0;
|
public boolean | isStateTransferred()
return stateTransferred;
|
public java.util.Set | keySet()
//todo implement
//should only return keys where this is active.
LinkedHashSet set = new LinkedHashSet(super.size());
Iterator i = super.entrySet().iterator();
while ( i.hasNext() ) {
Map.Entry e = (Map.Entry)i.next();
Object key = e.getKey();
MapEntry entry = (MapEntry)super.get(key);
if ( entry.isPrimary() ) set.add(key);
}
return Collections.unmodifiableSet(set);
|
public java.util.Set | keySetFull()
return super.keySet();
|
public void | leftOver(java.io.Serializable msg, org.apache.catalina.tribes.Member sender)If the reply has already been sent to the requesting thread,
the rpc callback can handle any data that comes in after the fact.
//left over membership messages
if (! (msg instanceof MapMessage))return;
MapMessage mapmsg = (MapMessage) msg;
try {
mapmsg.deserialize(getExternalLoaders());
if (mapmsg.getMsgType() == MapMessage.MSG_START) {
mapMemberAdded(mapmsg.getBackupNodes()[0]);
} else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
memberAlive(mapmsg.getBackupNodes()[0]);
}
} catch (IOException x ) {
log.error("Unable to deserialize MapMessage.",x);
} catch (ClassNotFoundException x ) {
log.error("Unable to deserialize MapMessage.",x);
}
|
public void | mapMemberAdded(org.apache.catalina.tribes.Member member)
if ( member.equals(getChannel().getLocalMember(false)) ) return;
boolean memberAdded = false;
//select a backup node if we don't have one
synchronized (mapMembers) {
if (!mapMembers.containsKey(member) ) {
mapMembers.put(member, new Long(System.currentTimeMillis()));
memberAdded = true;
}
}
if ( memberAdded ) {
synchronized (stateMutex) {
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
MapEntry entry = (MapEntry) super.get(e.getKey());
if ( entry == null ) continue;
if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
try {
Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
entry.setBackupNodes(backup);
} catch (ChannelException x) {
log.error("Unable to select backup node.", x);
} //catch
} //end if
} //while
} //synchronized
}//end if
|
public void | memberAdded(org.apache.catalina.tribes.Member member)
//do nothing
|
protected void | memberAlive(org.apache.catalina.tribes.Member member)We have received a member alive notification
synchronized (mapMembers) {
if (!mapMembers.containsKey(member)) {
mapMemberAdded(member);
} //end if
mapMembers.put(member, new Long(System.currentTimeMillis()));
}
|
public void | memberDisappeared(org.apache.catalina.tribes.Member member)
boolean removed = false;
synchronized (mapMembers) {
removed = (mapMembers.remove(member) != null );
}
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
MapEntry entry = (MapEntry) super.get(e.getKey());
if (entry.isPrimary() && inSet(member,entry.getBackupNodes())) {
try {
Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
entry.setBackupNodes(backup);
} catch (ChannelException x) {
log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
}
} //end if
} //while
|
public void | messageReceived(java.io.Serializable msg, org.apache.catalina.tribes.Member sender)
if (! (msg instanceof MapMessage)) return;
MapMessage mapmsg = (MapMessage) msg;
if ( log.isTraceEnabled() ) {
log.trace("Map["+mapname+"] received message:"+mapmsg);
}
try {
mapmsg.deserialize(getExternalLoaders());
} catch (IOException x) {
log.error("Unable to deserialize MapMessage.", x);
return;
} catch (ClassNotFoundException x) {
log.error("Unable to deserialize MapMessage.", x);
return;
}
if ( log.isTraceEnabled() )
log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg);
if (mapmsg.getMsgType() == MapMessage.MSG_START) {
mapMemberAdded(mapmsg.getBackupNodes()[0]);
}
if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
memberDisappeared(mapmsg.getBackupNodes()[0]);
}
if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
if ( entry==null ) {
entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
entry.setBackup(false);
entry.setProxy(true);
entry.setBackupNodes(mapmsg.getBackupNodes());
super.put(entry.getKey(), entry);
} else {
entry.setProxy(true);
entry.setBackup(false);
entry.setBackupNodes(mapmsg.getBackupNodes());
}
}
if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) {
super.remove(mapmsg.getKey());
}
if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP || mapmsg.getMsgType() == MapMessage.MSG_COPY) {
MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
if (entry == null) {
entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
entry.setProxy(false);
entry.setBackupNodes(mapmsg.getBackupNodes());
if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
}
} else {
entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
entry.setProxy(false);
entry.setBackupNodes(mapmsg.getBackupNodes());
if (entry.getValue() instanceof ReplicatedMapEntry) {
ReplicatedMapEntry diff = (ReplicatedMapEntry) entry.getValue();
if (mapmsg.isDiff()) {
try {
diff.lock();
diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
} catch (Exception x) {
log.error("Unable to apply diff to key:" + entry.getKey(), x);
} finally {
diff.unlock();
}
} else {
if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner());
} //end if
} else if (mapmsg.getValue() instanceof ReplicatedMapEntry) {
ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue();
re.setOwner(getMapOwner());
entry.setValue(re);
} else {
if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
} //end if
} //end if
super.put(entry.getKey(), entry);
} //end if
|
protected void | ping(long timeout)Sends a ping out to all the members in the cluster, not just map members
that this map is alive.
//send out a map membership message, only wait for the first reply
MapMessage msg = new MapMessage(this.mapContextName,
MapMessage.MSG_INIT,
false,
null,
null,
null,
wrap(channel.getLocalMember(false)));
if ( channel.getMembers().length > 0 ) {
//send a ping, wait for all nodes to reply
Response[] resp = rpcChannel.send(channel.getMembers(),
msg, rpcChannel.ALL_REPLY,
(channelSendOptions),
(int) accessTimeout);
for (int i = 0; i < resp.length; i++) {
memberAlive(resp[i].getSource());
} //for
}
//update our map of members, expire some if we didn't receive a ping back
synchronized (mapMembers) {
Iterator it = mapMembers.entrySet().iterator();
long now = System.currentTimeMillis();
while ( it.hasNext() ) {
Map.Entry entry = (Map.Entry)it.next();
long access = ((Long)entry.getValue()).longValue();
if ( (now - access) > timeout ) {
it.remove();
memberDisappeared( (Member) entry.getKey());
}
}
}//synch
|
protected void | printMap(java.lang.String header)
try {
System.out.println("\nDEBUG MAP:"+header);
System.out.println("Map["+ new String(mapContextName, chset) + ", Map Size:" + super.size());
Member[] mbrs = getMapMembers();
for ( int i=0; i<mbrs.length;i++ ) {
System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName());
}
Iterator i = super.entrySet().iterator();
int cnt = 0;
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
System.out.println( (++cnt) + ". " + super.get(e.getKey()));
}
System.out.println("EndMap]\n\n");
}catch ( Exception ignore) {
ignore.printStackTrace();
}
|
protected abstract org.apache.catalina.tribes.Member[] | publishEntryInfo(java.lang.Object key, java.lang.Object value)
|
public java.lang.Object | put(java.lang.Object key, java.lang.Object value)
return put(key,value,true);
|
public java.lang.Object | put(java.lang.Object key, java.lang.Object value, boolean notify)
MapEntry entry = new MapEntry(key,value);
entry.setBackup(false);
entry.setProxy(false);
Object old = null;
//make sure that any old values get removed
if ( containsKey(key) ) old = remove(key);
try {
if ( notify ) {
Member[] backup = publishEntryInfo(key, value);
entry.setBackupNodes(backup);
}
} catch (ChannelException x) {
log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x);
}
super.put(key,entry);
return old;
|
public void | putAll(java.util.Map m)Copies all values from one map to this instance
Iterator i = m.entrySet().iterator();
while ( i.hasNext() ) {
Map.Entry entry = (Map.Entry)i.next();
put(entry.getKey(),entry.getValue());
}
|
public java.lang.Object | remove(java.lang.Object key)Removes an object from this map, it will also remove it from
return remove(key,true);
|
public java.lang.Object | remove(java.lang.Object key, boolean notify)
MapEntry entry = (MapEntry)super.remove(key);
try {
if (getMapMembers().length > 0 && notify) {
MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null);
getChannel().send(getMapMembers(), msg, getChannelSendOptions());
}
} catch ( ChannelException x ) {
log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation",x);
}
return entry!=null?entry.getValue():null;
|
protected boolean | removeEldestEntry(java.util.Map$Entry eldest)
return false;
|
public void | replicate(java.lang.Object key, boolean complete)Replicates any changes to the object since the last time
The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated
if ( log.isTraceEnabled() )
log.trace("Replicate invoked on key:"+key);
MapEntry entry = (MapEntry)super.get(key);
if ( entry == null ) return;
if ( !entry.isSerializable() ) return;
if (entry != null && entry.isPrimary() && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0) {
Object value = entry.getValue();
//check to see if we need to replicate this object isDirty()||complete
boolean repl = complete || ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDirty());
if (!repl) {
if ( log.isTraceEnabled() )
log.trace("Not replicating:"+key+", no change made");
return;
}
//check to see if the message is diffable
boolean diff = ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDiffable());
MapMessage msg = null;
if (diff) {
ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue();
try {
rentry.lock();
//construct a diff message
msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
true, (Serializable) entry.getKey(), null,
rentry.getDiff(),
entry.getBackupNodes());
} catch (IOException x) {
log.error("Unable to diff object. Will replicate the entire object instead.", x);
} finally {
rentry.unlock();
}
}
if (msg == null) {
//construct a complete
msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
false, (Serializable) entry.getKey(),
(Serializable) entry.getValue(),
null, entry.getBackupNodes());
}
try {
if ( channel!=null && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0 ) {
channel.send(entry.getBackupNodes(), msg, channelSendOptions);
}
} catch (ChannelException x) {
log.error("Unable to replicate data.", x);
}
} //end if
|
public void | replicate(boolean complete)This can be invoked by a periodic thread to replicate out any changes.
For maps that don't store objects that implement ReplicatedMapEntry, this
method should be used infrequently to avoid large amounts of data transfer
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
replicate(e.getKey(), complete);
} //while
|
public java.io.Serializable | replyRequest(java.io.Serializable msg, org.apache.catalina.tribes.Member sender)
if (! (msg instanceof MapMessage))return null;
MapMessage mapmsg = (MapMessage) msg;
//map init request
if (mapmsg.getMsgType() == mapmsg.MSG_INIT) {
mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
return mapmsg;
}
//map start request
if (mapmsg.getMsgType() == mapmsg.MSG_START) {
mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
mapMemberAdded(sender);
return mapmsg;
}
//backup request
if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) {
MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
if (entry == null || (!entry.isSerializable()) )return null;
mapmsg.setValue( (Serializable) entry.getValue());
return mapmsg;
}
//state transfer request
if (mapmsg.getMsgType() == mapmsg.MSG_STATE || mapmsg.getMsgType() == mapmsg.MSG_STATE_COPY) {
synchronized (stateMutex) { //make sure we dont do two things at the same time
ArrayList list = new ArrayList();
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
MapEntry entry = (MapEntry) super.get(e.getKey());
if ( entry.isSerializable() ) {
boolean copy = (mapmsg.getMsgType() == mapmsg.MSG_STATE_COPY);
MapMessage me = new MapMessage(mapContextName,
copy?MapMessage.MSG_COPY:MapMessage.MSG_PROXY,
false, (Serializable) entry.getKey(), copy?(Serializable) entry.getValue():null, null, entry.getBackupNodes());
list.add(me);
}
}
mapmsg.setValue(list);
return mapmsg;
} //synchronized
}
return null;
|
public void | setAccessTimeout(long accessTimeout)
this.accessTimeout = accessTimeout;
|
public void | setChannelSendOptions(int channelSendOptions)
this.channelSendOptions = channelSendOptions;
|
public void | setExternalLoaders(java.lang.ClassLoader[] externalLoaders)
this.externalLoaders = externalLoaders;
|
public void | setMapOwner(java.lang.Object mapOwner)
this.mapOwner = mapOwner;
|
public int | size()
//todo, implement a counter variable instead
//only count active members in this node
int counter = 0;
Iterator it = super.entrySet().iterator();
while (it!=null && it.hasNext() ) {
Map.Entry e = (Map.Entry) it.next();
if ( e != null ) {
MapEntry entry = (MapEntry) super.get(e.getKey());
if (entry!=null && entry.isPrimary() && entry.getValue() != null) counter++;
}
}
return counter;
|
public int | sizeFull()
return super.size();
|
public void | transferState()
try {
Member[] members = getMapMembers();
Member backup = members.length > 0 ? (Member) members[0] : null;
if (backup != null) {
MapMessage msg = new MapMessage(mapContextName, getStateMessageType(), false,
null, null, null, null);
Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
if (resp.length > 0) {
synchronized (stateMutex) {
msg = (MapMessage) resp[0].getMessage();
msg.deserialize(getExternalLoaders());
ArrayList list = (ArrayList) msg.getValue();
for (int i = 0; i < list.size(); i++) {
messageReceived( (Serializable) list.get(i), resp[0].getSource());
} //for
}
} else {
log.warn("Transfer state, 0 replies, probably a timeout.");
}
}
} catch (ChannelException x) {
log.error("Unable to transfer LazyReplicatedMap state.", x);
} catch (IOException x) {
log.error("Unable to transfer LazyReplicatedMap state.", x);
} catch (ClassNotFoundException x) {
log.error("Unable to transfer LazyReplicatedMap state.", x);
}
stateTransferred = true;
|
public java.util.Collection | values()
ArrayList values = new ArrayList();
Iterator i = super.entrySet().iterator();
while ( i.hasNext() ) {
Map.Entry e = (Map.Entry)i.next();
MapEntry entry = (MapEntry)super.get(e.getKey());
if ( entry.isPrimary() && entry.getValue()!=null) values.add(entry.getValue());
}
return Collections.unmodifiableCollection(values);
|
protected org.apache.catalina.tribes.Member[] | wrap(org.apache.catalina.tribes.Member m)Helper methods, wraps a single member in an array
if ( m == null ) return new Member[0];
else return new Member[] {m};
|