@Override public Processor createProcessor(RouteContext routeContext) throws Exception { Policy policy = resolvePolicy(routeContext); ObjectHelper.notNull(policy, "policy", this); // before wrap policy.beforeWrap(routeContext, this); // create processor after the before wrap Processor childProcessor = this.createChildProcessor(routeContext, true); // wrap Processor target = policy.wrap(routeContext, childProcessor); if (!(target instanceof Service)) { // wrap the target so it becomes a service and we can manage its lifecycle target = new WrapProcessor(target, childProcessor); } return target; }
@Test public void testRollbackUsingXmlQueueToQueue() throws Exception { // configure routes and add to camel context context.addRoutes(new SpringRouteBuilder() { @Override public void configure() throws Exception { Policy required = lookup("PROPAGATION_REQUIRED_POLICY", SpringTransactionPolicy.class); from("activemq:queue:foo?transacted=true").policy(required).process(new ConditionalExceptionProcessor()) .to("activemq:queue:bar?transacted=true"); } }); assertResult(); }
protected Policy resolvePolicy(RouteContext routeContext) { if (policy != null) { return policy; } // reuse code on transacted definition to do the resolution return TransactedDefinition.doResolvePolicy(routeContext, getRef(), type); }
@Test public void testRollbackUsingXmlQueueToProcessor() throws Exception { // configure routes and add to camel context context.addRoutes(new SpringRouteBuilder() { @Override public void configure() throws Exception { Policy required = lookup("PROPAGATION_REQUIRED_POLICY", SpringTransactionPolicy.class); from("activemq:queue:foo").policy(required).process(new ConditionalExceptionProcessor()); } }); assertResult(); }
@Test public void testRollbackUsingXmlQueueToQueueRequestReplyUsingDynamicMessageSelector() throws Exception { final ConditionalExceptionProcessor cp = new ConditionalExceptionProcessor(5); context.addRoutes(new SpringRouteBuilder() { @Override public void configure() throws Exception { Policy required = lookup("PROPAGATION_REQUIRED_POLICY", SpringTransactionPolicy.class); from("activemq:queue:foo").policy(required).process(cp).to("activemq-1:queue:bar?replyTo=queue:bar.reply"); from("activemq-1:queue:bar").process(new Processor() { public void process(Exchange e) { String request = e.getIn().getBody(String.class); Message out = e.getOut(); String selectorValue = e.getIn().getHeader("camelProvider", String.class); if (selectorValue != null) { out.setHeader("camelProvider", selectorValue); } out.setBody("Re: " + request); } }); } }); for (int i = 0; i < 5; ++i) { Object reply = template.requestBody("activemq:queue:foo", "blah" + i); assertTrue("Received unexpeced reply", reply.equals("Re: blah" + i)); assertTrue(cp.getErrorMessage(), cp.getErrorMessage() == null); } }
public PolicyDefinition(Policy policy) { this.policy = policy; }
public TransactedDefinition(Policy policy) { this.policy = policy; }
protected Policy resolvePolicy(RouteContext routeContext) { if (policy != null) { return policy; } return doResolvePolicy(routeContext, getRef(), type); }
protected static Policy doResolvePolicy(RouteContext routeContext, String ref, Class<? extends Policy> type) { // explicit ref given so lookup by it if (ObjectHelper.isNotEmpty(ref)) { return CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), ref, Policy.class); } // no explicit reference given from user so we can use some convention over configuration here // try to lookup by scoped type Policy answer = null; if (type != null) { // try find by type, note that this method is not supported by all registry Map<String, ?> types = routeContext.lookupByType(type); if (types.size() == 1) { // only one policy defined so use it Object found = types.values().iterator().next(); if (type.isInstance(found)) { return type.cast(found); } } } // for transacted routing try the default REQUIRED name if (type == TransactedPolicy.class) { // still not found try with the default name PROPAGATION_REQUIRED answer = routeContext.lookup(PROPAGATION_REQUIRED, TransactedPolicy.class); } // this logic only applies if we are a transacted policy // still no policy found then try lookup the platform transaction manager and use it as policy if (answer == null && type == TransactedPolicy.class) { Class<?> tmClazz = routeContext.getCamelContext().getClassResolver().resolveClass("org.springframework.transaction.PlatformTransactionManager"); if (tmClazz != null) { // see if we can find the platform transaction manager in the registry Map<String, ?> maps = routeContext.lookupByType(tmClazz); if (maps.size() == 1) { // only one platform manager then use it as default and create a transacted // policy with it and default to required // as we do not want dependency on spring jars in the camel-core we use // reflection to lookup classes and create new objects and call methods // as this is only done during route building it does not matter that we // use reflection as performance is no a concern during route building Object transactionManager = maps.values().iterator().next(); LOG.debug("One instance of PlatformTransactionManager found in registry: {}", transactionManager); Class<?> txClazz = routeContext.getCamelContext().getClassResolver().resolveClass("org.apache.camel.spring.spi.SpringTransactionPolicy"); if (txClazz != null) { LOG.debug("Creating a new temporary SpringTransactionPolicy using the PlatformTransactionManager: {}", transactionManager); TransactedPolicy txPolicy = ObjectHelper.newInstance(txClazz, TransactedPolicy.class); Method method; try { method = txClazz.getMethod("setTransactionManager", tmClazz); } catch (NoSuchMethodException e) { throw new RuntimeCamelException("Cannot get method setTransactionManager(PlatformTransactionManager) on class: " + txClazz); } ObjectHelper.invokeMethod(method, txPolicy, transactionManager); return txPolicy; } else { // camel-spring is missing on the classpath throw new RuntimeCamelException("Cannot create a transacted policy as camel-spring.jar is not on the classpath!"); } } else { if (maps.isEmpty()) { throw new NoSuchBeanException(null, "PlatformTransactionManager"); } else { throw new IllegalArgumentException("Found " + maps.size() + " PlatformTransactionManager in registry. " + "Cannot determine which one to use. Please configure a TransactionTemplate on the transacted policy."); } } } } return answer; }
/** * Sets a policy type that this definition should scope within. * <p/> * Is used for convention over configuration situations where the policy * should be automatic looked up in the registry and it should be based * on this type. For instance a {@link org.apache.camel.spi.TransactedPolicy} * can be set as type for easy transaction configuration. * <p/> * Will by default scope to the wide {@link Policy} * * @param type the policy type */ public void setType(Class<? extends Policy> type) { this.type = type; }