/** * Creates a {@link PollingConsumer} and polls all pending messages on the endpoint * and invokes the given {@link Processor} to process each {@link Exchange} and then closes * down the consumer and throws any exceptions thrown. */ public static void pollEndpoint(Endpoint endpoint, Processor processor, long timeout) throws Exception { PollingConsumer consumer = endpoint.createPollingConsumer(); try { ServiceHelper.startService(consumer); while (true) { Exchange exchange = consumer.receive(timeout); if (exchange == null) { break; } else { processor.process(exchange); } } } finally { try { ServiceHelper.stopAndShutdownService(consumer); } catch (Exception e) { LOG.warn("Failed to stop PollingConsumer: " + consumer + ". This example is ignored.", e); } } }
@Override protected void configurePollingConsumer(PollingConsumer consumer) throws Exception { Map<String, Object> copy = new HashMap<String, Object>(getConsumerProperties()); Map<String, Object> throwaway = new HashMap<String, Object>(); // filter out unwanted options which is intended for the scheduled poll consumer // as these options are not supported on the polling consumer configureScheduledPollConsumerProperties(copy, throwaway); // set reference properties first as they use # syntax that fools the regular properties setter EndpointHelper.setReferenceProperties(getCamelContext(), consumer, copy); EndpointHelper.setProperties(getCamelContext(), consumer, copy); if (!isLenientProperties() && copy.size() > 0) { throw new ResolveEndpointFailedException(this.getEndpointUri(), "There are " + copy.size() + " parameters that couldn't be set on the endpoint polling consumer." + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint." + " Unknown consumer parameters=[" + copy + "]"); } }
@Override public PollingConsumer acquirePollingConsumer(Endpoint endpoint) { // always create a new consumer PollingConsumer answer; try { answer = endpoint.createPollingConsumer(); boolean singleton = true; if (answer instanceof IsSingleton) { singleton = ((IsSingleton) answer).isSingleton(); } if (getCamelContext().isStartingRoutes() && singleton) { // if we are currently starting a route, then add as service and enlist in JMX // - but do not enlist non-singletons in JMX // - note addService will also start the service getCamelContext().addService(answer); } else { // must then start service so producer is ready to be used ServiceHelper.startService(answer); } } catch (Exception e) { throw new FailedToCreateConsumerException(endpoint, e); } return answer; }
public ConsumerCache(Object source, CamelContext camelContext, Map<String, PollingConsumer> cache, ServicePool<Endpoint, PollingConsumer> pool) { this.camelContext = camelContext; this.consumers = cache; this.source = source; this.pool = pool; if (consumers instanceof LRUCache) { maxCacheSize = ((LRUCache) consumers).getMaxCacheSize(); } // only if JMX is enabled if (camelContext.getManagementStrategy().getManagementAgent() != null) { this.extendedStatistics = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended(); } else { this.extendedStatistics = false; } }
/** * Releases an acquired producer back after usage. * * @param endpoint the endpoint * @param pollingConsumer the pollingConsumer to release */ public void releasePollingConsumer(Endpoint endpoint, PollingConsumer pollingConsumer) { if (pollingConsumer instanceof ServicePoolAware) { // release back to the pool pool.release(endpoint, pollingConsumer); } else { boolean singleton = false; if (pollingConsumer instanceof IsSingleton) { singleton = ((IsSingleton) pollingConsumer).isSingleton(); } if (!singleton) { try { // stop and shutdown non-singleton producers as we should not leak resources ServiceHelper.stopAndShutdownService(pollingConsumer); } catch (Exception ex) { if (ex instanceof RuntimeCamelException) { throw (RuntimeCamelException)ex; } else { throw new RuntimeCamelException(ex); } } } } }
protected void doStop() throws Exception { // when stopping we intend to shutdown ServiceHelper.stopAndShutdownServices(statistics, pool); try { ServiceHelper.stopAndShutdownServices(consumers.values()); } finally { // ensure consumers are removed, and also from JMX for (PollingConsumer consumer : consumers.values()) { getCamelContext().removeService(consumer); } } consumers.clear(); if (statistics != null) { statistics.clear(); } }
public void testEndpointInjectPollingConsumer() throws Exception { CamelPostProcessorHelper helper = new CamelPostProcessorHelper(context); MyEndpointBeanPollingConsumer bean = new MyEndpointBeanPollingConsumer(); Method method = bean.getClass().getMethod("setConsumer", PollingConsumer.class); EndpointInject endpointInject = method.getAnnotation(EndpointInject.class); Class<?>[] parameterTypes = method.getParameterTypes(); for (Class<?> type : parameterTypes) { String propertyName = ObjectHelper.getPropertyName(method); Object value = helper.getInjectionValue(type, endpointInject.uri(), endpointInject.ref(), endpointInject.property(), propertyName, bean, "foo"); ObjectHelper.invokeMethod(method, bean, value); } template.sendBody("seda:foo", "Hello World"); MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("Hello World"); assertNotNull(bean.getConsumer()); Exchange exchange = bean.consume(); template.send("mock:result", exchange); assertMockEndpointsSatisfied(); }
public void testCacheConsumers() throws Exception { ConsumerCache cache = new ConsumerCache(this, context); cache.start(); assertEquals("Size should be 0", 0, cache.size()); // test that we cache at most 1000 consumers to avoid it eating to much memory for (int i = 0; i < 1003; i++) { Endpoint e = context.getEndpoint("direct:queue:" + i); PollingConsumer p = cache.getConsumer(e); assertNotNull("the polling consumer should not be null", p); } // the eviction is async so force cleanup cache.cleanUp(); assertEquals("Size should be 1000", 1000, cache.size()); cache.stop(); }
public void testPollingConsumer() throws Exception { template.sendBodyAndHeader("file:target/enrich", "Hello World", Exchange.FILE_NAME, "hello.txt"); PollingConsumer consumer = context.getEndpoint("file:target/enrich").createPollingConsumer(); consumer.start(); Exchange exchange = consumer.receive(5000); assertNotNull(exchange); assertEquals("Hello World", exchange.getIn().getBody(String.class)); // sleep a bit to ensure polling consumer would be suspended after we have used it Thread.sleep(500); // drop a new file which should not be picked up by the consumer template.sendBodyAndHeader("file:target/enrich", "Bye World", Exchange.FILE_NAME, "bye.txt"); // sleep a bit to ensure polling consumer would not have picked up that file Thread.sleep(1000); File file = new File("target/enrich/bye.txt"); assertTrue("File should exist " + file, file.exists()); consumer.stop(); }
@Test public void testPollingConsumer() throws Exception { template.sendBodyAndHeader(getFtpUrl(), "Hello World", Exchange.FILE_NAME, "hello.txt"); PollingConsumer consumer = context.getEndpoint(getFtpUrl()).createPollingConsumer(); consumer.start(); Exchange exchange = consumer.receive(5000); assertNotNull(exchange); assertEquals("Hello World", exchange.getIn().getBody(String.class)); // sleep a bit to ensure polling consumer would be suspended after we have used it Thread.sleep(1000); // drop a new file which should not be picked up by the consumer template.sendBodyAndHeader(getFtpUrl(), "Bye World", Exchange.FILE_NAME, "bye.txt"); // sleep a bit to ensure polling consumer would not have picked up that file Thread.sleep(1000); File file = new File(FTP_ROOT_DIR + "/polling/bye.txt"); assertTrue("File should exist " + file, file.exists()); consumer.stop(); }
@Transactional(propagation = Propagation.REQUIRES_NEW) @Override @SuppressWarnings("unchecked") public Pair<String, List<PropagationStatus>> create( final AnyObjectTO anyObjectTO, final Set<String> excludedResources, final boolean nullPriorityAsync) { PollingConsumer pollingConsumer = getConsumer("direct:createAnyObjectPort"); Map<String, Object> props = new HashMap<>(); props.put("excludedResources", excludedResources); props.put("nullPriorityAsync", nullPriorityAsync); sendMessage("direct:createAnyObject", anyObjectTO, props); Exchange exchange = pollingConsumer.receive(); if (exchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) { throw (RuntimeException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT); } return exchange.getIn().getBody(Pair.class); }
@Transactional(propagation = Propagation.REQUIRES_NEW) @Override @SuppressWarnings("unchecked") public Pair<AnyObjectPatch, List<PropagationStatus>> update( final AnyObjectPatch anyPatch, final Set<String> excludedResources, final boolean nullPriorityAsync) { PollingConsumer pollingConsumer = getConsumer("direct:updateAnyObjectPort"); Map<String, Object> props = new HashMap<>(); props.put("excludedResources", excludedResources); props.put("nullPriorityAsync", nullPriorityAsync); sendMessage("direct:updateAnyObject", anyPatch, props); Exchange exchange = pollingConsumer.receive(); if (exchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) { throw (RuntimeException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT); } return exchange.getIn().getBody(Pair.class); }
@Transactional(propagation = Propagation.REQUIRES_NEW) @Override @SuppressWarnings("unchecked") public List<PropagationStatus> delete( final String key, final Set<String> excludedResources, final boolean nullPriorityAsync) { PollingConsumer pollingConsumer = getConsumer("direct:deleteAnyObjectPort"); Map<String, Object> props = new HashMap<>(); props.put("excludedResources", excludedResources); props.put("nullPriorityAsync", nullPriorityAsync); sendMessage("direct:deleteAnyObject", key, props); Exchange exchange = pollingConsumer.receive(); if (exchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) { throw (RuntimeException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT); } return exchange.getIn().getBody(List.class); }
@Override @SuppressWarnings("unchecked") public List<PropagationStatus> provision( final String key, final Collection<String> resources, final boolean nullPriorityAsync) { PollingConsumer pollingConsumer = getConsumer("direct:provisionAnyObjectPort"); Map<String, Object> props = new HashMap<>(); props.put("resources", resources); props.put("nullPriorityAsync", nullPriorityAsync); sendMessage("direct:provisionAnyObject", key, props); Exchange exchange = pollingConsumer.receive(); if (exchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) { throw (RuntimeException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT); } return exchange.getIn().getBody(List.class); }
@Override @SuppressWarnings("unchecked") public List<PropagationStatus> deprovision( final String key, final Collection<String> resources, final boolean nullPriorityAsync) { PollingConsumer pollingConsumer = getConsumer("direct:deprovisionAnyObjectPort"); Map<String, Object> props = new HashMap<>(); props.put("resources", resources); props.put("nullPriorityAsync", nullPriorityAsync); sendMessage("direct:deprovisionAnyObject", key, props); Exchange exchange = pollingConsumer.receive(); if (exchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) { throw (RuntimeException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT); } return exchange.getIn().getBody(List.class); }
@Override @SuppressWarnings("unchecked") public Pair<String, List<PropagationStatus>> create(final GroupTO groupTO, final boolean nullPriorityAsync) { PollingConsumer pollingConsumer = getConsumer("direct:createGroupPort"); Map<String, Object> props = new HashMap<>(); props.put("excludedResources", Collections.<String>emptySet()); props.put("nullPriorityAsync", nullPriorityAsync); sendMessage("direct:createGroup", groupTO, props); Exchange exchange = pollingConsumer.receive(); if (exchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) { throw (RuntimeException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT); } return exchange.getIn().getBody(Pair.class); }
@Transactional(propagation = Propagation.REQUIRES_NEW) @Override @SuppressWarnings("unchecked") public Pair<String, List<PropagationStatus>> create( final GroupTO groupTO, final Map<String, String> groupOwnerMap, final Set<String> excludedResources, final boolean nullPriorityAsync) { PollingConsumer pollingConsumer = getConsumer("direct:createGroupInPullPort"); Map<String, Object> props = new HashMap<>(); props.put("groupOwnerMap", groupOwnerMap); props.put("excludedResources", excludedResources); props.put("nullPriorityAsync", nullPriorityAsync); sendMessage("direct:createGroupInPull", groupTO, props); Exchange exchange = pollingConsumer.receive(); if (exchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) { throw (RuntimeException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT); } return exchange.getIn().getBody(Pair.class); }
@Transactional(propagation = Propagation.REQUIRES_NEW) @Override @SuppressWarnings("unchecked") public Pair<GroupPatch, List<PropagationStatus>> update( final GroupPatch anyPatch, final Set<String> excludedResources, final boolean nullPriorityAsync) { PollingConsumer pollingConsumer = getConsumer("direct:updateGroupPort"); Map<String, Object> props = new HashMap<>(); props.put("excludedResources", excludedResources); props.put("nullPriorityAsync", nullPriorityAsync); sendMessage("direct:updateGroup", anyPatch, props); Exchange exchange = pollingConsumer.receive(); if (exchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) { throw (RuntimeException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT); } return exchange.getIn().getBody(Pair.class); }
@Transactional(propagation = Propagation.REQUIRES_NEW) @Override @SuppressWarnings("unchecked") public List<PropagationStatus> delete( final String key, final Set<String> excludedResources, final boolean nullPriorityAsync) { PollingConsumer pollingConsumer = getConsumer("direct:deleteGroupPort"); Map<String, Object> props = new HashMap<>(); props.put("excludedResources", excludedResources); props.put("nullPriorityAsync", nullPriorityAsync); sendMessage("direct:deleteGroup", key, props); Exchange exchange = pollingConsumer.receive(); if (exchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) { throw (RuntimeException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT); } return exchange.getIn().getBody(List.class); }
@Override @SuppressWarnings("unchecked") public List<PropagationStatus> provision( final String key, final Collection<String> resources, final boolean nullPriorityAsync) { PollingConsumer pollingConsumer = getConsumer("direct:provisionGroupPort"); Map<String, Object> props = new HashMap<>(); props.put("resources", resources); props.put("nullPriorityAsync", nullPriorityAsync); sendMessage("direct:provisionGroup", key, props); Exchange exchange = pollingConsumer.receive(); if (exchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) { throw (RuntimeException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT); } return exchange.getIn().getBody(List.class); }
@Override @SuppressWarnings("unchecked") public List<PropagationStatus> deprovision( final String key, final Collection<String> resources, final boolean nullPriorityAsync) { PollingConsumer pollingConsumer = getConsumer("direct:deprovisionGroupPort"); Map<String, Object> props = new HashMap<>(); props.put("resources", resources); props.put("nullPriorityAsync", nullPriorityAsync); sendMessage("direct:deprovisionGroup", key, props); Exchange exchange = pollingConsumer.receive(); if (exchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) { throw (RuntimeException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT); } return exchange.getIn().getBody(List.class); }
@Override @SuppressWarnings("unchecked") public Pair<UserPatch, List<PropagationStatus>> update(final UserPatch userPatch, final boolean nullPriorityAsync) { PollingConsumer pollingConsumer = getConsumer("direct:updatePort"); Map<String, Object> props = new HashMap<>(); props.put("nullPriorityAsync", nullPriorityAsync); sendMessage("direct:updateUser", userPatch, props); Exchange exchange = pollingConsumer.receive(); if (exchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) { throw (RuntimeException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT); } return exchange.getIn().getBody(Pair.class); }
@Transactional(propagation = Propagation.REQUIRES_NEW) @Override @SuppressWarnings("unchecked") public List<PropagationStatus> delete( final String key, final Set<String> excludedResources, final boolean nullPriorityAsync) { PollingConsumer pollingConsumer = getConsumer("direct:deletePort"); Map<String, Object> props = new HashMap<>(); props.put("excludedResources", excludedResources); props.put("nullPriorityAsync", nullPriorityAsync); sendMessage("direct:deleteUser", key, props); Exchange exchange = pollingConsumer.receive(); if (exchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) { throw (RuntimeException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT); } return exchange.getIn().getBody(List.class); }
@Override @SuppressWarnings("unchecked") public List<PropagationStatus> deprovision( final String user, final Collection<String> resources, final boolean nullPriorityAsync) { PollingConsumer pollingConsumer = getConsumer("direct:deprovisionPort"); Map<String, Object> props = new HashMap<>(); props.put("resources", resources); props.put("nullPriorityAsync", nullPriorityAsync); sendMessage("direct:deprovisionUser", user, props); Exchange exchange = pollingConsumer.receive(); if (exchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) { throw (RuntimeException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT); } return exchange.getIn().getBody(List.class); }
@Override public void confirmPasswordReset(final String key, final String token, final String password) { PollingConsumer pollingConsumer = getConsumer("direct:confirmPwdResetPort"); Map<String, Object> props = new HashMap<>(); props.put("key", key); props.put("token", token); props.put("password", password); sendMessage("direct:confirmPwdReset", key, props); Exchange exchange = pollingConsumer.receive(); if (exchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) { throw (RuntimeException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT); } }
protected PollingConsumer getConsumer(final String uri) { if (!knownURIs.contains(uri)) { knownURIs.add(uri); Endpoint endpoint = contextFactory.getContext().getEndpoint(uri); PollingConsumer pollingConsumer = null; try { pollingConsumer = endpoint.createPollingConsumer(); consumerMap.put(uri, pollingConsumer); pollingConsumer.start(); } catch (Exception ex) { LOG.error("Unexpected error in Consumer creation ", ex); } return pollingConsumer; } else { return consumerMap.get(uri); } }
@Test public void testDeployedContext() throws Exception { CamelContextRegistry registry = ServiceLocator.getRequiredService(CamelContextRegistry.class); CamelContext camelctx = registry.getCamelContext("netty-context"); Assert.assertNotNull("CamelContext not null", camelctx); Assert.assertEquals(ServiceStatus.Started, camelctx.getStatus()); PollingConsumer pollingConsumer = camelctx.getEndpoint("seda:end").createPollingConsumer(); pollingConsumer.start(); Socket socket = new Socket(SOCKET_HOST, 7999); socket.setKeepAlive(true); PrintWriter out = new PrintWriter(socket.getOutputStream(), true); try { out.write("Kermit\n"); } finally { out.close(); socket.close(); } String result = pollingConsumer.receive().getIn().getBody(String.class); Assert.assertEquals("Hello Kermit", result); }
@Test public void testConsumeAtomFeed() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("atom://http://localhost:8080/atom-test/atom/feed?splitEntries=true") .to("seda:end"); } }); PollingConsumer pollingConsumer = camelctx.getEndpoint("seda:end").createPollingConsumer(); pollingConsumer.start(); camelctx.start(); try { Entry result = pollingConsumer.receive(5000).getIn().getBody(Entry.class); Assert.assertEquals(FeedConstants.ENTRY_TITLE, result.getTitle()); Assert.assertEquals(FeedConstants.ENTRY_CONTENT, result.getContent()); } finally { camelctx.stop(); } }
@Test public void testJMSTransactionToDLQ() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.addComponent("jms", jmsComponent); camelctx.addRoutes(configureJmsRoutes()); camelctx.start(); PollingConsumer consumer = camelctx.getEndpoint("seda:dlq").createPollingConsumer(); consumer.start(); // Send a message to queue camel-jms-queue-one Connection connection = connectionFactory.createConnection(); sendMessage(connection, JmsQueue.QUEUE_ONE.getJndiName(), "Hello Bob"); // The JMS transaction should have been rolled back and the message sent to the DLQ String result = consumer.receive().getIn().getBody(String.class); Assert.assertNotNull(result); Assert.assertEquals("Hello Bob", result); connection.close(); camelctx.stop(); }
@Test public void testJMSTransaction() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.addComponent("jms", jmsComponent); camelctx.addRoutes(configureJmsRoutes()); camelctx.start(); PollingConsumer consumer = camelctx.getEndpoint("seda:success").createPollingConsumer(); consumer.start(); // Send a message to queue camel-jms-queue-one Connection connection = connectionFactory.createConnection(); sendMessage(connection, JmsQueue.QUEUE_ONE.getJndiName(), "Hello Kermit"); // The JMS transaction should have been committed and the message payload sent to the direct:success endpoint String result = consumer.receive(3000).getIn().getBody(String.class); Assert.assertNotNull(result); Assert.assertEquals("Hello Kermit", result); connection.close(); camelctx.stop(); }
@Test public void testSQLEndpoint() throws Exception { Assert.assertNotNull("DataSource not null", dataSource); CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("sql:select name from information_schema.users?dataSource=java:jboss/datasources/ExampleDS") .to("seda:end"); } }); PollingConsumer pollingConsumer = camelctx.getEndpoint("seda:end").createPollingConsumer(); pollingConsumer.start(); camelctx.start(); try { String result = (String) pollingConsumer.receive(3000).getIn().getBody(Map.class).get("NAME"); Assert.assertEquals("SA", result); } finally { camelctx.stop(); } }
@Test public void testSQLEndpointWithCDIContext() throws Exception { try { deployer.deploy(CAMEL_SQL_CDI_ROUTES_JAR); CamelContext camelctx = contextRegistry.getCamelContext("camel-sql-cdi-context"); Assert.assertNotNull("Camel context not null", camelctx); PollingConsumer pollingConsumer = camelctx.getEndpoint("seda:end").createPollingConsumer(); pollingConsumer.start(); String result = (String) pollingConsumer.receive(3000).getIn().getBody(Map.class).get("NAME"); Assert.assertEquals("SA", result); } finally { deployer.undeploy(CAMEL_SQL_CDI_ROUTES_JAR); } }
@Test public void testMessageConsumerRoute() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("jms:queue:" + QUEUE_NAME + "?connectionFactory=ConnectionFactory"). transform(body().prepend("Hello ")).to("direct:end"); } }); camelctx.start(); PollingConsumer consumer = camelctx.getEndpoint("direct:end").createPollingConsumer(); consumer.start(); try { // Send a message to the queue InitialContext initialctx = new InitialContext(); ConnectionFactory cfactory = (ConnectionFactory) initialctx.lookup("java:/ConnectionFactory"); Connection connection = cfactory.createConnection(); try { sendMessage(connection, QUEUE_JNDI_NAME, "Kermit"); String result = consumer.receive().getIn().getBody(String.class); Assert.assertEquals("Hello Kermit", result); } finally { connection.close(); } } finally { camelctx.stop(); } }
@Test public void testMessageConsumerRoute() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("jms:queue:" + Main.QUEUE_NAME + "?connectionFactory=ConnectionFactory"). transform(body().prepend("Hello ")).to("direct:end"); } }); camelctx.start(); PollingConsumer consumer = camelctx.getEndpoint("direct:end").createPollingConsumer(); consumer.start(); try { // Send a message to the queue InitialContext initialctx = new InitialContext(); ConnectionFactory cfactory = (ConnectionFactory) initialctx.lookup("java:/ConnectionFactory"); Connection connection = cfactory.createConnection(); try { sendMessage(connection, Main.QUEUE_JNDI_NAME, "Kermit"); String result = consumer.receive().getIn().getBody(String.class); Assert.assertEquals("Hello Kermit", result); } finally { connection.close(); } } finally { camelctx.stop(); } }
public PollingConsumer createPollingConsumer() throws Exception { // should not call configurePollingConsumer when its EventDrivenPollingConsumer if (LOG.isDebugEnabled()) { LOG.debug("Creating EventDrivenPollingConsumer with queueSize: {} blockWhenFull: {} blockTimeout: {}", new Object[]{getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull(), getPollingConsumerBlockTimeout()}); } EventDrivenPollingConsumer consumer = new EventDrivenPollingConsumer(this, getPollingConsumerQueueSize()); consumer.setBlockWhenFull(isPollingConsumerBlockWhenFull()); consumer.setBlockTimeout(getPollingConsumerBlockTimeout()); return consumer; }
@Override public void releasePollingConsumer(Endpoint endpoint, PollingConsumer pollingConsumer) { // stop and shutdown the consumer as its not cache or reused try { ServiceHelper.stopAndShutdownService(pollingConsumer); } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } }
/** * Creates the object to be injected for an {@link org.apache.camel.EndpointInject} or {@link org.apache.camel.Produce} injection point */ public Object getInjectionValue(Class<?> type, String endpointUri, String endpointRef, String endpointProperty, String injectionPointName, Object bean, String beanName) { if (type.isAssignableFrom(ProducerTemplate.class)) { return createInjectionProducerTemplate(endpointUri, endpointRef, endpointProperty, injectionPointName, bean); } else if (type.isAssignableFrom(ConsumerTemplate.class)) { return createInjectionConsumerTemplate(endpointUri, endpointRef, endpointProperty, injectionPointName); } else { Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointRef, endpointProperty, injectionPointName, true); if (endpoint != null) { if (type.isInstance(endpoint)) { return endpoint; } else if (type.isAssignableFrom(Producer.class)) { return createInjectionProducer(endpoint, bean, beanName); } else if (type.isAssignableFrom(PollingConsumer.class)) { return createInjectionPollingConsumer(endpoint, bean, beanName); } else if (type.isInterface()) { // lets create a proxy try { return ProxyHelper.createProxy(endpoint, type); } catch (Exception e) { throw createProxyInstantiationRuntimeException(type, endpoint, e); } } else { throw new IllegalArgumentException("Invalid type: " + type.getName() + " which cannot be injected via @EndpointInject/@Produce for: " + endpoint); } } return null; } }
/** * Factory method to create a started {@link org.apache.camel.PollingConsumer} to be injected into a POJO */ protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint, Object bean, String beanName) { try { PollingConsumer consumer = endpoint.createPollingConsumer(); startService(consumer, endpoint.getCamelContext(), bean, beanName); return consumer; } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } }
protected synchronized PollingConsumer doGetPollingConsumer(Endpoint endpoint, boolean pooled) { String key = endpoint.getEndpointUri(); PollingConsumer answer = consumers.get(key); if (pooled && answer == null) { pool.acquire(endpoint); } if (answer == null) { try { answer = endpoint.createPollingConsumer(); answer.start(); } catch (Exception e) { throw new FailedToCreateConsumerException(endpoint, e); } if (pooled && answer instanceof ServicePoolAware) { LOG.debug("Adding to producer service pool with key: {} for producer: {}", endpoint, answer); answer = pool.addAndAcquire(endpoint, answer); } else { boolean singleton = false; if (answer instanceof IsSingleton) { singleton = ((IsSingleton) answer).isSingleton(); } if (singleton) { LOG.debug("Adding to consumer cache with key: {} for consumer: {}", endpoint, answer); consumers.put(key, answer); } else { LOG.debug("Consumer for endpoint: {} is not singleton and thus not added to consumer cache", key); } } } if (answer != null) { // record statistics if (extendedStatistics) { statistics.onHit(key); } } return answer; }
public Exchange receive(Endpoint endpoint) { LOG.debug("<<<< {}", endpoint); PollingConsumer consumer = null; try { consumer = acquirePollingConsumer(endpoint); return consumer.receive(); } finally { if (consumer != null) { releasePollingConsumer(endpoint, consumer); } } }