@Test public void testSimpleMock() { consumer.subscribe(Arrays.asList("test"), new NoOpConsumerRebalanceListener()); assertEquals(0, consumer.poll(1000).count()); consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new TopicPartition("test", 1))); // Mock consumers need to seek manually since they cannot automatically reset offsets HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>(); beginningOffsets.put(new TopicPartition("test", 0), 0L); beginningOffsets.put(new TopicPartition("test", 1), 0L); consumer.updateBeginningOffsets(beginningOffsets); consumer.seek(new TopicPartition("test", 0), 0); ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1"); ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2"); consumer.addRecord(rec1); consumer.addRecord(rec2); ConsumerRecords<String, String> recs = consumer.poll(1); Iterator<ConsumerRecord<String, String>> iter = recs.iterator(); assertEquals(rec1, iter.next()); assertEquals(rec2, iter.next()); assertFalse(iter.hasNext()); assertEquals(2L, consumer.position(new TopicPartition("test", 0))); consumer.commitSync(); assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset()); }
@Override public void run() { try { consumer.subscribe(Collections.singletonList(topic), new NoOpConsumerRebalanceListener() { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { latch.countDown(); } }); while (!Thread.currentThread().isInterrupted()) { ConsumerRecords<String, Response> records = consumer.poll(Long.MAX_VALUE); records.forEach(record -> { logger.trace("Topic {}, partition {}, offset {}", record.topic(), record.partition(), record.offset()); responseMatcher.offerResponse(record.value()); }); } } catch (WakeupException e) { logger.warn("Response Consumer thread is shutting down"); } finally { consumer.close(); } }
@Test(expected = IllegalArgumentException.class) public void testSubscriptionOnNullPattern() { KafkaConsumer<byte[], byte[]> consumer = newConsumer(); Pattern pattern = null; try { consumer.subscribe(pattern, new NoOpConsumerRebalanceListener()); } finally { consumer.close(); } }
@Override public synchronized void subscribe(Collection<String> topics) { subscribe(topics, new NoOpConsumerRebalanceListener()); }
@Override public void subscribe(Collection<String> topics) { subscribe(topics, new NoOpConsumerRebalanceListener()); }
/** * Subscribe to the given list of topics to get dynamically assigned partitions. * <b>Topic subscriptions are not incremental. This list will replace the current * assignment (if there is one).</b> It is not possible to combine topic subscription with group management * with manual partition assignment through {@link #assign(Collection)}. * * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}. * * <p> * This is a short-hand for {@link #subscribe(Collection, ConsumerRebalanceListener)}, which * uses a noop listener. If you need the ability to seek to particular offsets, you should prefer * {@link #subscribe(Collection, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets * to be reset. You should also provide your own listener if you are doing your own offset * management since the listener gives you an opportunity to commit offsets before a rebalance finishes. * * @param topics The list of topics to subscribe to * @throws IllegalArgumentException If topics is null or contains null or empty elements */ @Override public void subscribe(Collection<String> topics) { subscribe(topics, new NoOpConsumerRebalanceListener()); }
/** * Subscribe to the given list of topics to get dynamically assigned partitions. * <b>Topic subscriptions are not incremental. This list will replace the current * assignment (if there is one).</b> It is not possible to combine topic subscription with group management * with manual partition assignment through {@link #assign(Collection)}. * * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}. * * <p> * This is a short-hand for {@link #subscribe(Collection, ConsumerRebalanceListener)}, which * uses a noop listener. If you need the ability to either seek to particular offsets, you should prefer * {@link #subscribe(Collection, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets * to be reset. You should also prefer to provide your own listener if you are doing your own offset * management since the listener gives you an opportunity to commit offsets before a rebalance finishes. * * @param topics The list of topics to subscribe to */ @Override public void subscribe(Collection<String> topics) { subscribe(topics, new NoOpConsumerRebalanceListener()); }