/** * This is called when client sends the record to KafkaProducer, before key and value gets serialized. * The method calls {@link ProducerInterceptor#onSend(ProducerRecord)} method. ProducerRecord * returned from the first interceptor's onSend() is passed to the second interceptor onSend(), and so on in the * interceptor chain. The record returned from the last interceptor is returned from this method. * * This method does not throw exceptions. Exceptions thrown by any of interceptor methods are caught and ignored. * If an interceptor in the middle of the chain, that normally modifies the record, throws an exception, * the next interceptor in the chain will be called with a record returned by the previous interceptor that did not * throw an exception. * * @param record the record from client * @return producer record to send to topic/partition */ public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { ProducerRecord<K, V> interceptRecord = record; for (ProducerInterceptor<K, V> interceptor : this.interceptors) { try { interceptRecord = interceptor.onSend(interceptRecord); } catch (Exception e) { // do not propagate interceptor exception, log and continue calling other interceptors // be careful not to throw exception from here if (record != null) log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e); else log.warn("Error executing interceptor onSend callback", e); } } return interceptRecord; }
/** * This method is called when sending the record fails in {@link ProducerInterceptor#onSend * (ProducerRecord)} method. This method calls {@link ProducerInterceptor#onAcknowledgement(RecordMetadata, Exception)} * method for each interceptor * * @param record The record from client * @param interceptTopicPartition The topic/partition for the record if an error occurred * after partition gets assigned; the topic part of interceptTopicPartition is the same as in record. * @param exception The exception thrown during processing of this record. */ public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) { for (ProducerInterceptor<K, V> interceptor : this.interceptors) { try { if (record == null && interceptTopicPartition == null) { interceptor.onAcknowledgement(null, exception); } else { if (interceptTopicPartition == null) { interceptTopicPartition = new TopicPartition(record.topic(), record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition()); } interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1, Record.NO_TIMESTAMP, -1, -1, -1), exception); } } catch (Exception e) { // do not propagate interceptor exceptions, just log log.warn("Error executing interceptor onAcknowledgement callback", e); } } }
/** * Closes every interceptor in a container. */ @Override public void close() { for (ProducerInterceptor<K, V> interceptor : this.interceptors) { try { interceptor.close(); } catch (Exception e) { log.error("Failed to close producer interceptor ", e); } } }
@Test public void testOnSendChain() { List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>(); // we are testing two different interceptors by configuring the same interceptor differently, which is not // how it would be done in KafkaProducer, but ok for testing interceptor callbacks AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One"); AppendProducerInterceptor interceptor2 = new AppendProducerInterceptor("Two"); interceptorList.add(interceptor1); interceptorList.add(interceptor2); ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList); // verify that onSend() mutates the record as expected ProducerRecord<Integer, String> interceptedRecord = interceptors.onSend(producerRecord); assertEquals(2, onSendCount); assertEquals(producerRecord.topic(), interceptedRecord.topic()); assertEquals(producerRecord.partition(), interceptedRecord.partition()); assertEquals(producerRecord.key(), interceptedRecord.key()); assertEquals(interceptedRecord.value(), producerRecord.value().concat("One").concat("Two")); // onSend() mutates the same record the same way ProducerRecord<Integer, String> anotherRecord = interceptors.onSend(producerRecord); assertEquals(4, onSendCount); assertEquals(interceptedRecord, anotherRecord); // verify that if one of the interceptors throws an exception, other interceptors' callbacks are still called interceptor1.injectOnSendError(true); ProducerRecord<Integer, String> partInterceptRecord = interceptors.onSend(producerRecord); assertEquals(6, onSendCount); assertEquals(partInterceptRecord.value(), producerRecord.value().concat("Two")); // verify the record remains valid if all onSend throws an exception interceptor2.injectOnSendError(true); ProducerRecord<Integer, String> noInterceptRecord = interceptors.onSend(producerRecord); assertEquals(producerRecord, noInterceptRecord); interceptors.close(); }
@Test public void testOnAcknowledgementChain() { List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>(); // we are testing two different interceptors by configuring the same interceptor differently, which is not // how it would be done in KafkaProducer, but ok for testing interceptor callbacks AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One"); AppendProducerInterceptor interceptor2 = new AppendProducerInterceptor("Two"); interceptorList.add(interceptor1); interceptorList.add(interceptor2); ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList); // verify onAck is called on all interceptors RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, Long.valueOf(0L), 0, 0); interceptors.onAcknowledgement(meta, null); assertEquals(2, onAckCount); // verify that onAcknowledgement exceptions do not propagate interceptor1.injectOnAcknowledgementError(true); interceptors.onAcknowledgement(meta, null); assertEquals(4, onAckCount); interceptor2.injectOnAcknowledgementError(true); interceptors.onAcknowledgement(meta, null); assertEquals(6, onAckCount); interceptors.close(); }
@Test public void testOnAcknowledgementChain() { List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>(); // we are testing two different interceptors by configuring the same interceptor differently, which is not // how it would be done in KafkaProducer, but ok for testing interceptor callbacks AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One"); AppendProducerInterceptor interceptor2 = new AppendProducerInterceptor("Two"); interceptorList.add(interceptor1); interceptorList.add(interceptor2); ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList); // verify onAck is called on all interceptors RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, 0, 0, 0); interceptors.onAcknowledgement(meta, null); assertEquals(2, onAckCount); // verify that onAcknowledgement exceptions do not propagate interceptor1.injectOnAcknowledgementError(true); interceptors.onAcknowledgement(meta, null); assertEquals(4, onAckCount); interceptor2.injectOnAcknowledgementError(true); interceptors.onAcknowledgement(meta, null); assertEquals(6, onAckCount); interceptors.close(); }
public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) { this.interceptors = interceptors; }
@Test public void testOnAcknowledgementWithErrorChain() { List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>(); AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One"); interceptorList.add(interceptor1); ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList); // verify that metadata contains both topic and partition interceptors.onSendError(producerRecord, new TopicPartition(producerRecord.topic(), producerRecord.partition()), new KafkaException("Test")); assertEquals(1, onErrorAckCount); assertEquals(1, onErrorAckWithTopicPartitionSetCount); // verify that metadata contains both topic and partition (because record already contains partition) interceptors.onSendError(producerRecord, null, new KafkaException("Test")); assertEquals(2, onErrorAckCount); assertEquals(2, onErrorAckWithTopicPartitionSetCount); // if producer record does not contain partition, interceptor should get partition == -1 ProducerRecord<Integer, String> record2 = new ProducerRecord<>("test2", null, 1, "value"); interceptors.onSendError(record2, null, new KafkaException("Test")); assertEquals(3, onErrorAckCount); assertEquals(3, onErrorAckWithTopicSetCount); assertEquals(2, onErrorAckWithTopicPartitionSetCount); // if producer record does not contain partition, but topic/partition is passed to // onSendError, then interceptor should get valid partition int reassignedPartition = producerRecord.partition() + 1; interceptors.onSendError(record2, new TopicPartition(record2.topic(), reassignedPartition), new KafkaException("Test")); assertEquals(4, onErrorAckCount); assertEquals(4, onErrorAckWithTopicSetCount); assertEquals(3, onErrorAckWithTopicPartitionSetCount); // if both record and topic/partition are null, interceptor should not receive metadata interceptors.onSendError(null, null, new KafkaException("Test")); assertEquals(5, onErrorAckCount); assertEquals(4, onErrorAckWithTopicSetCount); assertEquals(3, onErrorAckWithTopicPartitionSetCount); interceptors.close(); }
/** * This method is called when the record sent to the server has been acknowledged, or when sending the record fails before * it gets sent to the server. This method calls {@link ProducerInterceptor#onAcknowledgement(RecordMetadata, Exception)} * method for each interceptor. * * This method does not throw exceptions. Exceptions thrown by any of interceptor methods are caught and ignored. * * @param metadata The metadata for the record that was sent (i.e. the partition and offset). * If an error occurred, metadata will only contain valid topic and maybe partition. * @param exception The exception thrown during processing of this record. Null if no error occurred. */ public void onAcknowledgement(RecordMetadata metadata, Exception exception) { for (ProducerInterceptor<K, V> interceptor : this.interceptors) { try { interceptor.onAcknowledgement(metadata, exception); } catch (Exception e) { // do not propagate interceptor exceptions, just log log.warn("Error executing interceptor onAcknowledgement callback", e); } } }