@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { LevelDBAggregationRepository levelDB = new LevelDBAggregationRepository("myrepo", "data/myrepo.dat"); from("file:target/inbox") // do a little logging when we load the file .log("Consuming file ${file:name}") .convertBodyTo(String.class) // just aggregate all messages .aggregate(constant(true), new MyAggregationStrategy()) // use LevelDB as the persistent repository .aggregationRepository(levelDB) // and complete when we got 3 messages .completionSize(3) // do a little logging for the published message .log("Sending out ${body}") // and send it to the mock .to("mock:result"); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { LevelDBAggregationRepository levelDB = new LevelDBAggregationRepository("myrepo", "data/myrepo.dat"); // will recover by default levelDB.setUseRecovery(true); // try at most 4 times levelDB.setMaximumRedeliveries(4); // send to mock:dead if exhausted levelDB.setDeadLetterUri("mock:dead"); // have it retry every 3th second levelDB.setRecoveryInterval(3000); from("direct:start") // do a little logging .log("Sending ${body} with correlation key ${header.myId}") // aggregate based on header correlation key // use class MyAggregationStrategy for aggregation // and complete when we have aggregated 3 messages .aggregate(header("myId"), new MyAggregationStrategy()) // use LevelDB as the persistent repository .aggregationRepository(levelDB) // and complete when we got 3 messages .completionSize(3) // do a little logging for the published message .log("Sending out ${body}") // use a mock to check recovery .to("mock:aggregate") // force failure to have the message being recovered .throwException(new IllegalArgumentException("Damn does not work")) // and send it to the mock (not possible, due exception being thrown) .to("mock:result"); } }; }
@Test public void testLevelDBAggregate() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { LevelDBAggregationRepository repo = new LevelDBAggregationRepository("repo1", "target/leveldb/leveldb.dat"); from("direct:start") .aggregate(header("id"), new MyAggregationStrategy()) .completionSize(5).aggregationRepository(repo) .to("mock:aggregated"); } }); MockEndpoint mockAggregated = camelctx.getEndpoint("mock:aggregated", MockEndpoint.class); mockAggregated.expectedBodiesReceived("ABCDE"); camelctx.start(); try { ProducerTemplate template = camelctx.createProducerTemplate(); template.sendBodyAndHeader("direct:start", "A", "id", 123); template.sendBodyAndHeader("direct:start", "B", "id", 123); template.sendBodyAndHeader("direct:start", "C", "id", 123); template.sendBodyAndHeader("direct:start", "D", "id", 123); template.sendBodyAndHeader("direct:start", "E", "id", 123); MockEndpoint.assertIsSatisfied(camelctx, 30, TimeUnit.SECONDS); } finally { camelctx.stop(); } }
@Override @Before public void setUp() throws Exception { repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat"); super.setUp(); }