@Activate protected void activate() { localNodeId = clusterService.getLocalNode().id(); listenerRegistry = new AbstractListenerRegistry<>(); eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry); TopicConfig topicConfig = new TopicConfig(); topicConfig.setGlobalOrderingEnabled(true); topicConfig.setName(TOPIC_HZ_ID); storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig); messageHandlingExecutor = Executors.newSingleThreadExecutor( groupedThreads("onos/store/leadership", "message-handler")); clusterCommunicator.addSubscriber( LEADERSHIP_EVENT_MESSAGE_SUBJECT, new InternalLeadershipEventListener(), messageHandlingExecutor); log.info("Hazelcast Leadership Service started"); }
private void setTopicConfig(Config config) { TopicConfig topicConfig = config.getTopicConfig("yourTopicName"); topicConfig.setGlobalOrderingEnabled(true); topicConfig.setStatisticsEnabled(true); MessageListener<String> implementation = new MessageListener<String>() { @Override public void onMessage(Message<String> message) { // process the message } }; topicConfig.addMessageListenerConfig(new ListenerConfig(implementation)); }
private void initializeListeners() { final TopicConfig topicConfig = node.config.findMatchingTopicConfig(name); for (ListenerConfig lc : topicConfig.getMessageListenerConfigs()) { try { node.listenerManager.createAndAddListenerItem(name, lc, InstanceType.TOPIC); for (MemberImpl member : node.clusterManager.getMembers()) { addListener(member.getAddress(), true); } } catch (Exception e) { logger.log(Level.SEVERE, e.getMessage(), e); } } }
@JMXAttribute("Config") @JMXDescription("Topic configuration") public String getConfig() { final TopicConfig config = managementService.getInstance().getConfig().getTopicConfig(getName()); return config.toString(); }