@Override public void configure() throws Exception { from("activemq:queue:foo") .transacted() .to("mock:input") //.log("Incoming ${header.group} with body ${body}") .aggregate(header("group"), new MyConcatAggregatationStrategy()) .aggregationRepository(new HawtDBAggregationRepository("events", "target/data/hawtdb.dat")) .completionSize(10) .log("Aggregated #${header.counter} ${header.group} with body ${body}") .to("activemq:queue:out") .to("mock:out"); from("activemq:queue:out") .to("mock:result"); }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { HawtDBAggregationRepository hawtDB = new HawtDBAggregationRepository("myrepo", "data/myrepo.dat"); from("file:target/inbox") // do a little logging when we load the file .log("Consuming file ${file:name}") // just aggregate all messages .aggregate(constant(true), new MyAggregationStrategy()) // use HawtDB as the persistent repository .aggregationRepository(hawtDB) // and complete when we got 3 messages .completionSize(3) // do a little logging for the published message .log("Sending out ${body}") // and send it to the mock .to("mock:result"); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { HawtDBAggregationRepository hawtDB = new HawtDBAggregationRepository("myrepo", "data/myrepo.dat"); // will recover by default hawtDB.setUseRecovery(true); // try at most 4 times hawtDB.setMaximumRedeliveries(4); // send to mock:dead if exhausted hawtDB.setDeadLetterUri("mock:dead"); // have it retry every 3th second hawtDB.setRecoveryInterval(3000); from("direct:start") // do a little logging .log("Sending ${body} with correlation key ${header.myId}") // aggregate based on header correlation key // use class MyAggregationStrategy for aggregation // and complete when we have aggregated 3 messages .aggregate(header("myId"), new MyAggregationStrategy()) // use HawtDB as the persistent repository .aggregationRepository(hawtDB) // and complete when we got 3 messages .completionSize(3) // do a little logging for the published message .log("Sending out ${body}") // use a mock to check recovery .to("mock:aggregate") // force failure to have the message being recovered .throwException(new IllegalArgumentException("Damn does not work")) // and send it to the mock (not possible, due exception being thrown) .to("mock:result"); } }; }