Java 类org.springframework.boot.autoconfigure.kafka.KafkaProperties 实例源码

项目:spring-cloud-stream-binder-kafka    文件:KafkaTopicProvisioner.java   
public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
                            KafkaProperties kafkaProperties) {
    Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
    Map<String, Object> adminClientProperties = kafkaProperties.buildAdminProperties();
    String kafkaConnectionString = kafkaBinderConfigurationProperties.getKafkaConnectionString();

    if (ObjectUtils.isEmpty(adminClientProperties.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG))
            || !kafkaConnectionString.equals(kafkaBinderConfigurationProperties.getDefaultKafkaConnectionString())) {
        adminClientProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConnectionString);
    }
    this.configurationProperties = kafkaBinderConfigurationProperties;
    this.adminClient = AdminClient.create(adminClientProperties);
}
项目:spring-cloud-stream-binder-kafka    文件:KafkaBinderUnitTests.java   
@Test
public void testPropertyOverrides() throws Exception {
    KafkaBinderConfigurationProperties binderConfigurationProperties = new KafkaBinderConfigurationProperties();
    KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(binderConfigurationProperties, new KafkaProperties());
    KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfigurationProperties,
            provisioningProvider);
    KafkaConsumerProperties consumerProps = new KafkaConsumerProperties();
    ExtendedConsumerProperties<KafkaConsumerProperties> ecp =
            new ExtendedConsumerProperties<KafkaConsumerProperties>(consumerProps);
    Method method = KafkaMessageChannelBinder.class.getDeclaredMethod("createKafkaConsumerFactory", boolean.class,
            String.class, ExtendedConsumerProperties.class);
    method.setAccessible(true);

    // test default for anon
    Object factory = method.invoke(binder, true, "foo", ecp);
    Map<?, ?> configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
    assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("latest");

    // test default for named
    factory = method.invoke(binder, false, "foo", ecp);
    configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
    assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");

    // binder level setting
    binderConfigurationProperties.setConfiguration(
            Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"));
    factory = method.invoke(binder, false, "foo", ecp);
    configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
    assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("latest");

    // consumer level setting
    consumerProps.setConfiguration(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
    factory = method.invoke(binder, false, "foo", ecp);
    configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
    assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
}
项目:spring-cloud-stream-binder-kafka    文件:KafkaTransactionTests.java   
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testProducerRunsInTx() {
    KafkaProperties kafkaProperties = new KafkaProperties();
    kafkaProperties.setBootstrapServers(Collections.singletonList(embeddedKafka.getBrokersAsString()));
    KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties();
    configurationProperties.getTransaction().setTransactionIdPrefix("foo-");
    KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(configurationProperties, kafkaProperties);
    provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
    final Producer mockProducer = mock(Producer.class);
    willReturn(Collections.singletonList(new TopicPartition("foo", 0))).given(mockProducer).partitionsFor(anyString());
    KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties, provisioningProvider) {

        @Override
        protected DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(String transactionIdPrefix,
                ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
            DefaultKafkaProducerFactory<byte[], byte[]> producerFactory =
                    spy(super.getProducerFactory(transactionIdPrefix, producerProperties));
            willReturn(mockProducer).given(producerFactory).createProducer();
            return producerFactory;
        }

    };
    GenericApplicationContext applicationContext = new GenericApplicationContext();
    applicationContext.refresh();
    binder.setApplicationContext(applicationContext);
    DirectChannel channel = new DirectChannel();
    KafkaProducerProperties extension = new KafkaProducerProperties();
    ExtendedProducerProperties<KafkaProducerProperties> properties = new ExtendedProducerProperties<>(extension);
    binder.bindProducer("foo", channel, properties);
    channel.send(new GenericMessage<>("foo".getBytes()));
    InOrder inOrder = inOrder(mockProducer);
    inOrder.verify(mockProducer).beginTransaction();
    inOrder.verify(mockProducer).send(any(ProducerRecord.class), any(Callback.class));
    inOrder.verify(mockProducer).commitTransaction();
    inOrder.verify(mockProducer).close();
    inOrder.verifyNoMoreInteractions();
}
项目:spring-cloud-stream-binder-kafka    文件:KafkaBinderTests.java   
@Override
protected KafkaTestBinder getBinder() {
    if (binder == null) {
        KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
        KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(binderConfiguration, new KafkaProperties());
        try {
            kafkaTopicProvisioner.afterPropertiesSet();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner);
    }
    return binder;
}
项目:spring-cloud-stream-binder-kafka    文件:KafkaBinderTests.java   
private Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
    KafkaTopicProvisioner provisioningProvider =
            new KafkaTopicProvisioner(kafkaBinderConfigurationProperties, new KafkaProperties());
    try {
        provisioningProvider.afterPropertiesSet();
    }
    catch (Exception e) {
        throw new RuntimeException(e);
    }
    return new KafkaTestBinder(kafkaBinderConfigurationProperties, provisioningProvider);
}
项目:spring-cloud-sleuth    文件:ZipkinKafkaSenderConfiguration.java   
@Bean Sender kafkaSender(KafkaProperties config) {
    Map<String, Object> properties = config.buildProducerProperties();
    properties.put("key.serializer", ByteArraySerializer.class.getName());
    properties.put("value.serializer", ByteArraySerializer.class.getName());
    // Kafka expects the input to be a String, but KafkaProperties returns a list
    Object bootstrapServers = properties.get("bootstrap.servers");
    if (bootstrapServers instanceof List) {
        properties.put("bootstrap.servers", join((List) bootstrapServers));
    }
    return KafkaSender.newBuilder()
            .topic(this.topic)
            .overrides(properties)
            .build();
}
项目:storm_spring_boot_demo    文件:KafkaConfig.java   
/**
 * Customized ProducerFactory bean.
 * @param properties the kafka properties.
 * @return the bean.
 */
@Bean("kafkaProducerFactory")
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
    Map<String, Object> producerProperties = properties.buildProducerProperties();
    return new DefaultKafkaProducerFactory<>(producerProperties);
}
项目:spring-cloud-stream-binder-kafka    文件:KafkaBinderAutoConfigurationPropertiesTest.java   
@Bean
KafkaProperties kafkaProperties() {
    return new KafkaProperties();
}
项目:storm_spring_boot_demo    文件:KafkaConfig.java   
/**
 * TODO: eggs hurt:Kyro Serialization needs the default constructor and 'implements Serializable'
 * KafkaProperties and DefaultKafkaProducerFactory can not be Serialized
 * @param properties
 * @return
 */
@Bean("kafkaPropertiesMap")
public Map<String, Object> kafkaPropertiesMap(KafkaProperties properties) {
    Map<String, Object> producerProperties = properties.buildProducerProperties();
    return producerProperties;
}
项目:storm_spring_boot_demo    文件:KafkaConfig.java   
/**
 * 注意:目前Spring Boot自动配置只支持单个分组group-id创建consumer,
 * 如需要多个应该创建多个不同的DefaultKafkaConsumerFactory properties.getConsumer().setGroupId(groupId);
 * Customized ConsumerFactory bean.
 * @param properties the kafka properties.
 * @return the bean.
 */
@Bean("kafkaConsumerFactory")
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
    Map<String, Object> consumerProperties = properties.buildConsumerProperties();
    return new DefaultKafkaConsumerFactory<>(consumerProperties);
}