@Before public void setup() { this.time = new MockTime(); this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); this.metadata = new Metadata(0, Long.MAX_VALUE, true); this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); this.client = new MockClient(time, metadata); this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); this.metrics = new Metrics(time); this.rebalanceListener = new MockRebalanceListener(); this.mockOffsetCommitCallback = new MockCommitCallback(); this.partitionAssignor.clear(); client.setNode(node); this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled, true); }
@SuppressWarnings("unchecked") @Test public void shouldThrowStreamsExceptionWhenTimeoutExceptionThrown() throws Exception { final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public Map<String, List<PartitionInfo>> listTopics() { throw new TimeoutException("KABOOM!"); } }; final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 0); try { changelogReader.validatePartitionExists(topicPartition, "store"); fail("Should have thrown streams exception"); } catch (final StreamsException e) { // pass } }
@SuppressWarnings("unchecked") @Test public void shouldFallbackToPartitionsForIfPartitionNotInAllPartitionsList() throws Exception { final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public List<PartitionInfo> partitionsFor(final String topic) { return Collections.singletonList(partitionInfo); } }; final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 10); changelogReader.validatePartitionExists(topicPartition, "store"); }
@SuppressWarnings("unchecked") @Test public void shouldThrowStreamsExceptionIfTimeoutOccursDuringPartitionsFor() throws Exception { final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public List<PartitionInfo> partitionsFor(final String topic) { throw new TimeoutException("KABOOM!"); } }; final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 5); try { changelogReader.validatePartitionExists(topicPartition, "store"); fail("Should have thrown streams exception"); } catch (final StreamsException e) { // pass } }
@SuppressWarnings("unchecked") @Test public void shouldRequestPartitionInfoIfItDoesntExist() throws Exception { final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public Map<String, List<PartitionInfo>> listTopics() { return Collections.emptyMap(); } }; consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(partitionInfo)); final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Time.SYSTEM, 5000); changelogReader.validatePartitionExists(topicPartition, "store"); }
@Before public void before() throws IOException { final Map<String, String> storeToTopic = new HashMap<>(); storeToTopic.put("t1-store", "t1"); storeToTopic.put("t2-store", "t2"); final Map<StateStore, ProcessorNode> storeToProcessorNode = new HashMap<>(); store1 = new NoOpReadOnlyStore<>("t1-store"); storeToProcessorNode.put(store1, new MockProcessorNode(-1)); store2 = new NoOpReadOnlyStore("t2-store"); storeToProcessorNode.put(store2, new MockProcessorNode(-1)); topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(), Collections.<String, SourceNode>emptyMap(), Collections.<String, SinkNode>emptyMap(), Collections.<StateStore>emptyList(), storeToTopic, Arrays.<StateStore>asList(store1, store2)); context = new NoOpProcessorContext(); stateDirPath = TestUtils.tempDirectory().getPath(); stateDirectory = new StateDirectory("appId", stateDirPath, time); consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); stateManager = new GlobalStateManagerImpl(topology, consumer, stateDirectory); checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME); }
@SuppressWarnings("unchecked") @Test public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() throws Exception { final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public List<PartitionInfo> partitionsFor(final String topic) { throw new RuntimeException("KABOOM!"); } }; globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(), config, mockConsumer, new StateDirectory("appId", TestUtils.tempDirectory().getPath(), time), new Metrics(), new MockTime(), "clientId"); try { globalStreamThread.start(); fail("Should have thrown StreamsException if start up failed"); } catch (StreamsException e) { assertThat(e.getCause(), instanceOf(RuntimeException.class)); assertThat(e.getCause().getMessage(), equalTo("KABOOM!")); } assertFalse(globalStreamThread.stillRunning()); }
@Test public void testListOffsetsSendsIsolationLevel() { for (final IsolationLevel isolationLevel : IsolationLevel.values()) { Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, isolationLevel); subscriptions.assignFromUser(singleton(tp1)); subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST); client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { ListOffsetRequest request = (ListOffsetRequest) body; return request.isolationLevel() == isolationLevel; } }, listOffsetResponse(Errors.NONE, 1L, 5L)); fetcher.updateFetchPositions(singleton(tp1)); assertFalse(subscriptions.isOffsetResetNeeded(tp1)); assertTrue(subscriptions.isFetchable(tp1)); assertEquals(5, subscriptions.position(tp1).longValue()); } }
@Test public void testUpdateFetchPositionDisconnect() { subscriptions.assignFromUser(singleton(tp1)); subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST); // First request gets a disconnect client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), listOffsetResponse(Errors.NONE, 1L, 5L), true); // Next one succeeds client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), listOffsetResponse(Errors.NONE, 1L, 5L)); fetcher.updateFetchPositions(singleton(tp1)); assertFalse(subscriptions.isOffsetResetNeeded(tp1)); assertTrue(subscriptions.isFetchable(tp1)); assertEquals(5, subscriptions.position(tp1).longValue()); }
@Test public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() { subscriptions.assignFromUser(singleton(tp1)); subscriptions.committed(tp1, new OffsetAndMetadata(0)); subscriptions.pause(tp1); // paused partition does not have a valid position subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), listOffsetResponse(Errors.NONE, 1L, 10L)); fetcher.updateFetchPositions(singleton(tp1)); assertFalse(subscriptions.isOffsetResetNeeded(tp1)); assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused assertTrue(subscriptions.hasValidPosition(tp1)); assertEquals(10, subscriptions.position(tp1).longValue()); }
public Properties getConsumerProperties(String groupId, String clientId, OffsetResetStrategy autoOffsetReset) { if (groupId == null) { throw new IllegalArgumentException("The groupId is required"); } else { Properties props = new Properties(); props.setProperty("bootstrap.servers", brokers); props.setProperty("group.id", groupId); props.setProperty("enable.auto.commit", Boolean.FALSE.toString()); if (autoOffsetReset != null) { props.setProperty("auto.offset.reset", autoOffsetReset.toString().toLowerCase()); } if (clientId != null) { props.setProperty("client.id", clientId); } return props; } }
/** * Reset offsets for the given partition using the offset reset strategy. * * @param partition The given partition that needs reset offset * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined */ private void resetOffset(TopicPartition partition) { OffsetResetStrategy strategy = subscriptions.resetStrategy(partition); final long timestamp; if (strategy == OffsetResetStrategy.EARLIEST) timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP; else if (strategy == OffsetResetStrategy.LATEST) timestamp = ListOffsetRequest.LATEST_TIMESTAMP; else throw new NoOffsetForPartitionException(partition); log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase(Locale.ROOT)); long offset = listOffset(partition, timestamp); // we might lose the assignment while fetching the offset, so check it is still active if (subscriptions.isAssigned(partition)) this.subscriptions.seek(partition, offset); }
@Before public void setup() { this.time = new MockTime(); this.client = new MockClient(time); this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); this.metadata = new Metadata(0, Long.MAX_VALUE); this.metadata.update(cluster, time.milliseconds()); this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); this.metrics = new Metrics(time); this.rebalanceListener = new MockRebalanceListener(); this.defaultOffsetCommitCallback = new MockCommitCallback(); this.partitionAssignor.clear(); client.setNode(node); this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled); }
@Test public void testUpdateFetchPositionDisconnect() { subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); // First request gets a disconnect client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), listOffsetResponse(Errors.NONE, Arrays.asList(5L)), true); // Next one succeeds client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), listOffsetResponse(Errors.NONE, Arrays.asList(5L))); fetcher.updateFetchPositions(Collections.singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); assertEquals(5, (long) subscriptions.position(tp)); }
@Test public void testConsume(TestContext ctx) throws Exception { final String topicName = "testConsume"; String consumerId = topicName; Async batch = ctx.async(); AtomicInteger index = new AtomicInteger(); int numMessages = 1000; kafkaCluster.useTo().produceStrings(numMessages, batch::complete, () -> new ProducerRecord<>(topicName, 0, "key-" + index.get(), "value-" + index.getAndIncrement())); batch.awaitSuccess(20000); Properties config = kafkaCluster.useTo().getConsumerProperties(consumerId, consumerId, OffsetResetStrategy.EARLIEST); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumer = createConsumer(vertx, config); Async done = ctx.async(); AtomicInteger count = new AtomicInteger(numMessages); consumer.exceptionHandler(ctx::fail); consumer.handler(rec -> { if (count.decrementAndGet() == 0) { done.complete(); } }); consumer.subscribe(Collections.singleton(topicName)); }
@Test public void testPartitionsFor(TestContext ctx) throws Exception { String topicName = "testPartitionsFor"; String consumerId = topicName; kafkaCluster.createTopic(topicName, 2, 1); Properties config = kafkaCluster.useTo().getConsumerProperties(consumerId, consumerId, OffsetResetStrategy.EARLIEST); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); Context context = vertx.getOrCreateContext(); consumer = createConsumer(context, config); Async done = ctx.async(); consumer.partitionsFor(topicName, ar -> { if (ar.succeeded()) { List<PartitionInfo> partitionInfo = ar.result(); ctx.assertEquals(2, partitionInfo.size()); } else { ctx.fail(); } done.complete(); }); }
@Test public void testBatchHandler(TestContext ctx) throws Exception { String topicName = "testBatchHandler"; String consumerId = topicName; Async batch1 = ctx.async(); AtomicInteger index = new AtomicInteger(); int numMessages = 500; kafkaCluster.useTo().produceStrings(numMessages, batch1::complete, () -> new ProducerRecord<>(topicName, 0, "key-" + index.get(), "value-" + index.getAndIncrement())); batch1.awaitSuccess(10000); Properties config = kafkaCluster.useTo().getConsumerProperties(consumerId, consumerId, OffsetResetStrategy.EARLIEST); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); Context context = vertx.getOrCreateContext(); consumer = createConsumer(context, config); Async batchHandler = ctx.async(); consumer.batchHandler(records -> { ctx.assertEquals(numMessages, records.count()); batchHandler.complete(); }); consumer.exceptionHandler(ctx::fail); consumer.handler(rec -> {}); consumer.subscribe(Collections.singleton(topicName)); }
@Test public void testPollTimeout(TestContext ctx) throws Exception { Async async = ctx.async(); String topicName = "testPollTimeout"; Properties config = kafkaCluster.useTo().getConsumerProperties(topicName, topicName, OffsetResetStrategy.EARLIEST); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); io.vertx.kafka.client.common.TopicPartition topicPartition = new io.vertx.kafka.client.common.TopicPartition(topicName, 0); KafkaConsumer<Object, Object> consumerWithCustomTimeout = KafkaConsumer.create(vertx, config); int pollingTimeout = 1500; // Set the polling timeout to 1500 ms (default is 1000) consumerWithCustomTimeout.pollTimeout(pollingTimeout); // Subscribe to the empty topic (we want the poll() call to timeout!) consumerWithCustomTimeout.subscribe(topicName, subscribeRes -> { consumerWithCustomTimeout.handler(rec -> {}); // Consumer will now immediately poll once long beforeSeek = System.currentTimeMillis(); consumerWithCustomTimeout.seekToBeginning(topicPartition, seekRes -> { long durationWShortTimeout = System.currentTimeMillis() - beforeSeek; ctx.assertTrue(durationWShortTimeout >= pollingTimeout, "Operation must take at least as long as the polling timeout"); consumerWithCustomTimeout.close(); async.countDown(); }); }); }
@Test public void testConsume(TestContext ctx) throws Exception { String topicName = "testConsume"; Async batch = ctx.async(); AtomicInteger index = new AtomicInteger(); int numMessages = 1000; kafkaCluster.useTo().produceStrings(numMessages, batch::complete, () -> new ProducerRecord<>(topicName, 0, "key-" + index.get(), "value-" + index.getAndIncrement())); batch.awaitSuccess(20000); Properties config = kafkaCluster.useTo().getConsumerProperties("testConsume_consumer", "testConsume_consumer", OffsetResetStrategy.EARLIEST); Map<String ,String> map = mapConfig(config); consumer = KafkaConsumer.create(vertx, map, String.class, String.class); Async done = ctx.async(); AtomicInteger count = new AtomicInteger(numMessages); consumer.toObservable().subscribe(a -> { if (count.decrementAndGet() == 0) { done.complete(); } }, ctx::fail); consumer.subscribe(Collections.singleton(topicName)); }
@Test // Regression test for ISS-73: undeployment of a verticle with unassigned consumer fails public void testUndeployUnassignedConsumer(TestContext ctx) throws Exception { Properties config = kafkaCluster.useTo().getConsumerProperties("testUndeployUnassignedConsumer_consumer", "testUndeployUnassignedConsumer_consumer", OffsetResetStrategy.EARLIEST); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); Async async = ctx.async(1); vertx.deployVerticle(new AbstractVerticle() { @Override public void start() throws Exception { KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config); vertx.setTimer(20, record -> { // Very rarely, this throws a AlreadyUndedeployed error vertx.undeploy(context.deploymentID(), ctx.asyncAssertSuccess(ar -> { async.complete(); })); }); } }, ctx.asyncAssertSuccess()); }
@Test public void testConsume(TestContext ctx) throws Exception { MockConsumer<String, String> mock = new MockConsumer<>(OffsetResetStrategy.EARLIEST); KafkaReadStream<String, String> consumer = createConsumer(vertx, mock); Async doneLatch = ctx.async(); consumer.handler(record -> { ctx.assertEquals("the_topic", record.topic()); ctx.assertEquals(0, record.partition()); ctx.assertEquals("abc", record.key()); ctx.assertEquals("def", record.value()); consumer.close(v -> doneLatch.complete()); }); consumer.subscribe(Collections.singleton("the_topic"), v -> { mock.schedulePollTask(()-> { mock.rebalance(Collections.singletonList(new TopicPartition("the_topic", 0))); mock.addRecord(new ConsumerRecord<>("the_topic", 0, 0L, "abc", "def")); mock.seek(new TopicPartition("the_topic", 0), 0L); }); }); }
@Override public Publisher<ConsumerRecord<Long, Double>> createPublisher(final long l) { long nRecords = 100; mockConsumer = new MockConsumer<Long, Double>(OffsetResetStrategy.LATEST); mockConsumer.assign(Arrays.asList(topicPartition)); final HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>(); topicPartitionLongHashMap.put(topicPartition, 0L); mockConsumer.updateBeginningOffsets(topicPartitionLongHashMap); topicPartitionLongHashMap.put(topicPartition, nRecords - 1); mockConsumer.updateEndOffsets(topicPartitionLongHashMap); final Random random = new Random(); for (int i = 0; i < nRecords; i++) mockConsumer.addRecord( new ConsumerRecord<Long, Double>( topicPartition.topic(), topicPartition.partition(), i, random.nextLong(), random.nextDouble())); return new KafkaPublisher<Long, Double>(mockConsumer, 100, Executors.newSingleThreadExecutor()); }
/** * Creates a consumer with two topics, with 10 partitions each. * numElements are (round-robin) assigned all the 20 partitions. */ private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform( int numElements, int maxNumRecords, @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) { List<String> topics = ImmutableList.of("topic_a", "topic_b"); KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read() .withBootstrapServers("myServer1:9092,myServer2:9092") .withTopics(topics) .withConsumerFactoryFn(new ConsumerFactoryFn( topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(maxNumRecords); if (timestampFn != null) { return reader.withTimestampFn(timestampFn); } else { return reader; } }
@Test public void testUnboundedSourceWithSingleTopic() { // same as testUnboundedSource, but with single topic int numElements = 1000; String topic = "my_topic"; KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read() .withBootstrapServers("none") .withTopic("my_topic") .withConsumerFactoryFn(new ConsumerFactoryFn( ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST)) .withMaxNumRecords(numElements) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class); PCollection<Long> input = p .apply(reader.withoutMetadata()) .apply(Values.<Long>create()); addCountingAsserts(input, numElements); p.run(); }
@Test public void testSourceWithExplicitPartitionsDisplayData() { KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read() .withBootstrapServers("myServer1:9092,myServer2:9092") .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5), new TopicPartition("test", 6))) .withConsumerFactoryFn(new ConsumerFactoryFn( Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)) // 10 partitions .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(LongDeserializer.class); DisplayData displayData = DisplayData.from(read); assertThat(displayData, hasDisplayItem("topicPartitions", "test-5,test-6")); assertThat(displayData, hasDisplayItem("enable.auto.commit", false)); assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092")); assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest")); assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288)); }
@Test public void testConsume() throws Exception { Config testConfig = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.KAFKA_BROKERS, "test")); MockConsumer<String, String> consumer = new MockConsumer<String, String>(OffsetResetStrategy.NONE); consumer.assign(Arrays.asList(new TopicPartition("test_topic", 0))); HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>(); beginningOffsets.put(new TopicPartition("test_topic", 0), 0L); consumer.updateBeginningOffsets(beginningOffsets); ConsumerRecord<String, String> record0 = new ConsumerRecord<>("test_topic", 0, 0L, "key", "value0"); ConsumerRecord<String, String> record1 = new ConsumerRecord<>("test_topic", 0, 1L, "key", "value1"); ConsumerRecord<String, String> record2 = new ConsumerRecord<>("test_topic", 0, 2L, "key", "value2"); consumer.addRecord(record0); consumer.addRecord(record1); consumer.addRecord(record2); try (Kafka09ConsumerClient<String, String> kafka09Client = new Kafka09ConsumerClient<>(testConfig, consumer);) { // Consume from 0 offset Set<KafkaConsumerRecord> consumedRecords = Sets.newHashSet(kafka09Client.consume(new KafkaPartition.Builder().withId(0).withTopicName("test_topic") .build(), 0l, 100l)); Set<Kafka09ConsumerRecord<String, String>> expected = ImmutableSet.<Kafka09ConsumerRecord<String, String>> of(new Kafka09ConsumerRecord<>(record0), new Kafka09ConsumerRecord<>(record1), new Kafka09ConsumerRecord<>(record2)); Assert.assertEquals(consumedRecords, expected); } }
/** * Use the supplied function to asynchronously consume messages from the cluster. * * @param groupId the name of the group; may not be null * @param clientId the name of the client; may not be null * @param autoOffsetReset how to pick a starting offset when there is no initial offset in ZooKeeper or if an offset is * out of range; may be null for the default to be used * @param keyDeserializer the deserializer for the keys; may not be null * @param valueDeserializer the deserializer for the values; may not be null * @param continuation the function that determines if the consumer should continue; may not be null * @param completion the function to call when the consumer terminates; may be null * @param topics the set of topics to consume; may not be null or empty * @param consumerFunction the function to consume the messages; may not be null */ public <K, V> void consume(String groupId, String clientId, OffsetResetStrategy autoOffsetReset, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, BooleanSupplier continuation, Runnable completion, Collection<String> topics, java.util.function.Consumer<ConsumerRecord<K, V>> consumerFunction) { Properties props = getConsumerProperties(groupId, clientId, autoOffsetReset); Thread t = new Thread(() -> { LOGGER.debug("Starting consumer {} to read messages", clientId); try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(props, keyDeserializer, valueDeserializer)) { consumer.subscribe(new ArrayList<>(topics)); while (continuation.getAsBoolean()) { consumer.poll(10).forEach(record -> { LOGGER.debug("Consumer {}: consuming message {}", clientId, record); consumerFunction.accept(record); consumer.commitAsync(); }); } } finally { if (completion != null) completion.run(); LOGGER.debug("Stopping consumer {}", clientId); } }); t.setName(clientId + "-thread"); t.start(); }
private Consumer createConsumer() { ProxyMockConsumer consumer = new ProxyMockConsumer(OffsetResetStrategy.EARLIEST); for (Map.Entry<String, List<PartitionInfo>> entry : getInfos().entrySet()) { consumer.updatePartitions(entry.getKey(), entry.getValue()); } CONSUMERS.add(consumer); return consumer; }
private Consumer createConsumer() { ProxyMockConsumer consumer = new ProxyMockConsumer(OffsetResetStrategy.EARLIEST); for (Map.Entry<String, List<PartitionInfo>> entry : getInfos().entrySet()) { consumer.updatePartitions(entry.getKey(), entry.getValue()); } ConsumerForTests testConsumer = new ConsumerForTests(consumer); CONSUMERS.add(testConsumer); return testConsumer; }
@Before public void setUp() throws Exception { store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, initializer); consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1)); Map<TopicPartition, Long> beginningOffsets = new HashMap<>(); beginningOffsets.put(TP0, 0L); beginningOffsets.put(TP1, 0L); consumer.updateBeginningOffsets(beginningOffsets); }
private Consumer mockConsumer(final RuntimeException toThrow) { return new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public OffsetAndMetadata committed(final TopicPartition partition) { throw toThrow; } }; }
public SubscriptionState(OffsetResetStrategy defaultResetStrategy) { this.defaultResetStrategy = defaultResetStrategy; this.subscription = Collections.emptySet(); this.assignment = new PartitionStates<>(); this.groupSubscription = new HashSet<>(); this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up this.subscribedPattern = null; this.subscriptionType = SubscriptionType.NONE; }
private void offsetResetStrategyTimestamp( final TopicPartition partition, final Map<TopicPartition, Long> output, final Set<TopicPartition> partitionsWithNoOffsets) { OffsetResetStrategy strategy = subscriptions.resetStrategy(partition); if (strategy == OffsetResetStrategy.EARLIEST) output.put(partition, ListOffsetRequest.EARLIEST_TIMESTAMP); else if (strategy == OffsetResetStrategy.LATEST) output.put(partition, endTimestamp()); else partitionsWithNoOffsets.add(partition); }
@Test public void testUpdateFetchPositionResetToLatestOffset() { subscriptions.assignFromUser(singleton(tp1)); subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), listOffsetResponse(Errors.NONE, 1L, 5L)); fetcher.updateFetchPositions(singleton(tp1)); assertFalse(subscriptions.isOffsetResetNeeded(tp1)); assertTrue(subscriptions.isFetchable(tp1)); assertEquals(5, subscriptions.position(tp1).longValue()); }
@Test public void testUpdateFetchPositionResetToEarliestOffset() { subscriptions.assignFromUser(singleton(tp1)); subscriptions.needOffsetReset(tp1, OffsetResetStrategy.EARLIEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP), listOffsetResponse(Errors.NONE, 1L, 5L)); fetcher.updateFetchPositions(singleton(tp1)); assertFalse(subscriptions.isOffsetResetNeeded(tp1)); assertTrue(subscriptions.isFetchable(tp1)); assertEquals(5, subscriptions.position(tp1).longValue()); }
/** * Use the supplied function to asynchronously consume messages from the cluster. * * @param groupId the name of the group; may not be null * @param clientId the name of the client; may not be null * @param autoOffsetReset how to pick a starting offset when there is no initial offset in ZooKeeper or if an offset is * out of range; may be null for the default to be used * @param keyDeserializer the deserializer for the keys; may not be null * @param valueDeserializer the deserializer for the values; may not be null * @param continuation the function that determines if the consumer should continue; may not be null * @param offsetCommitCallback the callback that should be used after committing offsets; may be null if offsets are * not to be committed * @param completion the function to call when the consumer terminates; may be null * @param topics the set of topics to consume; may not be null or empty * @param consumerFunction the function to consume the messages; may not be null */ public <K, V> void consume(String groupId, String clientId, OffsetResetStrategy autoOffsetReset, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, BooleanSupplier continuation, OffsetCommitCallback offsetCommitCallback, Runnable completion, Collection<String> topics, java.util.function.Consumer<ConsumerRecord<K, V>> consumerFunction) { Properties props = getConsumerProperties(groupId, clientId, autoOffsetReset); Thread t = new Thread(() -> { LOGGER.info("Starting consumer {} to read messages", clientId); try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(props, keyDeserializer, valueDeserializer)) { consumer.subscribe(new ArrayList<>(topics)); while (continuation.getAsBoolean()) { consumer.poll(10).forEach(record -> { LOGGER.info("Consumer {}: consuming message {}", clientId, record); consumerFunction.accept(record); if (offsetCommitCallback != null) { consumer.commitAsync(offsetCommitCallback); } }); } } finally { if (completion != null) completion.run(); LOGGER.debug("Stopping consumer {}", clientId); } }); t.setName(clientId + "-thread"); t.start(); }
public void consumeIntegers(BooleanSupplier continuation, Runnable completion, Collection<String> topics, Consumer<ConsumerRecord<String, Integer>> consumerFunction) { Deserializer<String> keyDes = new StringDeserializer(); Deserializer<Integer> valDes = new IntegerDeserializer(); String randomId = UUID.randomUUID().toString(); OffsetCommitCallback offsetCommitCallback = null; this.consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, (OffsetCommitCallback) offsetCommitCallback, completion, topics, consumerFunction); }
public SubscriptionState(OffsetResetStrategy defaultResetStrategy) { this.defaultResetStrategy = defaultResetStrategy; this.subscription = new HashSet<>(); this.userAssignment = new HashSet<>(); this.assignment = new HashMap<>(); this.groupSubscription = new HashSet<>(); this.needsPartitionAssignment = false; this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up this.subscribedPattern = null; }
@Test public void testUpdateFetchPositionResetToLatestOffset() { subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), listOffsetResponse(Errors.NONE, Arrays.asList(5L))); fetcher.updateFetchPositions(Collections.singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); assertEquals(5, (long) subscriptions.position(tp)); }
@Test public void testUpdateFetchPositionResetToEarliestOffset() { subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP), listOffsetResponse(Errors.NONE, Arrays.asList(5L))); fetcher.updateFetchPositions(Collections.singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); assertEquals(5, (long) subscriptions.position(tp)); }