@Test public void testThreeArgConstructorRegistersTopicListener() { MapConfig mapConfig = mock(MapConfig.class); Config config = mock(Config.class); when(config.findMapConfig(eq(CACHE_NAME))).thenReturn(mapConfig); ITopic<Object> topic = mock(ITopic.class); when(topic.addMessageListener(isNotNull(MessageListener.class))).thenReturn("ignored"); HazelcastInstance instance = mock(HazelcastInstance.class); when(instance.getConfig()).thenReturn(config); when(instance.getTopic(eq(CACHE_NAME))).thenReturn(topic); new LocalRegionCache(CACHE_NAME, instance, null); verify(config).findMapConfig(eq(CACHE_NAME)); verify(instance).getConfig(); verify(instance).getTopic(eq(CACHE_NAME)); verify(topic).addMessageListener(isNotNull(MessageListener.class)); }
private <T extends Serializable> void registerHazelcastEventPipe(final ITopic<DolphinEvent<T>> topic) { hazelcastEventPipeLock.lock(); try { Assert.requireNonNull(topic, "hazelcastTopic"); final String registrationId = topic.addMessageListener(new com.hazelcast.core.MessageListener<DolphinEvent<T>>() { @Override public void onMessage(com.hazelcast.core.Message<DolphinEvent<T>> message) { final DolphinEvent<T> event = message.getMessageObject(); triggerEventHandling(event); } }); Assert.requireNonBlank(registrationId, "registrationId"); iTopicRegistrations.put(topic.getName(), registrationId); iTopicCount.put(topic.getName(), 1); } finally { hazelcastEventPipeLock.unlock(); } }
private <T extends Serializable> void unregisterHazelcastEventPipe(final ITopic<DolphinEvent<T>> topic) { hazelcastEventPipeLock.lock(); try { Assert.requireNonNull(topic, "hazelcastTopic"); final Integer count = iTopicCount.get(topic.getName()); if (count == null || count != 1) { throw new IllegalStateException("Count for topic " + topic.getName() + " is wrong: " + count); } final String registrationId = iTopicRegistrations.get(topic.getName()); Assert.requireNonBlank(registrationId, "registrationId"); topic.removeMessageListener(registrationId); iTopicRegistrations.remove(topic.getName()); iTopicCount.remove(topic.getName()); } finally { hazelcastEventPipeLock.unlock(); } }
@Test public void addMessageListener() throws InterruptedException { HazelcastClient hClient = getHazelcastClient(); ITopic<String> topic = hClient.getTopic("addMessageListener"); final CountDownLatch latch = new CountDownLatch(1); final String message = "Hazelcast Rocks!"; topic.addMessageListener(new MessageListener<String>() { public void onMessage(Message<String> msg) { if (msg.getMessageObject().equals(message)) { latch.countDown(); } } }); topic.publish(message); assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); }
@Test public void removeMessageListener() throws InterruptedException { HazelcastClient hClient = getHazelcastClient(); ITopic<String> topic = hClient.getTopic("removeMessageListener"); final CountDownLatch latch = new CountDownLatch(2); final CountDownLatch cp = new CountDownLatch(1); // final String message = "Hazelcast Rocks!"; MessageListener<String> messageListener = new MessageListener<String>() { public void onMessage(Message<String> msg) { // if (msg.startsWith(message)) { System.out.println("Received " + msg + " at " + this); latch.countDown(); cp.countDown(); // } } }; final String message = "message_" + messageListener.hashCode() + "_"; topic.addMessageListener(messageListener); topic.publish(message + "1"); cp.await(); topic.removeMessageListener(messageListener); topic.publish(message + "2"); Thread.sleep(50); assertEquals(1, latch.getCount()); }
@Test public void testPerformance() throws InterruptedException { HazelcastClient hClient = getHazelcastClient(); long begin = System.currentTimeMillis(); int count = 10000; final ITopic topic = hClient.getTopic("perf"); ExecutorService ex = Executors.newFixedThreadPool(10); final CountDownLatch l = new CountDownLatch(count); for (int i = 0; i < count; i++) { ex.submit(new Runnable() { public void run() { topic.publish("my object"); l.countDown(); } }); } assertTrue(l.await(20, TimeUnit.SECONDS)); long time = System.currentTimeMillis() - begin; System.out.println("per second: " + count * 1000 / time); }
@Test public void testSimpleUsage() { withHazelcast(3, hazelcastInstance -> { ITopic<String> topic = hazelcastInstance.getReliableTopic("default"); List<String> receivedMessage = new ArrayList<>(); topic.addMessageListener(message -> receivedMessage.add(message.getMessageObject())); topic.publish("Hello World"); topic.publish("Hello Hazelcast!"); assertThat(receivedMessage) .isEqualTo(Arrays.asList("Hello World", "Hello Hazelcast!")); assertThat(topic.getLocalTopicStats().getPublishOperationCount()) .isEqualTo(2L); assertThat(topic.getLocalTopicStats().getReceiveOperationCount()) .isEqualTo(2L); }); }
@Test public void testWithRingbufferTtl() { withHazelcast(3, hazelcastInstance -> { ITopic<String> topic = hazelcastInstance.getReliableTopic("with-ttl"); long start = System.currentTimeMillis(); IntStream .rangeClosed(1, 10005) .forEach(i -> topic.publish("message-" + i)); long elapsed = System.currentTimeMillis() - start; assertThat(topic.getLocalTopicStats().getPublishOperationCount()) .isEqualTo(10005L); assertThat(topic.getLocalTopicStats().getReceiveOperationCount()) .isEqualTo(0L); assertThat(elapsed) .isGreaterThanOrEqualTo(30 * 1000L); }); }
@Setup @SuppressWarnings("unchecked") public void setup() { totalMessagesSend = targetInstance.getAtomicLong(name + ":TotalExpectedCounter"); topics = new ITopic[topicCount]; listeners = new LinkedList<MessageListenerImpl>(); String[] names = generateStringKeys(name, topicCount, keyLocality, targetInstance); int listenerIdCounter = 0; for (int i = 0; i < topics.length; i++) { ITopic<MessageEntity> topic = targetInstance.getReliableTopic(names[i]); topics[i] = topic; for (int l = 0; l < listenersPerTopic; l++) { MessageListenerImpl topicListener = new MessageListenerImpl(listenerIdCounter); listenerIdCounter++; topic.addMessageListener(topicListener); listeners.add(topicListener); } } }
@Setup public void setup() { totalExpectedCounter = targetInstance.getAtomicLong(name + ":TotalExpectedCounter"); totalFoundCounter = targetInstance.getAtomicLong(name + ":TotalFoundCounter"); topics = new ITopic[topicCount]; listeners = new LinkedList<TopicListener>(); for (int topicIndex = 0; topicIndex < topics.length; topicIndex++) { ITopic<Long> topic = targetInstance.getTopic(name + topicIndex); topics[topicIndex] = topic; for (int listenerIndex = 0; listenerIndex < listenersPerTopic; listenerIndex++) { TopicListener topicListener = new TopicListener(); topic.addMessageListener(topicListener); listeners.add(topicListener); } } }
@Test public void doTest() { logger.info( "do test" ); Hazelcast.addInstanceListener( this ); ITopic<Object> topic = Hazelcast.getTopic( "default" ); topic.addMessageListener( this ); topic.publish( "my-message-object" ); Collection<Instance> instances = Hazelcast.getInstances(); for ( Instance instance : instances ) { logger.info( "ID: [" + instance.getId() + "] Type: [" + instance.getInstanceType() + "]" ); } Set<Member> setMembers = Hazelcast.getCluster().getMembers(); for ( Member member : setMembers ) { logger.info( "isLocalMember " + member.localMember() ); logger.info( "member.inetsocketaddress " + member.getInetSocketAddress() ); } }
/** * Subscribe for Mule events under the specified topic name * <p/> * {@sample.xml ../../../doc/pubsub-module.xml.sample pubsub:listener} * * @param topic Name of the topic * @param callback flow to process */ @Source(exchangePattern = MessageExchangePattern.ONE_WAY) public void listener(String topic, final SourceCallback callback) { ITopic hazelcastTopic = HazelcastManager.getInstance().getHazelcastInstance().getTopic(topic); hazelcastTopic.addMessageListener(new MessageListener() { @Override public void onMessage(Message message) { Thread.currentThread().setContextClassLoader(muleContext.getExecutionClassLoader()); MuleEvent newEvent = createMuleEvent(message); // process it try { callback.processEvent(newEvent); } catch (MuleException e) { LOGGER.error(e.getMessage(), e); } } }); }
/** * Creates an {@link ITopic} proxy on the combination of a * {@link TransactionalQueue} and an actual {@link ITopic} instance. The proxy * will offer items to the transactional queue when they are published on the * topic. All other topic methods are simply passed through to the underlying * topic. By offering items to the queue on publish, a transactional topic can * be simulated via the ITopic interface. * * @param <E> the type of items in the topic * @param queue the transactional queue to offer all published objects * @param topic the underlying topic to handle all other operations * * @return the proxy around the queue and topic */ @SuppressWarnings("unchecked") public static <E> ITopic<E> createTopicProxy( final TransactionalQueue<E> queue, final ITopic<E> topic) { InvocationHandler handler = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getName().equals("publish")) { return queue.offer((E) args[0]); } else { return method.invoke(topic, args); } } }; return (ITopic<E>) Proxy.newProxyInstance( ITopic.class.getClassLoader(), new Class[]{ITopic.class}, handler); }
@Test public void testPerformance() throws InterruptedException { HazelcastClient hClient = getHazelcastClient(); long begin = Clock.currentTimeMillis(); int count = 10000; final ITopic topic = hClient.getTopic("perf"); ExecutorService ex = Executors.newFixedThreadPool(10); final CountDownLatch l = new CountDownLatch(count); for (int i = 0; i < count; i++) { ex.submit(new Runnable() { public void run() { topic.publish("my object"); l.countDown(); } }); } assertTrue(l.await(20, TimeUnit.SECONDS)); long time = Clock.currentTimeMillis() - begin; System.out.println("per second: " + count * 1000 / time); }
public static void main(String[] args) { HazelcastInstance hz = Hazelcast.newHazelcastInstance(); ITopic<String> topic = hz.getTopic("foo"); // topic.addMessageListener(System.out::println); while(true){ topic.publish("Hi from Anatole at " + new Date()); } }
@PostConstruct public void listen() { if (hazelcastInstance != null) { ITopic topic = hazelcastInstance.getTopic(HazelcastMetricPublishingDelegate.METRICS_PUB_ENDPOINT); topic.addMessageListener(this); if (LOG.isDebugEnabled()) LOG.debug("Now listening on: " + HazelcastMetricPublishingDelegate.METRICS_PUB_ENDPOINT); } else { throw new IllegalStateException("Could not create hazelcast instance to listen through"); } }
@Override public void publish(MetricsMessageCollection collection) { try { ITopic topic = hazelcastInstance.getTopic(METRICS_PUB_ENDPOINT); topic.publish(collection); } catch (Exception e) { LOG.error("Error sending message:", e); } }
public void advise(SessionDataDTO sessionDataDTO) { try { ITopic topic = hazelcastInstance.getTopic(SESSION_DISCOVERY_EVENT); topic.publish(sessionDataDTO); } catch (Exception e) { LOG.error("Error sending message:", e); } }
@PostConstruct public void listen() { if (hazelcastInstance != null) { ITopic topic = hazelcastInstance.getTopic(HazelcastSessionDiscoveryDelegate.SESSION_DISCOVERY_EVENT); topic.addMessageListener(this); if (LOG.isDebugEnabled()) LOG.debug("Now listening on: " + HazelcastSessionDiscoveryDelegate.SESSION_DISCOVERY_EVENT); } else { throw new IllegalStateException("Could not create hazelcast instance to listen through"); } }
public <E> ITopic<E> createTopic(String name) { switch (Settings.INSTANCE.clusteringMode()) { case HAZELCAST: return Hazelcast.INSTANCE.getTopic(name); case IGNITE: case SINGLE: return new SingleTopic<E>(name); default: return null; } }
/** * Removes topic listener with given registration id. * @param topic * @param regID * @return */ public <E> boolean removeTopicListener(String topic, String regID) { if(hazelcast.getConfig().getTopicConfigs().containsKey(topic)) { ITopic<E> t = hazelcast.getTopic(topic); return t.removeMessageListener(regID); } return false; }
private void publish(final Ehcache cache, final Object key) { CacheEvictionEvent event = new CacheEvictionEvent(cache.getName(), key); if (LOG.isDebugEnabled()) { LOG.debug("Publishing cache eviction event: " + event); } ITopic<CacheEvictionEvent> topic = HazelcastCacheManagerPeerProvider.getTopic(cache); try { topic.publish(event); } catch (Exception e) { LOG.warn("Couldn't publish cache eviction event: " + event, e); } }
@Override @SuppressWarnings("unchecked") public <E> ITopic<E> getITopic(String name) { name = Objects.requireNonNull(name); final ITopic<E> valu = getBeanSafely(name, ITopic.class); if (null != valu) { return valu; } return hz().getTopic(name); }
private void addConsumer(HazelcastInstance instance, String topicName) { final ITopic<CustomEventFormat> topic = instance.getTopic(topicName); topic.addMessageListener(new MessageListener<CustomEventFormat>() { @Override public void onMessage(Message<CustomEventFormat> message) { System.out.println("Received: " + message.getMessageObject().getMyMessage()); } }); }
/** * {@inheritDoc} */ @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { HazelcastProvider provider = new HazelcastProvider(context.getThisWorkerPort()); this.hzInstance = provider.getHzInstance(); ITopic<T> topic = hzInstance.getTopic(name); topic.addMessageListener(this); }
@Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { HazelcastProvider provider = new HazelcastProvider(context.getThisWorkerPort()); this.hzInstance = provider.getHzInstance(); ITopic<T> topic = hzInstance.getTopic(name); topic.addMessageListener(this); }
private void initialize(HazelcastInstance hzInstance) { this.hzInstance = hzInstance; ITopic<Counter> cTopic = hzInstance.getTopic(TPN_XDM_COUNTERS); cTopic.addMessageListener(this); ITopic<Long> pTopic = hzInstance.getTopic(TPN_XDM_POPULATION); pTopic.addMessageListener(new PopulationStateListener()); hTopic = hzInstance.getTopic(TPN_XDM_HEALTH); xddCache = hzInstance.getMap(CN_XDM_DOCUMENT); hzInstance.getPartitionService().addPartitionLostListener(this); }
private boolean populateSchema(HazelcastInstance hz) { logger.debug("populateSchema.enter; HZ instance: {}", hz); ApplicationContext schemaCtx = getContext(schemaName); if (schemaCtx == null) { logger.info("populateSchema.exit; No Spring Context initialized yet"); return false; } IMap<Long, Transaction> xtxCache = hz.getMap(CN_XDM_TRANSACTION); xtxCache.loadAll(false); logger.info("populateSchema; transactions size after loadAll: {}", xtxCache.size()); IMap<Long, Document> xddCache = hz.getMap(CN_XDM_DOCUMENT); xddCache.loadAll(false); logger.info("populateSchema; documents size after loadAll: {}", xddCache.size()); ITopic<Long> pTopic = hz.getTopic(TPN_XDM_POPULATION); PopulationManagementImpl pm = (PopulationManagementImpl) hz.getUserContext().get(ctx_popService); int lo = pm.getActiveCount(); int hi = pm.getDocumentCount() - lo; long counts = ((long) hi << 32) + lo; pTopic.publish(counts); // adjusting tx idGen! TransactionManagementImpl txMgr = schemaCtx.getBean("txManager", TransactionManagementImpl.class); txMgr.adjustTxCounter(pm.getMaxTransactionId()); return true; }
@Override public boolean equals(Object o) { if (o instanceof ITopic) { return getName().equals(((ITopic) o).getName()); } return false; }
/** * Check if there is a listener registered by given id. If registered, * remove and register the new listener * * @param topic Hazelcast topic * @param listener Listener to register * @param listenerId ID of the listener to check if there is an existing * @return ID of the registered subscriber */ private String checkAndRegisterListerToTopic(ITopic<ClusterNotification> topic, MessageListener<ClusterNotification> listener, String listenerId) { if (StringUtils.isNotEmpty(listenerId)) { topic.removeMessageListener(listenerId); } return topic.addMessageListener(listener); }
@Override protected boolean doPublish(String name, Event event, String eventString) throws IOException { TopicName topicName = new TopicName(name); ITopic<String> topic = hazelcast.getTopic(topicName.getName()); topic.publish(topicName.encode(eventString)); return true; }
@Override protected synchronized void doUnsubscribe(String eventName) { String id = registrations.remove(eventName); log.info("Unsubscribing from [{}] id [{}]", eventName, id); if (id != null) { ITopic<String> topic = hazelcast.getTopic(new TopicName(eventName).getName()); topic.removeMessageListener(id); if (eventName.startsWith("reply.")) { topic.destroy(); } } }
@Override public void onPublish(InterceptPublishMessage msg) { // TODO ugly, too much array copy ByteBuf payload = msg.getPayload(); byte[] payloadContent = readBytesAndRewind(payload); LOG.info("{} publish on {} message: {}", msg.getClientID(), msg.getTopicName(), new String(payloadContent)); ITopic<HazelcastMsg> topic = hz.getTopic("moquette"); HazelcastMsg hazelcastMsg = new HazelcastMsg(msg); topic.publish(hazelcastMsg); }
@Override protected boolean doPublish(String name, Event event, String eventString) throws IOException { ITopic<String> topic = hazelcast.getTopic(name); topic.publish(eventString); return true; }
@Override protected void doSubscribe(final String eventName, SettableFuture<?> future) { boolean success = false; Throwable t = null; try { if ( registrations.containsKey(eventName) ) { throw new IllegalStateException("Already subscribed to [" + eventName + "]"); } ITopic<String> topic = hazelcast.getTopic(eventName); MessageListener<String> listener = new MessageListener<String>() { @Override public void onMessage(Message<String> message) { onEvent(null, eventName, message.getMessageObject()); } }; String id = topic.addMessageListener(listener); log.info("Subscribing to [{}] id [{}]", eventName, id); registrations.put(eventName, id); success = true; } catch ( RuntimeException e ) { t = e; throw e; } finally { if ( success ) { future.set(null); } else { if ( t == null ) { t = new IllegalStateException("Failed to subscribe to [" + eventName + "]"); } future.setException(t); } } }
@Override protected void doUnsubscribe(String eventName) { String id = registrations.remove(eventName); log.info("Unsubscribing from [{}] id [{}]", eventName, id); if ( id != null ) { ITopic<String> topic = hazelcast.getTopic(eventName); topic.removeMessageListener(id); // TODO GC topics.... // topic.destroy(); } }