@Override public TabularData browse() { try { TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.listAwaitThreadsTabularType()); Collection<AsyncProcessorAwaitManager.AwaitThread> threads = manager.browse(); for (AsyncProcessorAwaitManager.AwaitThread entry : threads) { CompositeType ct = CamelOpenMBeanTypes.listAwaitThreadsCompositeType(); String id = "" + entry.getBlockedThread().getId(); String name = entry.getBlockedThread().getName(); String exchangeId = entry.getExchange().getExchangeId(); String routeId = entry.getRouteId(); String nodeId = entry.getNodeId(); String duration = "" + entry.getWaitDuration(); CompositeData data = new CompositeDataSupport(ct, new String[]{"id", "name", "exchangeId", "routeId", "nodeId", "duration"}, new Object[]{id, name, exchangeId, routeId, nodeId, duration}); answer.put(data); } return answer; } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } }
/** * Calls the async version of the processor's process method and waits * for it to complete before returning. This can be used by {@link AsyncProcessor} * objects to implement their sync version of the process method. * <p/> * <b>Important:</b> This method is discouraged to be used, as its better to invoke the asynchronous * {@link AsyncProcessor#process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method, whenever possible. * * @param processor the processor * @param exchange the exchange * @throws Exception can be thrown if waiting is interrupted */ public static void process(final AsyncProcessor processor, final Exchange exchange) throws Exception { final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager(); final CountDownLatch latch = new CountDownLatch(1); boolean sync = processor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { if (!doneSync) { awaitManager.countDown(exchange, latch); } } @Override public String toString() { return "Done " + processor; } }); if (!sync) { awaitManager.await(exchange, latch); } }
@Override public void interrupt(String exchangeId) { // need to find the exchange with the given exchange id Exchange found = null; for (AsyncProcessorAwaitManager.AwaitThread entry : browse()) { Exchange exchange = entry.getExchange(); if (exchangeId.equals(exchange.getExchangeId())) { found = exchange; break; } } if (found != null) { interrupt(found); } }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { context.addComponent("async", new MyAsyncComponent()); from("direct:start").routeId("myRoute") .to("mock:before") .to("async:bye:camel").id("myAsync") .to("mock:after") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { int size = context.getAsyncProcessorAwaitManager().size(); log.info("async inflight: {}", size); assertEquals(1, size); Collection<AsyncProcessorAwaitManager.AwaitThread> threads = context.getAsyncProcessorAwaitManager().browse(); AsyncProcessorAwaitManager.AwaitThread thread = threads.iterator().next(); long wait = thread.getWaitDuration(); log.info("Thread {} has waited for {} msec.", thread.getBlockedThread().getName(), wait); assertEquals("myRoute", thread.getRouteId()); assertEquals("myAsync", thread.getNodeId()); } }) .to("mock:result"); } }; }
public ManagedAsyncProcessorAwaitManager(CamelContext context, AsyncProcessorAwaitManager manager) { super(context, manager); this.manager = manager; }
public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() { return manager; }
public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() { return asyncProcessorAwaitManager; }
public void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager asyncProcessorAwaitManager) { this.asyncProcessorAwaitManager = asyncProcessorAwaitManager; }
@Override public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() { return context.getAsyncProcessorAwaitManager(); }
@Override public void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager manager) { context.setAsyncProcessorAwaitManager(manager); }
/** * Gets the {@link org.apache.camel.AsyncProcessor} await manager. * * @return the manager */ AsyncProcessorAwaitManager getAsyncProcessorAwaitManager();
/** * Sets a custom {@link org.apache.camel.AsyncProcessor} await manager. * * @param manager the manager */ void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager manager);