/** * Creates default message formats. */ private void createDefaultMessageFormats() { final Map<String, String> defaultFormats = new HashMap<>(); defaultFormats.put("Short", ShortDeserializer.class.getName()); defaultFormats.put("ByteArray", ByteArrayDeserializer.class.getName()); defaultFormats.put("Bytes", BytesDeserializer.class.getName()); defaultFormats.put("Double", DoubleDeserializer.class.getName()); defaultFormats.put("Float", FloatDeserializer.class.getName()); defaultFormats.put("Integer", IntegerDeserializer.class.getName()); defaultFormats.put("Long", LongDeserializer.class.getName()); defaultFormats.put("String", StringDeserializer.class.getName()); // Create if needed. for (final Map.Entry<String, String> entry : defaultFormats.entrySet()) { MessageFormat messageFormat = messageFormatRepository.findByName(entry.getKey()); if (messageFormat == null) { messageFormat = new MessageFormat(); } messageFormat.setName(entry.getKey()); messageFormat.setClasspath(entry.getValue()); messageFormat.setJar("n/a"); messageFormat.setDefaultFormat(true); messageFormatRepository.save(messageFormat); } }
private static void consumeRecords(String bootstrapServers) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "string-consumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); Consumer<Integer, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(TOPIC)); ConsumerRecords<Integer, String> records = consumer.poll(10000); for (ConsumerRecord<Integer, String> record : records) out.printf( "key = %s value = %s%n", record.key(), record.value()); consumer.close(); }
private static void consumeRecords(String bootstrapServers) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "metadata-consumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); Consumer<Integer, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(TOPIC)); ConsumerRecords<Integer, String> records = consumer.poll(10000); for (ConsumerRecord<Integer, String> record : records) { System.out.printf("key = %s value = %s\t", record.key(), record.value()); System.out.printf("ProducerRecord: topic=>%s partition=>%s offset=>%s timestamp=>%s checksum=>%s", record.topic(), record.partition(), record.offset(), FORMATTER.format(Instant.ofEpochMilli(record.timestamp())), record.checksum()); System.out.println(); } consumer.close(); }
private List<String> receiveMessages(final Deserializer<?> valueDeserializer, final int numMessages, final String topic) throws InterruptedException { final Properties config = new Properties(); config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test"); config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); final List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived( config, topic, numMessages, 60 * 1000); Collections.sort(received); return received; }
private Properties setProduceConsumeProperties(final String clientId) { Properties props = new Properties(); props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); // the socket buffer needs to be large, especially when running in AWS with // high latency. if running locally the default is fine. props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // the socket buffer needs to be large, especially when running in AWS with // high latency. if running locally the default is fine. props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS); return props; }
@Test public void testSinkWithInteger() throws InterruptedException { KafkaUsage usage = new KafkaUsage(); String topic = UUID.randomUUID().toString(); CountDownLatch latch = new CountDownLatch(1); AtomicInteger expected = new AtomicInteger(2); usage.consumeIntegers(topic, 10, 10, TimeUnit.SECONDS, latch::countDown, (k, v) -> v == expected.getAndIncrement()); KafkaSink<Integer> sink = new KafkaSink<>(vertx, getKafkaConfig() .put("topic", topic) .put("value.serializer", IntegerSerializer.class.getName()) .put("value.deserializer", IntegerDeserializer.class.getName()) ); Source.from(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .transformPayload(i -> i + 1) .to(sink); assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue(); }
@Test public void testSource() throws InterruptedException { KafkaUsage usage = new KafkaUsage(); String topic = UUID.randomUUID().toString(); List<Integer> results = new ArrayList<>(); KafkaSource<Integer> source = new KafkaSource<>(vertx, getKafkaConfig() .put("topic", topic) .put("value.serializer", IntegerSerializer.class.getName()) .put("value.deserializer", IntegerDeserializer.class.getName()) ); source .transformPayload(i -> i + 1) .to(Sink.forEachPayload(results::add)); AtomicInteger counter = new AtomicInteger(); usage.produceIntegers(10, null, () -> new ProducerRecord<>(topic, counter.getAndIncrement())); await().atMost(1, TimeUnit.MINUTES).until(() -> results.size() >= 10); assertThat(results).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); }
@Test public void testCommonHeaders(TestContext context) throws InterruptedException { Async async = context.async(); KafkaUsage usage = new KafkaUsage(); String topic = UUID.randomUUID().toString(); KafkaSource<Integer> source = new KafkaSource<>(vertx, getKafkaConfig() .put("topic", topic) .put("value.serializer", IntegerSerializer.class.getName()) .put("value.deserializer", IntegerDeserializer.class.getName()) ); source .to(data -> { KafkaConsumerRecord record = original(data); assertThat(record).isNotNull(); assertThat(key(data)).isNotNull(); async.complete(); return complete(); }); usage.produceIntegers(1, null, () -> new ProducerRecord<>(topic, "key", 1)); }
@Test public void testMulticastWithBufferSize() throws InterruptedException { KafkaUsage usage = new KafkaUsage(); String topic = UUID.randomUUID().toString(); KafkaSource<Integer> source = new KafkaSource<>(vertx, getKafkaConfig() .put("topic", topic) .put("value.serializer", IntegerSerializer.class.getName()) .put("value.deserializer", IntegerDeserializer.class.getName()) .put("multicast.buffer.size", 20) ); assertThat(source).isNotNull(); checkMulticast(usage, topic, source); }
@Test public void testMulticastWithTime() throws InterruptedException { KafkaUsage usage = new KafkaUsage(); String topic = UUID.randomUUID().toString(); KafkaSource<Integer> source = new KafkaSource<>(vertx, getKafkaConfig() .put("topic", topic) .put("value.serializer", IntegerSerializer.class.getName()) .put("value.deserializer", IntegerDeserializer.class.getName()) .put("multicast.buffer.period.ms", 2000) ); assertThat(source).isNotNull(); checkMulticast(usage, topic, source); }
@Test public void testAutoCommit() throws Exception { LOG.info("Start testAutoCommit"); ContainerProperties containerProps = new ContainerProperties("topic3", "topic4"); final CountDownLatch latch = new CountDownLatch(4); containerProps.setMessageListener((MessageListener<Integer, String>) message -> { LOG.info("received: " + message); latch.countDown(); }); KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps, IntegerDeserializer.class, StringDeserializer.class); container.setBeanName("testAutoCommit"); container.start(); Thread.sleep(5000); // wait a bit for the container to start KafkaTemplate<Integer, String> template = createTemplate(IntegerSerializer.class, StringSerializer.class); template.setDefaultTopic("topic3"); template.sendDefault(0, "foo"); template.sendDefault(2, "bar"); template.sendDefault(0, "baz"); template.sendDefault(2, "qux"); template.flush(); assertTrue(latch.await(60, TimeUnit.SECONDS)); container.stop(); LOG.info("Stop testAutoCommit"); }
public static void main(String[] args) { String topic = "persistent://sample/standalone/ns/my-topic"; Properties props = new Properties(); props.put("bootstrap.servers", "pulsar://localhost:6650"); props.put("group.id", "my-subscription-name"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", IntegerDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); Consumer<Integer, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords<Integer, String> records = consumer.poll(100); records.forEach(record -> { log.info("Received record: {}", record); }); // Commit last offset consumer.commitSync(); } }
/** * Creates a consumer with two topics, with 10 partitions each. * numElements are (round-robin) assigned all the 20 partitions. */ private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform( int numElements, int maxNumRecords, @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) { List<String> topics = ImmutableList.of("topic_a", "topic_b"); KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read() .withBootstrapServers("myServer1:9092,myServer2:9092") .withTopics(topics) .withConsumerFactoryFn(new ConsumerFactoryFn( topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(maxNumRecords); if (timestampFn != null) { return reader.withTimestampFn(timestampFn); } else { return reader; } }
@Test public void testUnboundedSourceWithSingleTopic() { // same as testUnboundedSource, but with single topic int numElements = 1000; String topic = "my_topic"; KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read() .withBootstrapServers("none") .withTopic("my_topic") .withConsumerFactoryFn(new ConsumerFactoryFn( ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST)) .withMaxNumRecords(numElements) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class); PCollection<Long> input = p .apply(reader.withoutMetadata()) .apply(Values.<Long>create()); addCountingAsserts(input, numElements); p.run(); }
@Override public void run() { logger.info("Worker thread started"); try (Consumer<String, Integer> consumer = new KafkaConsumer<>(configuration, new StringDeserializer(), new IntegerDeserializer())) { SeekingConsumerLogic logic = new SeekingConsumerLogic(consumer, stateDao, messagesToChangeState, percentFailureProbability); consumer.subscribe(Collections.singletonList(topic), logic); while (!finish.get()) { ConsumerRecords<String, Integer> records = consumer.poll(pollTimeout.toMillis()); long startTime = System.nanoTime(); logic.processMessages(records); long duration = System.nanoTime() - startTime; logger.debug("Processing of poll batch finished: {} messages, {} ms", records.count(), TimeUnit.NANOSECONDS.toMillis(duration)); } logic.optionallyCommitAllOffsets(); } catch (Exception e) { logger.error("Unexpected exception occurred: {}", e.toString(), e); } logger.info("Worker thread stopped"); }
@Override public void run() { logger.info("Worker thread started"); try (Consumer<String, Integer> consumer = new KafkaConsumer<>(configuration, new StringDeserializer(), new IntegerDeserializer())) { consumer.subscribe(Collections.singletonList(topic), this); while (!finish.get()) { ConsumerRecords<String, Integer> records = consumer.poll(pollTimeout.toMillis()); long startTime = System.nanoTime(); for (ConsumerRecord<String, Integer> record : records) { logger.trace("Message consumed: {}, {}, {}-{}/{}", record.key(), record.value(), record.topic(), record.partition(), record.offset()); stateDao.markConsume(ConsumerType.autocommit, UUID.fromString(record.key()), record.value()); } long duration = System.nanoTime() - startTime; logger.debug("Processing of poll batch finished: {} messages, {} ms", records.count(), TimeUnit.NANOSECONDS.toMillis(duration)); } } catch (Exception e) { logger.error("Unexpected exception occurred: {}", e.toString(), e); } logger.info("Worker thread stopped"); }
public void doWork(Properties properties, ProteusTask task) { topicsList.add(ConsumerUtils.getTopicName(runnerProperties.getProperty("eu.proteus.kafkaTopic"))); properties.put("bootstrap.servers", properties.get("com.treelogic.proteus.kafka.bootstrapServers")); properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerSerializer"); properties.put("value.deserializer", ProteusSerializer.class.getName()); properties.put("group.id", "proteus-" + ConsumerUtils.getTopicName(runnerProperties.getProperty("eu.proteus.kafkaTopic"))); properties.put("max.poll.records", 100); properties.put("session.timeout.ms", 60000); properties.put("request.timeout.ms", 80000); properties.put("fetch.max.wati.ms", 60000); properties.put("auto.offset.reset", "latest"); kafkaConsumer = new KafkaConsumer<>(properties, new IntegerDeserializer(), new ProteusSerializer()); kafkaConsumer.subscribe(topicsList); try { while (true) { ConsumerRecords<Integer, Measurement> records = kafkaConsumer.poll(Long.MAX_VALUE); for (ConsumerRecord<Integer, Measurement> record : records) { logger.info("Task " + this.getClass().getSimpleName() + " doing work for coil " + record.value().getCoilID() + " on topic " + ConsumerUtils.getTopicName(runnerProperties.getProperty("eu.proteus.kafkaTopic"))); task.doWork(record.key(), record.value(), proteusBucket, topicsList); } } } finally { System.out.println("Cerrariamos la ejecución del hilo < " + this.runnerProperties.getProperty("eu.proteus.kafkaTopic") + " >"); } }
public static void main(String[] args) { ArrayList<String> topicsList = new ArrayList<String>(); HashMap<String, Object> kafkaProperties = new HashMap<String, Object>(); topicsList.add("proteus-flatness"); kafkaProperties.put("bootstrap.servers", "192.168.4.246:6667,192.168.4.247:6667,192.168.4.248:6667"); kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); kafkaProperties.put("value.deserializer", ProteusSerializer.class.getName()); kafkaProperties.put("group.id", "proteus"); KafkaConsumer<Integer, Measurement> kafkaConsumer; ProteusSerializer myValueDeserializer = new ProteusSerializer(); IntegerDeserializer keyDeserializer = new IntegerDeserializer(); kafkaConsumer = new KafkaConsumer<Integer, Measurement>(kafkaProperties, keyDeserializer, myValueDeserializer); kafkaConsumer.subscribe(topicsList); try { while (true) { ConsumerRecords<Integer, Measurement> records = kafkaConsumer.poll(1); for (ConsumerRecord<Integer, Measurement> record : records) { System.out.println("traza"); System.out.println(record); } } } finally { kafkaConsumer.close(); } }
public static void main(String[] args) { ArrayList<String> topicsList = new ArrayList<String>(); HashMap<String, Object> kafkaProperties = new HashMap<String, Object>(); topicsList.add("proteus-realtime"); kafkaProperties.put("bootstrap.servers", "192.168.4.246:6667,192.168.4.247:6667,192.168.4.248:6667"); kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); kafkaProperties.put("value.deserializer", ProteusSerializer.class.getName()); kafkaProperties.put("group.id", "proteus"); KafkaConsumer<Integer, Measurement> kafkaConsumer; kafkaConsumer = new KafkaConsumer<Integer, Measurement>(kafkaProperties, new IntegerDeserializer(), new ProteusSerializer()); kafkaConsumer.subscribe(topicsList); try { while (true) { ConsumerRecords<Integer, Measurement> records = kafkaConsumer.poll(1); for (ConsumerRecord<Integer, Measurement> record : records) { System.out.println("record realtime: " + record.toString()); } } } finally { kafkaConsumer.close(); } }
@Test public void shouldAggregate() throws Exception { produceMessages(mockTime.milliseconds()); groupedStream.aggregate( initializer, aggregator, Serdes.Integer(), "aggregate-by-selected-key") .to(Serdes.String(), Serdes.Integer(), outputTopic); startStreams(); produceMessages(mockTime.milliseconds()); final List<KeyValue<String, Integer>> results = receiveMessages( new StringDeserializer(), new IntegerDeserializer(), 10); Collections.sort(results, new Comparator<KeyValue<String, Integer>>() { @Override public int compare(final KeyValue<String, Integer> o1, final KeyValue<String, Integer> o2) { return KStreamAggregationIntegrationTest.compare(o1, o2); } }); assertThat(results, is(Arrays.asList( KeyValue.pair("A", 1), KeyValue.pair("A", 2), KeyValue.pair("B", 1), KeyValue.pair("B", 2), KeyValue.pair("C", 1), KeyValue.pair("C", 2), KeyValue.pair("D", 1), KeyValue.pair("D", 2), KeyValue.pair("E", 1), KeyValue.pair("E", 2) ))); }
@Before public void before() { sourceOne = new MockSourceNode<>(new String[]{"t1"}, new StringDeserializer(), new StringDeserializer()); sourceTwo = new MockSourceNode<>(new String[]{"t2"}, new IntegerDeserializer(), new IntegerDeserializer()); processorNodes = Arrays.asList(sourceOne, sourceTwo, new MockProcessorNode<>(-1), new MockProcessorNode<>(-1)); final Set<String> storeNames = Utils.mkSet("t1-store", "t2-store"); final Map<String, SourceNode> sourceByTopics = new HashMap<>(); sourceByTopics.put("t1", sourceOne); sourceByTopics.put("t2", sourceTwo); final Map<String, String> storeToTopic = new HashMap<>(); storeToTopic.put("t1-store", "t1"); storeToTopic.put("t2-store", "t2"); final ProcessorTopology topology = new ProcessorTopology(processorNodes, sourceByTopics, Collections.<String, SinkNode>emptyMap(), Collections.<StateStore>emptyList(), storeToTopic, Collections.<StateStore>emptyList()); context = new NoOpProcessorContext(); t1 = new TopicPartition("t1", 1); t2 = new TopicPartition("t2", 1); offsets = new HashMap<>(); offsets.put(t1, 50L); offsets.put(t2, 100L); stateMgr = new GlobalStateManagerStub(storeNames, offsets); globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr); }
public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProductJsonDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, appName + " #product"); propsMap.put(ConsumerConfig.CLIENT_ID_CONFIG, appName); return propsMap; }
@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<String, Object>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092"); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return propsMap; }
@Bean public Map<String, Object> consumerConfigs() { HashMap<String, Object> props = new HashMap<>(); // list of host:port pairs used for establishing the initial connections // to the Kakfa cluster props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // consumer groups allow a pool of processes to divide the work of // consuming and processing records props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld"); return props; }
@Test public void testUnreachableKafkaBrokers() { // Expect an exception when the Kafka brokers are not reachable on the workers. // We specify partitions explicitly so that splitting does not involve server interaction. // Set request timeout to 10ms so that test does not take long. thrown.expect(Exception.class); thrown.expectMessage("Reader-0: Timeout while initializing partition 'test-0'"); int numElements = 1000; PCollection<Long> input = p .apply(KafkaIO.<Integer, Long>read() .withBootstrapServers("8.8.8.8:9092") // Google public DNS ip. .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 0))) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class) .updateConsumerProperties(ImmutableMap.<String, Object>of( ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10, ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 8, ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 8)) .withMaxNumRecords(10) .withoutMetadata()) .apply(Values.<Long>create()); addCountingAsserts(input, numElements); p.run(); }
@Test public void testUnboundedSourceSplits() throws Exception { int numElements = 1000; int numSplits = 10; // Coders must be specified explicitly here due to the way the transform // is used in the test. UnboundedSource<KafkaRecord<Integer, Long>, ?> initial = mkKafkaReadTransform(numElements, null) .withKeyDeserializerAndCoder(IntegerDeserializer.class, BigEndianIntegerCoder.of()) .withValueDeserializerAndCoder(LongDeserializer.class, BigEndianLongCoder.of()) .makeSource(); List<? extends UnboundedSource<KafkaRecord<Integer, Long>, ?>> splits = initial.split(numSplits, p.getOptions()); assertEquals("Expected exact splitting", numSplits, splits.size()); long elementsPerSplit = numElements / numSplits; assertEquals("Expected even splits", numElements, elementsPerSplit * numSplits); PCollectionList<Long> pcollections = PCollectionList.empty(p); for (int i = 0; i < splits.size(); ++i) { pcollections = pcollections.and( p.apply("split" + i, Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit)) .apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata<Integer, Long>())) .apply("collection " + i, Values.<Long>create())); } PCollection<Long> input = pcollections.apply(Flatten.<Long>pCollections()); addCountingAsserts(input, numElements); p.run(); }
@Bean public Map<String, Object> consumerConfigs() { //FIXME: 12factorize Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList()); props.put(ConsumerConfig.GROUP_ID_CONFIG, getApplicationGroup()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; }
@Before public void before() throws Exception { String brokerConnectionString = createKafkaCluster(); properties = getProperties(brokerConnectionString, IntegerDeserializer.class, StringDeserializer.class); topic1Name = randomString(); topic2Name = randomString(); createTopic(topic1Name, INITIAL_PARTITION_COUNT); createTopic(topic2Name, INITIAL_PARTITION_COUNT); }
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamingConfig.JOB_ID_CONFIG, "moving-avg-example"); props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); StreamingConfig config = new StreamingConfig(props); KStreamBuilder builder = new KStreamBuilder(); KStream<String, Integer> prices = builder.stream("ks_prices"); KTable<String, String> names = builder.table("ks_names"); KStream<String, Integer> namedPrices = prices.leftJoin(names, (price, name) -> { return new NamedPrice(name, price); }).map((ticket, namedPrice) -> new KeyValue<String, Integer>(namedPrice.name, namedPrice.price)); KTable<Windowed<String>, AvgValue> tempTable = namedPrices.<AvgValue, TumblingWindow>aggregateByKey( () -> new AvgAggregator<String, Integer, AvgValue>(), TumblingWindows.of("avgWindow").with(10000), new StringSerializer(), new AvgValueSerializer(), new StringDeserializer(), new AvgValueDeserializer()); // Should work after we implement "aggregateByKey KTable<Windowed<String>, Double> avg = tempTable.<Double>mapValues((v) -> ((double) v.sum / v.count)); avg.to("ks_avg_prices"); KafkaStreaming kstream = new KafkaStreaming(builder, config); kstream.start(); }
public static void main(String[] args) { CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder().build(); clusterCouchbase = CouchbaseCluster.create(env, "192.168.4.246", "192.168.4.247", "192.168.4.248"); proteusBucket = clusterCouchbase.openBucket("proteus-testing"); documentCounter = 15; ArrayList<String> topicsList = new ArrayList<String>(); HashMap<String, Object> kafkaProperties = new HashMap<String, Object>(); topicsList.add("proteus-realtime"); kafkaProperties.put("bootstrap.servers", "192.168.4.246:6667,192.168.4.247:6667,192.168.4.248:6667"); kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); kafkaProperties.put("value.deserializer", ProteusSerializer.class.getName()); kafkaProperties.put("group.id", "proteus"); KafkaConsumer<Integer, Measurement> kafkaConsumer = new KafkaConsumer<Integer, Measurement>(kafkaProperties); ProteusSerializer myValueDeserializer = new ProteusSerializer(); IntegerDeserializer keyDeserializer = new IntegerDeserializer(); kafkaConsumer = new KafkaConsumer<>(kafkaProperties, keyDeserializer, myValueDeserializer); kafkaConsumer.subscribe(topicsList); try { while (true) { ConsumerRecords<Integer, Measurement> records = kafkaConsumer.poll(1); for (ConsumerRecord<Integer, Measurement> record : records) { System.out.println("Key: " + record.key()); if (!CouchbaseUtils.checkIfDocumentExists(String.valueOf(record.key()), proteusBucket)) { CouchbaseUtils.createDocumentFirstTime(String.valueOf(record.key()), record.value(), topicsList, proteusBucket); } else { CouchbaseUtils.updateDocument(proteusBucket, topicsList, record.value()); } } } } finally { kafkaConsumer.close(); } }
public static void main(String[] args) { CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder().build(); clusterCouchbase = CouchbaseCluster.create(env, "192.168.4.246", "192.168.4.247", "192.168.4.248"); proteusBucket = clusterCouchbase.openBucket("proteus-testing"); ArrayList<String> topicsList = new ArrayList<String>(); HashMap<String, Object> kafkaProperties = new HashMap<String, Object>(); topicsList.add("proteus-hsm"); kafkaProperties.put("bootstrap.servers", "192.168.4.246:6667,192.168.4.247:6667,192.168.4.248:6667"); kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); kafkaProperties.put("value.deserializer", ProteusSerializer.class.getName()); kafkaProperties.put("group.id", "proteus"); KafkaConsumer<Integer, Measurement> kafkaConsumer; kafkaConsumer = new KafkaConsumer<Integer, Measurement>(kafkaProperties, new IntegerDeserializer(), new ProteusSerializer()); kafkaConsumer.subscribe(topicsList); try { while (true) { ConsumerRecords<Integer, Measurement> records = kafkaConsumer.poll(1); for (ConsumerRecord<Integer, Measurement> record : records) { System.out.println("coilId: " + record.value().getCoilID()); System.out.println(record.value().toString()); if (record.value().getCoilID() == 40101001) { System.out.println(record.value().getCoilID()); if (CouchbaseUtils.checkIfDocumentExists(String.valueOf(record.value().getCoilID()), proteusBucket)) { System.out.println("Update en " + record.value().getCoilID()); CouchbaseUtils.updateDocument(proteusBucket, topicsList, record.value()); } else { CouchbaseUtils.createDocumentFirstTime(String.valueOf(record.value().getCoilID()), record.value(), topicsList, proteusBucket); } } } } } finally { kafkaConsumer.close(); } }
@Test public void testSendIndexDTO() throws Exception { LOG.info("Start testSendIndexDTO"); ContainerProperties containerProps = new ContainerProperties("topic1", "topic2"); List<IndexDTO> dtos = prepareIndexableDtos(rdfHandler, "/data/discos/rmd18mddcw", null) .collect(Collectors.toList()); Queue<IndexDTO> expectedDtos = new ArrayDeque<>(dtos); Queue<ExpectedActualDTOPair> receivedDtos = new ArrayDeque<>(); final CountDownLatch latch = new CountDownLatch(3); containerProps.setMessageListener((MessageListener<Integer, IndexDTO>) message -> { LOG.info("received: " + message); IndexDTO expected = expectedDtos.remove(); LOG.debug("expected: " + expected); IndexDTO actual = message.value(); LOG.debug("actual: " + actual); receivedDtos.add(new ExpectedActualDTOPair(expected, actual)); LOG.debug("Decrementing latch."); latch.countDown(); }); KafkaMessageListenerContainer<Integer, IndexDTO> container = createContainerForDto(containerProps, IntegerDeserializer.class, GenericJvmObjectDeserializer.class); container.setBeanName("testSendIndexDTO"); container.start(); Thread.sleep(5000); // wait a bit for the container to start KafkaTemplate<Integer, IndexDTO> template = createTemplate(IntegerSerializer.class, GenericJvmObjectSerializer.class); template.setDefaultTopic("topic1"); prepareIndexableDtos(rdfHandler, "/data/discos/rmd18mddcw", null) .peek(dto -> LOG.debug("Prepared DTO {}", dto)) .forEach(template::sendDefault); // do anything with the completablefuture returned by the template? template.flush(); assertTrue(latch.await(120, TimeUnit.SECONDS)); container.stop(); LOG.info("Stop testSendIndexDTO"); receivedDtos.forEach(pair -> assertEquals(pair.expected, pair.actual)); }
@Test(timeOut = 30000) public void testSimpleProducerConsumer() throws Exception { String topic = "persistent://sample/standalone/ns/testSimpleProducerConsumer"; Properties producerProperties = new Properties(); producerProperties.put("bootstrap.servers", lookupUrl.toString()); producerProperties.put("key.serializer", IntegerSerializer.class.getName()); producerProperties.put("value.serializer", StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(producerProperties); Properties consumerProperties = new Properties(); consumerProperties.put("bootstrap.servers", lookupUrl.toString()); consumerProperties.put("group.id", "my-subscription-name"); consumerProperties.put("key.deserializer", IntegerDeserializer.class.getName()); consumerProperties.put("value.deserializer", StringDeserializer.class.getName()); consumerProperties.put("enable.auto.commit", "true"); Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProperties); consumer.subscribe(Arrays.asList(topic)); List<Long> offsets = new ArrayList<>(); for (int i = 0; i < 10; i++) { RecordMetadata md = producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i)).get(); offsets.add(md.offset()); log.info("Published message at {}", Long.toHexString(md.offset())); } producer.flush(); producer.close(); for (int i = 0; i < 10; i++) { ConsumerRecords<Integer, String> records = consumer.poll(1000); assertEquals(records.count(), 1); int idx = i; records.forEach(record -> { log.info("Received record: {}", record); assertEquals(record.key().intValue(), idx); assertEquals(record.value(), "hello-" + idx); assertEquals(record.offset(), offsets.get(idx).longValue()); }); } consumer.close(); }
@Test public void testUnboundedSourceCheckpointMarkWithEmptyPartitions() throws Exception { // Similar to testUnboundedSourceCheckpointMark(), but verifies that source resumes // properly from empty partitions, without missing messages added since checkpoint. // Initialize consumer with fewer elements than number of partitions so that some are empty. int initialNumElements = 5; UnboundedSource<KafkaRecord<Integer, Long>, KafkaCheckpointMark> source = mkKafkaReadTransform(initialNumElements, new ValueAsTimestampFn()) .makeSource() .split(1, PipelineOptionsFactory.create()) .get(0); UnboundedReader<KafkaRecord<Integer, Long>> reader = source.createReader(null, null); for (int l = 0; l < initialNumElements; ++l) { advanceOnce(reader, l > 0); } // Checkpoint and restart, and confirm that the source continues correctly. KafkaCheckpointMark mark = CoderUtils.clone( source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark()); // Create another source with MockConsumer with OffsetResetStrategy.LATEST. This insures that // the reader need to explicitly need to seek to first offset for partitions that were empty. int numElements = 100; // all the 20 partitions will have elements List<String> topics = ImmutableList.of("topic_a", "topic_b"); source = KafkaIO.<Integer, Long>read() .withBootstrapServers("none") .withTopics(topics) .withConsumerFactoryFn(new ConsumerFactoryFn( topics, 10, numElements, OffsetResetStrategy.LATEST)) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numElements) .withTimestampFn(new ValueAsTimestampFn()) .makeSource() .split(1, PipelineOptionsFactory.create()) .get(0); reader = source.createReader(null, mark); // Verify in any order. As the partitions are unevenly read, the returned records are not in a // simple order. Note that testUnboundedSourceCheckpointMark() verifies round-robin oder. List<Long> expected = new ArrayList<>(); List<Long> actual = new ArrayList<>(); for (long i = initialNumElements; i < numElements; i++) { advanceOnce(reader, i > initialNumElements); expected.add(i); actual.add(reader.getCurrent().getKV().getValue()); } assertThat(actual, IsIterableContainingInAnyOrder.containsInAnyOrder(expected.toArray())); }
/** * Asynchronously consume all messages from the cluster. * * @param continuation the function that determines if the consumer should continue; may not be null * @param completion the function to call when all messages have been consumed; may be null * @param topics the set of topics to consume; may not be null or empty * @param consumerFunction the function to consume the messages; may not be null */ public void consumeIntegers(BooleanSupplier continuation, Runnable completion, Collection<String> topics, java.util.function.Consumer<ConsumerRecord<String, Integer>> consumerFunction) { Deserializer<String> keyDes = new StringDeserializer(); Deserializer<Integer> valDes = new IntegerDeserializer(); String randomId = UUID.randomUUID().toString(); consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, completion, topics, consumerFunction); }