@Test public void testConsumerBackPressure() throws Exception { CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context); // create an array with the messages String[] inbox = new String[100]; for (int i = 0; i < 100; i++) { inbox[i] = "Hello " + i; } // use stream engine create a publisher Flowable.fromArray(inbox) .doOnRequest(n -> { // log each time we are request more data from the publisher log.info("Requesting {} messages", n); }) .subscribe(rsCamel.streamSubscriber("inbox", String.class)); // let it run for 10 seconds Thread.sleep(10 * 1000L); }
@Test public void testNoBackPressure() throws Exception { CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context); // create a published that receive from the inbox stream Publisher<String> inbox = rsCamel.fromStream("inbox", String.class); // use stream engine to subscribe from the publisher Flowable.fromPublisher(inbox) .doOnNext(c -> { log.info("Processing message {}", c); Thread.sleep(1000); }) .subscribe(); // send in 200 messages log.info("Sending 200 messages ..."); for (int i = 0; i < 200; i++) { fluentTemplate.withBody("Hello " + i).to("seda:inbox?waitForTaskToComplete=Never").send(); } log.info("Sent 200 messages done"); // let it run for 250 seconds Thread.sleep(250 * 1000L); }
@Test public void testInflightBackPressure() throws Exception { CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context); // create a published that receive from the inbox stream Publisher<String> inbox = rsCamel.fromStream("inbox", String.class); // use stream engine to subscribe from the publisher Flowable.fromPublisher(inbox) .doOnNext(c -> { log.info("Processing message {}", c); Thread.sleep(1000); }) .subscribe(); // send in 200 messages log.info("Sending 200 messages ..."); for (int i = 0; i < 200; i++) { fluentTemplate.withBody("Hello " + i).to("seda:inbox?waitForTaskToComplete=Never").send(); } log.info("Sent 200 messages done"); // let it run for 250 seconds Thread.sleep(250 * 1000L); }
@Test public void testCamelFirst() throws Exception { LOG.info("Starting RX-Java2 Flowable Camel first"); // create Camel CamelContext camel = new DefaultCamelContext(); // create Reative Camel CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(camel); camel.start(); rsCamel.start(); // create a publisher from Camel seda:words endpoint Publisher<String> publisher = rsCamel.from("seda:words", String.class); Flowable.fromPublisher(publisher) // upper case the word .map(w -> w.toUpperCase()) // log the big number .doOnNext(w -> LOG.info(w)) .subscribe(); // send some words to Camel FluentProducerTemplate template = camel.createFluentProducerTemplate(); template.withBody("Camel").to("seda:words").send(); template.withBody("rocks").to("seda:words").send(); template.withBody("streams").to("seda:words").send(); template.withBody("as").to("seda:words").send(); template.withBody("well").to("seda:words").send(); // sleep a bit for reactive subscriber to complete Thread.sleep(1000); camel.stop(); rsCamel.stop(); }
@Test public void testFiles() throws Exception { getMockEndpoint("mock:inbox").expectedMessageCount(4); getMockEndpoint("mock:camel").expectedMessageCount(2); CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context); // use stream engine to subscribe from the publisher // where we filter out the big numbers which is logged Flowable.fromPublisher(rsCamel.from("file:target/inbox")) // call the direct:inbox Camel route from within this flow .doOnNext(e -> rsCamel.to("direct:inbox", e)) // filter out files which has Camel in the text .filter(e -> e.getIn().getBody(String.class).contains("Camel")) // let Camel also be subscriber by the endpoint direct:camel .subscribe(rsCamel.subscriber("direct:camel")); // create some test files fluentTemplate.to("file:target/inbox").withBody("Hello World").withHeader(Exchange.FILE_NAME, "hello.txt").send(); fluentTemplate.to("file:target/inbox").withBody("Hello Camel").withHeader(Exchange.FILE_NAME, "hello2.txt").send(); fluentTemplate.to("file:target/inbox").withBody("Bye Camel").withHeader(Exchange.FILE_NAME, "bye.txt").send(); fluentTemplate.to("file:target/inbox").withBody("Bye World").withHeader(Exchange.FILE_NAME, "bye2.txt").send(); assertMockEndpointsSatisfied(); }
@Test public void testNumbers() throws Exception { CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context); // create a published that receive from the numbers stream Publisher<Integer> numbers = rsCamel.fromStream("numbers", Integer.class); // use stream engine to subscribe from the publisher // where we filter out the big numbers which is logged Flowable.fromPublisher(numbers) .filter(n -> n > 5) .doOnNext(c -> log.info("Streaming big number {}", c)) .subscribe(); // let it run for 10 seconds Thread.sleep(10000); }
@Test public void testLatestBackPressure() throws Exception { CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context); // create a published that receive from the inbox stream Publisher<String> inbox = rsCamel.fromStream("inbox", String.class); // use stream engine to subscribe from the publisher Flowable.fromPublisher(inbox) .doOnNext(c -> { log.info("Processing message {}", c); Thread.sleep(1000); }) .subscribe(); // send in 200 messages log.info("Sending 200 messages ..."); for (int i = 0; i < 200; i++) { fluentTemplate.withBody("Hello " + i).to("seda:inbox?waitForTaskToComplete=Never").send(); } log.info("Sent 200 messages done"); // let it run for 250 seconds Thread.sleep(250 * 1000L); }
@Test public void testNumbers() throws Exception { CamelReactiveStreamsService reactive = CamelReactiveStreams.get(context); // create a published that receive from the numbers stream Publisher<Integer> numbers = reactive.fromStream("numbers", Integer.class); // use stream engine to subscribe from the publisher // where we filter out the big numbers which is logged Flux.from(numbers) .filter(n -> n > 5) .doOnNext(c -> log.info("Streaming big number {}", c)) .subscribe(); // let it run for 10 seconds Thread.sleep(10000); }
@Test public void testFrom() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.start(); try { CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx); Publisher<Exchange> timer = crs.from("timer:reactive?period=250&repeatCount=3"); AtomicInteger value = new AtomicInteger(0); CountDownLatch latch = new CountDownLatch(3); Flux.from(timer) .map(exchange -> ExchangeHelper.getHeaderOrProperty(exchange, Exchange.TIMER_COUNTER, Integer.class)) .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue())) .doOnNext(res -> latch.countDown()) .subscribe(); Assert.assertTrue(latch.await(2, TimeUnit.SECONDS)); } finally { camelctx.stop(); } }
@Test public void testToStream() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { public void configure() { from("reactive-streams:reactive") .setBody().constant("123"); } }); camelctx.start(); try { CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx); Publisher<Exchange> publisher = crs.toStream("reactive", new DefaultExchange(camelctx)); Exchange res = Flux.from(publisher).blockFirst(); Assert.assertNotNull(res); String content = res.getIn().getBody(String.class); Assert.assertNotNull(content); Assert.assertEquals("123", content); } finally { camelctx.stop(); } }
@Test public void testTo() throws Exception { CamelContext camelctx = createWildFlyCamelContext(); camelctx.start(); try { CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx); Set<String> values = Collections.synchronizedSet(new TreeSet<>()); CountDownLatch latch = new CountDownLatch(3); Flux.just(1, 2, 3) .flatMap(e -> crs.to("bean:hello", e, String.class)) .doOnNext(res -> values.add(res)) .doOnNext(res -> latch.countDown()) .subscribe(); Assert.assertTrue(latch.await(2, TimeUnit.SECONDS)); Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values); } finally { camelctx.stop(); } }
@Test public void testToWithExchange() throws Exception { CamelContext camelctx = createWildFlyCamelContext(); camelctx.start(); try { CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx); Set<String> values = Collections.synchronizedSet(new TreeSet<>()); CountDownLatch latch = new CountDownLatch(3); Flux.just(1, 2, 3) .flatMap(e -> crs.to("bean:hello", e)) .map(e -> e.getOut()) .map(e -> e.getBody(String.class)) .doOnNext(res -> values.add(res)) .doOnNext(res -> latch.countDown()) .subscribe(); Assert.assertTrue(latch.await(2, TimeUnit.SECONDS)); Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values); } finally { camelctx.stop(); } }
@Test public void testToFunction() throws Exception { CamelContext camelctx = createWildFlyCamelContext(); camelctx.start(); try { CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx); /* A TreeSet will order the messages alphabetically regardless of the insertion order * This is important because in the Flux returned by Flux.flatMap(Function<? super T, ? extends * Publisher<? extends R>>) the emissions may interleave */ Set<String> values = Collections.synchronizedSet(new TreeSet<>()); CountDownLatch latch = new CountDownLatch(3); Function<Object, Publisher<String>> fun = crs.to("bean:hello", String.class); Flux.just(1, 2, 3) .flatMap(fun) .doOnNext(res -> values.add(res)) .doOnNext(res -> latch.countDown()) .subscribe(); Assert.assertTrue(latch.await(2, TimeUnit.SECONDS)); Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values); } finally { camelctx.stop(); } }
@Test public void testToFunctionWithExchange() throws Exception { CamelContext camelctx = createWildFlyCamelContext(); camelctx.start(); try { CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx); Set<String> values = Collections.synchronizedSet(new TreeSet<>()); CountDownLatch latch = new CountDownLatch(3); Function<Object, Publisher<Exchange>> fun = crs.to("bean:hello"); Flux.just(1, 2, 3) .flatMap(fun) .map(e -> e.getOut()) .map(e -> e.getBody(String.class)) .doOnNext(res -> values.add(res)) .doOnNext(res -> latch.countDown()) .subscribe(); Assert.assertTrue(latch.await(2, TimeUnit.SECONDS)); Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values); } finally { camelctx.stop(); } }
@Test public void testConsumeNumbers() throws Exception { CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context); // use stream engine create a publisher // that just sends 5 numbers, which needs to be sorted // and then each data is send to Camel on the reactive-streams:number endpoint Flowable.just("3", "4", "1", "5", "2") .sorted(String::compareToIgnoreCase) .subscribe(rsCamel.streamSubscriber("numbers", String.class)); // let it run for 2 seconds Thread.sleep(2000); }
@Test public void testFromStreamDirect() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { public void configure() { from("direct:reactive") .to("reactive-streams:numbers"); } }); camelctx.start(); try { AtomicInteger value = new AtomicInteger(0); CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx); Flux.from(crs.fromStream("numbers", Integer.class)) .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue())) .subscribe(); ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBody("direct:reactive", 1); template.sendBody("direct:reactive", 2); template.sendBody("direct:reactive", 3); Assert.assertEquals(3, value.get()); } finally { camelctx.stop(); } }
@Test public void testFromStreamTimer() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("timer:tick?period=5&repeatCount=30") .setBody().header(Exchange.TIMER_COUNTER) .to("reactive-streams:tick"); } }); final int num = 30; final CountDownLatch latch = new CountDownLatch(num); final AtomicInteger value = new AtomicInteger(0); CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx); Flux.from(crs.fromStream("tick", Integer.class)) .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue())) .doOnNext(n -> latch.countDown()) .subscribe(); camelctx.start(); try { latch.await(5, TimeUnit.SECONDS); Assert.assertEquals(num, value.get()); } finally { camelctx.stop(); } }
@Test public void testFromStreamMultipleSubscriptionsWithDirect() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:reactive") .to("reactive-streams:direct"); } }); camelctx.start(); try { CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx); CountDownLatch latch1 = new CountDownLatch(2); Flux.from(crs.fromStream("direct", Integer.class)) .doOnNext(res -> latch1.countDown()) .subscribe(); CountDownLatch latch2 = new CountDownLatch(2); Flux.from(crs.fromStream("direct", Integer.class)) .doOnNext(res -> latch2.countDown()) .subscribe(); ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBody("direct:reactive", 1); template.sendBody("direct:reactive", 2); Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS)); Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS)); } finally { camelctx.stop(); } }
@Test public void testFromPublisher() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:source") .to("direct:stream") .setBody() .simple("after stream: ${body}"); } }); camelctx.start(); try { CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx); crs.process("direct:stream", publisher -> Flux.from(publisher) .map(e -> { int i = e.getIn().getBody(Integer.class); e.getOut().setBody(-i); return e; } ) ); ProducerTemplate template = camelctx.createProducerTemplate(); for (int i = 1; i <= 3; i++) { Assert.assertEquals( "after stream: " + (-i), template.requestBody("direct:source", i, String.class) ); } } finally { camelctx.stop(); } }
@Test public void testFromPublisherWithConversion() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:source") .to("direct:stream") .setBody() .simple("after stream: ${body}"); } }); camelctx.start(); try { CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx); crs.process("direct:stream", Integer.class, publisher -> Flux.from(publisher).map(Math::negateExact) ); ProducerTemplate template = camelctx.createProducerTemplate(); for (int i = 1; i <= 3; i++) { Assert.assertEquals( "after stream: " + (-i), template.requestBody("direct:source", i, String.class) ); } } finally { camelctx.stop(); } }
@Test public void testSubscriber() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:reactor") .to("mock:result"); } }); camelctx.start(); try { CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx); Flux.just(1, 2, 3) .subscribe(crs.subscriber("direct:reactor", Integer.class)); MockEndpoint mock = camelctx.getEndpoint("mock:result", MockEndpoint.class); mock.expectedMessageCount(3); mock.assertIsSatisfied(); int idx = 1; for (Exchange ex : mock.getExchanges()) { Assert.assertEquals(new Integer(idx++), ex.getIn().getBody(Integer.class)); } } finally { camelctx.stop(); } }
@Test public void testMultipleSubscriptionsWithTimer() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("timer:tick?period=50") .setBody().header(Exchange.TIMER_COUNTER) .to("reactive-streams:tick"); } }); CountDownLatch latch1 = new CountDownLatch(5); CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx); Disposable disp1 = Flux.from(crs.fromStream("tick", Integer.class)).subscribe(res -> latch1.countDown()); camelctx.start(); try { // Add another subscription CountDownLatch latch2 = new CountDownLatch(5); Disposable disp2 = Flux.from(crs.fromStream("tick", Integer.class)).subscribe(res -> latch2.countDown()); Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS)); Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS)); // Un subscribe both disp1.dispose(); disp2.dispose(); // No active subscriptions, warnings expected Thread.sleep(60); // Add another subscription CountDownLatch latch3 = new CountDownLatch(5); Disposable disp3 = Flux.from(crs.fromStream("tick", Integer.class)).subscribe(res -> latch3.countDown()); Assert.assertTrue(latch3.await(5, TimeUnit.SECONDS)); disp3.dispose(); } finally { camelctx.stop(); } }