我在Spring Boot中配置了几个Kafka使用者。这就是kafka.properties的样子(这里仅列出一个使用者的配置):
kafka.topics= bootstrap.servers= group.id= enable.auto.commit= auto.commit.interval.ms= session.timeout.ms= schema.registry.url= auto.offset.reset= kafka.enabled=
这是配置:
@Configuration @PropertySource({"classpath:kafka.properties"}) public class KafkaConsumerConfig { @Autowired private Environment env; @Bean public ConsumerFactory<String, String> pindropConsumerFactory() { Map<String, Object> dataRiverProps = new HashMap<>(); dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers")); dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id")); dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit")); dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms")); dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms")); dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset")); return new DefaultKafkaConsumerFactory<>(dataRiverProps); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(pindropConsumerFactory()); return factory; } }
这是消费者:
@Component public class KafkaConsumer { @Autowired private MessageProcessor messageProcessor; @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory") public void consumeJson(String message) { // processing message } }
我是否可以使用道具“ kafka.enabled”来控制此使用者的创建或消息检索?非常感谢!
您可以通过在使用者中使用属性 autoStartup (true / false)来做到这一点,如下所示-
@KafkaListener(id = "foo", topics = "Topic1", groupId = "group_id", containerFactory = "kafkaListenerContainerFactory",autoStartup = "${listen.auto.start:false}") public void consume(String message) { //System.out.println("Consumed message: " + message); }