Java 类org.apache.camel.component.kafka.KafkaConstants 实例源码

项目:deeplearning4j    文件:Dl4jServingRouteTest.java   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    DataSetIterator iter = new IrisDataSetIterator(150, 150);
    next = iter.next();
    next.normalizeZeroMeanZeroUnitVariance();

    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            final String kafkaUri = String.format("kafka:%s?topic=%s&groupId=dl4j-serving",
                            kafkaCluster.getBrokerList(), topicName);
            from("direct:start").process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    final INDArray arr = next.getFeatureMatrix();
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    DataOutputStream dos = new DataOutputStream(bos);
                    Nd4j.write(arr, dos);
                    byte[] bytes = bos.toByteArray();
                    String base64 = Base64.encodeBase64String(bytes);
                    exchange.getIn().setBody(base64, String.class);
                    exchange.getIn().setHeader(KafkaConstants.KEY, UUID.randomUUID().toString());
                    exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, "1");
                }
            }).to(kafkaUri);
        }
    };
}
项目:wildfly-camel    文件:KafkaConsumerIntegrationTest.java   
private KafkaProducer<String, String> createKafkaProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + KAFKA_PORT);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
    props.put(ProducerConfig.ACKS_CONFIG, "1");
    ClassLoader tccl = Thread.currentThread().getContextClassLoader();
    try {
        Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
        return new KafkaProducer<String, String>(props);
    } finally {
        Thread.currentThread().setContextClassLoader(tccl);
    }
}
项目:wildfly-camel    文件:KafkaProducerIntegrationTest.java   
@Test
public void producedStringMessageIsReceivedByKafka() throws Exception {

    String epuri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1";

    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start").to(epuri);
        }
    });

    KafkaComponent kafka = new KafkaComponent();
    kafka.setBrokers("localhost:" + KAFKA_PORT);
    camelctx.addComponent("kafka", kafka);

    camelctx.start();
    try {
        ProducerTemplate template = camelctx.createProducerTemplate();

        sendMessagesInRoute(10, template, "IT test message", KafkaConstants.PARTITION_KEY, "1");
        sendMessagesInRoute(5, template, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER);

        CountDownLatch latch = new CountDownLatch(15);

        boolean allReceived;
        try (KafkaConsumer<String, String> consumer = createKafkaConsumer()) {
            consumeKafkaMessages(consumer, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, latch);
            allReceived = latch.await(2, TimeUnit.SECONDS);
        }

        Assert.assertTrue("Messages published to the kafka topics were received: " + latch.getCount(), allReceived);
    } finally {
        camelctx.stop();
    }
}