public void testAsyncCallbackBodyInOnly() throws Exception { ORDER.set(0); getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); final CountDownLatch latch = new CountDownLatch(1); template.asyncCallbackSendBody("direct:start", "Hello", new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { assertEquals("Hello World", exchange.getIn().getBody()); ORDER.addAndGet(2); latch.countDown(); } }); ORDER.addAndGet(1); assertTrue(latch.await(10, TimeUnit.SECONDS)); ORDER.addAndGet(4); assertMockEndpointsSatisfied(); assertEquals(7, ORDER.get()); }
public void testAsyncCallbackBodyInOut() throws Exception { ORDER.set(0); final CountDownLatch latch = new CountDownLatch(1); template.asyncCallbackRequestBody("direct:echo", "Hello", new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { assertEquals("HelloHello", exchange.getOut().getBody()); ORDER.addAndGet(2); latch.countDown(); } }); ORDER.addAndGet(1); assertTrue(latch.await(10, TimeUnit.SECONDS)); ORDER.addAndGet(4); assertEquals(7, ORDER.get()); }
public void testAsyncCallbackBodyInOnlyGetResult() throws Exception { ORDER.set(0); getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); Future<Object> future = template.asyncCallbackSendBody("direct:start", "Hello", new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { assertEquals("Hello World", exchange.getIn().getBody()); ORDER.addAndGet(2); } }); ORDER.addAndGet(1); Object reply = future.get(10, TimeUnit.SECONDS); ORDER.addAndGet(4); assertMockEndpointsSatisfied(); assertEquals(7, ORDER.get()); // no reply when in only assertEquals(null, reply); }
public void testAsyncCallbackBodyInOutGetResult() throws Exception { ORDER.set(0); Future<Object> future = template.asyncCallbackRequestBody("direct:echo", "Hello", new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { assertEquals("HelloHello", exchange.getOut().getBody()); ORDER.addAndGet(2); } }); ORDER.addAndGet(1); Object reply = future.get(10, TimeUnit.SECONDS); ORDER.addAndGet(4); assertEquals(7, ORDER.get()); assertEquals("HelloHello", reply); }
public Exchange dispatchAsync(RouteboxEndpoint endpoint, Exchange exchange) throws Exception { URI dispatchUri; Exchange reply; if (LOG.isDebugEnabled()) { LOG.debug("Dispatching exchange {} to endpoint {}", exchange, endpoint.getEndpointUri()); } dispatchUri = selectDispatchUri(endpoint, exchange); if (exchange.getPattern() == ExchangePattern.InOnly) { producer.asyncSend(dispatchUri.toASCIIString(), exchange); reply = exchange; } else { Future<Exchange> future = producer.asyncCallback(dispatchUri.toASCIIString(), exchange, new SynchronizationAdapter()); reply = future.get(endpoint.getConfig().getConnectionTimeout(), TimeUnit.MILLISECONDS); } return reply; }
@Test public void testCamelCallback() throws Exception { // echos is the list of replies which could be modified by multiple thread final List<String> echos = new CopyOnWriteArrayList<String>(); final CountDownLatch latch = new CountDownLatch(3); // use this callback to gather the replies and add it to the echos list Synchronization callback = new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { // get the reply and add it to echoes echos.add(exchange.getOut().getBody(String.class)); // count down latch when we receive a response latch.countDown(); } }; // now submit 3 async request/reply messages and use the same callback to // handle the replies template.asyncCallbackRequestBody("seda:echo", "Donkey", callback); template.asyncCallbackRequestBody("seda:echo", "Tiger", callback); template.asyncCallbackRequestBody("seda:echo", "Camel", callback); // wait until the messages is done, or timeout after 6 seconds latch.await(6, TimeUnit.SECONDS); // assert we got 3 replies assertEquals(3, echos.size()); List result = new ArrayList(echos); // sort list so we can assert by index Collections.sort(result); assertEquals("CamelCamel", result.get(0)); assertEquals("DonkeyDonkey", result.get(1)); assertEquals("TigerTiger", result.get(2)); }
@Test public void testGRPCAsynchronousProducer() throws Exception { int port = Integer.parseInt(AvailablePortFinder.readServerData("grpc-port")); DefaultCamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .toF("grpc://localhost:%d/%s?method=pingAsyncSync", port, "org.wildfly.camel.test.grpc.subA.PingPong"); } }); camelctx.start(); try { final CountDownLatch latch = new CountDownLatch(1); PingRequest pingRequest = PingRequest.newBuilder() .setPingName(GRPC_TEST_PING_VALUE) .setPingId(GRPC_TEST_PING_ID_1) .build(); ProducerTemplate template = camelctx.createProducerTemplate(); template.asyncCallbackSendBody("direct:start", pingRequest, new SynchronizationAdapter() { @Override public void onComplete(Exchange exchange) { latch.countDown(); List<PongResponse> response = exchange.getOut().getBody(List.class); Assert.assertEquals(2, response.size()); Assert.assertEquals(response.get(0).getPongId(), GRPC_TEST_PONG_ID_1); Assert.assertEquals(response.get(0).getPongName(), GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE); Assert.assertEquals(response.get(1).getPongId(), GRPC_TEST_PONG_ID_2); } }); Assert.assertTrue("Gave up waiting for latch", latch.await(5, TimeUnit.SECONDS)); } finally { camelctx.stop(); } }