@Test public void testCamelFirst() throws Exception { LOG.info("Starting RX-Java2 Flowable Camel first"); // create Camel CamelContext camel = new DefaultCamelContext(); // create Reative Camel CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(camel); camel.start(); rsCamel.start(); // create a publisher from Camel seda:words endpoint Publisher<String> publisher = rsCamel.from("seda:words", String.class); Flowable.fromPublisher(publisher) // upper case the word .map(w -> w.toUpperCase()) // log the big number .doOnNext(w -> LOG.info(w)) .subscribe(); // send some words to Camel FluentProducerTemplate template = camel.createFluentProducerTemplate(); template.withBody("Camel").to("seda:words").send(); template.withBody("rocks").to("seda:words").send(); template.withBody("streams").to("seda:words").send(); template.withBody("as").to("seda:words").send(); template.withBody("well").to("seda:words").send(); // sleep a bit for reactive subscriber to complete Thread.sleep(1000); camel.stop(); rsCamel.stop(); }
@Test @InSequence(2) public void sendMessageToInbound(@Uri("direct:inbound") FluentProducerTemplate in, @Uri("mock:outbound") MockEndpoint out) throws InterruptedException { out.expectedMessageCount(1); out.expectedBodiesReceived("test-processed"); in.withBody("test").send(); assertIsSatisfied(2L, TimeUnit.SECONDS, out); }
@Produces @Default @Uri("") // Qualifiers are dynamically added in CdiCamelExtension private static FluentProducerTemplate fluentProducerTemplate(InjectionPoint ip, @Any Instance<CamelContext> instance, CdiCamelExtension extension) { return getQualifierByType(ip, Uri.class) .map(uri -> fluentProducerTemplateFromUri(ip, instance, extension, uri)) .orElseGet(() -> defaultFluentProducerTemplate(ip, instance, extension)); }
private static FluentProducerTemplate fluentProducerTemplateFromUri(InjectionPoint ip, @Any Instance<CamelContext> instance, CdiCamelExtension extension, Uri uri) { CamelContext context = selectContext(ip, instance, extension); FluentProducerTemplate producerTemplate = context.createFluentProducerTemplate(); Endpoint endpoint = context.getEndpoint(uri.value(), Endpoint.class); producerTemplate.setDefaultEndpoint(endpoint); return producerTemplate; }
private static FluentProducerTemplate defaultFluentProducerTemplate(InjectionPoint ip, @Any Instance<CamelContext> instance, CdiCamelExtension extension) { return selectContext(ip, instance, extension).createFluentProducerTemplate(); }
@Override public void configure(final Env env, final Config config, final Binder binder) throws Throwable { Config $camel = config.getConfig("camel"); DefaultCamelContext ctx = configure(new DefaultCamelContext(), $camel .withoutPath("shutdown") .withoutPath("threads") .withoutPath("jmx") .withoutPath("streamCaching")); if (!$camel.getBoolean("jmx")) { ctx.disableJMX(); } /** * Executor and thread poll */ ThreadPoolProfile threadPool = configure(new ThreadPoolProfile(), $camel.getConfig("threads")); ctx.getExecutorServiceManager().setDefaultThreadPoolProfile(threadPool); /** * Shutdown options. */ configure(ctx.getShutdownStrategy(), $camel.getConfig("shutdown")); if ($camel.getBoolean("streamCaching.enabled")) { ctx.setStreamCaching(true); configure(ctx.getStreamCachingStrategy(), $camel.getConfig("streamCaching")); } else { ctx.setStreamCaching(false); } /** * Components etc.. */ if (configurer != null) { configurer.configure(ctx, config); } /** * Routes */ if (routes != null) { routes(routes, ctx, config); } /** * Properties */ PropertiesComponent properties = new PropertiesComponent(config.root().origin().description()); properties.setIgnoreMissingLocation(true); properties.setPropertiesResolver((context, ignoreMissingLocation, location) -> { Properties props = new Properties(); config.entrySet() .forEach(e -> props.setProperty(e.getKey(), e.getValue().unwrapped().toString())); return props; }); properties.setPrefixToken("${"); properties.setSuffixToken("}"); ctx.addComponent("properties", properties); env.lifeCycle(CamelFinalizer.class); /** * Guice! */ binder.bind(CamelContext.class).toInstance(ctx); binder.bind(DefaultCamelContext.class).toInstance(ctx); binder.bind(ProducerTemplate.class).toInstance(ctx.createProducerTemplate()); binder.bind(FluentProducerTemplate.class).toInstance(ctx.createFluentProducerTemplate()); binder.bind(ConsumerTemplate.class).toInstance(ctx.createConsumerTemplate()); binder.bind(CamelFinalizer.class).asEagerSingleton(); binder.bind(RouteBuilder.class).toInstance(rb()); Multibinder<Object> routesBinder = Multibinder .newSetBinder(binder, Object.class, Names.named("camel.routes")); routeList.forEach(routeType -> routesBinder.addBinding().to(routeType)); }
@Test public void testPutAndGet() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { public void configure() { from("direct:start").toF("atomix-map:%s", MAP_NAME); } }); final String key = camelctx.getUuidGenerator().generateUuid(); final String val = camelctx.getUuidGenerator().generateUuid(); AtomixMapComponent component = camelctx.getComponent("atomix-map", AtomixMapComponent.class); component.setNodes(Collections.singletonList(replicaAddress)); camelctx.start(); try { Message result; FluentProducerTemplate fluent = camelctx.createFluentProducerTemplate().to("direct:start"); result = fluent.clearAll() .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixMap.Action.PUT) .withHeader(AtomixClientConstants.RESOURCE_KEY, key) .withBody(val) .request(Message.class); Assert.assertFalse(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); Assert.assertEquals(val, result.getBody()); Assert.assertEquals(val, map.get(key).join()); result = fluent.clearAll() .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixMap.Action.GET) .withHeader(AtomixClientConstants.RESOURCE_KEY, key) .request(Message.class); Assert.assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); Assert.assertEquals(val, result.getBody(String.class)); Assert.assertTrue(map.containsKey(key).join()); } finally { camelctx.stop(); } }