public void testDuplicateMessagesAreFilteredOut() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").idempotentConsumer( header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200) ).eager(false).to("mock:result"); } }); context.start(); resultEndpoint.expectedBodiesReceived("one", "two", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testNotEager() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { final IdempotentRepository<String> repo = MemoryIdempotentRepository.memoryIdempotentRepository(200); from("direct:start").idempotentConsumer(header("messageId"), repo).eager(false). process(new Processor() { public void process(Exchange exchange) throws Exception { String id = exchange.getIn().getHeader("messageId", String.class); // should not contain assertFalse("Should not eager add to repo", repo.contains(id)); } }).to("mock:result"); } }); context.start(); resultEndpoint.expectedBodiesReceived("one", "two", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testEager() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { final IdempotentRepository<String> repo = MemoryIdempotentRepository.memoryIdempotentRepository(200); from("direct:start").idempotentConsumer(header("messageId"), repo).eager(true). process(new Processor() { public void process(Exchange exchange) throws Exception { String id = exchange.getIn().getHeader("messageId", String.class); // should contain assertTrue("Should eager add to repo", repo.contains(id)); } }).to("mock:result"); } }); context.start(); resultEndpoint.expectedBodiesReceived("one", "two", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testDuplicateMessagesAreFilteredOut() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").idempotentConsumer( header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200) ).to("mock:result"); } }); context.start(); resultEndpoint.expectedBodiesReceived("one", "two", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").idempotentConsumer(header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)) // in case of a failure we still want the message to be regarded as a duplicate, so we set the option to false .removeOnFailure(false) .process(new Processor() { public void process(Exchange exchange) throws Exception { String id = exchange.getIn().getHeader("messageId", String.class); if (id.equals("2")) { throw new IllegalArgumentException("Damn I cannot handle id 2"); } } }).to("mock:result"); } }; }
public void testDuplicateMessagesAreFilteredOut() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").idempotentConsumer( header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200) ).threads().to("mock:result"); } }); context.start(); resultEndpoint.expectedBodiesReceivedInAnyOrder("one", "two", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testDuplicateMessagesAreFilteredOut() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").idempotentConsumer(header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)).to("mock:result"); } }); context.start(); resultEndpoint.expectedBodiesReceived("one", "two", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testNoMessageId() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { errorHandler(deadLetterChannel("mock:dead")); from("direct:start").idempotentConsumer( header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200) ).to("mock:result"); } }); context.start(); resultEndpoint.expectedBodiesReceived("one", "two"); getMockEndpoint("mock:dead").expectedBodiesReceived("Hello World"); sendMessage("1", "one"); template.sendBody("direct:start", "Hello World"); sendMessage("2", "two"); sendMessage("1", "one"); assertMockEndpointsSatisfied(); }
public void testIdempotentConsumer() throws Exception { List<Route> routes = buildIdempotentConsumer(); log.debug("Created routes: " + routes); assertEquals("Number routes created", 1, routes.size()); for (Route route : routes) { Endpoint key = route.getEndpoint(); assertEquals("From endpoint", "direct://a", key.getEndpointUri()); EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route); Channel channel = unwrapChannel(consumer.getProcessor()); IdempotentConsumer idempotentConsumer = assertIsInstanceOf(IdempotentConsumer.class, channel.getNextProcessor()); assertEquals("messageIdExpression", "header(myMessageId)", idempotentConsumer.getMessageIdExpression().toString()); assertIsInstanceOf(MemoryIdempotentRepository.class, idempotentConsumer.getIdempotentRepository()); SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class, unwrapChannel(idempotentConsumer.getProcessor()).getNextProcessor()); assertEquals("Endpoint URI", "direct://b", sendProcessor.getDestination().getEndpointUri()); } }
public void testBrowsableOneFile() throws Exception { template.sendBodyAndHeader("file:target/browse", "A", Exchange.FILE_NAME, "a.txt"); FileEndpoint endpoint = context.getEndpoint("file:target/browse", FileEndpoint.class); assertNotNull(endpoint); MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository(); assertEquals(0, repo.getCacheSize()); List<Exchange> list = endpoint.getExchanges(); assertNotNull(list); assertEquals(1, list.size()); assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME)); // the in progress repo should not leak assertEquals(0, repo.getCacheSize()); // and the file is still there File file = new File("target/browse/a.txt"); assertTrue("File should exist " + file, file.exists()); }
public void testBrowsableTwoFiles() throws Exception { template.sendBodyAndHeader("file:target/browse", "A", Exchange.FILE_NAME, "a.txt"); template.sendBodyAndHeader("file:target/browse", "B", Exchange.FILE_NAME, "b.txt"); FileEndpoint endpoint = context.getEndpoint("file:target/browse?sortBy=file:name", FileEndpoint.class); assertNotNull(endpoint); MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository(); assertEquals(0, repo.getCacheSize()); List<Exchange> list = endpoint.getExchanges(); assertNotNull(list); assertEquals(2, list.size()); assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME)); assertEquals("b.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME)); // the in progress repo should not leak assertEquals(0, repo.getCacheSize()); // and the files is still there File fileA = new File("target/browse/a.txt"); assertTrue("File should exist " + fileA, fileA.exists()); File fileB = new File("target/browse/b.txt"); assertTrue("File should exist " + fileB, fileB.exists()); }
@Test public void testBrowsableOneFile() throws Exception { template.sendBodyAndHeader(getFtpUrl(), "A", Exchange.FILE_NAME, "a.txt"); FtpEndpoint<?> endpoint = context.getEndpoint(getFtpUrl(), FtpEndpoint.class); assertNotNull(endpoint); MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository(); assertEquals(0, repo.getCacheSize()); List<Exchange> list = endpoint.getExchanges(); assertNotNull(list); assertEquals(1, list.size()); assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME)); // the in progress repo should not leak assertEquals(0, repo.getCacheSize()); // and the file is still there File file = new File(FTP_ROOT_DIR + "/browse/a.txt"); assertTrue("File should exist " + file, file.exists()); }
@Override public void configure() throws Exception { from("direct:in") .log("Received message ${header[messageId]}") .idempotentConsumer(header("messageId"), new MemoryIdempotentRepository()).skipDuplicate(false) .choice() .when(exchangeProperty(Exchange.DUPLICATE_MESSAGE)) .log("Duplicate") .to("mock:duplicate") .otherwise() .to("mock:ws") .endChoice() .end() .log("Completing") .to("mock:out"); }
/** * Camel routes for reading RSS feeds. Routes could be also defined in XML, Groovy or scripting knowledge bases. * * @return route builder. */ @Bean public RouteBuilder rssInputRoute() { return new RouteBuilder() { // @formatter:off @SuppressWarnings("unchecked") @Override public void configure() throws Exception { EngineOperations operations = camelRssEngine().getOperations(); Map<String, String> rssSources = operations.getVariable(Map.class, CamelRssConstants.VAR_RSS_SOURCES); // Read RSS feeds from all configured sources. rssSources.forEach((source, url) -> from("rss:" + url + operations.getVariable(CamelRssConstants.VAR_RSS_ENDPOINT_PARAMETERS, "")).routeId(source) .setHeader(HEADER_SOURCE).constant(source) .to("direct:rss")); // Gathers RSS from different sources and sends to Sponge engine as a normalized event. from("direct:rss").routeId("rss") .marshal().rss() // Deduplicate by title. .idempotentConsumer(xpath("/rss/channel/item/title/text()"), MemoryIdempotentRepository.memoryIdempotentRepository()) // Conversion from RSS XML to Sponge event with attributes. .process((exchange) -> exchange.getIn().setBody(operations.event("news") .set("source", exchange.getIn().getHeader(HEADER_SOURCE)) .set("channel", CamelUtils.xpath(exchange, "/rss/channel/title/text()")) .set("title", CamelUtils.xpath(exchange, "/rss/channel/item/title/text()")) .set("link", CamelUtils.xpath(exchange, "/rss/channel/item/link/text()")) .set("description", CamelUtils.xpath(exchange, "/rss/channel/item/description/text()")) .make())) //.filter((exchange) -> false) .to("sponge:camelRssEngine"); } // @formatter:on }; }
public void testFailedExchangesNotAddedDeadLetterChannel() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).redeliveryDelay(0).logStackTrace(false)); from("direct:start").idempotentConsumer( header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200) ).eager(false).process(new Processor() { public void process(Exchange exchange) throws Exception { String id = exchange.getIn().getHeader("messageId", String.class); if (id.equals("2")) { throw new IllegalArgumentException("Damm I cannot handle id 2"); } } }).to("mock:result"); } }); context.start(); // we send in 2 messages with id 2 that fails getMockEndpoint("mock:error").expectedMessageCount(2); resultEndpoint.expectedBodiesReceived("one", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testFailedExchangesNotAdded() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").idempotentConsumer( header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200) ).eager(false).process(new Processor() { public void process(Exchange exchange) throws Exception { String id = exchange.getIn().getHeader("messageId", String.class); if (id.equals("2")) { throw new IllegalArgumentException("Damm I cannot handle id 2"); } } }).to("mock:result"); } }); context.start(); resultEndpoint.expectedBodiesReceived("one", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testNotSkiDuplicate() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { IdempotentRepository<String> repo = MemoryIdempotentRepository.memoryIdempotentRepository(200); from("direct:start") .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false) .to("mock:result"); } }); context.start(); resultEndpoint.expectedBodiesReceived("one", "two", "one", "two", "one", "three"); resultEndpoint.message(0).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isNull(); resultEndpoint.message(1).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isNull(); resultEndpoint.message(2).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE); resultEndpoint.message(3).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE); resultEndpoint.message(4).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE); resultEndpoint.message(5).exchangeProperty(Exchange.DUPLICATE_MESSAGE).isNull(); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testNotSkiDuplicateWithFilter() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { IdempotentRepository<String> repo = MemoryIdempotentRepository.memoryIdempotentRepository(200); // START SNIPPET: e1 from("direct:start") // instruct idempotent consumer to not skip duplicates as we will filter then our self .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false) .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true)) // filter out duplicate messages by sending them to someplace else and then stop .to("mock:duplicate") .stop() .end() // and here we process only new messages (no duplicates) .to("mock:result"); // END SNIPPET: e1 } }); context.start(); resultEndpoint.expectedBodiesReceived("one", "two", "three"); getMockEndpoint("mock:duplicate").expectedBodiesReceived("one", "two", "one"); getMockEndpoint("mock:duplicate").allMessages().exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testFailedExchangesNotAddedDeadLetterChannel() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).redeliveryDelay(0).logStackTrace(false)); from("direct:start").idempotentConsumer( header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200) ).process(new Processor() { public void process(Exchange exchange) throws Exception { String id = exchange.getIn().getHeader("messageId", String.class); if (id.equals("2")) { throw new IllegalArgumentException("Damm I cannot handle id 2"); } } }).to("mock:result"); } }); context.start(); // we send in 2 messages with id 2 that fails getMockEndpoint("mock:error").expectedMessageCount(2); resultEndpoint.expectedBodiesReceived("one", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testFailedExchangesNotAddedDeadLetterChannelNotHandled() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).redeliveryDelay(0).logStackTrace(false)); from("direct:start").idempotentConsumer( header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200) ).process(new Processor() { public void process(Exchange exchange) throws Exception { String id = exchange.getIn().getHeader("messageId", String.class); if (id.equals("2")) { throw new IllegalArgumentException("Damm I cannot handle id 2"); } } }).to("mock:result"); } }); context.start(); // we send in 2 messages with id 2 that fails getMockEndpoint("mock:error").expectedMessageCount(2); resultEndpoint.expectedBodiesReceived("one", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testFailedExchangesNotAdded() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { // use default error handler errorHandler(defaultErrorHandler()); from("direct:start").idempotentConsumer( header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200) ).process(new Processor() { public void process(Exchange exchange) throws Exception { String id = exchange.getIn().getHeader("messageId", String.class); if (id.equals("2")) { throw new IllegalArgumentException("Damm I cannot handle id 2"); } } }).to("mock:result"); } }); context.start(); resultEndpoint.expectedBodiesReceived("one", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testCompletionEager() throws Exception { repo = MemoryIdempotentRepository.memoryIdempotentRepository(200); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { errorHandler(deadLetterChannel("mock:dead")); from("direct:start") .idempotentConsumer(header("messageId"), repo).completionEager(true) .to("log:a", "mock:a") .to("log:b", "mock:b") .end() .filter(simple("${header.messageId} == '2'")) .throwException(new IllegalArgumentException("Forced")) .end() .to("log:result", "mock:result"); } }); context.start(); // we are on block only scope as "two" was success in the block, and then "two" failed afterwards does not matter // the idempotent consumer will not receive "two" again a.expectedBodiesReceived("one", "two", "three"); b.expectedBodiesReceived("one", "two", "three"); dead.expectedBodiesReceived("two", "two"); resultEndpoint.expectedBodiesReceived("one", "one", "one", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testNotCompletionEager() throws Exception { repo = MemoryIdempotentRepository.memoryIdempotentRepository(200); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { errorHandler(deadLetterChannel("mock:dead")); from("direct:start") .idempotentConsumer(header("messageId"), repo).completionEager(false) .to("log:a", "mock:a") .to("log:b", "mock:b") .end() .filter(simple("${header.messageId} == '2'")) .throwException(new IllegalArgumentException("Forced")) .end() .to("log:result", "mock:result"); } }); context.start(); // we are on completion scope so the "two" will rollback and therefore the idempotent consumer receives those again a.expectedBodiesReceived("one", "two", "two", "three"); b.expectedBodiesReceived("one", "two", "two", "three"); dead.expectedBodiesReceived("two", "two"); resultEndpoint.expectedBodiesReceived("one", "one", "one", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testFailedExchangesNotAddedDeadLetterChannel() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).redeliveryDelay(0).logStackTrace(false)); from("direct:start").idempotentConsumer( header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200) ).threads().process(new Processor() { public void process(Exchange exchange) throws Exception { String id = exchange.getIn().getHeader("messageId", String.class); if (id.equals("2")) { throw new IllegalArgumentException("Damn I cannot handle id 2"); } } }).to("mock:result"); } }); context.start(); // we send in 2 messages with id 2 that fails getMockEndpoint("mock:error").expectedMessageCount(2); resultEndpoint.expectedBodiesReceivedInAnyOrder("one", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testFailedExchangesNotAdded() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").idempotentConsumer( header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200) ).threads().process(new Processor() { public void process(Exchange exchange) throws Exception { String id = exchange.getIn().getHeader("messageId", String.class); if (id.equals("2")) { throw new IllegalArgumentException("Damn I cannot handle id 2"); } } }).to("mock:result"); } }); context.start(); resultEndpoint.expectedBodiesReceivedInAnyOrder("one", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testFailedExchangesNotAddedDeadLetterChannel() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).redeliveryDelay(0).logStackTrace(false)); from("direct:start").idempotentConsumer(header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)) .process(new Processor() { public void process(Exchange exchange) throws Exception { String id = exchange.getIn().getHeader("messageId", String.class); if (id.equals("2")) { throw new IllegalArgumentException("Damm I cannot handle id 2"); } } }).to("mock:result"); } }); context.start(); // we send in 2 messages with id 2 that fails getMockEndpoint("mock:error").expectedMessageCount(2); resultEndpoint.expectedBodiesReceived("one", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
public void testFailedExchangesNotAdded() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").idempotentConsumer(header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)) .process(new Processor() { public void process(Exchange exchange) throws Exception { String id = exchange.getIn().getHeader("messageId", String.class); if (id.equals("2")) { throw new IllegalArgumentException("Damm I cannot handle id 2"); } } }).to("mock:result"); } }); context.start(); resultEndpoint.expectedBodiesReceived("one", "three"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("2", "two"); sendMessage("1", "one"); sendMessage("3", "three"); assertMockEndpointsSatisfied(); }
@Override protected void setUp() throws Exception { repo = MemoryIdempotentRepository.memoryIdempotentRepository(); // lets start with 4 repo.add("4"); super.setUp(); startEndpoint = resolveMandatoryEndpoint("direct:start"); resultEndpoint = getMockEndpoint("mock:result"); }
protected List<Route> buildIdempotentConsumer() throws Exception { // START SNIPPET: idempotent RouteBuilder builder = new RouteBuilder() { public void configure() { errorHandler(deadLetterChannel("mock:error")); from("direct:a") .idempotentConsumer(header("myMessageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)) .to("direct:b"); } }; // END SNIPPET: idempotent return getRouteList(builder); }
public void testBrowsableThreeFilesRecursive() throws Exception { template.sendBodyAndHeader("file:target/browse", "A", Exchange.FILE_NAME, "a.txt"); template.sendBodyAndHeader("file:target/browse", "B", Exchange.FILE_NAME, "foo/b.txt"); template.sendBodyAndHeader("file:target/browse", "C", Exchange.FILE_NAME, "bar/c.txt"); FileEndpoint endpoint = context.getEndpoint("file:target/browse?recursive=true&sortBy=file:name", FileEndpoint.class); assertNotNull(endpoint); MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository(); assertEquals(0, repo.getCacheSize()); List<Exchange> list = endpoint.getExchanges(); assertNotNull(list); assertEquals(3, list.size()); assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME)); assertEquals("c.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME_ONLY)); assertEquals("b.txt", list.get(2).getIn().getHeader(Exchange.FILE_NAME_ONLY)); // the in progress repo should not leak assertEquals(0, repo.getCacheSize()); // and the files is still there File fileA = new File("target/browse/a.txt"); assertTrue("File should exist " + fileA, fileA.exists()); File fileB = new File("target/browse/foo/b.txt"); assertTrue("File should exist " + fileB, fileB.exists()); File fileC = new File("target/browse/bar/c.txt"); assertTrue("File should exist " + fileC, fileC.exists()); }
public void testIdempotent() throws Exception { // consume the file the first time MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("Hello World"); assertMockEndpointsSatisfied(); oneExchangeDone.matchesMockWaitTime(); // reset mock and set new expectations mock.reset(); mock.expectedMessageCount(0); // move file back File file = new File("target/idempotent/done/report.txt"); File renamed = new File("target/idempotent/report.txt"); file.renameTo(renamed); // should NOT consume the file again, let a bit time pass to let the consumer try to consume it but it should not Thread.sleep(100); assertMockEndpointsSatisfied(); FileEndpoint fe = context.getEndpoint(uri, FileEndpoint.class); assertNotNull(fe); MemoryIdempotentRepository repo = (MemoryIdempotentRepository) fe.getInProgressRepository(); assertEquals("Should be no in-progress files", 0, repo.getCacheSize()); }
@Override public RemoteFileConsumer<T> createConsumer(Processor processor) throws Exception { afterPropertiesSet(); RemoteFileConsumer<T> consumer = buildConsumer(processor); if (isDelete() && getMove() != null) { throw new IllegalArgumentException("You cannot both set delete=true and move options"); } // if noop=true then idempotent should also be configured if (isNoop() && !isIdempotentSet()) { log.info("Endpoint is configured with noop=true so forcing endpoint to be idempotent as well"); setIdempotent(true); } // if idempotent and no repository set then create a default one if (isIdempotentSet() && isIdempotent() && idempotentRepository == null) { log.info("Using default memory based idempotent repository with cache max size: " + DEFAULT_IDEMPOTENT_CACHE_SIZE); idempotentRepository = MemoryIdempotentRepository.memoryIdempotentRepository(DEFAULT_IDEMPOTENT_CACHE_SIZE); } if (!getConfiguration().isUseList() && getFileName() == null) { throw new IllegalArgumentException("Endpoint is configured with useList=false, then fileName must be configured also"); } // set max messages per poll consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll()); consumer.setEagerLimitMaxMessagesPerPoll(isEagerMaxMessagesPerPoll()); configureConsumer(consumer); return consumer; }
@Test public void testBrowsableTwoFiles() throws Exception { template.sendBodyAndHeader(getFtpUrl(), "A", Exchange.FILE_NAME, "a.txt"); template.sendBodyAndHeader(getFtpUrl(), "B", Exchange.FILE_NAME, "b.txt"); FtpEndpoint<?> endpoint = context.getEndpoint(getFtpUrl() + "&sortBy=file:name", FtpEndpoint.class); assertNotNull(endpoint); MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository(); assertEquals(0, repo.getCacheSize()); List<Exchange> list = endpoint.getExchanges(); assertNotNull(list); assertEquals(2, list.size()); assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME)); assertEquals("b.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME)); // the in progress repo should not leak assertEquals(0, repo.getCacheSize()); // and the files is still there File fileA = new File(FTP_ROOT_DIR + "/browse/a.txt"); assertTrue("File should exist " + fileA, fileA.exists()); File fileB = new File(FTP_ROOT_DIR + "/browse/b.txt"); assertTrue("File should exist " + fileB, fileB.exists()); }
@Test public void testBrowsableThreeFilesRecursive() throws Exception { template.sendBodyAndHeader(getFtpUrl(), "A", Exchange.FILE_NAME, "a.txt"); template.sendBodyAndHeader(getFtpUrl(), "B", Exchange.FILE_NAME, "foo/b.txt"); template.sendBodyAndHeader(getFtpUrl(), "C", Exchange.FILE_NAME, "bar/c.txt"); FtpEndpoint<?> endpoint = context.getEndpoint(getFtpUrl() + "&recursive=true&sortBy=file:name", FtpEndpoint.class); assertNotNull(endpoint); MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository(); assertEquals(0, repo.getCacheSize()); List<Exchange> list = endpoint.getExchanges(); assertNotNull(list); assertEquals(3, list.size()); assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME)); assertEquals("c.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME_ONLY)); assertEquals("b.txt", list.get(2).getIn().getHeader(Exchange.FILE_NAME_ONLY)); // the in progress repo should not leak assertEquals(0, repo.getCacheSize()); // and the files is still there File fileA = new File(FTP_ROOT_DIR + "/browse/a.txt"); assertTrue("File should exist " + fileA, fileA.exists()); File fileB = new File(FTP_ROOT_DIR + "/browse/foo/b.txt"); assertTrue("File should exist " + fileB, fileB.exists()); File fileC = new File(FTP_ROOT_DIR + "/browse/bar/c.txt"); assertTrue("File should exist " + fileC, fileC.exists()); }
@Override protected JndiRegistry createRegistry() throws Exception { JndiRegistry jndi = super.createRegistry(); repo = new MemoryIdempotentRepository(); repo.setCacheSize(5); jndi.bind("myRepo", repo); return jndi; }
@Override public void configure() throws Exception { from("direct:in") .log("Received message ${header[messageId]}") .idempotentConsumer(header("messageId"), new MemoryIdempotentRepository()) .log("Invoking WS") .to("mock:ws") .end() .log("Completing") .to("mock:out"); }
@Override public void configure() throws Exception { from("direct:in") .log("Received message ${header[messageId]}") .enrich("direct:invokeWs") .log("Completing") .to("mock:out"); from("direct:invokeWs") .idempotentConsumer(header("messageId"), new MemoryIdempotentRepository()) .log("Invoking WS") .to("mock:ws"); }
@Bean @Override public RouteBuilder route() { return new RouteBuilder() { @Override public void configure() throws Exception { // @formatter:off // RSS from CNN, body set to the title, deduplicated by body (title), put into Sponge as a camel event // (containing exchange). from("rss:http://rss.cnn.com/rss/edition.rss?sortEntries=false&consumer.delay=1000").routeId("rss") .marshal().rss() .setBody(xpath("/rss/channel/item/title/text()")) .idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository()) .transform(body().prepend("CNN news: ")) .to("sponge:spongeEngine"); // RSS from BBC, deduplicated by title, put into Sponge as an event with attributes containing decomposed feed. from("rss:http://feeds.bbci.co.uk/news/world/rss.xml?consumer.delay=1000").routeId("rssDecomposed") .marshal().rss() .idempotentConsumer(xpath("/rss/channel/item/title/text()"), MemoryIdempotentRepository.memoryIdempotentRepository()) // Conversion from RSS XML to Sponge event with attributes. .process((exchange) -> { exchange.getIn().setBody(spongeEngine().getOperations().makeEvent("rssDecomposed") .set("source", "BBC") .set("title", CamelUtils.xpath(exchange, "/rss/channel/item/title/text()")) .set("link", CamelUtils.xpath(exchange, "/rss/channel/item/link/text()")) .set("description", CamelUtils.xpath(exchange, "/rss/channel/item/description/text()"))); }) .to("sponge:spongeEngine"); from("sponge:spongeEngine").routeId("spongeConsumer") .process(exchange -> spongeEngine().getOperations().getVariable(AtomicInteger.class, "receivedCamelMessages") .incrementAndGet()) .log("${body}"); from("direct:log").routeId("directLog") .process(exchange -> spongeEngine().getOperations().getVariable(AtomicInteger.class, "receivedCamelMessages") .incrementAndGet()) .log("${body}"); // @formatter:on } }; }