private void simulateCacheInserts(Cache cache) { EntryListener EntryListener = EntryListeners.get(cache); if (EntryListener != null) { if (cache instanceof CacheWrapper) { Cache wrapped = ((CacheWrapper) cache).getWrappedCache(); if (wrapped instanceof ClusteredCache) { ClusteredCache clusteredCache = (ClusteredCache) wrapped; for (Map.Entry entry : (Set<Map.Entry>) cache.entrySet()) { EntryEvent event = new EntryEvent(clusteredCache.map.getName(), cluster.getLocalMember(), EntryEventType.ADDED.getType(), entry.getKey(), null, entry.getValue()); EntryListener.entryAdded(event); } } } } }
@Test @SuppressWarnings("unchecked") public void testAdd() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { MockEndpoint mock = camelctx.getEndpoint("mock:added", MockEndpoint.class); mock.expectedMessageCount(1); EntryEvent<Object, Object> event = new EntryEvent<Object, Object>("foo", null, EntryEventType.ADDED.getType(), "4711", "my-foo"); argument.getValue().entryAdded(event); mock.assertIsSatisfied(3000); checkHeaders(mock.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.ADDED); } finally { camelctx.stop(); } }
@Test @SuppressWarnings("unchecked") public void testEvict() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { MockEndpoint mock = camelctx.getEndpoint("mock:evicted", MockEndpoint.class); mock.expectedMessageCount(1); EntryEvent<Object, Object> event = new EntryEvent<Object, Object>("foo", null, EntryEventType.EVICTED.getType(), "4711", "my-foo"); argument.getValue().entryEvicted(event); mock.assertIsSatisfied(3000); } finally { camelctx.stop(); } }
@Test @SuppressWarnings("unchecked") public void testUpdate() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { MockEndpoint mock = camelctx.getEndpoint("mock:updated", MockEndpoint.class); mock.expectedMessageCount(1); EntryEvent<Object, Object> event = new EntryEvent<Object, Object>("foo", null, EntryEventType.UPDATED.getType(), "4711", "my-foo"); argument.getValue().entryUpdated(event); mock.assertIsSatisfied(3000); checkHeaders(mock.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.UPDATED); } finally { camelctx.stop(); } }
@Test @SuppressWarnings("unchecked") public void testRemove() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { MockEndpoint mock = camelctx.getEndpoint("mock:removed", MockEndpoint.class); mock.expectedMessageCount(1); EntryEvent<Object, Object> event = new EntryEvent<Object, Object>("foo", null, EntryEventType.REMOVED.getType(), "4711", "my-foo"); argument.getValue().entryRemoved(event); mock.assertIsSatisfied(3000); checkHeaders(mock.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.REMOVED); } finally { camelctx.stop(); } }
@Test @SuppressWarnings("unchecked") public void testAdd() throws InterruptedException { MockEndpoint out = getMockEndpoint("mock:added"); out.expectedMessageCount(1); EntryEvent<Object, Object> event = new EntryEvent<Object, Object>("foo", null, EntryEventType.ADDED.getType(), "4711", "my-foo"); argument.getValue().entryAdded(event); assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS); this.checkHeaders(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.ADDED); }
@Test @SuppressWarnings("unchecked") public void testEnict() throws InterruptedException { MockEndpoint out = super.getMockEndpoint("mock:evicted"); out.expectedMessageCount(1); EntryEvent<Object, Object> event = new EntryEvent<Object, Object>("foo", null, EntryEventType.EVICTED.getType(), "4711", "my-foo"); argument.getValue().entryEvicted(event); assertMockEndpointsSatisfied(30000, TimeUnit.MILLISECONDS); }
@Test @SuppressWarnings("unchecked") public void testUpdate() throws InterruptedException { MockEndpoint out = getMockEndpoint("mock:updated"); out.expectedMessageCount(1); EntryEvent<Object, Object> event = new EntryEvent<Object, Object>("foo", null, EntryEventType.UPDATED.getType(), "4711", "my-foo"); argument.getValue().entryUpdated(event); assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS); this.checkHeaders(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.UPDATED); }
@Test @SuppressWarnings("unchecked") public void testEvict() throws InterruptedException { MockEndpoint out = getMockEndpoint("mock:evicted"); out.expectedMessageCount(1); EntryEvent<Object, Object> event = new EntryEvent<Object, Object>("foo", null, EntryEventType.EVICTED.getType(), "4711", "my-foo"); argument.getValue().entryEvicted(event); assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS); this.checkHeaders(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.EVICTED); }
@Test @SuppressWarnings("unchecked") public void testRemove() throws InterruptedException { MockEndpoint out = getMockEndpoint("mock:removed"); out.expectedMessageCount(1); EntryEvent<Object, Object> event = new EntryEvent<Object, Object>("foo", null, EntryEventType.REMOVED.getType(), "4711", "my-foo"); argument.getValue().entryRemoved(event); assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS); this.checkHeaders(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.REMOVED); }
@Test @SuppressWarnings("unchecked") public void testEvict() throws InterruptedException { MockEndpoint out = getMockEndpoint("mock:evicted"); out.expectedMessageCount(1); EntryEvent<Object, Object> event = new EntryEvent<Object, Object>("foo", null, EntryEventType.EVICTED.getType(), "4711", "my-foo"); argument.getValue().entryEvicted(event); assertMockEndpointsSatisfied(30000, TimeUnit.MILLISECONDS); }
/** * Returns a predicate for {@link Sources#mapJournal} and * {@link Sources#remoteMapJournal} that passes only * {@link EntryEventType#ADDED ADDED} and {@link EntryEventType#UPDATED * UPDATED} events. */ public static <K, V> DistributedPredicate<EventJournalMapEvent<K, V>> mapPutEvents() { return e -> e.getType() == EntryEventType.ADDED || e.getType() == EntryEventType.UPDATED; }