Java 类com.hazelcast.core.MessageListener 实例源码

项目:hazelcast-hibernate5    文件:LocalRegionCacheTest.java   
@Test
public void testThreeArgConstructorRegistersTopicListener() {
    MapConfig mapConfig = mock(MapConfig.class);

    Config config = mock(Config.class);
    when(config.findMapConfig(eq(CACHE_NAME))).thenReturn(mapConfig);

    ITopic<Object> topic = mock(ITopic.class);
    when(topic.addMessageListener(isNotNull(MessageListener.class))).thenReturn("ignored");

    HazelcastInstance instance = mock(HazelcastInstance.class);
    when(instance.getConfig()).thenReturn(config);
    when(instance.getTopic(eq(CACHE_NAME))).thenReturn(topic);

    new LocalRegionCache(CACHE_NAME, instance, null);
    verify(config).findMapConfig(eq(CACHE_NAME));
    verify(instance).getConfig();
    verify(instance).getTopic(eq(CACHE_NAME));
    verify(topic).addMessageListener(isNotNull(MessageListener.class));
}
项目:hazelcast-hibernate5    文件:TimestampsRegionCacheTest.java   
@SuppressWarnings({ "rawtypes", "unchecked" })
@Before
public void setup() {
    when(config.findMapConfig(eq(CACHE_NAME))).thenReturn(mapConfig);
    when(instance.getCluster()).thenReturn(cluster);
    when(instance.getConfig()).thenReturn(config);
    when(instance.getTopic(eq(CACHE_NAME))).thenReturn(topic);

    // make the message appear that it is coming from a different member of the cluster
    when(member.localMember()).thenReturn(false);

    ArgumentCaptor<MessageListener> listener = ArgumentCaptor.forClass(MessageListener.class);
    when(topic.addMessageListener(listener.capture())).thenReturn("ignored");
    target = new TimestampsRegionCache(CACHE_NAME, instance);
    this.listener = listener.getValue();
}
项目:hazelcast-hibernate    文件:LocalRegionCacheTest.java   
@Test
public void testThreeArgConstructorRegistersTopicListener() {
    MapConfig mapConfig = mock(MapConfig.class);

    Config config = mock(Config.class);
    when(config.findMapConfig(eq(CACHE_NAME))).thenReturn(mapConfig);

    ITopic<Object> topic = mock(ITopic.class);
    when(topic.addMessageListener(isNotNull(MessageListener.class))).thenReturn("ignored");

    HazelcastInstance instance = mock(HazelcastInstance.class);
    when(instance.getConfig()).thenReturn(config);
    when(instance.getTopic(eq(CACHE_NAME))).thenReturn(topic);

    new LocalRegionCache(CACHE_NAME, instance, null);
    verify(config).findMapConfig(eq(CACHE_NAME));
    verify(instance).getConfig();
    verify(instance).getTopic(eq(CACHE_NAME));
    verify(topic).addMessageListener(isNotNull(MessageListener.class));
}
项目:hazelcast-hibernate    文件:TimestampsRegionCacheTest.java   
@SuppressWarnings({ "rawtypes", "unchecked" })
@Before
public void setup() {
    when(config.findMapConfig(eq(CACHE_NAME))).thenReturn(mapConfig);
    when(instance.getCluster()).thenReturn(cluster);
    when(instance.getConfig()).thenReturn(config);
    when(instance.getTopic(eq(CACHE_NAME))).thenReturn(topic);

    // make the message appear that it is coming from a different member of the cluster
    when(member.localMember()).thenReturn(false);

    ArgumentCaptor<MessageListener> listener = ArgumentCaptor.forClass(MessageListener.class);
    when(topic.addMessageListener(listener.capture())).thenReturn("ignored");
    target = new TimestampsRegionCache(CACHE_NAME, instance);
    this.listener = listener.getValue();
}
项目:hazelcast-hibernate    文件:LocalRegionCacheTest.java   
@Test
public void testThreeArgConstructorRegistersTopicListener() {
    MapConfig mapConfig = mock(MapConfig.class);

    Config config = mock(Config.class);
    when(config.findMapConfig(eq(CACHE_NAME))).thenReturn(mapConfig);

    ITopic<Object> topic = mock(ITopic.class);
    when(topic.addMessageListener(isNotNull(MessageListener.class))).thenReturn("ignored");

    HazelcastInstance instance = mock(HazelcastInstance.class);
    when(instance.getConfig()).thenReturn(config);
    when(instance.getTopic(eq(CACHE_NAME))).thenReturn(topic);

    new LocalRegionCache(CACHE_NAME, instance, null);
    verify(config).findMapConfig(eq(CACHE_NAME));
    verify(instance).getConfig();
    verify(instance).getTopic(eq(CACHE_NAME));
    verify(topic).addMessageListener(isNotNull(MessageListener.class));
}
项目:hazelcast-hibernate    文件:TimestampsRegionCacheTest.java   
@SuppressWarnings({ "rawtypes", "unchecked" })
@Before
public void setup() {
    when(config.findMapConfig(eq(CACHE_NAME))).thenReturn(mapConfig);
    when(instance.getCluster()).thenReturn(cluster);
    when(instance.getConfig()).thenReturn(config);
    when(instance.getTopic(eq(CACHE_NAME))).thenReturn(topic);

    // make the message appear that it is coming from a different member of the cluster
    when(member.localMember()).thenReturn(false);

    ArgumentCaptor<MessageListener> listener = ArgumentCaptor.forClass(MessageListener.class);
    when(topic.addMessageListener(listener.capture())).thenReturn("ignored");
    target = new TimestampsRegionCache(CACHE_NAME, instance);
    this.listener = listener.getValue();
}
项目:hazelcast-archive    文件:TopicMBean.java   
@SuppressWarnings("unchecked")
@Override
public void postRegister(Boolean registrationDone) {
    super.postRegister(registrationDone);
    if (!registrationDone) {
        return;
    }
    if (managementService.showDetails()) {
        servedStats = ManagementService.newStatisticsCollector();
        listener = new MessageListener() {

            public void onMessage(Message msg) {
                servedStats.addEvent();
            }
        };
        getManagedObject().addMessageListener(listener);
    }
}
项目:hazelcast-archive    文件:HazelcastClientTopicTest.java   
@Test
public void addMessageListener() throws InterruptedException {
    HazelcastClient hClient = getHazelcastClient();
    ITopic<String> topic = hClient.getTopic("addMessageListener");
    final CountDownLatch latch = new CountDownLatch(1);
    final String message = "Hazelcast Rocks!";
    topic.addMessageListener(new MessageListener<String>() {
        public void onMessage(Message<String> msg) {
            if (msg.getMessageObject().equals(message)) {
                latch.countDown();
            }
        }
    });
    topic.publish(message);
    assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
}
项目:hazelcast-archive    文件:HazelcastClientTopicTest.java   
@Test
    public void removeMessageListener() throws InterruptedException {
        HazelcastClient hClient = getHazelcastClient();
        ITopic<String> topic = hClient.getTopic("removeMessageListener");
        final CountDownLatch latch = new CountDownLatch(2);
        final CountDownLatch cp = new CountDownLatch(1);
//        final String message = "Hazelcast Rocks!";
        MessageListener<String> messageListener = new MessageListener<String>() {
            public void onMessage(Message<String> msg) {
//                if (msg.startsWith(message)) {
                System.out.println("Received " + msg + " at " + this);
                latch.countDown();
                cp.countDown();
//                }
            }
        };
        final String message = "message_" + messageListener.hashCode() + "_";
        topic.addMessageListener(messageListener);
        topic.publish(message + "1");
        cp.await();
        topic.removeMessageListener(messageListener);
        topic.publish(message + "2");
        Thread.sleep(50);
        assertEquals(1, latch.getCount());
    }
项目:hazelcast-archive    文件:MessageListenerManagerTest.java   
@Test
public void testRemoveMessageListener() throws Exception {
    MessageListenerManager manager = new MessageListenerManager();
    String name = "default";
    assertTrue(manager.noListenerRegistered(name));
    MessageListener listener = new MessageListener<Object>() {

        public void onMessage(Message<Object> message) {
        }
    };
    manager.registerListener(name, listener);
    assertFalse(manager.noListenerRegistered(name));
    manager.removeListener(name, listener);
    assertTrue(manager.noListenerRegistered(name));
    manager.removeListener(name, listener);
    assertTrue(manager.noListenerRegistered(name));
}
项目:mule-module-publish-subscribe    文件:PubSubModule.java   
/**
 * Subscribe for Mule events under the specified topic name
 * <p/>
 * {@sample.xml ../../../doc/pubsub-module.xml.sample pubsub:listener}
 *
 * @param topic    Name of the topic
 * @param callback flow to process
 */
@Source(exchangePattern = MessageExchangePattern.ONE_WAY)
public void listener(String topic, final SourceCallback callback) {
    ITopic hazelcastTopic = HazelcastManager.getInstance().getHazelcastInstance().getTopic(topic);
    hazelcastTopic.addMessageListener(new MessageListener() {
        @Override
        public void onMessage(Message message) {
            Thread.currentThread().setContextClassLoader(muleContext.getExecutionClassLoader());

            MuleEvent newEvent = createMuleEvent(message);

            // process it
            try {
                callback.processEvent(newEvent);
            } catch (MuleException e) {
                LOGGER.error(e.getMessage(), e);
            }
        }
    });
}
项目:health-and-care-developer-network    文件:LocalRegionCache.java   
protected MessageListener<Object> createMessageListener() {
    return new MessageListener<Object>() {
        public void onMessage(final Message<Object> message) {
            final Invalidation invalidation = (Invalidation) message.getMessageObject();
            if (versionComparator != null) {
                final Value value = cache.get(invalidation.getKey());
                if (value != null) {
                    Object currentVersion = value.getVersion();
                    Object newVersion = invalidation.getVersion();
                    if (versionComparator.compare(newVersion, currentVersion) > 0) {
                        cache.remove(invalidation.getKey(), value);
                    }
                }
            } else {
                cache.remove(invalidation.getKey());
            }
        }
    };
}
项目:health-and-care-developer-network    文件:TopicMBean.java   
@SuppressWarnings("unchecked")
@Override
public void postRegister(Boolean registrationDone) {
    super.postRegister(registrationDone);
    if (!registrationDone) {
        return;
    }
    if (managementService.showDetails()) {
        servedStats = ManagementService.newStatisticsCollector();
        listener = new MessageListener() {

            public void onMessage(Message msg) {
                servedStats.addEvent();
            }
        };
        getManagedObject().addMessageListener(listener);
    }
}
项目:health-and-care-developer-network    文件:HazelcastClientTopicTest.java   
@Test
public void addMessageListener() throws InterruptedException {
    HazelcastClient hClient = getHazelcastClient();
    ITopic<String> topic = hClient.getTopic("addMessageListener");
    final CountDownLatch latch = new CountDownLatch(1);
    final String message = "Hazelcast Rocks!";
    topic.addMessageListener(new MessageListener<String>() {
        public void onMessage(Message<String> msg) {
            if (msg.getMessageObject().equals(message)) {
                latch.countDown();
            }
        }
    });
    topic.publish(message);
    assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
}
项目:health-and-care-developer-network    文件:HazelcastClientTopicTest.java   
@Test
    public void removeMessageListener() throws InterruptedException {
        HazelcastClient hClient = getHazelcastClient();
        ITopic<String> topic = hClient.getTopic("removeMessageListener");
        final CountDownLatch latch = new CountDownLatch(2);
        final CountDownLatch cp = new CountDownLatch(1);
//        final String message = "Hazelcast Rocks!";
        MessageListener<String> messageListener = new MessageListener<String>() {
            public void onMessage(Message<String> msg) {
//                if (msg.startsWith(message)) {
                System.out.println("Received " + msg + " at " + this);
                latch.countDown();
                cp.countDown();
//                }
            }
        };
        final String message = "message_" + messageListener.hashCode() + "_";
        topic.addMessageListener(messageListener);
        topic.publish(message + "1");
        cp.await();
        topic.removeMessageListener(messageListener);
        topic.publish(message + "2");
        Thread.sleep(50);
        assertEquals(1, latch.getCount());
    }
项目:health-and-care-developer-network    文件:MessageListenerManagerTest.java   
@Test
public void testRemoveMessageListener() throws Exception {
    MessageListenerManager manager = new MessageListenerManager();
    String name = "default";
    assertTrue(manager.noListenerRegistered(name));
    MessageListener listener = new MessageListener<Object>() {

        public void onMessage(Message<Object> message) {
        }
    };
    manager.registerListener(name, listener);
    assertFalse(manager.noListenerRegistered(name));
    manager.removeListener(name, listener);
    assertTrue(manager.noListenerRegistered(name));
    manager.removeListener(name, listener);
    assertTrue(manager.noListenerRegistered(name));
}
项目:lannister    文件:SingleTopic.java   
@Override
public String addMessageListener(MessageListener<E> listener) {
    UUID ret = UUID.randomUUID();

    messageListeners.put(ret.toString(), listener);

    return ret.toString();
}
项目:hazelcast-hibernate5    文件:LocalRegionCache.java   
protected MessageListener<Object> createMessageListener() {
    return new MessageListener<Object>() {

        @Override
        public void onMessage(final Message<Object> message) {
                maybeInvalidate(message.getMessageObject());
        }
    };
}
项目:eet.osslite.cz    文件:HazelcastConfiguration.java   
private void setTopicConfig(Config config) {
    TopicConfig topicConfig = config.getTopicConfig("yourTopicName");
    topicConfig.setGlobalOrderingEnabled(true);
    topicConfig.setStatisticsEnabled(true);

    MessageListener<String> implementation = new MessageListener<String>() {
        @Override
        public void onMessage(Message<String> message) {
            // process the message
        }
    };
    topicConfig.addMessageListenerConfig(new ListenerConfig(implementation));
}
项目:hazelcast-hibernate    文件:LocalRegionCache.java   
protected MessageListener<Object> createMessageListener() {
    return new MessageListener<Object>() {
        public void onMessage(final Message<Object> message) {
            if (!message.getPublishingMember().localMember()) {
                maybeInvalidate(message.getMessageObject());
            }
        }
    };
}
项目:hazelcast-hibernate    文件:LocalRegionCache.java   
protected MessageListener<Object> createMessageListener() {
    return new MessageListener<Object>() {
        public void onMessage(final Message<Object> message) {
            if (!message.getPublishingMember().localMember()) {
                maybeInvalidate(message.getMessageObject());
            }
        }
    };
}
项目:Camel    文件:HazelcastTopicConsumerTest.java   
@Override
@SuppressWarnings("unchecked")
protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance) {
    when(hazelcastInstance.<String>getTopic("foo")).thenReturn(topic);
    argument = ArgumentCaptor.forClass(MessageListener.class);
    when(topic.addMessageListener(argument.capture())).thenReturn("foo");
}
项目:dolphin-platform    文件:ExternalServer.java   
private void addConsumer(HazelcastInstance instance, String topicName) {
    final ITopic<CustomEventFormat> topic = instance.getTopic(topicName);
    topic.addMessageListener(new MessageListener<CustomEventFormat>() {

        @Override
        public void onMessage(Message<CustomEventFormat> message) {
            System.out.println("Received: " + message.getMessageObject().getMyMessage());
        }
    });
}
项目:s1    文件:NodeMessageExchange.java   
public NodeMessageExchange(){
    nodeId = Options.getStorage().getSystem("cluster.nodeId");
    topic = HazelcastWrapper.getInstance().getTopic(TOPIC);
    topic.addMessageListener(new MessageListener<NodeMessageBean>() {
        @Override
        public void onMessage(Message<NodeMessageBean> msg) {
            NodeMessageBean req = msg.getMessageObject();
            if(Objects.newArrayList(NodeMessageBean.ALL,nodeId).contains(req.to)){
                if(req.operation.equals(NodeMessageBean.REPLY)){
                    if(LOG.isDebugEnabled())
                        LOG.debug("Received node reply <= "+req.toString());
                    //reply
                    synchronized (replies){
                        if(replies.containsKey(req.id)){
                            if(req.from.equals(NodeMessageBean.ALL)){
                                List<Object> l = (List<Object>)replies.get(req.id);
                                l.add(req.data);
                            }else{
                                replies.put(req.id, req.data);
                            }
                        }
                    }
                }else{
                    //request
                    Object reply = process(req.operation, req.data);
                    topic.publish(new NodeMessageBean(req.id, req.to, req.from, NodeMessageBean.REPLY, reply));
                    if(LOG.isDebugEnabled())
                        LOG.debug("Received node request => "+req.toString());
                }
            }
        }
    });
    LOG.info("Node message exchange is ready");
}
项目:hazelcast-archive    文件:MessageListenerManager.java   
public void registerListener(String name, MessageListener messageListener) {
    List<MessageListener> newListenersList = new CopyOnWriteArrayList<MessageListener>();
    List<MessageListener> listeners = messageListeners.putIfAbsent(name, newListenersList);
    if (listeners == null) {
        listeners = newListenersList;
    }
    listeners.add(messageListener);
}
项目:hazelcast-archive    文件:MessageListenerManager.java   
public void removeListener(String name, MessageListener messageListener) {
    if (!messageListeners.containsKey(name)) {
        return;
    }
    messageListeners.get(name).remove(messageListener);
    if (messageListeners.get(name).isEmpty()) {
        messageListeners.remove(name);
    }
}
项目:hazelcast-archive    文件:MessageListenerManager.java   
public void notifyMessageListeners(Packet packet) {
    List<MessageListener> list = messageListeners.get(packet.getName());
    if (list != null) {
        for (MessageListener<Object> messageListener : list) {
            messageListener.onMessage(new DataMessage(packet.getName(), new Data(packet.getKey())));
        }
    }
}
项目:hazelcast-archive    文件:TopicClientProxy.java   
public void addMessageListener(MessageListener messageListener) {
    check(messageListener);
    synchronized (lock) {
        boolean shouldCall = messageListenerManager().noListenerRegistered(name);
        messageListenerManager().registerListener(name, messageListener);
        if (shouldCall) {
            doAddListenerCall(messageListener);
        }
    }
}
项目:hazelcast-archive    文件:TopicClientProxy.java   
public void removeMessageListener(MessageListener messageListener) {
    check(messageListener);
    synchronized (lock) {
        messageListenerManager().removeListener(name, messageListener);
        if (messageListenerManager().noListenerRegistered(name)) {
            proxyHelper.doOp(ClusterOperation.REMOVE_LISTENER, null, null);
        }
    }
}
项目:hazelcast-archive    文件:MessageListenerManagerTest.java   
@Test
public void testRegisterMessageListener() throws Exception {
    MessageListenerManager manager = new MessageListenerManager();
    String name = "default";
    assertTrue(manager.noListenerRegistered(name));
    MessageListener listener = new MessageListener<Object>() {

        public void onMessage(Message<Object> message) {
        }
    };
    manager.registerListener(name, listener);
    assertFalse(manager.noListenerRegistered(name));
}
项目:hazelcast-archive    文件:MessageListenerManagerTest.java   
@Test
public void testNotifyMessageListeners() throws Exception {
    final MessageListenerManager manager = new MessageListenerManager();
    final String name = "default";
    assertTrue(manager.noListenerRegistered(name));
    final String myMessage = "my myMessage";
    final CountDownLatch latch = new CountDownLatch(1);
    MessageListener listener = new MessageListener<Object>() {

        public void onMessage(Message<Object> message) {
            if (message.getMessageObject().equals(myMessage)) {
                latch.countDown();
            }
        }
    };
    manager.registerListener(name, listener);
    assertFalse(manager.noListenerRegistered(name));
    new Thread(new Runnable() {
        public void run() {
            Packet packet = new Packet();
            packet.setName(name);
            packet.setKey(toByte(myMessage));
            manager.notifyMessageListeners(packet);
        }
    }).start();
    assertTrue(latch.await(10, TimeUnit.SECONDS));
}
项目:HZSpatial    文件:HZServer.java   
public void doMain(final String[] args) throws UnknownHostException, SocketException {
    final Enumeration e = NetworkInterface.getNetworkInterfaces();
    while (e.hasMoreElements()) {
        final Enumeration ee = ((NetworkInterface) e.nextElement()).getInetAddresses();
        while (ee.hasMoreElements()) {
            final InetAddress inetAddress = (InetAddress) ee.nextElement();
            if (inetAddress.isLinkLocalAddress()) {
                continue;
            }
            if (inetAddress.isLoopbackAddress()) {
                continue;
            }
            m_inetAddress = inetAddress;
        }
    }
    if (args.length > 0) {
        final HazelcastInstance hazelcastInstance = HazelcastClient.newHazelcastClient();
        try {
            final String hostAddress = m_inetAddress == null ? "0.0.0.0" : m_inetAddress.getHostAddress();
            hazelcastInstance.<String>getTopic(args[0]).publish(hostAddress);
        } finally {
            hazelcastInstance.shutdown();
        }
    } else if (args.length == 0) {
        final Config config = new ClasspathXmlConfig("hazelcast.xml");
        m_hazelcastInstance = Hazelcast.newHazelcastInstance(config);
        m_hazelcastInstance.<String>getTopic("stop").addMessageListener(this);
        m_hazelcastInstance.<String>getTopic("shutdown").addMessageListener(new MessageListener<String>() {
            @Override
            public void onMessage(final Message<String> message) {
                m_hazelcastInstance.shutdown();
            }
        });
    }
}
项目:andes    文件:HazelcastClusterNotificationListenerImpl.java   
/**
 * Check if there is a listener registered by given id. If registered,
 * remove and register the new listener
 *
 * @param topic      Hazelcast topic
 * @param listener   Listener to register
 * @param listenerId ID of the listener to check if there is an existing
 * @return ID of the registered subscriber
 */
private String checkAndRegisterListerToTopic(ITopic<ClusterNotification> topic,
                                             MessageListener<ClusterNotification> listener,
                                             String listenerId) {
    if (StringUtils.isNotEmpty(listenerId)) {
        topic.removeMessageListener(listenerId);
    }
    return topic.addMessageListener(listener);

}
项目:cloud-cattle    文件:HazelcastEventService.java   
@Override
protected void doSubscribe(final String eventName, SettableFuture<?> future) {
    boolean success = false;
    Throwable t = null;
    try {
        if ( registrations.containsKey(eventName) ) {
            throw new IllegalStateException("Already subscribed to [" + eventName + "]");
        }

        ITopic<String> topic = hazelcast.getTopic(eventName);
        MessageListener<String> listener = new MessageListener<String>() {
            @Override
            public void onMessage(Message<String> message) {
                onEvent(null, eventName, message.getMessageObject());
            }
        };

        String id = topic.addMessageListener(listener);
        log.info("Subscribing to [{}] id [{}]", eventName, id);

        registrations.put(eventName, id);

        success = true;
    } catch ( RuntimeException e ) {
        t = e;
        throw e;
    } finally {
        if ( success ) {
            future.set(null);
        } else {
            if ( t == null ) {
                t = new IllegalStateException("Failed to subscribe to [" + eventName + "]");
            }
            future.setException(t);
        }
    }
}
项目:dstack    文件:HazelcastEventService.java   
@Override
protected void doSubscribe(final String eventName, SettableFuture<?> future) {
    boolean success = false;
    Throwable t = null;
    try {
        if ( registrations.containsKey(eventName) ) {
            throw new IllegalStateException("Already subscribed to [" + eventName + "]");
        }

        ITopic<String> topic = hazelcast.getTopic(eventName);
        MessageListener<String> listener = new MessageListener<String>() {
            @Override
            public void onMessage(Message<String> message) {
                onEvent(null, eventName, message.getMessageObject());
            }
        };

        String id = topic.addMessageListener(listener);
        log.info("Subscribing to [{}] id [{}]", eventName, id);

        registrations.put(eventName, id);

        success = true;
    } catch ( RuntimeException e ) {
        t = e;
        throw e;
    } finally {
        if ( success ) {
            future.set(null);
        } else {
            if ( t == null ) {
                t = new IllegalStateException("Failed to subscribe to [" + eventName + "]");
            }
            future.setException(t);
        }
    }
}
项目:mule-module-publish-subscribe    文件:PubSubModule.java   
/**
 * Subscribe for Mule events under the specified topic name using
 * the specified subscriberId. This is useful when you dynamically
 * want to add new subscribers to events, if you subscription map
 * is static you should use <pubsub:listener> instead.
 * <p/>
 * {@sample.xml ../../../doc/pubsub-module.xml.sample pubsub:subscribe}
 *
 * @param topic        Name of the topic
 * @param subscriberId Identification of the subscriber, this is useful for later removing the subscriber
 * @param flow         Flow that will handle the events for the specified topic
 */
@Processor
public void subscribe(String topic, final String subscriberId, final Flow flow) {
    subscribersLock.writeLock().lock();

    try {
        subscribers.put(topic, subscriberId, new MessageListener() {
            @Override
            public void onMessage(Message message) {
                Thread.currentThread().setContextClassLoader(muleContext.getExecutionClassLoader());

                MuleEvent newEvent = createMuleEvent(message, subscriberId, flow);

                // process it
                try {
                    flow.process(newEvent);
                } catch (MuleException e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
        });
    } finally {
        subscribersLock.writeLock().unlock();
    }

    subscribersLock.readLock().lock();

    try {
        ITopic hazelcastTopic = HazelcastManager.getInstance().getHazelcastInstance().getTopic(topic);
        hazelcastTopic.addMessageListener(subscribers.get(topic, subscriberId));
    } finally {
        subscribersLock.readLock().unlock();
    }
}
项目:health-and-care-developer-network    文件:TimestampsRegionCache.java   
@Override
    protected MessageListener<Object> createMessageListener() {
        return new MessageListener<Object>() {
            public void onMessage(final Message<Object> message) {
                final Timestamp ts = (Timestamp) message.getMessageObject();
//                System.err.println("ts = " + ts);
                final Object key = ts.getKey();

                for (;;) {
                    final Value value = cache.get(key);
                    final Long current = value != null ? (Long) value.getValue() : null;
                    if (current != null) {
                        if (ts.getTimestamp() > current) {
                            if (cache.replace(key, value, new Value(value.getVersion(),
                                    ts.getTimestamp(), Clock.currentTimeMillis()))) {
                                return;
                            }
                        } else {
                            return;
                        }
                    } else {
                        if (cache.putIfAbsent(key, new Value(null, ts.getTimestamp(),
                                Clock.currentTimeMillis())) == null) {
                            return;
                        }
                    }
                }
            }
        };
    }
项目:health-and-care-developer-network    文件:MessageListenerManager.java   
public void registerListener(String name, MessageListener messageListener) {
    List<MessageListener> newListenersList = new CopyOnWriteArrayList<MessageListener>();
    List<MessageListener> listeners = messageListeners.putIfAbsent(name, newListenersList);
    if (listeners == null) {
        listeners = newListenersList;
    }
    listeners.add(messageListener);
}
项目:health-and-care-developer-network    文件:MessageListenerManager.java   
public void removeListener(String name, MessageListener messageListener) {
    if (!messageListeners.containsKey(name)) {
        return;
    }
    messageListeners.get(name).remove(messageListener);
    if (messageListeners.get(name).isEmpty()) {
        messageListeners.remove(name);
    }
}
项目:health-and-care-developer-network    文件:MessageListenerManager.java   
public void notifyMessageListeners(Packet packet) {
    List<MessageListener> list = messageListeners.get(packet.getName());
    if (list != null) {
        for (MessageListener<Object> messageListener : list) {
            messageListener.onMessage(new DataMessage(packet.getName(), new Data(packet.getKey())));
        }
    }
}