/** * Utility method for Consumers to process API method invocation result. * @param consumer Consumer that wants to process results. * @param result result of API method invocation. * @param splitResult true if the Consumer wants to split result using {@link org.apache.camel.util.component.ResultInterceptor#splitResult(Object)} method. * @param <T> Consumer class that extends DefaultConsumer and implements {@link org.apache.camel.util.component.ResultInterceptor}. * @return number of result exchanges processed. * @throws Exception on error. */ public static <T extends DefaultConsumer & ResultInterceptor> int getResultsProcessed( T consumer, Object result, boolean splitResult) throws Exception { // process result according to type if (result != null && splitResult) { // try to split the result final Object resultArray = consumer.splitResult(result); if (resultArray != result && resultArray.getClass().isArray()) { // create an exchange for every element final int length = Array.getLength(resultArray); for (int i = 0; i < length; i++) { processResult(consumer, result, Array.get(resultArray, i)); } return length; } } processResult(consumer, result, result); return 1; // number of messages polled }
private static <T extends DefaultConsumer & ResultInterceptor> void processResult(T consumer, Object methodResult, Object result) throws Exception { Exchange exchange = consumer.getEndpoint().createExchange(); exchange.getIn().setBody(result); consumer.interceptResult(methodResult, exchange); try { // send message to next processor in the route consumer.getProcessor().process(exchange); } finally { // log exception if an exception occurred and was not handled final Exception exception = exchange.getException(); if (exception != null) { consumer.getExceptionHandler().handleException("Error processing exchange", exchange, exception); } } }
@Override public void process(final Exchange exchange) throws Exception { final int count = getHeaderValue(exchange, HEADER_ITERATIONS); final int threads = getHeaderValue(exchange, HEADER_THREADS); PerformanceTestEndpoint endpoint = (PerformanceTestEndpoint)getEndpoint(); if (endpoint != null) { final DefaultConsumer consumer = (DefaultConsumer)endpoint.getConsumer(); ExecutorService executor = exchange.getContext().getExecutorServiceManager().newFixedThreadPool(this, "perf", threads); CompletionService<Exchange> tasks = new ExecutorCompletionService<Exchange>(executor); // StopWatch watch = new StopWatch(); // if we want to clock how long it takes for (int i = 0; i < count; i++) { tasks.submit(new Callable<Exchange>() { @Override public Exchange call() throws Exception { Exchange exch = ExchangeHelper.createCopy(exchange, false); try { consumer.getProcessor().process(exch); } catch (final Exception e) { exch.setException(e); } return exch; } }); } for (int i = 0; i < count; i++) { // Future<Exchange> result = tasks.take(); tasks.take(); // wait for all exchanges to complete } } }
@Override public Consumer createConsumer(Processor processor) throws Exception { return new DefaultConsumer(this, processor) { @Override protected void doStart() throws Exception { processors.add(getProcessor()); } @Override protected void doStop() throws Exception { processors.remove(getProcessor()); } }; }
protected Set<DefaultConsumer> getConsumers() { return consumers; }