@Override public Processor wrapProcessorInInterceptors(CamelContext context, final ProcessorDefinition<?> definition, final Processor target, final Processor nextTarget) throws Exception { return new DelegateAsyncProcessor(new Processor() { @Override public void process(Exchange exchange) throws Exception { // if(!camelConfig.isRunning()){ // System.err.println("系统将关闭,不在处理任务"); // return ; // } System.out.println("defainition :"+definition); System.out.println("nextTarget :"+nextTarget); target.process(exchange); } }); }
@Override public Processor getAfterProducer() { @SuppressWarnings("unchecked") final Processor processor = exchange -> { String jsonBean = ""; if (exchange.getIn().getBody(List.class) != null) { //Only grabbing the first record (map) in the list @SuppressWarnings("rawtypes") List<Map> maps = exchange.getIn().getBody(List.class); jsonBean = JSONBeanUtil.toJSONBean(maps.iterator().next()); } else { jsonBean = JSONBeanUtil.toJSONBean(exchange.getIn().getBody(Map.class)); } exchange.getIn().setBody(jsonBean); }; return processor; }
@Override protected final Endpoint createEndpoint(final String uri, final String remaining, final Map<String, Object> parameters) throws Exception { final DefaultConnectorEndpoint connectorEndpoint = (DefaultConnectorEndpoint) super.createEndpoint(uri, remaining, parameters); final DataType inputDataType = connectorEndpoint.getInputDataType(); final UnmarshallProcessor unmarshallInputProcessor = new UnmarshallInputProcessor(inputDataType); final Processor existingBeforeProducer = connectorEndpoint.getBeforeProducer(); if (existingBeforeProducer == null) { connectorEndpoint.setBeforeProducer(unmarshallInputProcessor); } else { connectorEndpoint.setBeforeProducer(Pipeline.newInstance(getCamelContext(), unmarshallInputProcessor, existingBeforeProducer)); } final DataType outputDataType = connectorEndpoint.getOutputDataType(); final UnmarshallProcessor unmarshallOutputProcessor = new UnmarshallOutputProcessor(outputDataType); final Processor existingAfterProducer = connectorEndpoint.getAfterProducer(); if (existingAfterProducer == null) { connectorEndpoint.setAfterProducer(unmarshallOutputProcessor); } else { connectorEndpoint.setAfterProducer(Pipeline.newInstance(getCamelContext(), unmarshallOutputProcessor, existingAfterProducer)); } return connectorEndpoint; }
@Test public void shouldNotRemoveExistingProcessors() throws Exception { final DefaultConnectorEndpoint endpoint = (DefaultConnectorEndpoint) connectorWithExistingProcessors .createEndpoint("salesforce-connector"); final Processor createdBeforeProducer = endpoint.getBeforeProducer(); assertThat(createdBeforeProducer).isInstanceOf(Pipeline.class); final Pipeline beforePipeline = (Pipeline) createdBeforeProducer; assertThat(beforePipeline.getProcessors()).isInstanceOf(List.class).hasSize(2); assertThat(((List<Processor>) beforePipeline.getProcessors()).get(0)).isInstanceOf(UnmarshallInputProcessor.class); assertThat(((List<Processor>) beforePipeline.getProcessors()).get(1)).isSameAs(beforeProcessor); final Processor createdAfterProducer = endpoint.getAfterProducer(); assertThat(createdAfterProducer).isInstanceOf(Pipeline.class); final Pipeline afterPipeline = (Pipeline) createdAfterProducer; assertThat(afterPipeline.getProcessors()).isInstanceOf(List.class).hasSize(2); assertThat(((List<Processor>) afterPipeline.getProcessors()).get(0)).isInstanceOf(UnmarshallOutputProcessor.class); assertThat(((List<Processor>) afterPipeline.getProcessors()).get(1)).isSameAs(afterProcessor); }
@Override public void configure() throws Exception { String jsonBody = "{\"a\":20,\"b\":30}"; from("timer://myTimer?period=1000") .setBody().constant(jsonBody) .to("sql-stored-connector:DEMO_ADD( " + "INTEGER ${body[a]}, " + "INTEGER ${body[b]}, " + "OUT INTEGER c)") .process(new Processor() { public void process(Exchange exchange) throws Exception { System.out.println(exchange.getIn() .getBody().getClass()); System.out.println(exchange.getIn() .getBody()); } }); }
@Override public void configure() throws Exception { from("sql-stored-start-connector:DEMO_OUT( " + "OUT INTEGER C )?schedulerPeriod=5000") .process(new Processor() { public void process(Exchange exchange) throws Exception { System.out.println(exchange.getIn() .getBody().getClass()); System.out.println(exchange.getIn() .getBody()); } }); }
@Test public void testMultipartToString() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); template.send("direct:a", new Processor() { public void process(Exchange exchange) throws Exception { exchange.getIn().setBody("Hello World"); exchange.getIn().setHeader(MailConstants.MAIL_ALTERNATIVE_BODY, "Alternative World"); } }); assertMockEndpointsSatisfied(); Message mailMessage = mock.getReceivedExchanges().get(0).getIn().getBody(MailMessage.class).getMessage(); assertNotNull(mailMessage); Object content = mailMessage.getContent(); Multipart mp = assertIsInstanceOf(Multipart.class, content); String s = MailConverters.toString(mp); assertEquals("Alternative World", s); }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { // leverage the fact that we can limit to max 50 files per poll // this will result in polling again and potentially picking up files // that already are in progress from("file:target/filestress?maxMessagesPerPoll=50&readLock=fileLock").routeId("foo").noAutoStartup() .threads(10) .process(new Processor() { public void process(Exchange exchange) throws Exception { // simulate some work with random time to complete Random ran = new Random(); int delay = ran.nextInt(250) + 10; Thread.sleep(delay); } }).to("mock:result"); } }; }
@Test public void testPutAndGetMultiColumns() throws Exception { testPutMultiColumns(); if (systemReady) { Exchange resp = template.request("direct:start", new Processor() { public void process(Exchange exchange) throws Exception { exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET); for (int col = 0; col < column[0].length; col++) { exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(col + 1), key[0]); exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(col + 1), family[0]); exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(col + 1), column[0][col]); } } }); for (int col = 0; col < column[0].length; col++) { assertEquals(body[0][col][0], resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(col + 1))); } } }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws URISyntaxException { JettyHttpComponent componentJetty = (JettyHttpComponent) context.getComponent("jetty"); componentJetty.setSslPassword(pwd); componentJetty.setSslKeyPassword(pwd); URL keyStoreUrl = this.getClass().getClassLoader().getResource("jsse/localhost.ks"); componentJetty.setKeystore(keyStoreUrl.toURI().getPath()); from("jetty:https://localhost:" + port1 + "/test").to("mock:a"); Processor proc = new Processor() { public void process(Exchange exchange) throws Exception { exchange.getOut().setBody("<b>Hello World</b>"); } }; from("jetty:https://localhost:" + port1 + "/hello").process(proc); from("jetty:https://localhost:" + port2 + "/test").to("mock:b"); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { port = AvailablePortFinder.getNextAvailable(8000); return new RouteBuilder() { public void configure() { from("jetty:http://localhost:" + port + "/test") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String name = exchange.getIn().getHeader("name", String.class); ObjectHelper.notNull(name, "name"); name = "org/apache/camel/itest/jetty/" + name; InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(exchange.getContext().getClassResolver(), name); String xml = exchange.getContext().getTypeConverter().convertTo(String.class, is); exchange.getOut().setBody(xml); exchange.getOut().setHeader(Exchange.CONTENT_TYPE, "text/xml"); } }); } }; }
private Processor divideByValueOf(final ValueBuilder valueBuilder) { return new Processor() { @Override public void process(Exchange exchange) throws Exception { Message in = exchange.getIn(); BigDecimal sum = new BigDecimal(checkNotNull( in.getBody(Number.class), "Body of %s is null", in) .toString()); BigDecimal divisor = new BigDecimal(checkNotNull( valueBuilder.evaluate(exchange, Integer.class), "No %s set in exchange %s", valueBuilder, exchange) .toString()); in.setBody(sum.divide(divisor, HALF_UP)); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:a").recipientList(header("slip")); from("jetty://http://localhost:{{port}}/myapp").process(new Processor() { public void process(Exchange exchange) throws Exception { int foo = exchange.getIn().getHeader("foo", Integer.class); String bar = exchange.getIn().getHeader("bar", String.class); exchange.getOut().setHeader("foo", foo * 2); exchange.getOut().setBody("Bye " + bar); } }); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("netty4:udp://localhost:{{port}}?sync=true") .process(new Processor() { public void process(Exchange exchange) throws Exception { Poetry poetry = (Poetry) exchange.getIn().getBody(); poetry.setPoet("Dr. Sarojini Naidu"); exchange.getOut().setBody(poetry); } }); } }; }
/** * Process 100 files with a sorted file endpoint. For each exchange the body will be replaced * by a large buffer. In reality a similar thing happens if you have a lot of large files * and use convertBodyTo(String.class). In both cases the Exchanges becomes quite large. * The test will consume a lot of memory if all exchanges are kept in a list while doing * the batch processing. This is because the garbage collector can not clean them as they * are referenced in the list of exchanges. * <p/> * The test is not really a good integration test as it simply waits and does not fail * or succeed fast */ public void xxxtestMemoryLeak() throws Exception { // run this manually and browse the memory usage, eg in IDEA there is a Statistics tab deleteDirectory("target/filesorter/archiv"); for (int c = 0; c < 100; c++) { template.sendBodyAndHeader(fileUrl + "c", "test", Exchange.FILE_NAME, c + ".dat"); } context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from(fileUrl + "c/?sortBy=ignoreCase:file:name") .process(new Processor() { public void process(Exchange exchange) throws Exception { StringBuilder buf = new StringBuilder(10000000); buf.setLength(1000000); exchange.getIn().setBody(buf.toString()); } }).to("file:target/filesorter/archiv"); } }); context.start(); Thread.sleep(30 * 1000L); }
@Test public void testHttpSimpleExchange() throws Exception { getMockEndpoint("mock:input").expectedBodiesReceived("Hello World"); Exchange out = template.request("netty-http:http://localhost:{{port}}/foo", new Processor() { @Override public void process(Exchange exchange) throws Exception { exchange.getIn().setBody("Hello World"); } }); assertNotNull(out); assertTrue(out.hasOut()); NettyHttpMessage response = out.getOut(NettyHttpMessage.class); assertNotNull(response); assertEquals(200, response.getHttpResponse().getStatus().getCode()); // we can also get the response as body HttpResponse body = out.getOut().getBody(HttpResponse.class); assertNotNull(body); assertEquals(200, body.getStatus().getCode()); assertMockEndpointsSatisfied(); }
public void testMulticastParallel() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(10); mock.whenAnyExchangeReceived(new Processor() { public void process(Exchange exchange) throws Exception { // they should all be BA as B is faster than A assertEquals("BA", exchange.getIn().getBody(String.class)); } }); for (int i = 0; i < 10; i++) { template.sendBody("direct:start", "Hello"); } assertMockEndpointsSatisfied(); }
public void testEnd2FailureTest() throws Exception { MockEndpoint end2 = getMockEndpoint("mock:end2"); end2.whenAnyExchangeReceived(new Processor() { public void process(Exchange exchange) throws Exception { throw new RuntimeException("Simulated Exception"); } }); getMockEndpoint("mock:end1").expectedMessageCount(1); getMockEndpoint("mock:end3").expectedMessageCount(0); getMockEndpoint("mock:end4").expectedMessageCount(1); String result = template.requestBody("direct:start", "Hello World!", String.class); assertEquals("Stop!", result); assertMockEndpointsSatisfied(); }
@Test public void testFilterCamelHeaders() throws Exception { Exchange out = template.send("http://localhost:{{port}}/test/filter", new Processor() { public void process(Exchange exchange) throws Exception { exchange.getIn().setBody("Claus"); exchange.getIn().setHeader("bar", 123); } }); assertNotNull(out); assertEquals("Hi Claus", out.getOut().getBody(String.class)); // there should be no internal Camel headers // except for the response code Map<String, Object> headers = out.getOut().getHeaders(); for (String key : headers.keySet()) { boolean valid = key.equalsIgnoreCase(Exchange.HTTP_RESPONSE_CODE) || key.equalsIgnoreCase(Exchange.HTTP_RESPONSE_TEXT); if (!valid) { assertTrue("Should not contain any Camel internal headers", !key.toLowerCase().startsWith("camel")); } else { assertEquals(200, headers.get(Exchange.HTTP_RESPONSE_CODE)); } } }
/** * Removes all headers on the message, except for the ones provided in the <tt>names</tt> parameter */ public static Processor removeHeaders(final String pattern, final String... exceptionPatterns) { return new Processor() { public void process(Exchange exchange) { if (exchange.hasOut()) { exchange.getOut().removeHeaders(pattern, exceptionPatterns); } else { exchange.getIn().removeHeaders(pattern, exceptionPatterns); } } @Override public String toString() { return "removeHeaders(" + pattern + ", " + Arrays.toString(exceptionPatterns) + ")"; } }; }
public void testXMLRouteLoading() throws Exception { applicationContext = createApplicationContext(); SpringCamelContext context = applicationContext.getBeansOfType(SpringCamelContext.class).values().iterator().next(); assertValidContext(context); // now lets send a message ProducerTemplate template = context.createProducerTemplate(); template.start(); template.send("direct:start", new Processor() { public void process(Exchange exchange) { Message in = exchange.getIn(); in.setHeader("name", "James"); in.setBody(body); } }); template.stop(); MyProcessor myProcessor = applicationContext.getBean("myProcessor", MyProcessor.class); List<Exchange> list = myProcessor.getExchanges(); assertEquals("Should have received a single exchange: " + list, 1, list.size()); }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from(url).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String s = "<GetPersonResponse xmlns=\"http://camel.apache.org/wsdl-first/types\">" + "<personId>123</personId><ssn>456</ssn><name>Donald Duck</name>" + "</GetPersonResponse>"; Document xml = context.getTypeConverter().convertTo(Document.class, s); exchange.getOut().setBody(xml); } }); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { from("netty4-http:http://localhost:{{port}}/myapp/mytest").process(new Processor() { public void process(Exchange exchange) throws Exception { // headers received should be in case as well Map<String, Object> map = new LinkedHashMap<String, Object>(); map.putAll(exchange.getIn().getHeaders()); assertEquals("123", map.get("OTHER")); assertEquals(null, map.get("other")); assertEquals("Carlsberg", map.get("beer")); assertEquals(null, map.get("Beer")); exchange.getOut().setBody("Bye World"); exchange.getOut().setHeader("MyCaseHeader", "aBc123"); exchange.getOut().setHeader("otherCaseHeader", "456DEf"); } }); } }; }
@Override @SuppressWarnings("unchecked") public Processor createProcessor(RouteContext routeContext) throws Exception { Processor childProcessor = this.createChildProcessor(routeContext, true); IdempotentRepository<String> idempotentRepository = (IdempotentRepository<String>) resolveMessageIdRepository(routeContext); ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this); Expression expression = getExpression().createExpression(routeContext); // these boolean should be true by default boolean eager = getEager() == null || getEager(); boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate(); boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure(); // these boolean should be false by default boolean completionEager = getCompletionEager() != null && getCompletionEager(); return new IdempotentConsumer(expression, idempotentRepository, eager, completionEager, duplicate, remove, childProcessor); }
protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { from("file://target/reports?preMove=../inprogress/${file:name}&consumer.delay=5000") .process(new Processor() { @SuppressWarnings("unchecked") public void process(Exchange exchange) throws Exception { GenericFile<File> file = (GenericFile<File>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE); assertNotNull(file); assertTrue(file.getRelativeFilePath().indexOf("inprogress") > -1); } }) .to("mock:report"); } }; }
@Override protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { from(fromURI).process(new Processor() { public void process(final Exchange exchange) throws Exception { QName faultCode = new QName("http://schemas.xmlsoap.org/soap/envelope/", "Server"); SoapFault fault = new SoapFault("Get the null value of person name", faultCode); Element details = StaxUtils.read(new StringReader(DETAILS)).getDocumentElement(); fault.setDetail(details); exchange.setException(fault); } }); } }; }
public void testSplitterWithAggregationStrategyParallel() throws Exception { MockEndpoint resultEndpoint = getMockEndpoint("mock:result"); resultEndpoint.expectedMessageCount(5); Exchange result = template.request("direct:parallel", new Processor() { public void process(Exchange exchange) { Message in = exchange.getIn(); in.setBody("James,Guillaume,Hiram,Rob,Roman"); in.setHeader("foo", "bar"); } }); assertMockEndpointsSatisfied(); Message out = result.getOut(); assertMessageHeader(out, "foo", "bar"); assertEquals((Integer) 5, result.getProperty("aggregated", Integer.class)); }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("activemq:queue:foo") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { JmsMessage jms = exchange.getIn(JmsMessage.class); assertNotNull(jms); Session session = jms.getJmsSession(); assertNotNull("Should have JMS session", session); assertEquals("Should be client ACK mode", Session.CLIENT_ACKNOWLEDGE, session.getAcknowledgeMode()); jms.getJmsMessage().acknowledge(); } }) .to("mock:result"); } }; }
@Test public void sendInOut() throws Exception { result.expectedMessageCount(1); template.send("direct:start", new Processor() { public void process(Exchange exchange) throws Exception { exchange.getIn().setHeader(SWFConstants.WORKFLOW_ID, "123"); } }); assertMockEndpointsSatisfied(); Exchange resultExchange = result.getExchanges().get(0); assertNotNull(resultExchange.getIn().getHeader(SWFConstants.WORKFLOW_ID)); assertNotNull(resultExchange.getIn().getHeader(SWFConstants.RUN_ID)); }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("jetty://http://localhost:{{port}}/test?handlers=myAuthHandler") .process(new Processor() { public void process(Exchange exchange) throws Exception { HttpServletRequest req = exchange.getIn().getBody(HttpServletRequest.class); assertNotNull(req); Principal user = req.getUserPrincipal(); assertNotNull(user); assertEquals("donald", user.getName()); } }) .transform(constant("Bye World")); } }; }
public void testOnExceptionTransformConstant() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(0)); // START SNIPPET: e1 // we catch MyFunctionalException and want to mark it as handled (= no failure returned to client) // but we want to return a fixed text response, so we transform OUT body as Sorry. onException(MyFunctionalException.class) .handled(true) .transform().constant("Sorry"); // END SNIPPET: e1 from("direct:start").process(new Processor() { public void process(Exchange exchange) throws Exception { throw new MyFunctionalException("Sorry you cannot do this"); } }); } }); Object out = template.requestBody("direct:start", "Hello World"); assertEquals("Sorry", out); }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("netty4:tcp://localhost:{{port}}?textline=true&sync=true") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String body = exchange.getIn().getBody(String.class); if (body.contains("Camel")) { Thread.sleep(3000); } } }) .transform().constant("Bye World"); } }; }
public void testXMLRouteLoading() throws Exception { applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/spring/camelContextFactoryBean.xml"); CamelContext context = applicationContext.getBean("camel2", CamelContext.class); assertNotNull("No context found!", context); List<Route> routes = context.getRoutes(); LOG.debug("Found routes: " + routes); assertNotNull("Should have found some routes", routes); assertEquals("One Route should be found", 1, routes.size()); for (Route route : routes) { Endpoint key = route.getEndpoint(); EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class, route); Processor processor = consumerRoute.getProcessor(); assertNotNull(processor); assertEndpointUri(key, "seda://test.c"); } }
public Mina2Consumer(final Mina2Endpoint endpoint, Processor processor) throws Exception { super(endpoint, processor); this.configuration = endpoint.getConfiguration(); // // All mina2 endpoints are InOut. The endpoints are asynchronous. // Endpoints can send "n" messages and receive "m" messages. // this.getEndpoint().setExchangePattern(ExchangePattern.InOut); String protocol = configuration.getProtocol(); if (protocol.equals("tcp")) { if (configuration.isClientMode()) { setupClientSocketProtocol(protocol, configuration); } else { setupSocketProtocol(protocol, configuration); } } else if (configuration.isDatagramProtocol()) { setupDatagramProtocol(protocol, configuration); } else if (protocol.equals("vm")) { setupVmProtocol(protocol, configuration); } }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("sjms:queue:in.only.consumer.async?synchronous=false").to("log:before") .process(new Processor() { public void process(Exchange exchange) throws Exception { if (exchange.getIn().getBody(String.class).equals("Hello Camel")) { Thread.sleep(2000); } } }).to("log:after").to(MOCK_RESULT); } }; }
@Override public Processor createProcessor(RouteContext routeContext) throws Exception { Expression exp = getExpression().createExpression(routeContext); boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork(); boolean isIgnoreInvalidEndpoint = getIgnoreInvalidEndpoint() != null && getIgnoreInvalidEndpoint(); Enricher enricher = new Enricher(exp); enricher.setShareUnitOfWork(isShareUnitOfWork); enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint); AggregationStrategy strategy = createAggregationStrategy(routeContext); if (strategy != null) { enricher.setAggregationStrategy(strategy); } if (aggregateOnException != null) { enricher.setAggregateOnException(aggregateOnException); } return enricher; }
/** * Gather the route scoped error handler from the given route */ private void doGetRouteScopedErrorHandler(Set<Service> services, Route route) { // only include error handlers if they are route scoped boolean includeErrorHandler = !routeDefinition.isContextScopedErrorHandler(route.getRouteContext().getCamelContext()); List<Service> extra = new ArrayList<Service>(); if (includeErrorHandler) { for (Service service : services) { if (service instanceof Channel) { Processor eh = ((Channel) service).getErrorHandler(); if (eh != null && eh instanceof Service) { extra.add((Service) eh); } } } } if (!extra.isEmpty()) { services.addAll(extra); } }
@Override public Processor getBeforeProducer() { final Processor processor = exchange -> { final String body = exchange.getIn().getBody(String.class); if (body!=null) { final Properties properties = JSONBeanUtil.parsePropertiesFromJSONBean(body); exchange.getIn().setBody(properties); } }; return processor; }
/** * Creates a consumer endpoint that splits up the List of Maps into exchanges of single * Maps, and within each exchange it converts each Map to JSON. */ @Override public Consumer createConsumer(final Processor processor) throws Exception { final ToJSONProcessor toJsonProcessor = new ToJSONProcessor(); Processor pipeline = Pipeline.newInstance(getCamelContext(), toJsonProcessor, processor); final Expression expression = ExpressionBuilder.bodyExpression(List.class); final Splitter splitter = new Splitter(getCamelContext(), expression, pipeline, null); return endpoint.createConsumer(splitter); }