/** * Helper method to broadcast a message to all members in a channel * @param msgtype int * @param rpc boolean * @throws ChannelException */ protected void broadcast(int msgtype, boolean rpc) throws ChannelException { Member[] members = channel.getMembers(); // No destination. if (members.length == 0 ) return; //send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, msgtype, false, null, null, null, channel.getLocalMember(false), null); if ( rpc) { Response[] resp = rpcChannel.send(members, msg, RpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout); if (resp.length > 0) { for (int i = 0; i < resp.length; i++) { mapMemberAdded(resp[i].getSource()); messageReceived(resp[i].getMessage(), resp[i].getSource()); } } else { log.warn("broadcast received 0 replies, probably a timeout."); } } else { channel.send(channel.getMembers(),msg,channelSendOptions); } }
/** * Helper method to broadcast a message to all members in a channel * * @param msgtype * int * @param rpc * boolean * @throws ChannelException */ protected void broadcast(int msgtype, boolean rpc) throws ChannelException { Member[] members = channel.getMembers(); // No destination. if (members.length == 0) return; // send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, msgtype, false, null, null, null, channel.getLocalMember(false), null); if (rpc) { Response[] resp = rpcChannel.send(members, msg, RpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout); if (resp.length > 0) { for (int i = 0; i < resp.length; i++) { mapMemberAdded(resp[i].getSource()); messageReceived(resp[i].getMessage(), resp[i].getSource()); } } else { log.warn("broadcast received 0 replies, probably a timeout."); } } else { channel.send(channel.getMembers(), msg, channelSendOptions); } }
/** * Send the list of members to the <code>member</code> * * @param member The member to whom the member list has to be sent */ public void sendMemberList(Member member) { try { MemberListCommand memListCmd = new MemberListCommand(); List<Member> members = new ArrayList<Member>(this.members); memListCmd.setMembers(members.toArray(new Member[members.size()])); rpcMembershipChannel.send(new Member[]{member}, memListCmd, RpcChannel.ALL_REPLY, Channel.SEND_OPTIONS_ASYNCHRONOUS | TribesConstants.MEMBERSHIP_MSG_OPTION, 10000); if (log.isDebugEnabled()) { log.debug("Sent MEMBER_LIST to " + TribesUtil.getName(member)); } } catch (Exception e) { String errMsg = "Could not send MEMBER_LIST to member " + TribesUtil.getName(member); log.error(errMsg, e); throw new RemoteProcessException(errMsg, e); } }
public EchoRpcTest(Channel channel, String name, int count, String message, long pause, int options, long timeout) { this.channel = channel; this.count = count; this.message = message; this.pause = pause; this.options = options; this.rpc = new RpcChannel(name.getBytes(),channel,this); this.timeout = timeout; this.name = name; }
/** * Inform all members that a particular member just joined * * @param member The member who just joined */ public void sendMemberJoinedToAll(Member member) { try { MemberJoinedCommand cmd = new MemberJoinedCommand(); cmd.setMember(member); ArrayList<Member> membersToSend = (ArrayList<Member>) (((ArrayList) members).clone()); membersToSend.remove(member); // Do not send MEMBER_JOINED to the new member who just joined if (membersToSend.size() > 0) { rpcMembershipChannel.send(membersToSend.toArray(new Member[membersToSend.size()]), cmd, RpcChannel.ALL_REPLY, Channel.SEND_OPTIONS_ASYNCHRONOUS | TribesConstants.MEMBERSHIP_MSG_OPTION, 10000); if (log.isDebugEnabled()) { log.debug("Sent MEMBER_JOINED[" + TribesUtil.getName(member) + "] to all members in domain " + new String(domain)); } } } catch (Exception e) { String errMsg = "Could not send MEMBER_JOINED[" + TribesUtil.getName(member) + "] to all members "; log.error(errMsg, e); throw new RemoteProcessException(errMsg, e); } }
/** * Initializes the map by creating the RPC channel, registering itself as a channel listener * This method is also responsible for initiating the state transfer * @param owner Object * @param channel Channel * @param mapContextName String * @param timeout long * @param channelSendOptions int * @param cls ClassLoader[] */ protected void init(MapOwner owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) { log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName); this.mapOwner = owner; this.externalLoaders = cls; this.channelSendOptions = channelSendOptions; this.channel = channel; this.rpcTimeout = timeout; this.mapname = mapContextName; //unique context is more efficient if it is stored as bytes this.mapContextName = mapContextName.getBytes(CHARSET_ISO_8859_1); if ( log.isTraceEnabled() ) log.trace("Created Lazy Map with name:"+mapContextName+", bytes:"+Arrays.toString(this.mapContextName)); //create an rpc channel and add the map as a listener this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); //add this map as a message listener this.channel.addChannelListener(this); //listen for membership notifications this.channel.addMembershipListener(this); try { //broadcast our map, this just notifies other members of our existence broadcast(MapMessage.MSG_INIT, true); //transfer state from another map transferState(); //state is transferred, we are ready for messaging broadcast(MapMessage.MSG_START, true); } catch (ChannelException x) { log.warn("Unable to send map start message."); // remove listener from channel this.rpcChannel.breakdown(); this.channel.removeChannelListener(this); this.channel.removeMembershipListener(this); throw new RuntimeException("Unable to start replicated map.",x); } }
/** * Initializes the map by creating the RPC channel, registering itself as a channel listener * This method is also responsible for initiating the state transfer * @param owner Object * @param channel Channel * @param mapContextName String * @param timeout long * @param channelSendOptions int * @param cls ClassLoader[] * @param terminate - Flag for whether to terminate this map that failed to start. */ protected void init(MapOwner owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls, boolean terminate) { long start = System.currentTimeMillis(); if (log.isInfoEnabled()) log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName); this.mapOwner = owner; this.externalLoaders = cls; this.channelSendOptions = channelSendOptions; this.channel = channel; this.rpcTimeout = timeout; this.mapname = mapContextName; //unique context is more efficient if it is stored as bytes this.mapContextName = mapContextName.getBytes(CHARSET_ISO_8859_1); if ( log.isTraceEnabled() ) log.trace("Created Lazy Map with name:"+mapContextName+", bytes:"+Arrays.toString(this.mapContextName)); //create an rpc channel and add the map as a listener this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); //add this map as a message listener this.channel.addChannelListener(this); //listen for membership notifications this.channel.addMembershipListener(this); try { //broadcast our map, this just notifies other members of our existence broadcast(MapMessage.MSG_INIT, true); //transfer state from another map transferState(); //state is transferred, we are ready for messaging broadcast(MapMessage.MSG_START, true); } catch (ChannelException x) { log.warn("Unable to send map start message."); if (terminate) { breakdown(); throw new RuntimeException("Unable to start replicated map.",x); } } this.state = State.INITIALIZED; long complete = System.currentTimeMillis() - start; if (log.isInfoEnabled()) log.info("AbstractReplicatedMap[" +mapContextName + "] initialization was completed in " + complete + " ms."); }
@SuppressWarnings("unchecked") @Override public V get(Object key) { MapEntry<K,V> entry = innerMap.get(key); if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" entry:"+entry); if ( entry == null ) return null; if ( !entry.isPrimary() ) { //if the message is not primary, we need to retrieve the latest value try { Member[] backup = null; MapMessage msg = null; if (entry.isBackup()) { //select a new backup node backup = publishEntryInfo(key, entry.getValue()); } else if ( entry.isProxy() ) { //make sure we don't retrieve from ourselves msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable) key, null, null, null,null); Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, RpcChannel.FIRST_REPLY, getChannelSendOptions(), getRpcTimeout()); if (resp == null || resp.length == 0 || resp[0].getMessage() == null) { //no responses log.warn("Unable to retrieve remote object for key:" + key); return null; } msg = (MapMessage) resp[0].getMessage(); msg.deserialize(getExternalLoaders()); backup = entry.getBackupNodes(); if ( msg.getValue()!=null ) entry.setValue((V) msg.getValue()); // notify member msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false, (Serializable)entry.getKey(), null, null, channel.getLocalMember(false), backup); if ( backup != null && backup.length > 0) { getChannel().send(backup, msg, getChannelSendOptions()); } //invalidate the previous primary msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup); Member[] dest = getMapMembersExcl(backup); if ( dest!=null && dest.length >0) { getChannel().send(dest, msg, getChannelSendOptions()); } if (entry.getValue() instanceof ReplicatedMapEntry) { ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); val.setOwner(getMapOwner()); } } else if ( entry.isCopy() ) { backup = getMapMembers(); if (backup.length > 0) { msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false, (Serializable)key,null,null,channel.getLocalMember(false),backup); getChannel().send(backup, msg, getChannelSendOptions()); } } entry.setPrimary(channel.getLocalMember(false)); entry.setBackupNodes(backup); entry.setBackup(false); entry.setProxy(false); entry.setCopy(false); if ( getMapOwner()!=null ) getMapOwner().objectMadePrimay(key, entry.getValue()); } catch (Exception x) { log.error("Unable to replicate out data for a AbstractReplicatedMap.get operation", x); return null; } } if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" result:"+entry.getValue()); return entry.getValue(); }
public RpcChannel getRpcChannel() { return rpcChannel; }
public static void main(String[] args) throws Exception { long pause = 3000; int count = 1000000; int stats = 10000; String name = "EchoRpcId"; int options = RpcChannel.ALL_REPLY; long timeout = 15000; String message = "EchoRpcMessage"; if (args.length == 0) { usage(); System.exit(1); } for (int i = 0; i < args.length; i++) { if ("-threads".equals(args[i])) { // Not used } else if ("-count".equals(args[i])) { count = Integer.parseInt(args[++i]); System.out.println("Sending "+count+" messages."); } else if ("-pause".equals(args[i])) { pause = Long.parseLong(args[++i])*1000; } else if ("-break".equals(args[i])) { // Not used } else if ("-stats".equals(args[i])) { stats = Integer.parseInt(args[++i]); System.out.println("Stats every "+stats+" message"); } else if ("-timeout".equals(args[i])) { timeout = Long.parseLong(args[++i]); } else if ("-message".equals(args[i])) { message = args[++i]; } else if ("-name".equals(args[i])) { name = args[++i]; } else if ("-mode".equals(args[i])) { if ( "all".equals(args[++i]) ) options = RpcChannel.ALL_REPLY; else if ( "first".equals(args[i]) ) options = RpcChannel.FIRST_REPLY; else if ( "majority".equals(args[i]) ) options = RpcChannel.MAJORITY_REPLY; } else if ("-debug".equals(args[i])) { // Not used } else if ("-help".equals(args[i])) { usage(); System.exit(1); } } ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args); EchoRpcTest test = new EchoRpcTest(channel,name,count,message,pause,options,timeout); channel.start(Channel.DEFAULT); Runtime.getRuntime().addShutdownHook(new Shutdown(channel)); test.run(); System.out.println("System test complete, sleeping to let threads finish."); Thread.sleep(60*1000*60); }
/** * Initializes the map by creating the RPC channel, registering itself as a * channel listener This method is also responsible for initiating the state * transfer * * @param owner * Object * @param channel * Channel * @param mapContextName * String * @param timeout * long * @param channelSendOptions * int * @param cls * ClassLoader[] * @param terminate * - Flag for whether to terminate this map that failed to start. */ protected void init(MapOwner owner, Channel channel, String mapContextName, long timeout, int channelSendOptions, ClassLoader[] cls, boolean terminate) { long start = System.currentTimeMillis(); if (log.isInfoEnabled()) log.info("Initializing AbstractReplicatedMap with context name:" + mapContextName); this.mapOwner = owner; this.externalLoaders = cls; this.channelSendOptions = channelSendOptions; this.channel = channel; this.rpcTimeout = timeout; this.mapname = mapContextName; // unique context is more efficient if it is stored as bytes this.mapContextName = mapContextName.getBytes(CHARSET_ISO_8859_1); if (log.isTraceEnabled()) log.trace( "Created Lazy Map with name:" + mapContextName + ", bytes:" + Arrays.toString(this.mapContextName)); // create an rpc channel and add the map as a listener this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); // add this map as a message listener this.channel.addChannelListener(this); // listen for membership notifications this.channel.addMembershipListener(this); try { // broadcast our map, this just notifies other members of our // existence broadcast(MapMessage.MSG_INIT, true); // transfer state from another map transferState(); // state is transferred, we are ready for messaging broadcast(MapMessage.MSG_START, true); } catch (ChannelException x) { log.warn("Unable to send map start message."); if (terminate) { breakdown(); throw new RuntimeException("Unable to start replicated map.", x); } } this.state = State.INITIALIZED; long complete = System.currentTimeMillis() - start; if (log.isInfoEnabled()) log.info("AbstractReplicatedMap[" + mapContextName + "] initialization was completed in " + complete + " ms."); }
@SuppressWarnings("unchecked") @Override public V get(Object key) { MapEntry<K, V> entry = innerMap.get(key); if (log.isTraceEnabled()) log.trace("Requesting id:" + key + " entry:" + entry); if (entry == null) return null; if (!entry.isPrimary()) { // if the message is not primary, we need to retrieve the latest // value try { Member[] backup = null; MapMessage msg = null; if (entry.isBackup()) { // select a new backup node backup = publishEntryInfo(key, entry.getValue()); } else if (entry.isProxy()) { // make sure we don't retrieve from ourselves msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable) key, null, null, null, null); Response[] resp = getRpcChannel().send(entry.getBackupNodes(), msg, RpcChannel.FIRST_REPLY, getChannelSendOptions(), getRpcTimeout()); if (resp == null || resp.length == 0 || resp[0].getMessage() == null) { // no responses log.warn("Unable to retrieve remote object for key:" + key); return null; } msg = (MapMessage) resp[0].getMessage(); msg.deserialize(getExternalLoaders()); backup = entry.getBackupNodes(); if (msg.getValue() != null) entry.setValue((V) msg.getValue()); // notify member msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER, false, (Serializable) entry.getKey(), null, null, channel.getLocalMember(false), backup); if (backup != null && backup.length > 0) { getChannel().send(backup, msg, getChannelSendOptions()); } // invalidate the previous primary msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, (Serializable) key, null, null, channel.getLocalMember(false), backup); Member[] dest = getMapMembersExcl(backup); if (dest != null && dest.length > 0) { getChannel().send(dest, msg, getChannelSendOptions()); } if (entry.getValue() instanceof ReplicatedMapEntry) { ReplicatedMapEntry val = (ReplicatedMapEntry) entry.getValue(); val.setOwner(getMapOwner()); } } else if (entry.isCopy()) { backup = getMapMembers(); if (backup.length > 0) { msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER, false, (Serializable) key, null, null, channel.getLocalMember(false), backup); getChannel().send(backup, msg, getChannelSendOptions()); } } entry.setPrimary(channel.getLocalMember(false)); entry.setBackupNodes(backup); entry.setBackup(false); entry.setProxy(false); entry.setCopy(false); if (getMapOwner() != null) getMapOwner().objectMadePrimay(key, entry.getValue()); } catch (Exception x) { log.error("Unable to replicate out data for a AbstractReplicatedMap.get operation", x); return null; } } if (log.isTraceEnabled()) log.trace("Requesting id:" + key + " result:" + entry.getValue()); return entry.getValue(); }
/** * Initializes the map by creating the RPC channel, registering itself as a channel listener * This method is also responsible for initiating the state transfer * @param owner Object * @param channel Channel * @param mapContextName String * @param timeout long * @param channelSendOptions int * @param cls ClassLoader[] */ protected void init(MapOwner owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls, boolean terminate) { long start = System.currentTimeMillis(); log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName); this.mapOwner = owner; this.externalLoaders = cls; this.channelSendOptions = channelSendOptions; this.channel = channel; this.rpcTimeout = timeout; this.mapname = mapContextName; //unique context is more efficient if it is stored as bytes this.mapContextName = mapContextName.getBytes(CHARSET_ISO_8859_1); if ( log.isTraceEnabled() ) log.trace("Created Lazy Map with name:"+mapContextName+", bytes:"+Arrays.toString(this.mapContextName)); //create an rpc channel and add the map as a listener this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); //add this map as a message listener this.channel.addChannelListener(this); //listen for membership notifications this.channel.addMembershipListener(this); try { //broadcast our map, this just notifies other members of our existence broadcast(MapMessage.MSG_INIT, true); //transfer state from another map transferState(); //state is transferred, we are ready for messaging broadcast(MapMessage.MSG_START, true); } catch (ChannelException x) { log.warn("Unable to send map start message."); if (terminate) { breakdown(); throw new RuntimeException("Unable to start replicated map.",x); } } long complete = System.currentTimeMillis() - start; if (log.isInfoEnabled()) log.info("AbstractReplicatedMap[" +mapContextName + "] initialization was completed in " + complete + " ms."); }
/** * Sends a ping out to all the members in the cluster, not just map members * that this map is alive. * @param timeout long * @throws ChannelException */ protected void ping(long timeout) throws ChannelException { //send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT, false, null, null, null, channel.getLocalMember(false), null); if ( channel.getMembers().length > 0 ) { try { //send a ping, wait for all nodes to reply Response[] resp = rpcChannel.send(channel.getMembers(), msg, RpcChannel.ALL_REPLY, (channelSendOptions), (int) accessTimeout); for (int i = 0; i < resp.length; i++) { memberAlive(resp[i].getSource()); } } catch (ChannelException ce) { // Handle known failed members FaultyMember[] faultyMembers = ce.getFaultyMembers(); for (FaultyMember faultyMember : faultyMembers) { memberDisappeared(faultyMember.getMember()); } throw ce; } } //update our map of members, expire some if we didn't receive a ping back synchronized (mapMembers) { Member[] members = mapMembers.keySet().toArray(new Member[mapMembers.size()]); long now = System.currentTimeMillis(); for (Member member : members) { long access = mapMembers.get(member); if ( (now - access) > timeout ) { memberDisappeared(member); } } }//synch }
/** * Initializes the map by creating the RPC channel, registering itself as a channel listener * This method is also responsible for initiating the state transfer * @param owner Object * @param channel Channel * @param mapContextName String * @param timeout long * @param channelSendOptions int * @param cls ClassLoader[] * @param terminate - Flag for whether to terminate this map that failed to start. */ protected void init(MapOwner owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls, boolean terminate) { long start = System.currentTimeMillis(); if (log.isInfoEnabled()) log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName); this.mapOwner = owner; this.externalLoaders = cls; this.channelSendOptions = channelSendOptions; this.channel = channel; this.rpcTimeout = timeout; this.mapname = mapContextName; //unique context is more efficient if it is stored as bytes this.mapContextName = mapContextName.getBytes(CHARSET_ISO_8859_1); if ( log.isTraceEnabled() ) log.trace("Created Lazy Map with name:"+mapContextName+", bytes:"+Arrays.toString(this.mapContextName)); //create an rpc channel and add the map as a listener this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); //add this map as a message listener this.channel.addChannelListener(this); //listen for membership notifications this.channel.addMembershipListener(this); try { //broadcast our map, this just notifies other members of our existence broadcast(MapMessage.MSG_INIT, true); //transfer state from another map transferState(); //state is transferred, we are ready for messaging broadcast(MapMessage.MSG_START, true); } catch (ChannelException x) { log.warn("Unable to send map start message."); if (terminate) { breakdown(); throw new RuntimeException("Unable to start replicated map.",x); } } long complete = System.currentTimeMillis() - start; if (log.isInfoEnabled()) log.info("AbstractReplicatedMap[" +mapContextName + "] initialization was completed in " + complete + " ms."); }
/** * Sends a ping out to all the members in the cluster, not just map members * that this map is alive. * @param timeout long * @throws ChannelException */ protected void ping(long timeout) throws ChannelException { //send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT, false, null, null, null, channel.getLocalMember(false), null); if ( channel.getMembers().length > 0 ) { try { //send a ping, wait for all nodes to reply Response[] resp = rpcChannel.send(channel.getMembers(), msg, RpcChannel.ALL_REPLY, (channelSendOptions), (int) accessTimeout); for (int i = 0; i < resp.length; i++) { memberAlive(resp[i].getSource()); } } catch (ChannelException ce) { // Handle known failed members FaultyMember[] faultyMembers = ce.getFaultyMembers(); for (FaultyMember faultyMember : faultyMembers) { memberDisappeared(faultyMember.getMember()); } throw ce; } } //update our map of members, expire some if we didn't receive a ping back synchronized (mapMembers) { Member[] members = mapMembers.keySet().toArray(new Member[mapMembers.size()]); long now = System.currentTimeMillis(); for (Member member : members) { long access = mapMembers.get(member).longValue(); if ( (now - access) > timeout ) { memberDisappeared(member); } } }//synch }
public void setRpcMembershipChannel(RpcChannel rpcMembershipChannel) { this.rpcMembershipChannel = rpcMembershipChannel; }