@Override public KafkaConsumer<Long, byte[]> worker() { Properties props = AbstractKafkaClient.configBuilder()// .put(BOOTSTRAP_SERVERS_CONFIG, bootstrap)// .put(GROUP_ID_CONFIG, MallConstants.ORDER_GROUP)// .put(ENABLE_AUTO_COMMIT_CONFIG, true)// .put(MAX_POLL_RECORDS_CONFIG, "100")// .put(SESSION_TIMEOUT_MS_CONFIG, "30000")// .put(FETCH_MIN_BYTES_CONFIG, 1)// .put(AUTO_COMMIT_INTERVAL_MS_CONFIG, AUTO_COMMIT_INTERVAL_MS)// .put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName())// .put(VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName())// .build(); this.worker = new KafkaConsumer<>(props); this.worker.subscribe(topics); System.out.printf("started"); return this.worker; }
/** * Creates default message formats. */ private void createDefaultMessageFormats() { final Map<String, String> defaultFormats = new HashMap<>(); defaultFormats.put("Short", ShortDeserializer.class.getName()); defaultFormats.put("ByteArray", ByteArrayDeserializer.class.getName()); defaultFormats.put("Bytes", BytesDeserializer.class.getName()); defaultFormats.put("Double", DoubleDeserializer.class.getName()); defaultFormats.put("Float", FloatDeserializer.class.getName()); defaultFormats.put("Integer", IntegerDeserializer.class.getName()); defaultFormats.put("Long", LongDeserializer.class.getName()); defaultFormats.put("String", StringDeserializer.class.getName()); // Create if needed. for (final Map.Entry<String, String> entry : defaultFormats.entrySet()) { MessageFormat messageFormat = messageFormatRepository.findByName(entry.getKey()); if (messageFormat == null) { messageFormat = new MessageFormat(); } messageFormat.setName(entry.getKey()); messageFormat.setClasspath(entry.getValue()); messageFormat.setJar("n/a"); messageFormat.setDefaultFormat(true); messageFormatRepository.save(messageFormat); } }
private static void consumeRecords(String bootstrapServers) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "byte-array-consumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Consumer<Long, byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(TOPIC)); ConsumerRecords<Long, byte[]> records = consumer.poll(10000); for (ConsumerRecord<Long, byte[]> record : records) out.printf( "key = %s value = %s%n", record.key(), new String(record.value())); consumer.close(); }
@BeforeClass public static void setupConfigsAndUtils() throws Exception { PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0); PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer"); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); }
private List<KeyValue<Long, Long>> readResult(final int numberOfRecords, final String groupId) throws Exception { if (groupId != null) { return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( TestUtils.consumerConfig( CLUSTER.bootstrapServers(), groupId, LongDeserializer.class, LongDeserializer.class, new Properties() { { put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)); } }), SINGLE_PARTITION_OUTPUT_TOPIC, numberOfRecords ); } // read uncommitted return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class), SINGLE_PARTITION_OUTPUT_TOPIC, numberOfRecords ); }
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.setProperty("bootstrap.servers", args[0]); props.setProperty("group.id", UUID.randomUUID().toString()); props.setProperty("key.deserializer", LongDeserializer.class.getName()); props.setProperty("value.deserializer", TradeDeserializer.class.getName()); props.setProperty("auto.offset.reset", "earliest"); KafkaConsumer<Long, Trade> consumer = new KafkaConsumer<>(props); List<String> topics = Arrays.asList(args[1]); consumer.subscribe(topics); System.out.println("Subscribed to topics " + topics); long count = 0; long start = System.nanoTime(); while (true) { ConsumerRecords<Long, Trade> poll = consumer.poll(5000); System.out.println("Partitions in batch: " + poll.partitions()); LongSummaryStatistics stats = StreamSupport.stream(poll.spliterator(), false) .mapToLong(r -> r.value().getTime()).summaryStatistics(); System.out.println("Oldest record time: " + stats.getMin() + ", newest record: " + stats.getMax()); count += poll.count(); long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); long rate = (long) ((double) count / elapsed * 1000); System.out.printf("Total count: %,d in %,dms. Average rate: %,d records/s %n", count, elapsed, rate); } }
/** * Creates a consumer with two topics, with 10 partitions each. * numElements are (round-robin) assigned all the 20 partitions. */ private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform( int numElements, int maxNumRecords, @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) { List<String> topics = ImmutableList.of("topic_a", "topic_b"); KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read() .withBootstrapServers("myServer1:9092,myServer2:9092") .withTopics(topics) .withConsumerFactoryFn(new ConsumerFactoryFn( topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(maxNumRecords); if (timestampFn != null) { return reader.withTimestampFn(timestampFn); } else { return reader; } }
@Test public void testUnboundedSourceWithSingleTopic() { // same as testUnboundedSource, but with single topic int numElements = 1000; String topic = "my_topic"; KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read() .withBootstrapServers("none") .withTopic("my_topic") .withConsumerFactoryFn(new ConsumerFactoryFn( ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST)) .withMaxNumRecords(numElements) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class); PCollection<Long> input = p .apply(reader.withoutMetadata()) .apply(Values.<Long>create()); addCountingAsserts(input, numElements); p.run(); }
@Test public void testSourceWithExplicitPartitionsDisplayData() { KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read() .withBootstrapServers("myServer1:9092,myServer2:9092") .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5), new TopicPartition("test", 6))) .withConsumerFactoryFn(new ConsumerFactoryFn( Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)) // 10 partitions .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(LongDeserializer.class); DisplayData displayData = DisplayData.from(read); assertThat(displayData, hasDisplayItem("topicPartitions", "test-5,test-6")); assertThat(displayData, hasDisplayItem("enable.auto.commit", false)); assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092")); assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest")); assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288)); }
@Test public void testInferKeyCoder() { CoderRegistry registry = CoderRegistry.createDefault(); assertTrue(KafkaIO.inferCoder(registry, LongDeserializer.class).getValueCoder() instanceof VarLongCoder); assertTrue(KafkaIO.inferCoder(registry, StringDeserializer.class).getValueCoder() instanceof StringUtf8Coder); assertTrue(KafkaIO.inferCoder(registry, InstantDeserializer.class).getValueCoder() instanceof InstantCoder); assertTrue(KafkaIO.inferCoder(registry, DeserializerWithInterfaces.class).getValueCoder() instanceof VarLongCoder); }
@Override public KafkaProducer<Long, byte[]> worker() { Properties props = AbstractKafkaClient.configBuilder()// .put(BOOTSTRAP_SERVERS_CONFIG, bootstrap)// .put(ACKS_CONFIG, "all").put(RETRIES_CONFIG, 3)// .put(BATCH_SIZE_CONFIG, 16384)// .put(LINGER_MS_CONFIG, 1)// .put(BUFFER_MEMORY_CONFIG, 33554432)// .put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName())// .put(VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName())// .build(); return this.worker = new KafkaProducer<>(props); }
private Collection<String> listTopics() { Properties consumerConfig = new Properties() {{ setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("%s:%s", LOCALHOST, kafkaPort)); setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); }}; try (Consumer<Long, Long> consumer = new KafkaConsumer<>(consumerConfig)) { return consumer.listTopics().keySet(); } }
private void shouldCountHelper() throws Exception { startStreams(); produceMessages(mockTime.milliseconds()); final List<KeyValue<String, Long>> results = receiveMessages( new StringDeserializer(), new LongDeserializer(), 10); Collections.sort(results, new Comparator<KeyValue<String, Long>>() { @Override public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) { return KStreamAggregationIntegrationTest.compare(o1, o2); } }); assertThat(results, is(Arrays.asList( KeyValue.pair("A", 1L), KeyValue.pair("A", 2L), KeyValue.pair("B", 1L), KeyValue.pair("B", 2L), KeyValue.pair("C", 1L), KeyValue.pair("C", 2L), KeyValue.pair("D", 1L), KeyValue.pair("D", 2L), KeyValue.pair("E", 1L), KeyValue.pair("E", 2L) ))); }
private void waitUntilAtLeastNumRecordProcessed(final String topic, final int numRecs) throws InterruptedException { final Properties config = new Properties(); config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer"); config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); IntegrationTestUtils.waitUntilMinValuesRecordsReceived( config, topic, numRecs, 120 * 1000); }
public void run(Configuration configuration, Environment environment) throws Exception { // INSTRUMENTATION // Metrics Instrumentation final CollectorRegistry collectorRegistry = new CollectorRegistry(); collectorRegistry.register(new DropwizardExports(environment.metrics())); environment.admin() .addServlet("metrics", new MetricsServlet(collectorRegistry)) .addMapping("/metrics"); final PrometheusMetricsReporter reporter = PrometheusMetricsReporter.newMetricsReporter() .withCollectorRegistry(collectorRegistry) .withConstLabel("service", getName()) .build(); // Tracing Instrumentation final Tracer tracer = getTracer(); final Tracer metricsTracer = io.opentracing.contrib.metrics.Metrics.decorate(tracer, reporter); GlobalTracer.register(metricsTracer); final HttpHost httpHost = new HttpHost("tweets-elasticsearch", 9200); final RestClientBuilder restClientBuilder = RestClient.builder(httpHost).setHttpClientConfigCallback(new TracingHttpClientConfigCallback(metricsTracer)); final RestClient restClient = restClientBuilder.build(); final ElasticsearchTweetRepository elasticsearchRepository = new ElasticsearchTweetRepository(restClient); final Properties consumerConfigs = new Properties(); consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "tweets-kafka:9092"); consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, getName()); consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); final KafkaConsumer<Long, String> kafkaConsumer = new KafkaConsumer<>(consumerConfigs, new LongDeserializer(), new StringDeserializer()); final TracingKafkaConsumer<Long, String> tracingKafkaConsumer = new TracingKafkaConsumer<>(kafkaConsumer, metricsTracer); final Runnable kafkaTweetEventConsumer = new KafkaTweetEventConsumer(tracingKafkaConsumer, elasticsearchRepository); final ExecutorService executorService = environment.lifecycle().executorService("kafka-consumer").build(); executorService.submit(kafkaTweetEventConsumer); }
private static Properties getKafkaProperties(String brokerUrl, String offsetReset) { Properties props = new Properties(); props.setProperty("bootstrap.servers", brokerUrl); props.setProperty("group.id", UUID.randomUUID().toString()); props.setProperty("key.deserializer", LongDeserializer.class.getName()); props.setProperty("value.deserializer", TradeDeserializer.class.getName()); props.setProperty("auto.offset.reset", offsetReset); props.setProperty("max.poll.records", "32768"); //props.setProperty("metadata.max.age.ms", "5000"); return props; }
private static Properties getKafkaProperties(String brokerUrl, String offsetReset) { Properties props = new Properties(); props.setProperty("bootstrap.servers", brokerUrl); props.setProperty("group.id", UUID.randomUUID().toString()); props.setProperty("key.deserializer", LongDeserializer.class.getName()); props.setProperty("value.deserializer", TradeDeserializer.class.getName()); props.setProperty("auto.offset.reset", offsetReset); props.setProperty("max.poll.records", "32768"); return props; }
private static Map<String, Object> getKafkaProperties(String brokerUrl) { Map<String, Object> props = new HashMap<>(); props.put("bootstrap.servers", brokerUrl); props.put("group.id", UUID.randomUUID().toString()); props.put("key.deserializer", LongDeserializer.class); props.put("value.deserializer", TradeDeserializer.class); props.put("auto.offset.reset", "latest"); return props; }
@Test @SuppressWarnings("unchecked") public void testConsumerCustomDeserializer() throws Exception { Binding<?> binding = null; try { KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); Map<String, String> propertiesToOverride = configurationProperties.getConfiguration(); propertiesToOverride.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); propertiesToOverride.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"); configurationProperties.setConfiguration(propertiesToOverride); String testTopicName = "existing" + System.currentTimeMillis(); configurationProperties.setAutoCreateTopics(false); Binder binder = getBinder(configurationProperties); ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties(); DirectChannel input = createBindableChannel("input", createConsumerBindingProperties(consumerProperties)); binding = binder.bindConsumer(testTopicName, "test", input, consumerProperties); DirectFieldAccessor consumerAccessor = new DirectFieldAccessor(getKafkaConsumer(binding)); assertTrue("Expected StringDeserializer as a custom key deserializer", consumerAccessor.getPropertyValue("keyDeserializer") instanceof StringDeserializer); assertTrue("Expected LongDeserializer as a custom value deserializer", consumerAccessor.getPropertyValue("valueDeserializer") instanceof LongDeserializer); } finally { if (binding != null) { binding.unbind(); } } }
@Test public void testUnreachableKafkaBrokers() { // Expect an exception when the Kafka brokers are not reachable on the workers. // We specify partitions explicitly so that splitting does not involve server interaction. // Set request timeout to 10ms so that test does not take long. thrown.expect(Exception.class); thrown.expectMessage("Reader-0: Timeout while initializing partition 'test-0'"); int numElements = 1000; PCollection<Long> input = p .apply(KafkaIO.<Integer, Long>read() .withBootstrapServers("8.8.8.8:9092") // Google public DNS ip. .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 0))) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class) .updateConsumerProperties(ImmutableMap.<String, Object>of( ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10, ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 8, ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 8)) .withMaxNumRecords(10) .withoutMetadata()) .apply(Values.<Long>create()); addCountingAsserts(input, numElements); p.run(); }
@Test public void testUnboundedSourceWithExplicitPartitions() { int numElements = 1000; List<String> topics = ImmutableList.of("test"); KafkaIO.Read<byte[], Long> reader = KafkaIO.<byte[], Long>read() .withBootstrapServers("none") .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5))) .withConsumerFactoryFn(new ConsumerFactoryFn( topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 10 partitions .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numElements / 10); PCollection<Long> input = p .apply(reader.withoutMetadata()) .apply(Values.<Long>create()); // assert that every element is a multiple of 5. PAssert .that(input) .satisfies(new AssertMultipleOf(5)); PAssert .thatSingleton(input.apply(Count.<Long>globally())) .isEqualTo(numElements / 10L); p.run(); }
@Test public void testUnboundedSourceSplits() throws Exception { int numElements = 1000; int numSplits = 10; // Coders must be specified explicitly here due to the way the transform // is used in the test. UnboundedSource<KafkaRecord<Integer, Long>, ?> initial = mkKafkaReadTransform(numElements, null) .withKeyDeserializerAndCoder(IntegerDeserializer.class, BigEndianIntegerCoder.of()) .withValueDeserializerAndCoder(LongDeserializer.class, BigEndianLongCoder.of()) .makeSource(); List<? extends UnboundedSource<KafkaRecord<Integer, Long>, ?>> splits = initial.split(numSplits, p.getOptions()); assertEquals("Expected exact splitting", numSplits, splits.size()); long elementsPerSplit = numElements / numSplits; assertEquals("Expected even splits", numElements, elementsPerSplit * numSplits); PCollectionList<Long> pcollections = PCollectionList.empty(p); for (int i = 0; i < splits.size(); ++i) { pcollections = pcollections.and( p.apply("split" + i, Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit)) .apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata<Integer, Long>())) .apply("collection " + i, Values.<Long>create())); } PCollection<Long> input = pcollections.apply(Flatten.<Long>pCollections()); addCountingAsserts(input, numElements); p.run(); }
private void runSimpleCopyTest(final int numberOfRestarts, final String inputTopic, final String throughTopic, final String outputTopic) throws Exception { final KStreamBuilder builder = new KStreamBuilder(); final KStream<Long, Long> input = builder.stream(inputTopic); KStream<Long, Long> output = input; if (throughTopic != null) { output = input.through(throughTopic); } output.to(outputTopic); for (int i = 0; i < numberOfRestarts; ++i) { final long factor = i; final KafkaStreams streams = new KafkaStreams( builder, StreamsTestUtils.getStreamsConfig( applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), new Properties() { { put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1); put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); } })); try { streams.start(); final List<KeyValue<Long, Long>> inputData = prepareData(factor * 100, factor * 100 + 10L, 0L, 1L); IntegrationTestUtils.produceKeyValuesSynchronously( inputTopic, inputData, TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), CLUSTER.time ); final List<KeyValue<Long, Long>> committedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( TestUtils.consumerConfig( CLUSTER.bootstrapServers(), CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, new Properties() { { put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)); } }), inputTopic, inputData.size() ); checkResultPerKey(committedRecords, inputData); } finally { streams.close(); } } }
@Test public void shouldBeAbleToPerformMultipleTransactions() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC); final KafkaStreams streams = new KafkaStreams( builder, StreamsTestUtils.getStreamsConfig( applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), new Properties() { { put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); } })); try { streams.start(); final List<KeyValue<Long, Long>> firstBurstOfData = prepareData(0L, 5L, 0L); final List<KeyValue<Long, Long>> secondBurstOfData = prepareData(5L, 8L, 0L); IntegrationTestUtils.produceKeyValuesSynchronously( SINGLE_PARTITION_INPUT_TOPIC, firstBurstOfData, TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), CLUSTER.time ); final List<KeyValue<Long, Long>> firstCommittedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( TestUtils.consumerConfig( CLUSTER.bootstrapServers(), CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, new Properties() { { put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)); } }), SINGLE_PARTITION_OUTPUT_TOPIC, firstBurstOfData.size() ); assertThat(firstCommittedRecords, equalTo(firstBurstOfData)); IntegrationTestUtils.produceKeyValuesSynchronously( SINGLE_PARTITION_INPUT_TOPIC, secondBurstOfData, TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), CLUSTER.time ); final List<KeyValue<Long, Long>> secondCommittedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( TestUtils.consumerConfig( CLUSTER.bootstrapServers(), CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, new Properties() { { put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)); } }), SINGLE_PARTITION_OUTPUT_TOPIC, secondBurstOfData.size() ); assertThat(secondCommittedRecords, equalTo(secondBurstOfData)); } finally { streams.close(); } }
@Test public void testKafkaBinderConfigurationWithKafkaProperties() throws Exception { assertNotNull(this.kafkaMessageChannelBinder); ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>( new KafkaProducerProperties()); Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory", String.class, ExtendedProducerProperties.class); getProducerFactoryMethod.setAccessible(true); DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod .invoke(this.kafkaMessageChannelBinder, "foo", producerProperties); Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs", Map.class); ReflectionUtils.makeAccessible(producerFactoryConfigField); Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField, producerFactory); assertTrue(producerConfigs.get("batch.size").equals(10)); assertTrue(producerConfigs.get("key.serializer").equals(LongSerializer.class)); assertTrue(producerConfigs.get("key.deserializer") == null); assertTrue(producerConfigs.get("value.serializer").equals(LongSerializer.class)); assertTrue(producerConfigs.get("value.deserializer") == null); assertTrue(producerConfigs.get("compression.type").equals("snappy")); List<String> bootstrapServers = new ArrayList<>(); bootstrapServers.add("10.98.09.199:9092"); bootstrapServers.add("10.98.09.196:9092"); assertTrue((((List<String>) producerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers))); Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod( "createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class); createKafkaConsumerFactoryMethod.setAccessible(true); ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>( new KafkaConsumerProperties()); DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod .invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties); Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs", Map.class); ReflectionUtils.makeAccessible(consumerFactoryConfigField); Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField, consumerFactory); assertTrue(consumerConfigs.get("key.deserializer").equals(LongDeserializer.class)); assertTrue(consumerConfigs.get("key.serializer") == null); assertTrue(consumerConfigs.get("value.deserializer").equals(LongDeserializer.class)); assertTrue(consumerConfigs.get("value.serialized") == null); assertTrue(consumerConfigs.get("group.id").equals("groupIdFromBootConfig")); assertTrue(consumerConfigs.get("auto.offset.reset").equals("earliest")); assertTrue((((List<String>) consumerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers))); }
@Test public void testUnboundedSourceCheckpointMarkWithEmptyPartitions() throws Exception { // Similar to testUnboundedSourceCheckpointMark(), but verifies that source resumes // properly from empty partitions, without missing messages added since checkpoint. // Initialize consumer with fewer elements than number of partitions so that some are empty. int initialNumElements = 5; UnboundedSource<KafkaRecord<Integer, Long>, KafkaCheckpointMark> source = mkKafkaReadTransform(initialNumElements, new ValueAsTimestampFn()) .makeSource() .split(1, PipelineOptionsFactory.create()) .get(0); UnboundedReader<KafkaRecord<Integer, Long>> reader = source.createReader(null, null); for (int l = 0; l < initialNumElements; ++l) { advanceOnce(reader, l > 0); } // Checkpoint and restart, and confirm that the source continues correctly. KafkaCheckpointMark mark = CoderUtils.clone( source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark()); // Create another source with MockConsumer with OffsetResetStrategy.LATEST. This insures that // the reader need to explicitly need to seek to first offset for partitions that were empty. int numElements = 100; // all the 20 partitions will have elements List<String> topics = ImmutableList.of("topic_a", "topic_b"); source = KafkaIO.<Integer, Long>read() .withBootstrapServers("none") .withTopics(topics) .withConsumerFactoryFn(new ConsumerFactoryFn( topics, 10, numElements, OffsetResetStrategy.LATEST)) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numElements) .withTimestampFn(new ValueAsTimestampFn()) .makeSource() .split(1, PipelineOptionsFactory.create()) .get(0); reader = source.createReader(null, mark); // Verify in any order. As the partitions are unevenly read, the returned records are not in a // simple order. Note that testUnboundedSourceCheckpointMark() verifies round-robin oder. List<Long> expected = new ArrayList<>(); List<Long> actual = new ArrayList<>(); for (long i = initialNumElements; i < numElements; i++) { advanceOnce(reader, i > initialNumElements); expected.add(i); actual.add(reader.getCurrent().getKV().getValue()); } assertThat(actual, IsIterableContainingInAnyOrder.containsInAnyOrder(expected.toArray())); }