@Test public void testOnCommitChain() { List<ConsumerInterceptor<Integer, Integer>> interceptorList = new ArrayList<>(); // we are testing two different interceptors by configuring the same interceptor differently, which is not // how it would be done in KafkaConsumer, but ok for testing interceptor callbacks FilterConsumerInterceptor<Integer, Integer> interceptor1 = new FilterConsumerInterceptor<>(filterPartition1); FilterConsumerInterceptor<Integer, Integer> interceptor2 = new FilterConsumerInterceptor<>(filterPartition2); interceptorList.add(interceptor1); interceptorList.add(interceptor2); ConsumerInterceptors<Integer, Integer> interceptors = new ConsumerInterceptors<>(interceptorList); // verify that onCommit is called for all interceptors in the chain Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(tp, new OffsetAndMetadata(0)); interceptors.onCommit(offsets); assertEquals(2, onCommitCount); // verify that even if one of the interceptors throws an exception, all interceptors' onCommit are called interceptor1.injectOnCommitError(true); interceptors.onCommit(offsets); assertEquals(4, onCommitCount); interceptors.close(); }
/** * This is called when commit request returns successfully from the broker. * <p> * This method calls {@link ConsumerInterceptor#onCommit(Map)} method for each interceptor. * <p> * This method does not throw exceptions. Exceptions thrown by any of the interceptors in the chain are logged, but not propagated. * * @param offsets A map of offsets by partition with associated metadata */ public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { for (ConsumerInterceptor<K, V> interceptor : this.interceptors) { try { interceptor.onCommit(offsets); } catch (Exception e) { // do not propagate interceptor exception, just log log.warn("Error executing interceptor onCommit callback", e); } } }
/** * Closes every interceptor in a container. */ @Override public void close() { for (ConsumerInterceptor<K, V> interceptor : this.interceptors) { try { interceptor.close(); } catch (Exception e) { log.error("Failed to close consumer interceptor ", e); } } }
public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) { this.interceptors = interceptors; }
@Test public void testOnConsumeChain() { List<ConsumerInterceptor<Integer, Integer>> interceptorList = new ArrayList<>(); // we are testing two different interceptors by configuring the same interceptor differently, which is not // how it would be done in KafkaConsumer, but ok for testing interceptor callbacks FilterConsumerInterceptor<Integer, Integer> interceptor1 = new FilterConsumerInterceptor<>(filterPartition1); FilterConsumerInterceptor<Integer, Integer> interceptor2 = new FilterConsumerInterceptor<>(filterPartition2); interceptorList.add(interceptor1); interceptorList.add(interceptor2); ConsumerInterceptors<Integer, Integer> interceptors = new ConsumerInterceptors<>(interceptorList); // verify that onConsumer modifies ConsumerRecords Map<TopicPartition, List<ConsumerRecord<Integer, Integer>>> records = new HashMap<>(); List<ConsumerRecord<Integer, Integer>> list1 = new ArrayList<>(); list1.add(consumerRecord); List<ConsumerRecord<Integer, Integer>> list2 = new ArrayList<>(); list2.add(new ConsumerRecord<>(filterTopicPart1.topic(), filterTopicPart1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1)); List<ConsumerRecord<Integer, Integer>> list3 = new ArrayList<>(); list3.add(new ConsumerRecord<>(filterTopicPart2.topic(), filterTopicPart2.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1)); records.put(tp, list1); records.put(filterTopicPart1, list2); records.put(filterTopicPart2, list3); ConsumerRecords<Integer, Integer> consumerRecords = new ConsumerRecords<>(records); ConsumerRecords<Integer, Integer> interceptedRecords = interceptors.onConsume(consumerRecords); assertEquals(1, interceptedRecords.count()); assertTrue(interceptedRecords.partitions().contains(tp)); assertFalse(interceptedRecords.partitions().contains(filterTopicPart1)); assertFalse(interceptedRecords.partitions().contains(filterTopicPart2)); assertEquals(2, onConsumeCount); // verify that even if one of the intermediate interceptors throws an exception, all interceptors' onConsume are called interceptor1.injectOnConsumeError(true); ConsumerRecords<Integer, Integer> partInterceptedRecs = interceptors.onConsume(consumerRecords); assertEquals(2, partInterceptedRecs.count()); assertTrue(partInterceptedRecs.partitions().contains(filterTopicPart1)); // since interceptor1 threw exception assertFalse(partInterceptedRecs.partitions().contains(filterTopicPart2)); // interceptor2 should still be called assertEquals(4, onConsumeCount); // if all interceptors throw an exception, records should be unmodified interceptor2.injectOnConsumeError(true); ConsumerRecords<Integer, Integer> noneInterceptedRecs = interceptors.onConsume(consumerRecords); assertEquals(noneInterceptedRecs, consumerRecords); assertEquals(3, noneInterceptedRecs.count()); assertEquals(6, onConsumeCount); interceptors.close(); }
/** * This is called when the records are about to be returned to the user. * <p> * This method calls {@link ConsumerInterceptor#onConsume(ConsumerRecords)} for each * interceptor. Records returned from each interceptor get passed to onConsume() of the next interceptor * in the chain of interceptors. * <p> * This method does not throw exceptions. If any of the interceptors in the chain throws an exception, * it gets caught and logged, and next interceptor in the chain is called with 'records' returned by the * previous successful interceptor onConsume call. * * @param records records to be consumed by the client. * @return records that are either modified by interceptors or same as records passed to this method. */ public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) { ConsumerRecords<K, V> interceptRecords = records; for (ConsumerInterceptor<K, V> interceptor : this.interceptors) { try { interceptRecords = interceptor.onConsume(interceptRecords); } catch (Exception e) { // do not propagate interceptor exception, log and continue calling other interceptors log.warn("Error executing interceptor onConsume callback", e); } } return interceptRecords; }