@Override protected RouteBuilder createRouteBuilder() throws Exception { DataSetIterator iter = new IrisDataSetIterator(150, 150); next = iter.next(); next.normalizeZeroMeanZeroUnitVariance(); return new RouteBuilder() { @Override public void configure() throws Exception { final String kafkaUri = String.format("kafka:%s?topic=%s&groupId=dl4j-serving", kafkaCluster.getBrokerList(), topicName); from("direct:start").process(new Processor() { @Override public void process(Exchange exchange) throws Exception { final INDArray arr = next.getFeatureMatrix(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); Nd4j.write(arr, dos); byte[] bytes = bos.toByteArray(); String base64 = Base64.encodeBase64String(bytes); exchange.getIn().setBody(base64, String.class); exchange.getIn().setHeader(KafkaConstants.KEY, UUID.randomUUID().toString()); exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, "1"); } }).to(kafkaUri); } }; }
private KafkaProducer<String, String> createKafkaProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + KAFKA_PORT); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER); props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER); props.put(ProducerConfig.ACKS_CONFIG, "1"); ClassLoader tccl = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader()); return new KafkaProducer<String, String>(props); } finally { Thread.currentThread().setContextClassLoader(tccl); } }
@Test public void producedStringMessageIsReceivedByKafka() throws Exception { String epuri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1"; CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").to(epuri); } }); KafkaComponent kafka = new KafkaComponent(); kafka.setBrokers("localhost:" + KAFKA_PORT); camelctx.addComponent("kafka", kafka); camelctx.start(); try { ProducerTemplate template = camelctx.createProducerTemplate(); sendMessagesInRoute(10, template, "IT test message", KafkaConstants.PARTITION_KEY, "1"); sendMessagesInRoute(5, template, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER); CountDownLatch latch = new CountDownLatch(15); boolean allReceived; try (KafkaConsumer<String, String> consumer = createKafkaConsumer()) { consumeKafkaMessages(consumer, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, latch); allReceived = latch.await(2, TimeUnit.SECONDS); } Assert.assertTrue("Messages published to the kafka topics were received: " + latch.getCount(), allReceived); } finally { camelctx.stop(); } }