Java 类org.apache.camel.component.reactive.streams.api.CamelReactiveStreams 实例源码

项目:camelinaction2    文件:CamelConsumerBackPressureTest.java   
@Test
public void testConsumerBackPressure() throws Exception {
    CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);

    // create an array with the messages
    String[] inbox = new String[100];
    for (int i = 0; i < 100; i++) {
        inbox[i] = "Hello " + i;
    }

    // use stream engine create a publisher
    Flowable.fromArray(inbox)
        .doOnRequest(n -> {
            // log each time we are request more data from the publisher
            log.info("Requesting {} messages", n);
        })
        .subscribe(rsCamel.streamSubscriber("inbox", String.class));

    // let it run for 10 seconds
    Thread.sleep(10 * 1000L);
}
项目:camelinaction2    文件:CamelNoBackPressureTest.java   
@Test
public void testNoBackPressure() throws Exception {
    CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);

    // create a published that receive from the inbox stream
    Publisher<String> inbox = rsCamel.fromStream("inbox", String.class);

    // use stream engine to subscribe from the publisher
    Flowable.fromPublisher(inbox)
        .doOnNext(c -> {
            log.info("Processing message {}", c);
            Thread.sleep(1000);
        })
        .subscribe();

    // send in 200 messages
    log.info("Sending 200 messages ...");
    for (int i = 0; i < 200; i++) {
        fluentTemplate.withBody("Hello " + i).to("seda:inbox?waitForTaskToComplete=Never").send();
    }
    log.info("Sent 200 messages done");

    // let it run for 250 seconds
    Thread.sleep(250 * 1000L);
}
项目:camelinaction2    文件:CamelInflightBackPressureTest.java   
@Test
public void testInflightBackPressure() throws Exception {
    CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);

    // create a published that receive from the inbox stream
    Publisher<String> inbox = rsCamel.fromStream("inbox", String.class);

    // use stream engine to subscribe from the publisher
    Flowable.fromPublisher(inbox)
        .doOnNext(c -> {
            log.info("Processing message {}", c);
            Thread.sleep(1000);
        })
        .subscribe();

    // send in 200 messages
    log.info("Sending 200 messages ...");
    for (int i = 0; i < 200; i++) {
        fluentTemplate.withBody("Hello " + i).to("seda:inbox?waitForTaskToComplete=Never").send();
    }
    log.info("Sent 200 messages done");

    // let it run for 250 seconds
    Thread.sleep(250 * 1000L);
}
项目:camelinaction2    文件:CamelFirstTest.java   
@Test
public void testCamelFirst() throws Exception {
    LOG.info("Starting RX-Java2 Flowable Camel first");

    // create Camel
    CamelContext camel = new DefaultCamelContext();

    // create Reative Camel
    CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(camel);

    camel.start();
    rsCamel.start();

    // create a publisher from Camel seda:words endpoint
    Publisher<String> publisher = rsCamel.from("seda:words", String.class);

    Flowable.fromPublisher(publisher)
        // upper case the word
        .map(w -> w.toUpperCase())
        // log the big number
        .doOnNext(w -> LOG.info(w))
        .subscribe();

    // send some words to Camel
    FluentProducerTemplate template = camel.createFluentProducerTemplate();

    template.withBody("Camel").to("seda:words").send();
    template.withBody("rocks").to("seda:words").send();
    template.withBody("streams").to("seda:words").send();
    template.withBody("as").to("seda:words").send();
    template.withBody("well").to("seda:words").send();

    // sleep a bit for reactive subscriber to complete
    Thread.sleep(1000);

    camel.stop();
    rsCamel.stop();
}
项目:camelinaction2    文件:CamelFilesTest.java   
@Test
public void testFiles() throws Exception {
    getMockEndpoint("mock:inbox").expectedMessageCount(4);
    getMockEndpoint("mock:camel").expectedMessageCount(2);

    CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);

    // use stream engine to subscribe from the publisher
    // where we filter out the big numbers which is logged
    Flowable.fromPublisher(rsCamel.from("file:target/inbox"))
        // call the direct:inbox Camel route from within this flow
        .doOnNext(e -> rsCamel.to("direct:inbox", e))
        // filter out files which has Camel in the text
        .filter(e -> e.getIn().getBody(String.class).contains("Camel"))
        // let Camel also be subscriber by the endpoint direct:camel
        .subscribe(rsCamel.subscriber("direct:camel"));

    // create some test files
    fluentTemplate.to("file:target/inbox").withBody("Hello World").withHeader(Exchange.FILE_NAME, "hello.txt").send();
    fluentTemplate.to("file:target/inbox").withBody("Hello Camel").withHeader(Exchange.FILE_NAME, "hello2.txt").send();
    fluentTemplate.to("file:target/inbox").withBody("Bye Camel").withHeader(Exchange.FILE_NAME, "bye.txt").send();
    fluentTemplate.to("file:target/inbox").withBody("Bye World").withHeader(Exchange.FILE_NAME, "bye2.txt").send();

    assertMockEndpointsSatisfied();
}
项目:camelinaction2    文件:CamelNumbersTest.java   
@Test
public void testNumbers() throws Exception {
    CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);

    // create a published that receive from the numbers stream
    Publisher<Integer> numbers = rsCamel.fromStream("numbers", Integer.class);

    // use stream engine to subscribe from the publisher
    // where we filter out the big numbers which is logged
    Flowable.fromPublisher(numbers)
        .filter(n -> n > 5)
        .doOnNext(c -> log.info("Streaming big number {}", c))
        .subscribe();

    // let it run for 10 seconds
    Thread.sleep(10000);
}
项目:camelinaction2    文件:CamelLatestBackPressureTest.java   
@Test
public void testLatestBackPressure() throws Exception {
    CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);

    // create a published that receive from the inbox stream
    Publisher<String> inbox = rsCamel.fromStream("inbox", String.class);

    // use stream engine to subscribe from the publisher
    Flowable.fromPublisher(inbox)
        .doOnNext(c -> {
            log.info("Processing message {}", c);
            Thread.sleep(1000);
        })
        .subscribe();

    // send in 200 messages
    log.info("Sending 200 messages ...");
    for (int i = 0; i < 200; i++) {
        fluentTemplate.withBody("Hello " + i).to("seda:inbox?waitForTaskToComplete=Never").send();
    }
    log.info("Sent 200 messages done");

    // let it run for 250 seconds
    Thread.sleep(250 * 1000L);
}
项目:camelinaction2    文件:CamelNumbersTest.java   
@Test
public void testNumbers() throws Exception {
    CamelReactiveStreamsService reactive = CamelReactiveStreams.get(context);

    // create a published that receive from the numbers stream
    Publisher<Integer> numbers = reactive.fromStream("numbers", Integer.class);

    // use stream engine to subscribe from the publisher
    // where we filter out the big numbers which is logged
    Flux.from(numbers)
        .filter(n -> n > 5)
        .doOnNext(c -> log.info("Streaming big number {}", c))
        .subscribe();

    // let it run for 10 seconds
    Thread.sleep(10000);
}
项目:wildfly-camel    文件:ReactorIntegrationTest.java   
@Test
public void testFrom() throws Exception {
    CamelContext camelctx = new DefaultCamelContext();
    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
        Publisher<Exchange> timer = crs.from("timer:reactive?period=250&repeatCount=3");

        AtomicInteger value = new AtomicInteger(0);
        CountDownLatch latch = new CountDownLatch(3);

        Flux.from(timer)
            .map(exchange -> ExchangeHelper.getHeaderOrProperty(exchange, Exchange.TIMER_COUNTER, Integer.class))
            .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue()))
            .doOnNext(res -> latch.countDown())
            .subscribe();

        Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
    } finally {
        camelctx.stop();
    }
}
项目:wildfly-camel    文件:ReactorIntegrationTest.java   
@Test
public void testToStream() throws Exception {
    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        public void configure() {
            from("reactive-streams:reactive")
                .setBody().constant("123");
        }
    });

    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
        Publisher<Exchange> publisher = crs.toStream("reactive", new DefaultExchange(camelctx));
        Exchange res = Flux.from(publisher).blockFirst();

        Assert.assertNotNull(res);

        String content = res.getIn().getBody(String.class);

        Assert.assertNotNull(content);
        Assert.assertEquals("123", content);
    } finally {
        camelctx.stop();
    }
}
项目:wildfly-camel    文件:ReactorIntegrationTest.java   
@Test
public void testTo() throws Exception {
    CamelContext camelctx = createWildFlyCamelContext();
    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);

        Set<String> values = Collections.synchronizedSet(new TreeSet<>());
        CountDownLatch latch = new CountDownLatch(3);

        Flux.just(1, 2, 3)
            .flatMap(e -> crs.to("bean:hello", e, String.class))
            .doOnNext(res -> values.add(res))
            .doOnNext(res -> latch.countDown())
            .subscribe();

        Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
        Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
    } finally {
        camelctx.stop();
    }
}
项目:wildfly-camel    文件:ReactorIntegrationTest.java   
@Test
public void testToWithExchange() throws Exception {
    CamelContext camelctx = createWildFlyCamelContext();
    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);

        Set<String> values = Collections.synchronizedSet(new TreeSet<>());
        CountDownLatch latch = new CountDownLatch(3);

        Flux.just(1, 2, 3)
            .flatMap(e -> crs.to("bean:hello", e))
            .map(e -> e.getOut())
            .map(e -> e.getBody(String.class))
            .doOnNext(res -> values.add(res))
            .doOnNext(res -> latch.countDown())
            .subscribe();

        Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
        Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
    } finally {
        camelctx.stop();
    }
}
项目:wildfly-camel    文件:ReactorIntegrationTest.java   
@Test
public void testToFunction() throws Exception {
    CamelContext camelctx = createWildFlyCamelContext();
    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);

        /* A TreeSet will order the messages alphabetically regardless of the insertion order
         * This is important because in the Flux returned by Flux.flatMap(Function<? super T, ? extends
         * Publisher<? extends R>>) the emissions may interleave */
        Set<String> values = Collections.synchronizedSet(new TreeSet<>());
        CountDownLatch latch = new CountDownLatch(3);
        Function<Object, Publisher<String>> fun = crs.to("bean:hello", String.class);

        Flux.just(1, 2, 3)
            .flatMap(fun)
            .doOnNext(res -> values.add(res))
            .doOnNext(res -> latch.countDown())
            .subscribe();

        Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
        Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
    } finally {
        camelctx.stop();
    }
}
项目:wildfly-camel    文件:ReactorIntegrationTest.java   
@Test
public void testToFunctionWithExchange() throws Exception {
    CamelContext camelctx = createWildFlyCamelContext();
    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
        Set<String> values = Collections.synchronizedSet(new TreeSet<>());
        CountDownLatch latch = new CountDownLatch(3);
        Function<Object, Publisher<Exchange>> fun = crs.to("bean:hello");

        Flux.just(1, 2, 3)
            .flatMap(fun)
            .map(e -> e.getOut())
            .map(e -> e.getBody(String.class))
            .doOnNext(res -> values.add(res))
            .doOnNext(res -> latch.countDown())
            .subscribe();

        Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
        Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
    } finally {
        camelctx.stop();
    }
}
项目:camelinaction2    文件:CamelConsumeNumbersTest.java   
@Test
public void testConsumeNumbers() throws Exception {
    CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);

    // use stream engine create a publisher
    // that just sends 5 numbers, which needs to be sorted
    // and then each data is send to Camel on the reactive-streams:number endpoint
    Flowable.just("3", "4", "1", "5", "2")
        .sorted(String::compareToIgnoreCase)
        .subscribe(rsCamel.streamSubscriber("numbers", String.class));

    // let it run for 2 seconds
    Thread.sleep(2000);
}
项目:wildfly-camel    文件:ReactiveStreamsIntegrationTest.java   
@Test
public void testStreamPublishSubscribe() throws Exception {
    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start")
            .to("reactive-streams:pub");

            from("reactive-streams:sub")
            .to("mock:result");
        }
    });

    MockEndpoint mockEndpoint = camelctx.getEndpoint("mock:result", MockEndpoint.class);
    mockEndpoint.expectedBodiesReceivedInAnyOrder(1, 2, 3, 4, 5);

    camelctx.start();
    try {
        Subscriber<Integer> sub = CamelReactiveStreams.get(camelctx).streamSubscriber("sub", Integer.class);
        Publisher<Integer> pub = CamelReactiveStreams.get(camelctx).fromStream("pub", Integer.class);
        pub.subscribe(sub);

        ProducerTemplate template = camelctx.createProducerTemplate();
        for (int i = 1; i <= 5; i++) {
            template.sendBody("direct:start", i);
        }

        mockEndpoint.assertIsSatisfied();
    } finally {
        camelctx.stop();
    }
}
项目:wildfly-camel    文件:ReactorIntegrationTest.java   
@Test
public void testFromStreamDirect() throws Exception {

    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        public void configure() {
            from("direct:reactive")
                .to("reactive-streams:numbers");
        }
    });

    camelctx.start();
    try {
        AtomicInteger value = new AtomicInteger(0);

        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
        Flux.from(crs.fromStream("numbers", Integer.class))
            .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue()))
            .subscribe();

        ProducerTemplate template = camelctx.createProducerTemplate();
        template.sendBody("direct:reactive", 1);
        template.sendBody("direct:reactive", 2);
        template.sendBody("direct:reactive", 3);

        Assert.assertEquals(3, value.get());
    } finally {
        camelctx.stop();
    }
}
项目:wildfly-camel    文件:ReactorIntegrationTest.java   
@Test
public void testFromStreamTimer() throws Exception {

    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("timer:tick?period=5&repeatCount=30")
                .setBody().header(Exchange.TIMER_COUNTER)
                .to("reactive-streams:tick");
        }
    });

    final int num = 30;
    final CountDownLatch latch = new CountDownLatch(num);
    final AtomicInteger value = new AtomicInteger(0);

    CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
    Flux.from(crs.fromStream("tick", Integer.class))
        .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue()))
        .doOnNext(n -> latch.countDown())
        .subscribe();

    camelctx.start();
    try {
        latch.await(5, TimeUnit.SECONDS);
        Assert.assertEquals(num, value.get());
    } finally {
        camelctx.stop();
    }
}
项目:wildfly-camel    文件:ReactorIntegrationTest.java   
@Test
public void testFromStreamMultipleSubscriptionsWithDirect() throws Exception {

    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:reactive")
                .to("reactive-streams:direct");
        }
    });

    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);

        CountDownLatch latch1 = new CountDownLatch(2);
        Flux.from(crs.fromStream("direct", Integer.class))
            .doOnNext(res -> latch1.countDown())
            .subscribe();

        CountDownLatch latch2 = new CountDownLatch(2);
        Flux.from(crs.fromStream("direct", Integer.class))
            .doOnNext(res -> latch2.countDown())
            .subscribe();

        ProducerTemplate template = camelctx.createProducerTemplate();
        template.sendBody("direct:reactive", 1);
        template.sendBody("direct:reactive", 2);

        Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS));
        Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS));
    } finally {
        camelctx.stop();
    }
}
项目:wildfly-camel    文件:ReactorIntegrationTest.java   
@Test
public void testFromPublisher() throws Exception {

    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:source")
                .to("direct:stream")
                .setBody()
                    .simple("after stream: ${body}");
        }
    });

    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
        crs.process("direct:stream",
            publisher ->
                Flux.from(publisher)
                    .map(e -> {
                        int i = e.getIn().getBody(Integer.class);
                        e.getOut().setBody(-i);

                        return e;
                    }
                )
        );

        ProducerTemplate template = camelctx.createProducerTemplate();
        for (int i = 1; i <= 3; i++) {
            Assert.assertEquals(
                "after stream: " + (-i),
                template.requestBody("direct:source", i, String.class)
            );
        }
    } finally {
        camelctx.stop();
    }
}
项目:wildfly-camel    文件:ReactorIntegrationTest.java   
@Test
public void testFromPublisherWithConversion() throws Exception {
    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:source")
                .to("direct:stream")
                .setBody()
                    .simple("after stream: ${body}");
        }
    });

    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);

        crs.process("direct:stream",
            Integer.class,
            publisher ->
                Flux.from(publisher).map(Math::negateExact)
        );

        ProducerTemplate template = camelctx.createProducerTemplate();
        for (int i = 1; i <= 3; i++) {
            Assert.assertEquals(
                "after stream: " + (-i),
                template.requestBody("direct:source", i, String.class)
            );
        }
    } finally {
        camelctx.stop();
    }
}
项目:wildfly-camel    文件:ReactorIntegrationTest.java   
@Test
public void testSubscriber() throws Exception {
    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:reactor")
                .to("mock:result");
        }
    });

    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);

        Flux.just(1, 2, 3)
            .subscribe(crs.subscriber("direct:reactor", Integer.class));

        MockEndpoint mock = camelctx.getEndpoint("mock:result", MockEndpoint.class);
        mock.expectedMessageCount(3);
        mock.assertIsSatisfied();

        int idx = 1;
        for (Exchange ex : mock.getExchanges()) {
            Assert.assertEquals(new Integer(idx++), ex.getIn().getBody(Integer.class));
        }
    } finally {
        camelctx.stop();
    }
}
项目:reactive-components    文件:CamelComponent.java   
CamelComponent(CamelContext context) {
    this.camelContext = context;
    this.camelreactive = CamelReactiveStreams.get(camelContext);
}
项目:reactive-components    文件:CamelComponent.java   
@Activate
public void activate(BundleContext context, CamelConfig config) throws Exception {
    this.camelContext = new OsgiDefaultCamelContext(context, new OsgiServiceRegistry(context));
    this.camelContext.start();
    this.camelreactive = CamelReactiveStreams.get(camelContext);
}
项目:wildfly-camel    文件:ReactorIntegrationTest.java   
@Test
public void testMultipleSubscriptionsWithTimer() throws Exception {

    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("timer:tick?period=50")
                .setBody().header(Exchange.TIMER_COUNTER)
                .to("reactive-streams:tick");
        }
    });

    CountDownLatch latch1 = new CountDownLatch(5);
    CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
    Disposable disp1 = Flux.from(crs.fromStream("tick", Integer.class)).subscribe(res -> latch1.countDown());

    camelctx.start();
    try {
        // Add another subscription
        CountDownLatch latch2 = new CountDownLatch(5);
        Disposable disp2 = Flux.from(crs.fromStream("tick", Integer.class)).subscribe(res -> latch2.countDown());

        Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS));
        Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS));

        // Un subscribe both
        disp1.dispose();
        disp2.dispose();

        // No active subscriptions, warnings expected
        Thread.sleep(60);

        // Add another subscription
        CountDownLatch latch3 = new CountDownLatch(5);
        Disposable disp3 = Flux.from(crs.fromStream("tick", Integer.class)).subscribe(res -> latch3.countDown());

        Assert.assertTrue(latch3.await(5, TimeUnit.SECONDS));
        disp3.dispose();
    } finally {
        camelctx.stop();
    }
}