private void initializeListeners(Config config) { for (final ListenerConfig listenerCfg : config.getListenerConfigs()) { Object listener = listenerCfg.getImplementation(); if (listener == null) { try { listener = Serializer.newInstance(Serializer.loadClass(listenerCfg.getClassName())); } catch (Exception e) { logger.log(Level.SEVERE, e.getMessage(), e); } } if (listener instanceof InstanceListener) { factory.addInstanceListener((InstanceListener) listener); } else if (listener instanceof MembershipListener) { clusterImpl.addMembershipListener((MembershipListener) listener); } else if (listener instanceof MigrationListener) { concurrentMapManager.partitionServiceImpl.addMigrationListener((MigrationListener) listener); } else if (listener != null) { final String error = "Unknown listener type: " + listener.getClass(); Throwable t = new IllegalArgumentException(error); logger.log(Level.WARNING, error, t); } } }
private void setTopicConfig(Config config) { TopicConfig topicConfig = config.getTopicConfig("yourTopicName"); topicConfig.setGlobalOrderingEnabled(true); topicConfig.setStatisticsEnabled(true); MessageListener<String> implementation = new MessageListener<String>() { @Override public void onMessage(Message<String> message) { // process the message } }; topicConfig.addMessageListenerConfig(new ListenerConfig(implementation)); }
/** * Create and add ListenerItem during initialization of CMap, BQ and TopicInstance. */ void createAndAddListenerItem(String name, ListenerConfig lc, Instance.InstanceType instanceType) throws Exception { Object listener = lc.getImplementation(); if (listener == null) { listener = Serializer.newInstance(Serializer.loadClass(lc.getClassName())); } if (listener != null) { final ListenerItem listenerItem = new ListenerItem(name, null, listener, lc.isIncludeValue(), instanceType, lc.isLocal()); getOrCreateListenerList(name).add(listenerItem); } }
private void initializeListeners() { final TopicConfig topicConfig = node.config.findMatchingTopicConfig(name); for (ListenerConfig lc : topicConfig.getMessageListenerConfigs()) { try { node.listenerManager.createAndAddListenerItem(name, lc, InstanceType.TOPIC); for (MemberImpl member : node.clusterManager.getMembers()) { addListener(member.getAddress(), true); } } catch (Exception e) { logger.log(Level.SEVERE, e.getMessage(), e); } } }
private void initializeListeners() { for (ListenerConfig lc : topicConfig.getMessageListenerConfigs()) { try { node.listenerManager.createAndAddListenerItem(name, lc, InstanceType.TOPIC); for (MemberImpl member : node.clusterManager.getMembers()) { addListener(member.getAddress(), true); } } catch (Exception e) { logger.log(Level.SEVERE, e.getMessage(), e); } } }
@Test public void testMulticastJoinDuringSplitBrainHandlerRunning() throws InterruptedException { Properties props = new Properties(); props.setProperty(GroupProperties.PROP_WAIT_SECONDS_BEFORE_JOIN, "5"); props.setProperty(GroupProperties.PROP_MERGE_FIRST_RUN_DELAY_SECONDS, "0"); props.setProperty(GroupProperties.PROP_MERGE_NEXT_RUN_DELAY_SECONDS, "0"); final CountDownLatch latch = new CountDownLatch(1); Config config1 = new Config(); config1.getNetworkConfig().setPort(5901) ; // bigger port to make sure address.hashCode() check pass during merge! config1.setProperties(props); config1.addListenerConfig(new ListenerConfig(new LifecycleListener() { public void stateChanged(final LifecycleEvent event) { System.out.println(event); switch (event.getState()) { case MERGING: case RESTARTING: case RESTARTED: case MERGED: latch.countDown(); default: break; } } })); Hazelcast.newHazelcastInstance(config1); Thread.sleep(5000); Config config2 = new Config(); config2.getNetworkConfig().setPort(5701) ; config2.setProperties(props); Hazelcast.newHazelcastInstance(config2); assertFalse("Latch should not be countdown!", latch.await(3, TimeUnit.SECONDS)); }
@Test(timeout = 180000) public void testTcpIpSplitBrainStillWorksWhenTargetDisappears() throws Exception { // The ports are ordered like this so h3 will always attempt to merge with h1 Config c1 = buildConfig(false).setPort(25701); Config c2 = buildConfig(false).setPort(25704); Config c3 = buildConfig(false).setPort(25703); List<String> clusterOneMembers = Arrays.asList("127.0.0.1:25701"); List<String> clusterTwoMembers = Arrays.asList("127.0.0.1:25704"); List<String> clusterThreeMembers = Arrays.asList("127.0.0.1:25703"); c1.getNetworkConfig().getJoin().getTcpIpConfig().setMembers(clusterOneMembers); c2.getNetworkConfig().getJoin().getTcpIpConfig().setMembers(clusterTwoMembers); c3.getNetworkConfig().getJoin().getTcpIpConfig().setMembers(clusterThreeMembers); final HazelcastInstance h1 = Hazelcast.newHazelcastInstance(c1); final HazelcastInstance h2 = Hazelcast.newHazelcastInstance(c2); final CountDownLatch latch = new CountDownLatch(1); c3.addListenerConfig(new ListenerConfig(new LifecycleListener() { public void stateChanged(final LifecycleEvent event) { if (event.getState() == LifecycleState.MERGING) { h1.getLifecycleService().shutdown(); } else if (event.getState() == LifecycleState.MERGED) { System.out.println("h3 restarted"); latch.countDown(); } } })); final HazelcastInstance h3 = Hazelcast.newHazelcastInstance(c3); // We should have three clusters of one assertEquals(1, h1.getCluster().getMembers().size()); assertEquals(1, h2.getCluster().getMembers().size()); assertEquals(1, h3.getCluster().getMembers().size()); List<String> allMembers = Arrays.asList("127.0.0.1:25701", "127.0.0.1:25704", "127.0.0.1:25703"); h3.getConfig().getNetworkConfig().getJoin().getTcpIpConfig().setMembers(allMembers); latch.await(60, TimeUnit.SECONDS); // Both nodes from cluster two should have joined cluster one assertFalse(h1.getLifecycleService().isRunning()); assertEquals(2, h2.getCluster().getMembers().size()); assertEquals(2, h3.getCluster().getMembers().size()); }