public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, KafkaProperties kafkaProperties) { Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null"); Map<String, Object> adminClientProperties = kafkaProperties.buildAdminProperties(); String kafkaConnectionString = kafkaBinderConfigurationProperties.getKafkaConnectionString(); if (ObjectUtils.isEmpty(adminClientProperties.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)) || !kafkaConnectionString.equals(kafkaBinderConfigurationProperties.getDefaultKafkaConnectionString())) { adminClientProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConnectionString); } this.configurationProperties = kafkaBinderConfigurationProperties; this.adminClient = AdminClient.create(adminClientProperties); }
@Test public void testPropertyOverrides() throws Exception { KafkaBinderConfigurationProperties binderConfigurationProperties = new KafkaBinderConfigurationProperties(); KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(binderConfigurationProperties, new KafkaProperties()); KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfigurationProperties, provisioningProvider); KafkaConsumerProperties consumerProps = new KafkaConsumerProperties(); ExtendedConsumerProperties<KafkaConsumerProperties> ecp = new ExtendedConsumerProperties<KafkaConsumerProperties>(consumerProps); Method method = KafkaMessageChannelBinder.class.getDeclaredMethod("createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class); method.setAccessible(true); // test default for anon Object factory = method.invoke(binder, true, "foo", ecp); Map<?, ?> configs = TestUtils.getPropertyValue(factory, "configs", Map.class); assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("latest"); // test default for named factory = method.invoke(binder, false, "foo", ecp); configs = TestUtils.getPropertyValue(factory, "configs", Map.class); assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest"); // binder level setting binderConfigurationProperties.setConfiguration( Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")); factory = method.invoke(binder, false, "foo", ecp); configs = TestUtils.getPropertyValue(factory, "configs", Map.class); assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("latest"); // consumer level setting consumerProps.setConfiguration(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")); factory = method.invoke(binder, false, "foo", ecp); configs = TestUtils.getPropertyValue(factory, "configs", Map.class); assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest"); }
@SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testProducerRunsInTx() { KafkaProperties kafkaProperties = new KafkaProperties(); kafkaProperties.setBootstrapServers(Collections.singletonList(embeddedKafka.getBrokersAsString())); KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(); configurationProperties.getTransaction().setTransactionIdPrefix("foo-"); KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(configurationProperties, kafkaProperties); provisioningProvider.setMetadataRetryOperations(new RetryTemplate()); final Producer mockProducer = mock(Producer.class); willReturn(Collections.singletonList(new TopicPartition("foo", 0))).given(mockProducer).partitionsFor(anyString()); KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties, provisioningProvider) { @Override protected DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(String transactionIdPrefix, ExtendedProducerProperties<KafkaProducerProperties> producerProperties) { DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = spy(super.getProducerFactory(transactionIdPrefix, producerProperties)); willReturn(mockProducer).given(producerFactory).createProducer(); return producerFactory; } }; GenericApplicationContext applicationContext = new GenericApplicationContext(); applicationContext.refresh(); binder.setApplicationContext(applicationContext); DirectChannel channel = new DirectChannel(); KafkaProducerProperties extension = new KafkaProducerProperties(); ExtendedProducerProperties<KafkaProducerProperties> properties = new ExtendedProducerProperties<>(extension); binder.bindProducer("foo", channel, properties); channel.send(new GenericMessage<>("foo".getBytes())); InOrder inOrder = inOrder(mockProducer); inOrder.verify(mockProducer).beginTransaction(); inOrder.verify(mockProducer).send(any(ProducerRecord.class), any(Callback.class)); inOrder.verify(mockProducer).commitTransaction(); inOrder.verify(mockProducer).close(); inOrder.verifyNoMoreInteractions(); }
@Override protected KafkaTestBinder getBinder() { if (binder == null) { KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties(); KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(binderConfiguration, new KafkaProperties()); try { kafkaTopicProvisioner.afterPropertiesSet(); } catch (Exception e) { throw new RuntimeException(e); } binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner); } return binder; }
private Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) { KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(kafkaBinderConfigurationProperties, new KafkaProperties()); try { provisioningProvider.afterPropertiesSet(); } catch (Exception e) { throw new RuntimeException(e); } return new KafkaTestBinder(kafkaBinderConfigurationProperties, provisioningProvider); }
@Bean Sender kafkaSender(KafkaProperties config) { Map<String, Object> properties = config.buildProducerProperties(); properties.put("key.serializer", ByteArraySerializer.class.getName()); properties.put("value.serializer", ByteArraySerializer.class.getName()); // Kafka expects the input to be a String, but KafkaProperties returns a list Object bootstrapServers = properties.get("bootstrap.servers"); if (bootstrapServers instanceof List) { properties.put("bootstrap.servers", join((List) bootstrapServers)); } return KafkaSender.newBuilder() .topic(this.topic) .overrides(properties) .build(); }
/** * Customized ProducerFactory bean. * @param properties the kafka properties. * @return the bean. */ @Bean("kafkaProducerFactory") public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) { Map<String, Object> producerProperties = properties.buildProducerProperties(); return new DefaultKafkaProducerFactory<>(producerProperties); }
@Bean KafkaProperties kafkaProperties() { return new KafkaProperties(); }
/** * TODO: eggs hurt:Kyro Serialization needs the default constructor and 'implements Serializable' * KafkaProperties and DefaultKafkaProducerFactory can not be Serialized * @param properties * @return */ @Bean("kafkaPropertiesMap") public Map<String, Object> kafkaPropertiesMap(KafkaProperties properties) { Map<String, Object> producerProperties = properties.buildProducerProperties(); return producerProperties; }
/** * 注意:目前Spring Boot自动配置只支持单个分组group-id创建consumer, * 如需要多个应该创建多个不同的DefaultKafkaConsumerFactory properties.getConsumer().setGroupId(groupId); * Customized ConsumerFactory bean. * @param properties the kafka properties. * @return the bean. */ @Bean("kafkaConsumerFactory") public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) { Map<String, Object> consumerProperties = properties.buildConsumerProperties(); return new DefaultKafkaConsumerFactory<>(consumerProperties); }