@Override public TabularData browse(String routeId, int limit, boolean sortByLongestDuration) { try { TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.listInflightExchangesTabularType()); Collection<InflightRepository.InflightExchange> exchanges = inflightRepository.browse(routeId, limit, sortByLongestDuration); for (InflightRepository.InflightExchange entry : exchanges) { CompositeType ct = CamelOpenMBeanTypes.listInflightExchangesCompositeType(); String exchangeId = entry.getExchange().getExchangeId(); String fromRouteId = entry.getFromRouteId(); String atRouteId = entry.getAtRouteId(); String nodeId = entry.getNodeId(); String elapsed = "" + entry.getElapsed(); String duration = "" + entry.getDuration(); CompositeData data = new CompositeDataSupport(ct, new String[]{"exchangeId", "fromRouteId", "routeId", "nodeId", "elapsed", "duration"}, new Object[]{exchangeId, fromRouteId, atRouteId, nodeId, elapsed, duration}); answer.put(data); } return answer; } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").routeId("foo") .to("mock:a") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { Collection<InflightRepository.InflightExchange> list = context.getInflightRepository().browse(); assertEquals(1, list.size()); InflightRepository.InflightExchange inflight = list.iterator().next(); assertNotNull(inflight); assertEquals(exchange, inflight.getExchange()); assertEquals("foo", inflight.getFromRouteId()); assertEquals("foo", inflight.getAtRouteId()); assertEquals("myProcessor", inflight.getNodeId()); } }).id("myProcessor") .to("mock:result"); } }; }
public void testDefaultInflightRepository() throws Exception { InflightRepository repo = new DefaultInflightRepository(); assertEquals(0, repo.size()); Exchange e1 = new DefaultExchange(context); repo.add(e1); assertEquals(1, repo.size()); Exchange e2 = new DefaultExchange(context); repo.add(e2); assertEquals(2, repo.size()); repo.remove(e2); assertEquals(1, repo.size()); repo.remove(e1); assertEquals(0, repo.size()); }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").routeId("foo") .to("mock:a") .to("direct:bar") .to("mock:result"); from("direct:bar").routeId("bar") .to("mock:b") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { Collection<InflightRepository.InflightExchange> list = context.getInflightRepository().browse("foo"); assertEquals(1, list.size()); InflightRepository.InflightExchange inflight = list.iterator().next(); assertNotNull(inflight); assertEquals(exchange, inflight.getExchange()); assertEquals("foo", inflight.getFromRouteId()); assertEquals("bar", inflight.getAtRouteId()); assertEquals("myProcessor", inflight.getNodeId()); } }).id("myProcessor"); } }; }
public RouteInflightRepositoryAdvice(InflightRepository inflightRepository, String id) { this.inflightRepository = inflightRepository; this.id = id; }
public ManagedInflightRepository(CamelContext context, InflightRepository inflightRepository) { super(context, inflightRepository); this.inflightRepository = inflightRepository; }
public InflightRepository getInflightRepository() { return inflightRepository; }
public void setInflightRepository(InflightRepository repository) { this.inflightRepository = repository; }
@Override public InflightRepository getInflightRepository() { return context.getInflightRepository(); }
@Override public void setInflightRepository(InflightRepository repository) { context.setInflightRepository(repository); }
/** * Gets the inflight repository * * @return the repository */ InflightRepository getInflightRepository();
/** * Sets a custom inflight repository to use * * @param repository the repository */ void setInflightRepository(InflightRepository repository);