@Override public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { super.subscribe(topics, listener); Map<TopicPartition, Long> offsets = topics .stream() .flatMap( topic -> IntStream .range(0, KafkaMockFactory.NUMBER_OF_PARTITIONS) .mapToObj(i -> new TopicPartition(topic, i))) .collect(Collectors.toMap(Function.identity(), topicPartition -> 0L)); rebalance(offsets.keySet()); updateBeginningOffsets(offsets); updateEndOffsets(offsets); }
@Override public void subscribe(final Collection<String> topics, final ConsumerRebalanceListener callback) { Retries.tryMe(new Runnable() { @Override public void run() { inner.subscribe(topics, callback); } }, strategy()); }
@Override public void subscribe(final Pattern pattern, final ConsumerRebalanceListener callback) { Retries.tryMe(new Runnable() { @Override public void run() { inner.subscribe(pattern, callback); } }, strategy()); }
@Override public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { super.subscribe(topics, listener); Map<TopicPartition, Long> offsets = new HashMap<>(); for (String topic : topics) { TopicPartition partition1 = new TopicPartition(topic, 0); offsets.put(partition1, 0L); TopicPartition partition2 = new TopicPartition(topic, 1); offsets.put(partition2, 0L); } rebalance(offsets.keySet()); updateBeginningOffsets(offsets); updateEndOffsets(offsets); }
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { if (listener == null) throw new IllegalArgumentException("RebalanceListener cannot be null"); setSubscriptionType(SubscriptionType.AUTO_PATTERN); this.listener = listener; this.subscribedPattern = pattern; }
@Override public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) { Set<String> newSubscription = new HashSet<>(topics); // TODO: This is a hot fix for KAFKA-3664 and should be removed after the issue is fixed. commitSync(); for (TopicPartition tp : _kafkaConsumer.assignment()) { if (!newSubscription.contains(tp.topic())) { _consumerRecordsProcessor.clear(tp); } } _consumerRebalanceListener.setUserListener(callback); _kafkaConsumer.subscribe(new ArrayList<>(topics), _consumerRebalanceListener); }
@Override public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { if (callback != null) { _consumerRebalanceListener.setUserListener(callback); } _kafkaConsumer.subscribe(pattern, _consumerRebalanceListener); }
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { if (listener == null) throw new IllegalArgumentException("RebalanceListener cannot be null"); if (!this.userAssignment.isEmpty() || this.subscribedPattern != null) throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE); this.listener = listener; changeSubscription(topics); }
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { if (listener == null) throw new IllegalArgumentException("RebalanceListener cannot be null"); if (!this.subscription.isEmpty() || !this.userAssignment.isEmpty()) throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE); this.listener = listener; this.subscribedPattern = pattern; }
@Override public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { consumer.subscribe(topics, listener); }
@Override public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { consumer.subscribe(pattern, listener); }
@Override public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) { activeConsumer().subscribe(topics, callback); }
@Override public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { activeConsumer().subscribe(pattern, callback); }
@Override public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) { }
@Override public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { }
public ConsumerRebalanceListener listener() { return listener; }
public ConsumerRebalanceListener getRebalanceListener() { return rebalanceListener; }
public void setUserListener(ConsumerRebalanceListener userListener) { _userListener = userListener; }
public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) { if (listener == null) throw new IllegalArgumentException("RebalanceListener cannot be null"); setSubscriptionType(SubscriptionType.AUTO_TOPICS); this.listener = listener; changeSubscription(topics); }
/** * Subscribe to the given list of topics to get dynamically * assigned partitions. <b>Topic subscriptions are not incremental. This list will replace the current * assignment (if there is one).</b> Note that it is not possible to combine topic subscription with group management * with manual partition assignment through {@link #assign(Collection)}. * <p> * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}. * <p> * As part of group management, the consumer will keep track of the list of consumers that belong to a particular * group and will trigger a rebalance operation if one of the following events trigger - * <ul> * <li>Number of partitions change for any of the subscribed list of topics * <li>Topic is created or deleted * <li>An existing member of the consumer group dies * <li>A new member is added to an existing consumer group via the join API * </ul> * <p> * When any of these events are triggered, the provided listener will be invoked first to indicate that * the consumer's assignment has been revoked, and then again when the new assignment has been received. * Note that this listener will immediately override any listener set in a previous call to subscribe. * It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics * subscribed in this call. See {@link ConsumerRebalanceListener} for more details. * <p> * In order to support large message, the consumer tracks all the consumed messages for each partition. When the * user no longer subscribes to a new set of topics, the consumer will discard all the tracked messages of the * partitions of that topic. * * @param topics The list of topics to subscribe to * @param callback Non-null listener instance to get notifications on partition assignment/revocation for the * subscribed topics */ @InterfaceOrigin.ApacheKafka void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
/** * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. The pattern matching will * be done periodically against topics existing at the time of check. * <p> * As part of group management, the consumer will keep track of the list of consumers that * belong to a particular group and will trigger a rebalance operation if one of the * following events trigger - * <ul> * <li>Number of partitions change for any of the subscribed list of topics * <li>Topic is created or deleted * <li>An existing member of the consumer group dies * <li>A new member is added to an existing consumer group via the join API * </ul> * <p> * In order to support large message, the consumer tracks all the consumed messages for each partition. When the * user no longer subscribes to a new set of topics, the consumer will discard all the tracked messages of the * partitions of that topic. * * @param pattern Pattern to subscribe to */ @InterfaceOrigin.ApacheKafka void subscribe(Pattern pattern, ConsumerRebalanceListener callback);