public void testAggregateClosedCorrelationKey() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .aggregate(header("id"), new BodyInAggregatingStrategy()) .completionSize(2).closeCorrelationKeyOnCompletion(1000) .to("mock:result"); } }); context.start(); getMockEndpoint("mock:result").expectedBodiesReceived("A+B"); template.sendBodyAndHeader("direct:start", "A", "id", 1); template.sendBodyAndHeader("direct:start", "B", "id", 1); // should be closed try { template.sendBodyAndHeader("direct:start", "C", "id", 1); fail("Should throw an exception"); } catch (CamelExecutionException e) { ClosedCorrelationKeyException cause = assertIsInstanceOf(ClosedCorrelationKeyException.class, e.getCause()); assertEquals("1", cause.getCorrelationKey()); assertTrue(cause.getMessage().startsWith("The correlation key [1] has been closed.")); } assertMockEndpointsSatisfied(); }
@Test public void testABCClose() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); // we expect ABC in the published message // notice: Only 1 message is expected mock.expectedBodiesReceived("ABC"); // send the first message template.sendBodyAndHeader("direct:start", "A", "myId", 1); // send the 2nd message with the same correlation key template.sendBodyAndHeader("direct:start", "B", "myId", 1); // the F message has another correlation key template.sendBodyAndHeader("direct:start", "F", "myId", 2); // now we have 3 messages with the same correlation key // and the Aggregator should publish the message template.sendBodyAndHeader("direct:start", "C", "myId", 1); // sending with correlation id 1 should fail as its closed try { template.sendBodyAndHeader("direct:start", "A2", "myId", 1); } catch (CamelExecutionException e) { ClosedCorrelationKeyException cause = assertIsInstanceOf(ClosedCorrelationKeyException.class, e.getCause()); assertEquals("1", cause.getCorrelationKey()); } assertMockEndpointsSatisfied(); }