/** * Helper method to broadcast a message to all members in a channel * @param msgtype int * @param rpc boolean * @throws ChannelException */ protected void broadcast(int msgtype, boolean rpc) throws ChannelException { Member[] members = channel.getMembers(); // No destination. if (members.length == 0 ) return; //send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, msgtype, false, null, null, null, channel.getLocalMember(false), null); if ( rpc) { Response[] resp = rpcChannel.send(members, msg, RpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout); if (resp.length > 0) { for (int i = 0; i < resp.length; i++) { mapMemberAdded(resp[i].getSource()); messageReceived(resp[i].getMessage(), resp[i].getSource()); } } else { log.warn("broadcast received 0 replies, probably a timeout."); } } else { channel.send(channel.getMembers(),msg,channelSendOptions); } }
@Override public void run() { long counter = 0; while (counter<count) { String msg = message + " cnt="+(++counter); try { System.out.println("Sending ["+msg+"]"); long start = System.currentTimeMillis(); Response[] resp = rpc.send(channel.getMembers(),msg,options,Channel.SEND_OPTIONS_DEFAULT,timeout); System.out.println("Send of ["+msg+"] completed. Nr of responses="+resp.length+" Time:"+(System.currentTimeMillis()-start)+" ms."); for ( int i=0; i<resp.length; i++ ) { System.out.println("Received a response message from ["+resp[i].getSource().getName()+"] with data ["+resp[i].getMessage()+"]"); } Thread.sleep(pause); }catch(Exception x){ // Ignore } } }
/** * Helper method to broadcast a message to all members in a channel * * @param msgtype * int * @param rpc * boolean * @throws ChannelException */ protected void broadcast(int msgtype, boolean rpc) throws ChannelException { Member[] members = channel.getMembers(); // No destination. if (members.length == 0) return; // send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, msgtype, false, null, null, null, channel.getLocalMember(false), null); if (rpc) { Response[] resp = rpcChannel.send(members, msg, RpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout); if (resp.length > 0) { for (int i = 0; i < resp.length; i++) { mapMemberAdded(resp[i].getSource()); messageReceived(resp[i].getMessage(), resp[i].getSource()); } } else { log.warn("broadcast received 0 replies, probably a timeout."); } } else { channel.send(channel.getMembers(), msg, channelSendOptions); } }
@SuppressWarnings("unchecked") @Override public V get(Object key) { MapEntry<K,V> entry = innerMap.get(key); if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" entry:"+entry); if ( entry == null ) return null; if ( !entry.isPrimary() ) { //if the message is not primary, we need to retrieve the latest value try { Member[] backup = null; MapMessage msg = null; if (entry.isBackup()) { //select a new backup node backup = publishEntryInfo(key, entry.getValue()); } else if ( entry.isProxy() ) { //make sure we don't retrieve from ourselves msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable) key, null, null, null,null); Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, RpcChannel.FIRST_REPLY, getChannelSendOptions(), getRpcTimeout()); if (resp == null || resp.length == 0 || resp[0].getMessage() == null) { //no responses log.warn("Unable to retrieve remote object for key:" + key); return null; } msg = (MapMessage) resp[0].getMessage(); msg.deserialize(getExternalLoaders()); backup = entry.getBackupNodes(); if ( msg.getValue()!=null ) entry.setValue((V) msg.getValue()); // notify member msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false, (Serializable)entry.getKey(), null, null, channel.getLocalMember(false), backup); if ( backup != null && backup.length > 0) { getChannel().send(backup, msg, getChannelSendOptions()); } //invalidate the previous primary msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup); Member[] dest = getMapMembersExcl(backup); if ( dest!=null && dest.length >0) { getChannel().send(dest, msg, getChannelSendOptions()); } if (entry.getValue() instanceof ReplicatedMapEntry) { ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); val.setOwner(getMapOwner()); } } else if ( entry.isCopy() ) { backup = getMapMembers(); if (backup.length > 0) { msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false, (Serializable)key,null,null,channel.getLocalMember(false),backup); getChannel().send(backup, msg, getChannelSendOptions()); } } entry.setPrimary(channel.getLocalMember(false)); entry.setBackupNodes(backup); entry.setBackup(false); entry.setProxy(false); entry.setCopy(false); if ( getMapOwner()!=null ) getMapOwner().objectMadePrimay(key, entry.getValue()); } catch (Exception x) { log.error("Unable to replicate out data for a AbstractReplicatedMap.get operation", x); return null; } } if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" result:"+entry.getValue()); return entry.getValue(); }
@SuppressWarnings("unchecked") @Override public V get(Object key) { MapEntry<K, V> entry = innerMap.get(key); if (log.isTraceEnabled()) log.trace("Requesting id:" + key + " entry:" + entry); if (entry == null) return null; if (!entry.isPrimary()) { // if the message is not primary, we need to retrieve the latest // value try { Member[] backup = null; MapMessage msg = null; if (entry.isBackup()) { // select a new backup node backup = publishEntryInfo(key, entry.getValue()); } else if (entry.isProxy()) { // make sure we don't retrieve from ourselves msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable) key, null, null, null, null); Response[] resp = getRpcChannel().send(entry.getBackupNodes(), msg, RpcChannel.FIRST_REPLY, getChannelSendOptions(), getRpcTimeout()); if (resp == null || resp.length == 0 || resp[0].getMessage() == null) { // no responses log.warn("Unable to retrieve remote object for key:" + key); return null; } msg = (MapMessage) resp[0].getMessage(); msg.deserialize(getExternalLoaders()); backup = entry.getBackupNodes(); if (msg.getValue() != null) entry.setValue((V) msg.getValue()); // notify member msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER, false, (Serializable) entry.getKey(), null, null, channel.getLocalMember(false), backup); if (backup != null && backup.length > 0) { getChannel().send(backup, msg, getChannelSendOptions()); } // invalidate the previous primary msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, (Serializable) key, null, null, channel.getLocalMember(false), backup); Member[] dest = getMapMembersExcl(backup); if (dest != null && dest.length > 0) { getChannel().send(dest, msg, getChannelSendOptions()); } if (entry.getValue() instanceof ReplicatedMapEntry) { ReplicatedMapEntry val = (ReplicatedMapEntry) entry.getValue(); val.setOwner(getMapOwner()); } } else if (entry.isCopy()) { backup = getMapMembers(); if (backup.length > 0) { msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER, false, (Serializable) key, null, null, channel.getLocalMember(false), backup); getChannel().send(backup, msg, getChannelSendOptions()); } } entry.setPrimary(channel.getLocalMember(false)); entry.setBackupNodes(backup); entry.setBackup(false); entry.setProxy(false); entry.setCopy(false); if (getMapOwner() != null) getMapOwner().objectMadePrimay(key, entry.getValue()); } catch (Exception x) { log.error("Unable to replicate out data for a AbstractReplicatedMap.get operation", x); return null; } } if (log.isTraceEnabled()) log.trace("Requesting id:" + key + " result:" + entry.getValue()); return entry.getValue(); }
/** * Sends a ping out to all the members in the cluster, not just map members * that this map is alive. * @param timeout long * @throws ChannelException */ protected void ping(long timeout) throws ChannelException { //send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT, false, null, null, null, channel.getLocalMember(false), null); if ( channel.getMembers().length > 0 ) { try { //send a ping, wait for all nodes to reply Response[] resp = rpcChannel.send(channel.getMembers(), msg, RpcChannel.ALL_REPLY, (channelSendOptions), (int) accessTimeout); for (int i = 0; i < resp.length; i++) { memberAlive(resp[i].getSource()); } } catch (ChannelException ce) { // Handle known failed members FaultyMember[] faultyMembers = ce.getFaultyMembers(); for (FaultyMember faultyMember : faultyMembers) { memberDisappeared(faultyMember.getMember()); } throw ce; } } //update our map of members, expire some if we didn't receive a ping back synchronized (mapMembers) { Member[] members = mapMembers.keySet().toArray(new Member[mapMembers.size()]); long now = System.currentTimeMillis(); for (Member member : members) { long access = mapMembers.get(member); if ( (now - access) > timeout ) { memberDisappeared(member); } } }//synch }
/** * Sends a ping out to all the members in the cluster, not just map members * that this map is alive. * @param timeout long * @throws ChannelException */ protected void ping(long timeout) throws ChannelException { //send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT, false, null, null, null, channel.getLocalMember(false), null); if ( channel.getMembers().length > 0 ) { try { //send a ping, wait for all nodes to reply Response[] resp = rpcChannel.send(channel.getMembers(), msg, RpcChannel.ALL_REPLY, (channelSendOptions), (int) accessTimeout); for (int i = 0; i < resp.length; i++) { memberAlive(resp[i].getSource()); } } catch (ChannelException ce) { // Handle known failed members FaultyMember[] faultyMembers = ce.getFaultyMembers(); for (FaultyMember faultyMember : faultyMembers) { memberDisappeared(faultyMember.getMember()); } throw ce; } } //update our map of members, expire some if we didn't receive a ping back synchronized (mapMembers) { Member[] members = mapMembers.keySet().toArray(new Member[mapMembers.size()]); long now = System.currentTimeMillis(); for (Member member : members) { long access = mapMembers.get(member).longValue(); if ( (now - access) > timeout ) { memberDisappeared(member); } } }//synch }
/** * Send MEMBER_LIST message to WKA member * * @param wkaMember The WKA member to whom the MEMBER_LIST has to be sent * @return true - if the WKA member belongs to the domain of this local member */ private boolean sendMemberListToWellKnownMember(Member wkaMember) { /*if (wkaMember.isFailing() || wkaMember.isSuspect()) { return false; }*/ // send the member list to it MemberListCommand memListCmd; try { memListCmd = new MemberListCommand(); List<Member> members = new ArrayList<Member>(this.members); members.add(localMember); // Need to set the local member too memListCmd.setMembers(members.toArray(new Member[members.size()])); Response[] responses = rpcMembershipChannel.send(new Member[]{wkaMember}, memListCmd, RpcChannel.ALL_REPLY, Channel.SEND_OPTIONS_ASYNCHRONOUS | TribesConstants.MEMBERSHIP_MSG_OPTION, 10000); // Once a response is received from the WKA member to the MEMBER_LIST message, // if it does not belong to this domain, simply remove it from the members if (responses != null && responses.length > 0 && responses[0] != null) { nonRespondingWkaMembers.remove(wkaMember); Member source = responses[0].getSource(); if (!TribesUtil.areInSameDomain(source, wkaMember)) { if (log.isDebugEnabled()) { log.debug("WKA Member " + TribesUtil.getName(source) + " does not belong to local domain " + new String(domain) + ". Hence removing it from the list."); } return false; } } else { // No response from WKA member if(nonRespondingWkaMembers.add(wkaMember)){ log.debug("No response from WKA member added to the list of non-responsive members : " + wkaMember); } // we still don't know the details of this WKA member, hence should return false return false; } } catch (Exception e) { String errMsg = "Could not send MEMBER_LIST to well-known member " + TribesUtil.getName(wkaMember); log.error(errMsg, e); throw new RemoteProcessException(errMsg, e); } return true; }
/** * Sends a ping out to all the members in the cluster, not just map members * that this map is alive. * @param timeout long * @throws ChannelException */ protected void ping(long timeout) throws ChannelException { //send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT, false, null, null, null, channel.getLocalMember(false), null); if ( channel.getMembers().length > 0 ) { try { //send a ping, wait for all nodes to reply Response[] resp = rpcChannel.send(channel.getMembers(), msg, RpcChannel.ALL_REPLY, (channelSendOptions), (int) accessTimeout); for (int i = 0; i < resp.length; i++) { memberAlive(resp[i].getSource()); } } catch (ChannelException ce) { // Handle known failed members FaultyMember[] faultyMembers = ce.getFaultyMembers(); for (FaultyMember faultyMember : faultyMembers) { memberDisappeared(faultyMember.getMember()); } } } //update our map of members, expire some if we didn't receive a ping back synchronized (mapMembers) { Iterator<Map.Entry<Member, Long>> it = mapMembers.entrySet().iterator(); long now = System.currentTimeMillis(); while ( it.hasNext() ) { Map.Entry<Member,Long> entry = it.next(); long access = entry.getValue().longValue(); if ( (now - access) > timeout ) { it.remove(); memberDisappeared(entry.getKey()); } } }//synch }