/** * send a cluster message to one member * * @param msg message to transfer * @param dest Receiver member * @see org.apache.catalina.ha.CatalinaCluster#send(org.apache.catalina.ha.ClusterMessage, * org.apache.catalina.tribes.Member) */ @Override public void send(ClusterMessage msg, Member dest) { try { msg.setAddress(getLocalMember()); int sendOptions = channelSendOptions; if (msg instanceof SessionMessage && ((SessionMessage)msg).getEventType() == SessionMessage.EVT_ALL_SESSION_DATA) { sendOptions = Channel.SEND_OPTIONS_SYNCHRONIZED_ACK|Channel.SEND_OPTIONS_USE_ACK; } if (dest != null) { if (!getLocalMember().equals(dest)) { channel.send(new Member[] {dest}, msg, sendOptions); } else log.error("Unable to send message to local member " + msg); } else { Member[] destmembers = channel.getMembers(); if (destmembers.length>0) channel.send(destmembers,msg, sendOptions); else if (log.isDebugEnabled()) log.debug("No members in cluster, ignoring message:"+msg); } } catch (Exception x) { log.error("Unable to send message through cluster sender.", x); } }
@Before public void setUp() throws Exception { channel1 = new GroupChannel(); channel1.addInterceptor(new MessageDispatch15Interceptor()); channel2 = new GroupChannel(); channel2.addInterceptor(new MessageDispatch15Interceptor()); ThroughputInterceptor tint = new ThroughputInterceptor(); tint.setInterval(500); ThroughputInterceptor tint2 = new ThroughputInterceptor(); tint2.setInterval(500); //channel1.addInterceptor(tint); channel2.addInterceptor(tint2); listener1 = new Listener(); ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver(); ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver(); rb1.setUdpPort(50000); rb2.setUdpPort(50000); channel2.addChannelListener(listener1); TesterUtil.addRandomDomain(new ManagedChannel[] {channel1, channel2}); channel1.start(Channel.DEFAULT); channel2.start(Channel.DEFAULT); }
/** * send a cluster message to one member * * @param msg * message to transfer * @param dest * Receiver member * @see org.apache.catalina.ha.CatalinaCluster#send(org.apache.catalina.ha.ClusterMessage, * org.apache.catalina.tribes.Member) */ @Override public void send(ClusterMessage msg, Member dest) { try { msg.setAddress(getLocalMember()); int sendOptions = channelSendOptions; if (msg instanceof SessionMessage && ((SessionMessage) msg).getEventType() == SessionMessage.EVT_ALL_SESSION_DATA) { sendOptions = Channel.SEND_OPTIONS_SYNCHRONIZED_ACK | Channel.SEND_OPTIONS_USE_ACK; } if (dest != null) { if (!getLocalMember().equals(dest)) { channel.send(new Member[] { dest }, msg, sendOptions); } else log.error("Unable to send message to local member " + msg); } else { Member[] destmembers = channel.getMembers(); if (destmembers.length > 0) channel.send(destmembers, msg, sendOptions); else if (log.isDebugEnabled()) log.debug("No members in cluster, ignoring message:" + msg); } } catch (Exception x) { log.error("Unable to send message through cluster sender.", x); } }
/** * Send a message to one or more members in the cluster * * @param destination * Member[] - the destinations, null or zero length means all * @param msg * ClusterMessage - the message to send * @param payload * TBA */ @Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if (destination == null) destination = membershipService.getMembers(); if ((msg.getOptions() & Channel.SEND_OPTIONS_MULTICAST) == Channel.SEND_OPTIONS_MULTICAST) { membershipService.broadcast(msg); } else { clusterSender.sendMessage(msg, destination); } if (Logs.MESSAGES.isTraceEnabled()) { Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()) + " to " + Arrays.toNameString(destination)); } }
public void stop() { try { if ( channel != null ) { channel.stop(Channel.DEFAULT); status = "Channel Stopped"; } else { status = "Channel Already Stopped"; } }catch ( Exception x ) { synchronized (System.err) { System.err.println("Stop failed:"); StackTraceElement[] els = x.getStackTrace(); for (int i = 0; i < els.length; i++) System.err.println(els[i].toString()); } status = "Stop failed:"+x.getMessage(); error = x; }finally { startstatus = "stopped"; channel = null; interceptor = null; } }
@Override public void broadcast(ChannelMessage message) throws ChannelException { if (impl==null || (impl.startLevel & Channel.MBR_TX_SEQ)!=Channel.MBR_TX_SEQ ) throw new ChannelException("Multicast send is not started or enabled."); byte[] data = XByteBuffer.createDataPackage((ChannelData)message); if (data.length>McastServiceImpl.MAX_PACKET_SIZE) { throw new ChannelException("Packet length["+data.length+"] exceeds max packet size of "+McastServiceImpl.MAX_PACKET_SIZE+" bytes."); } DatagramPacket packet = new DatagramPacket(data,0,data.length); try { impl.send(false, packet); } catch (Exception x) { throw new ChannelException(x); } }
@Test public void testOptionConflict() throws Exception { boolean error = false; channel.setOptionCheck(true); ChannelInterceptor i = new TestInterceptor(); i.setOptionFlag(128); channel.addInterceptor(i); i = new TestInterceptor(); i.setOptionFlag(128); channel.addInterceptor(i); try { channel.start(Channel.DEFAULT); }catch ( ChannelException x ) { if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true; } assertTrue(error); }
@Test public void testTcpMcastFail() throws Exception { System.out.println("testTcpMcastFail()"); clear(); channel1.start(Channel.DEFAULT); channel2.start(Channel.DEFAULT); //Thread.sleep(1000); assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size()); channel2.stop(Channel.MBR_TX_SEQ); ByteMessage msg = new ByteMessage(new byte[1024]); try { Thread.sleep(5000); assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size()); channel1.send(channel1.getMembers(), msg, 0); } catch ( ChannelException x ) { fail("Message send should have succeeded."); } channel1.stop(Channel.DEFAULT); channel2.stop(Channel.DEFAULT); }
@Test public void testMemberArrival() throws Exception { //purpose of this test is to make sure that we have received all the members //that we can expect before the start method returns Thread[] threads = new Thread[channels.length]; for (int i=0; i<channels.length; i++ ) { final Channel channel = channels[i]; Thread t = new Thread() { @Override public void run() { try { channel.start(Channel.DEFAULT); }catch ( Exception x ) { throw new RuntimeException(x); } } }; threads[i] = t; } for (int i=0; i<threads.length; i++ ) threads[i].start(); for (int i=0; i<threads.length; i++ ) threads[i].join(); System.out.println("All channels started."); for (int i=listeners.length-1; i>=0; i-- ) assertEquals("Checking member arrival length",0,listeners[i].members.size()); }
@Test public void testTcpSendFailureMemberDrop() throws Exception { System.out.println("testTcpSendFailureMemberDrop()"); clear(); channel1.start(Channel.DEFAULT); channel2.start(Channel.DEFAULT); //Thread.sleep(1000); assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size()); channel2.stop(Channel.SND_RX_SEQ); ByteMessage msg = new ByteMessage(new byte[1024]); try { channel1.send(channel1.getMembers(), msg, 0); fail("Message send should have failed."); } catch ( ChannelException x ) { // Ignore } assertEquals("Expecting member count to not be equal",mbrlist1.members.size()+1,mbrlist2.members.size()); channel1.stop(Channel.DEFAULT); channel2.stop(Channel.DEFAULT); }
/** * Creates a new map * @param channel The channel to use for communication * @param timeout long - timeout for RPC messags * @param mapContextName String - unique name for this map, to allow multiple maps per channel * @param initialCapacity int - the size of this map, see HashMap * @param loadFactor float - load factor, see HashMap * @param cls - a list of classloaders to be used for deserialization of objects. * @param terminate - Flag for whether to terminate this map that failed to start. */ public AbstractReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor, int channelSendOptions, ClassLoader[] cls, boolean terminate) { innerMap = new ConcurrentHashMap<K,MapEntry<K, V>>(initialCapacity, loadFactor, 15); init(owner, channel, mapContextName, timeout, channelSendOptions, cls, terminate); }
public boolean startService() { try { parent.init(); parent.start(Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ); return true; } catch (Exception x) { log.warn("Recovery thread failed to start membership service.", x); return false; } }
/** * Send a message and wait for the response. * * @param destination * Member[] - the destination for the message, and the members * you request a reply from * @param message * Serializable - the message you are sending out * @param rpcOptions * int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY * @param channelOptions * channel sender options * @param timeout * long - timeout in milliseconds, if no reply is received within * this time null is returned * @return Response[] - an array of response objects. * @throws ChannelException */ public Response[] send(Member[] destination, Serializable message, int rpcOptions, int channelOptions, long timeout) throws ChannelException { if (destination == null || destination.length == 0) return new Response[0]; // avoid dead lock int sendOptions = channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK; RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false)); RpcCollector collector = new RpcCollector(key, rpcOptions, destination.length); try { synchronized (collector) { if (rpcOptions != NO_REPLY) responseMap.put(key, collector); RpcMessage rmsg = new RpcMessage(rpcId, key.id, message); channel.send(destination, rmsg, sendOptions); if (rpcOptions != NO_REPLY) collector.wait(timeout); } } catch (InterruptedException ix) { Thread.currentThread().interrupt(); } finally { responseMap.remove(key); } return collector.getResponses(); }
/** * Send a message to one or more members in the cluster * @param destination Member[] - the destinations, null or zero length means all * @param msg ClusterMessage - the message to send * @param payload TBA */ @Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if ( destination == null ) destination = membershipService.getMembers(); if ((msg.getOptions()&Channel.SEND_OPTIONS_MULTICAST) == Channel.SEND_OPTIONS_MULTICAST) { membershipService.broadcast(msg); } else { clusterSender.sendMessage(msg,destination); } if ( Logs.MESSAGES.isTraceEnabled() ) { Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination)); } }
/** * Send a message and wait for the response. * @param destination Member[] - the destination for the message, and the members you request a reply from * @param message Serializable - the message you are sending out * @param rpcOptions int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY * @param channelOptions channel sender options * @param timeout long - timeout in milliseconds, if no reply is received within this time null is returned * @return Response[] - an array of response objects. * @throws ChannelException */ public Response[] send(Member[] destination, Serializable message, int rpcOptions, int channelOptions, long timeout) throws ChannelException { if ( destination==null || destination.length == 0 ) return new Response[0]; //avoid dead lock int sendOptions = channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK; RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false)); RpcCollector collector = new RpcCollector(key,rpcOptions,destination.length); try { synchronized (collector) { if ( rpcOptions != NO_REPLY ) responseMap.put(key, collector); RpcMessage rmsg = new RpcMessage(rpcId, key.id, message); channel.send(destination, rmsg, sendOptions); if ( rpcOptions != NO_REPLY ) collector.wait(timeout); } } catch ( InterruptedException ix ) { Thread.currentThread().interrupt(); } finally { responseMap.remove(key); } return collector.getResponses(); }
/** * Sends a <code>NoRpcChannelReply</code> message to a member<br> * This method gets invoked by the channel if a RPC message comes in * and no channel listener accepts the message. This avoids timeout * @param msg RpcMessage * @param destination Member - the destination for the reply */ protected void sendNoRpcChannelReply(RpcMessage msg, Member destination) { try { //avoid circular loop if ( msg instanceof RpcMessage.NoRpcChannelReply) return; RpcMessage.NoRpcChannelReply reply = new RpcMessage.NoRpcChannelReply(msg.rpcId,msg.uuid); send(new Member[]{destination},reply,Channel.SEND_OPTIONS_ASYNCHRONOUS); } catch ( Exception x ) { log.error("Unable to find rpc channel, failed to send NoRpcChannelReply.",x); } }
@Override public void start(int svc) throws ChannelException { //start the thread if (!run ) { synchronized (this) { if ( !run && ((svc & Channel.SND_TX_SEQ)==Channel.SND_TX_SEQ) ) {//only start with the sender startQueue(); }//end if }//sync }//end if super.start(svc); }
@Override public void stop(int svc) throws ChannelException { // stop the thread if (run) { synchronized (this) { if (run && ((svc & Channel.SND_TX_SEQ) == Channel.SND_TX_SEQ)) { stopQueue(); } // end if } // sync } // end if super.stop(svc); }
@Override public void stop(int svc) throws ChannelException { //stop the thread if ( run ) { synchronized (this) { if ( run && ((svc & Channel.SND_TX_SEQ)==Channel.SND_TX_SEQ)) { stopQueue(); }//end if }//sync }//end if super.stop(svc); }
@Test public void testTcpFailureMemberAdd() throws Exception { System.out.println("testTcpFailureMemberAdd()"); clear(); channel1.start(Channel.DEFAULT); channel2.start(Channel.SND_RX_SEQ); channel2.start(Channel.SND_TX_SEQ); channel2.start(Channel.MBR_RX_SEQ); channel2.stop(Channel.SND_RX_SEQ); channel2.start(Channel.MBR_TX_SEQ); //Thread.sleep(1000); assertEquals("Expecting member count to not be equal",mbrlist1.members.size()+1,mbrlist2.members.size()); channel1.stop(Channel.DEFAULT); channel2.stop(Channel.DEFAULT); }
@Test public void testCoord2() throws Exception { Member member = coordinators[1].getCoordinator(); System.out.println("Coordinator[2a] is:" + member); int index = -1; for ( int i=0; i<CHANNEL_COUNT; i++ ) { if ( channels[i].getLocalMember(false).equals(member) ) { System.out.println("Shutting down:" + channels[i].getLocalMember(true).toString()); channels[i].stop(Channel.DEFAULT); index = i; } } int dead = index; Thread.sleep(1000); if (index == 0) { index = 1; } else { index = 0; } System.out.println("Member count:"+channels[index].getMembers().length); member = coordinators[index].getCoordinator(); for (int i = 1; i < CHANNEL_COUNT; i++) { if (i != dead) { assertEquals(member, coordinators[i].getCoordinator()); } } System.out.println("Coordinator[2b] is:" + member); }
@Test public void testDataSendSYNCACK() throws Exception { System.err.println("Starting SYNC_ACK"); for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),Channel.SEND_OPTIONS_SYNCHRONIZED_ACK|Channel.SEND_OPTIONS_USE_ACK); Thread.sleep(250); System.err.println("Finished SYNC_ACK"); assertEquals("Checking success messages.",msgCount,listener1.count); }
@Override public void run() { System.out.println("Shutting down..."); SystemExit exit = new SystemExit(5000); exit.setDaemon(true); exit.start(); try { channel.stop(Channel.DEFAULT); }catch ( Exception x ) { x.printStackTrace(); } System.out.println("Channel stopped."); }
@Override public void run() { System.out.println("Shutting down..."); try { channel.stop(Channel.DEFAULT); } catch (Exception x) { x.printStackTrace(); } System.out.println("Channel stopped."); main.interrupt(); }
/** * Constructs a map demo object. * @param channel - the Tribes channel object to be used for communication * @param mapName - the name of this map */ public MapDemo(Channel channel, String mapName ) { //instantiate the replicated map map = new LazyReplicatedMap<String,StringBuilder>(null, channel, 5000, mapName, null); //create a gui, name it with the member name of this JVM table = SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember(false).getName()); //add ourself as a listener for messages channel.addChannelListener(this); //add ourself as a listener for memberships channel.addMembershipListener(this); //initialize the map by receiving a fake message this.messageReceived(null,null); }
@Test public void testDataSendASYNC() throws Exception { System.err.println("Starting ASYNC"); for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),Channel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP); //sleep for 50 sec, let the other messages in long start = System.currentTimeMillis(); while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count.get()) Thread.sleep(500); System.err.println("Finished ASYNC"); assertEquals("Checking success messages.",msgCount,listener1.count.get()); }
@Test public void testDataSendACK() throws Exception { System.err.println("Starting ACK"); for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),Channel.SEND_OPTIONS_USE_ACK); Thread.sleep(250); System.err.println("Finished ACK"); assertEquals("Checking success messages.",msgCount,listener1.count); }
@Test public void testUdpReceiverStart() throws Exception { ReceiverBase rb = (ReceiverBase)channel.getChannelReceiver(); rb.setUdpPort(udpPort); channel.start(Channel.DEFAULT); Thread.sleep(1000); channel.stop(Channel.DEFAULT); }
/** * Start the service * @param level 1 starts the receiver, level 2 starts the sender * @throws IOException if the service fails to start * @throws IllegalStateException if the service is already started */ public synchronized void start(int level) throws IOException { boolean valid = false; if ( (level & Channel.MBR_RX_SEQ)==Channel.MBR_RX_SEQ ) { if ( receiver != null ) throw new IllegalStateException("McastService.receive already running."); try { if ( sender == null ) socket.joinGroup(address); }catch (IOException iox) { log.error("Unable to join multicast group, make sure your system has multicasting enabled."); throw iox; } doRunReceiver = true; receiver = new ReceiverThread(); receiver.setDaemon(true); receiver.start(); valid = true; } if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) { if ( sender != null ) throw new IllegalStateException("McastService.send already running."); if ( receiver == null ) socket.joinGroup(address); //make sure at least one packet gets out there send(false); doRunSender = true; sender = new SenderThread(sendFrequency); sender.setDaemon(true); sender.start(); //we have started the receiver, but not yet waited for membership to establish valid = true; } if (!valid) { throw new IllegalArgumentException("Invalid start level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ"); } //pause, once or twice waitForMembers(level); startLevel = (startLevel | level); }
@After public void tearDown() throws Exception { System.out.println("tearDown"); for ( int i=0; i<channelCount; i++ ) { channels[i].stop(Channel.DEFAULT); } }
/** * Send notifications upwards * @param svc int * @throws ChannelException */ @Override public void start(int svc) throws ChannelException { if ( (Channel.SND_RX_SEQ&svc)==Channel.SND_RX_SEQ ) super.start(Channel.SND_RX_SEQ); if ( (Channel.SND_TX_SEQ&svc)==Channel.SND_TX_SEQ ) super.start(Channel.SND_TX_SEQ); final ChannelInterceptorBase base = this; for (final Member member : members) { Thread t = new Thread() { @Override public void run() { base.memberAdded(member); if (getfirstInterceptor().getMember(member) != null) { sendLocalMember(new Member[]{member}); } } }; t.start(); } super.start(svc & (~Channel.SND_RX_SEQ) & (~Channel.SND_TX_SEQ)); // check required interceptors TcpFailureDetector failureDetector = null; TcpPingInterceptor pingInterceptor = null; ChannelInterceptor prev = getPrevious(); while (prev != null) { if (prev instanceof TcpFailureDetector ) failureDetector = (TcpFailureDetector) prev; if (prev instanceof TcpPingInterceptor) pingInterceptor = (TcpPingInterceptor) prev; prev = prev.getPrevious(); } if (failureDetector == null) { log.warn("There is no TcpFailureDetector. Automatic detection of static members does" + " not work properly. By defining the StaticMembershipInterceptor under the" + " TcpFailureDetector, automatic detection of the static members will work."); } if (pingInterceptor == null) { log.warn("There is no TcpPingInterceptor. The health check of static member does" + " not work properly. By defining the TcpPingInterceptor, the health check of" + " static member will work."); } }
@After public void tearDown() throws Exception { for (int i = 0; i < channels.length; i++) { try { channels[i].stop(Channel.DEFAULT); } catch (Exception ignore) { // Ignore } } }
@Test public void testDataSendASYNC() throws Exception { System.err.println("Starting ASYNC"); for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),Channel.SEND_OPTIONS_ASYNCHRONOUS); //sleep for 50 sec, let the other messages in long start = System.currentTimeMillis(); while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count) Thread.sleep(500); System.err.println("Finished ASYNC"); assertEquals("Checking success messages.",msgCount,listener1.count); }