protected void sendExchange(String operation, Object key, Object value) { Exchange exchange = consumer.getEndpoint().createExchange(); // set object to body exchange.getIn().setBody(value); // set headers if (key != null) { exchange.getIn().setHeader(HazelcastConstants.OBJECT_ID, key); } HazelcastComponentHelper.setListenerHeaders(exchange, HazelcastConstants.CACHE_LISTENER, operation, cacheName); try { consumer.getProcessor().process(exchange); } catch (Exception e) { exchange.setException(e); } if (exchange.getException() != null) { consumer.getExceptionHandler().handleException(String.format("Error processing exchange for hazelcast consumer on object '%s' in cache '%s'.", key, cacheName), exchange, exchange.getException()); } }
private void sendExchange(MembershipEvent event, String action) { Exchange exchange = getEndpoint().createExchange(); HazelcastComponentHelper.setListenerHeaders(exchange, HazelcastConstants.INSTANCE_LISTENER, action); // instance listener header values InetSocketAddress adr = event.getMember().getSocketAddress(); if (adr != null) { exchange.getIn().setHeader(HazelcastConstants.INSTANCE_HOST, adr.getHostName()); exchange.getIn().setHeader(HazelcastConstants.INSTANCE_PORT, adr.getPort()); } try { getProcessor().process(exchange); } catch (Exception e) { exchange.setException(e); } if (exchange.getException() != null) { getExceptionHandler().handleException("Error processing exchange for Hazelcast consumer on your Hazelcast cluster.", exchange, exchange.getException()); } }
public void process(Exchange exchange) throws Exception { final int operation = lookupOperationNumber(exchange); switch (operation) { case -1: // default operation to publish case HazelcastConstants.PUBLISH_OPERATION: this.publish(exchange); break; default: throw new IllegalArgumentException(String.format("The value '%s' is not allowed for parameter '%s' on the TOPIC cache.", operation, HazelcastConstants.OPERATION)); } // finally copy headers HazelcastComponentHelper.copyHeaders(exchange); }
@Test @SuppressWarnings("unchecked") public void testAdd() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { MockEndpoint mock = camelctx.getEndpoint("mock:added", MockEndpoint.class); mock.expectedMessageCount(1); EntryEvent<Object, Object> event = new EntryEvent<Object, Object>("foo", null, EntryEventType.ADDED.getType(), "4711", "my-foo"); argument.getValue().entryAdded(event); mock.assertIsSatisfied(3000); checkHeaders(mock.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.ADDED); } finally { camelctx.stop(); } }
@Test @SuppressWarnings("unchecked") public void testUpdate() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { MockEndpoint mock = camelctx.getEndpoint("mock:updated", MockEndpoint.class); mock.expectedMessageCount(1); EntryEvent<Object, Object> event = new EntryEvent<Object, Object>("foo", null, EntryEventType.UPDATED.getType(), "4711", "my-foo"); argument.getValue().entryUpdated(event); mock.assertIsSatisfied(3000); checkHeaders(mock.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.UPDATED); } finally { camelctx.stop(); } }
@Test @SuppressWarnings("unchecked") public void testRemove() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { MockEndpoint mock = camelctx.getEndpoint("mock:removed", MockEndpoint.class); mock.expectedMessageCount(1); EntryEvent<Object, Object> event = new EntryEvent<Object, Object>("foo", null, EntryEventType.REMOVED.getType(), "4711", "my-foo"); argument.getValue().entryRemoved(event); mock.assertIsSatisfied(3000); checkHeaders(mock.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.REMOVED); } finally { camelctx.stop(); } }
private RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from(String.format("hazelcast-%sfoo", HazelcastConstants.MAP_PREFIX)) .log("object...") .choice() .when(header(HazelcastConstants.LISTENER_ACTION) .isEqualTo(HazelcastConstants.ADDED)) .log("...added").to("mock:added") .when(header(HazelcastConstants.LISTENER_ACTION) .isEqualTo(HazelcastConstants.EVICTED)) .log("...evicted").to("mock:evicted") .when(header(HazelcastConstants.LISTENER_ACTION) .isEqualTo(HazelcastConstants.UPDATED)) .log("...updated").to("mock:updated") .when(header(HazelcastConstants.LISTENER_ACTION) .isEqualTo(HazelcastConstants.REMOVED)) .log("...removed").to("mock:removed") .otherwise().log("fail!"); } }; }
@Test public void testPutWithTTL() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { Map<String, Object> headers = new HashMap<String, Object>(); headers.put(HazelcastConstants.OBJECT_ID, "4711"); headers.put(HazelcastConstants.TTL_VALUE, new Long(1)); headers.put(HazelcastConstants.TTL_UNIT, TimeUnit.MINUTES); ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBodyAndHeaders("direct:put", "test", headers); Mockito.verify(map).put("4711", "test", 1, TimeUnit.MINUTES); } finally { camelctx.stop(); } }
@Test public void testGet() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { ProducerTemplate template = camelctx.createProducerTemplate(); Mockito.when(map.get("4711")).thenReturn("my-foo"); template.sendBodyAndHeader("direct:get", null, HazelcastConstants.OBJECT_ID, "4711"); ConsumerTemplate consumer = camelctx.createConsumerTemplate(); String body = consumer.receiveBody("seda:out", 5000, String.class); Mockito.verify(map).get("4711"); Assert.assertEquals("my-foo", body); } finally { camelctx.stop(); } }
@Test public void testGetAllEmptySet() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { Set<Object> l = new HashSet<Object>(); Map t = new HashMap(); t.put("key1", "value1"); t.put("key2", "value2"); t.put("key3", "value3"); Mockito.when(map.getAll(Mockito.anySet())).thenReturn(t); ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBodyAndHeader("direct:getAll", null, HazelcastConstants.OBJECT_ID, l); ConsumerTemplate consumer = camelctx.createConsumerTemplate(); String body = consumer.receiveBody("seda:out", 5000, String.class); Mockito.verify(map).getAll(l); Assert.assertTrue(body.contains("key1=value1")); Assert.assertTrue(body.contains("key2=value2")); Assert.assertTrue(body.contains("key3=value3")); } finally { camelctx.stop(); } }
@Test public void testGetAllOnlyOneKey() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { Set<Object> l = new HashSet<Object>(); l.add("key1"); Map t = new HashMap(); t.put("key1", "value1"); Mockito.when(map.getAll(l)).thenReturn(t); ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBodyAndHeader("direct:getAll", null, HazelcastConstants.OBJECT_ID, l); ConsumerTemplate consumer = camelctx.createConsumerTemplate(); String body = consumer.receiveBody("seda:out", 5000, String.class); Mockito.verify(map).getAll(l); Assert.assertEquals("{key1=value1}", body); } finally { camelctx.stop(); } }
@Test public void testQuery() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { String sql = "bar > 1000"; Mockito.when(map.values(Mockito.any(SqlPredicate.class))).thenReturn(Arrays.<Object>asList(new Dummy("beta", 2000), new Dummy("gamma", 3000))); ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBodyAndHeader("direct:queue", null, HazelcastConstants.QUERY, sql); Mockito.verify(map).values(Mockito.any(SqlPredicate.class)); ConsumerTemplate consumer = camelctx.createConsumerTemplate(); Collection<?> b1 = consumer.receiveBody("seda:out", 5000, Collection.class); Assert.assertNotNull(b1); Assert.assertEquals(2, b1.size()); } finally { camelctx.stop(); } }
@Test public void testUpdateOldValue() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { Map<String, Object> headers = new HashMap<String, Object>(); headers.put(HazelcastConstants.OBJECT_ID, "4711"); headers.put(HazelcastConstants.OBJECT_VALUE, "my-foo"); ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBodyAndHeaders("direct:update", "replaced", headers); Mockito.verify(map).lock("4711"); Mockito.verify(map).replace("4711", "my-foo", "replaced"); Mockito.verify(map).unlock("4711"); } finally { camelctx.stop(); } }
@Test public void testPutIfAbsentWithTtl() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { Map<String, Object> headers = new HashMap<String, Object>(); headers.put(HazelcastConstants.OBJECT_ID, "4711"); headers.put(HazelcastConstants.TTL_VALUE, new Long(1)); headers.put(HazelcastConstants.TTL_UNIT, TimeUnit.MINUTES); ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBodyAndHeaders("direct:putIfAbsent", "replaced", headers); Mockito.verify(map).putIfAbsent("4711", "replaced", new Long(1), TimeUnit.MINUTES); } finally { camelctx.stop(); } }
@Test public void testContainsKey() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { Mockito.when(map.containsKey("testOk")).thenReturn(true); Mockito.when(map.containsKey("testKo")).thenReturn(false); ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBodyAndHeader("direct:containsKey", null, HazelcastConstants.OBJECT_ID, "testOk"); ConsumerTemplate consumer = camelctx.createConsumerTemplate(); Boolean body = consumer.receiveBody("seda:out", 5000, Boolean.class); Mockito.verify(map).containsKey("testOk"); Assert.assertEquals(true, body); template.sendBodyAndHeader("direct:containsKey", null, HazelcastConstants.OBJECT_ID, "testKo"); body = consumer.receiveBody("seda:out", 5000, Boolean.class); Mockito.verify(map).containsKey("testKo"); Assert.assertEquals(false, body); } finally { camelctx.stop(); } }
public void process(Exchange exchange) throws Exception { final int operation = lookupOperationNumber(exchange); switch (operation) { case HazelcastConstants.ADD_OPERATION: this.add(exchange); break; case HazelcastConstants.REMOVEVALUE_OPERATION: this.remove(exchange); break; case HazelcastConstants.CLEAR_OPERATION: this.clear(); break; case HazelcastConstants.ADD_ALL_OPERATION: this.addAll(exchange); break; case HazelcastConstants.REMOVE_ALL_OPERATION: this.removeAll(exchange); break; case HazelcastConstants.RETAIN_ALL_OPERATION: this.retainAll(exchange); break; default: throw new IllegalArgumentException(String.format("The value '%s' is not allowed for parameter '%s' on the LIST cache.", operation, HazelcastConstants.OPERATION)); } // finally copy headers HazelcastComponentHelper.copyHeaders(exchange); }
public void process(Exchange exchange) throws Exception { Map<String, Object> headers = exchange.getIn().getHeaders(); int operation = lookupOperationNumber(exchange); switch (operation) { case HazelcastConstants.READ_ONCE_HEAD_OPERATION: this.readOnceHead(exchange); break; case HazelcastConstants.READ_ONCE_TAIL_OPERATION: this.readOnceTail(exchange); break; case HazelcastConstants.GET_CAPACITY_OPERATION: this.getCapacity(exchange); break; case HazelcastConstants.REMAINING_CAPACITY_OPERATION: this.getRemainingCapacity(exchange); break; case HazelcastConstants.ADD_OPERATION: this.add(exchange); break; default: throw new IllegalArgumentException(String.format("The value '%s' is not allowed for parameter '%s' on the RINGBUFFER.", operation, HazelcastConstants.OPERATION)); } // finally copy headers HazelcastComponentHelper.copyHeaders(exchange); }
@Override public void configure() throws Exception { // HTTP service fromF("jetty:http://localhost:" + port) // increase the atomic clustered counter from the hazelcast cache .setHeader(HazelcastConstants.OBJECT_ID, constant("myCounter")) .to("hazelcast:atomicvalue:Cache?hazelcastInstance=#hz&defaultOperation=increment") // prepare http response .log(name + ": counter is now ${body}") .setHeader(Exchange.CONTENT_TYPE, constant("text/plain")) .transform().simple("Atomic Counter is now ${body}\n"); }
@Override public void configure() throws Exception { // HTTP service fromF("jetty:http://localhost:" + port) // get the counter from the hazelcast cache .setHeader(HazelcastConstants.OBJECT_ID, constant("myCounter")) .to("hazelcast:map:myCache?hazelcastInstance=#hz&defaultOperation=get") // update the counter using java code .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { Integer counter = exchange.getIn().getBody(Integer.class); if (counter == null) { counter = 0; } counter++; exchange.getIn().setBody(counter); } }) // update the counter in the hazelcast cache .setHeader(HazelcastConstants.OBJECT_ID, constant("myCounter")) .to("hazelcast:map:myCache?hazelcastInstance=#hz&defaultOperation=put") // prepare http response .log(name + ": counter is now ${body}") .setHeader(Exchange.CONTENT_TYPE, constant("text/plain")) .transform().simple("Counter is now ${body}\n"); }
@Test public void testPut() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBodyAndHeader("direct:put", "my-foo", HazelcastConstants.OBJECT_ID, "4711"); Mockito.verify(map).put("4711", "my-foo"); } finally { camelctx.stop(); } }
@Test public void testPutWithOperationNumber() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBodyAndHeader("direct:putWithOperationNumber", "my-foo", HazelcastConstants.OBJECT_ID, "4711"); Mockito.verify(map).put("4711", "my-foo"); } finally { camelctx.stop(); } }
@Test public void testPutWithOperationName() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBodyAndHeader("direct:putWithOperationName", "my-foo", HazelcastConstants.OBJECT_ID, "4711"); Mockito.verify(map).put("4711", "my-foo"); } finally { camelctx.stop(); } }
@Test public void testUpdate() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBodyAndHeader("direct:update", "my-fooo", HazelcastConstants.OBJECT_ID, "4711"); Mockito.verify(map).lock("4711"); Mockito.verify(map).replace("4711", "my-fooo"); Mockito.verify(map).unlock("4711"); } finally { camelctx.stop(); } }
@Test public void testDelete() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBodyAndHeader("direct:delete", null, HazelcastConstants.OBJECT_ID, 4711); Mockito.verify(map).remove(4711); } finally { camelctx.stop(); } }
@Test public void testPutIfAbsent() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { Map<String, Object> headers = new HashMap<String, Object>(); headers.put(HazelcastConstants.OBJECT_ID, "4711"); ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBodyAndHeaders("direct:putIfAbsent", "replaced", headers); Mockito.verify(map).putIfAbsent("4711", "replaced"); } finally { camelctx.stop(); } }
@Test public void testEvict() throws Exception { CamelContext camelctx = createCamelContext(); camelctx.start(); try { Map<String, Object> headers = new HashMap<String, Object>(); headers.put(HazelcastConstants.OBJECT_ID, "4711"); ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBodyAndHeaders("direct:evict", "", headers); Mockito.verify(map).evict("4711"); } finally { camelctx.stop(); } }
public void itemAdded(ItemEvent<Object> itemEvent) { this.sendExchange(HazelcastConstants.ADDED, null, itemEvent); }
public void itemRemoved(ItemEvent<Object> itemEvent) { this.sendExchange(HazelcastConstants.REMOVED, null, itemEvent); }
public void onMessage(Message<Object> objectMessage) { this.sendExchange(HazelcastConstants.RECEIVED, null, objectMessage); }
@Override public void entryAdded(EntryEvent<Object, Object> event) { this.sendExchange(HazelcastConstants.ADDED, event.getKey(), event.getValue()); }
@Override public void entryEvicted(EntryEvent<Object, Object> event) { this.sendExchange(HazelcastConstants.EVICTED, event.getKey(), event.getValue()); }
@Override public void entryRemoved(EntryEvent<Object, Object> event) { this.sendExchange(HazelcastConstants.REMOVED, event.getKey(), event.getValue()); }
@Override public void entryUpdated(EntryEvent<Object, Object> event) { this.sendExchange(HazelcastConstants.UPDATED, event.getKey(), event.getValue()); }
public void entryAdded(EntryEvent<Object, Object> event) { this.sendExchange(HazelcastConstants.ADDED, event.getKey(), event.getValue()); }
public void entryEvicted(EntryEvent<Object, Object> event) { this.sendExchange(HazelcastConstants.EVICTED, event.getKey(), event.getValue()); }
public void entryRemoved(EntryEvent<Object, Object> event) { this.sendExchange(HazelcastConstants.REMOVED, event.getKey(), event.getValue()); }
public void entryUpdated(EntryEvent<Object, Object> event) { this.sendExchange(HazelcastConstants.UPDATED, event.getKey(), event.getValue()); }
public void process(Exchange exchange) throws Exception { Map<String, Object> headers = exchange.getIn().getHeaders(); // get header parameters Integer pos = null; if (headers.containsKey(HazelcastConstants.OBJECT_POS)) { if (!(headers.get(HazelcastConstants.OBJECT_POS) instanceof Integer)) { throw new IllegalArgumentException("OBJECT_POS Should be of type Integer"); } pos = (Integer) headers.get(HazelcastConstants.OBJECT_POS); } final int operation = lookupOperationNumber(exchange); switch (operation) { case HazelcastConstants.ADD_OPERATION: this.add(pos, exchange); break; case HazelcastConstants.GET_OPERATION: this.get(pos, exchange); break; case HazelcastConstants.SETVALUE_OPERATION: this.set(pos, exchange); break; case HazelcastConstants.REMOVEVALUE_OPERATION: this.remove(pos, exchange); break; case HazelcastConstants.CLEAR_OPERATION: this.clear(); break; case HazelcastConstants.ADD_ALL_OPERATION: this.addAll(pos, exchange); break; case HazelcastConstants.REMOVE_ALL_OPERATION: this.removeAll(exchange); break; case HazelcastConstants.RETAIN_ALL_OPERATION: this.retainAll(exchange); break; default: throw new IllegalArgumentException(String.format("The value '%s' is not allowed for parameter '%s' on the LIST cache.", operation, HazelcastConstants.OPERATION)); } // finally copy headers HazelcastComponentHelper.copyHeaders(exchange); }
public void memberAdded(MembershipEvent event) { this.sendExchange(event, HazelcastConstants.ADDED); }
public void memberRemoved(MembershipEvent event) { this.sendExchange(event, HazelcastConstants.REMOVED); }