Spring Apache Kafka简介


Apache Kafka是一个社区分布式流平台,具有三个关键功能:发布和订阅记录流,以容错的持久方式存储记录流,然后在出现流时对其进行处理。Apache Kafka在Java世界中有几个成功的案例。这篇文章将介绍如何从Spring Universe中受益于此功能强大的工具 。

Apache Kafka Core Concepts

Kafka在一个或多个可以跨越多个数据中心的服务器上作为集群运行;Kafka集群将记录流存储在称为主题的类别中,每个记录由一个键,一个值和一个时间戳组成。

根据文档,Kafka具有四个核心API:

  1. Producer API允许应用程序将记录流发布到一个或多个Kafka主题。
  2. 消费者API允许应用程序订阅一个或多个主题并处理为其生成的记录流。
  3. Streams API允许应用程序充当流处理器,使用一个或多个主题的输入流,并生成一个或多个输出主题的输出流,从而有效地将输入流转换为输出流。
  4. 连接器API允许构建和运行将Kafka主题连接到现有应用程序或数据系统的可重用生产者或使用者。

1621261302899.png

Using Docker 也有可能使用Docker。由于它需要两张图像,一个用于Zookeeper,一个用于Apache Kafka,因此本教程将使用docker-compose。请遵循以下指示:

version: '2.1'
services:
  zoo:
    image: zookeeper:3.4.9
   hostname: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zoo1:2888:3888

  kafka:
    image: confluentinc/cp-kafka:5.5.1
    hostname: kafka1
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zoo

然后,运行命令:

docker-compose -f docker-compose.yml up –d

要以本地主机身份进行连接,还必须在Linux中将Kafka定义为本地主机,在t后面附加以下值

他 /etc/hosts:

127.0.0.1       localhost kafka

Application With Spring

要探索Kafka,我们将使用 Spring- kafka项目。在项目中,我们将简化一个名称计数器,在该计数器的基础上,它将根据请求将事件触发到内存中的简单计数器。

用于Apache Kafka的Spring(spring- kafka)项目将Spring的核心概念应用于基于Kafka的消息传递解决方案的开发。它提供了一个“模板”作为发送消息的高级抽象。它还通过@KafkaListener批注和“侦听器容器”为消息驱动的POJO提供支持。这些库促进了依赖注入和声明式的使用。在 所有 这些情况下,您将看到与JMS支持的相似之处。

基于Spring项目Maven的第一步,我们将在其中添加Spring-Kafka,spring-boot-starter-web。

默认情况下,Spring-kafka使用String来同时进行序列化和反序列化。我们将覆盖此配置以使用JSON,其中将通过JSON发送Java对象。

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.json.add.type.headers=false
spring.kafka.consumer.properties.spring.json.trusted.packages=*

第一类是配置,如果主题不存在,则可以创建主题。Spring有一个 TopicBuilder 来定义名称, 分区和 副本。

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class TopicProducer {
    static final String NAME_INCREMENT = "name_increment";
    static final String NAME_DECREMENT = "name_decrement";
    @Bean
    public NewTopic increment() {
        return TopicBuilder.name(NAME_INCREMENT)
                .partitions(10)
                .replicas(1)
                .build();
    }

    @Bean
    public NewTopic decrement() {
        return TopicBuilder.name(NAME_DECREMENT)
                .partitions(10)
                .replicas(1)
                .build();
    }

}

KafkaTemplate 是用于在Apache Kafka中执行高级操作的模板。我们将在名称服务中使用此类来触发Kafka中的两个事件,一个事件递增,另一个事件递减。

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.stream.Collectors;

@Service
public class NameService {

    private final NameCounter counter;

    private final KafkaTemplate<String, Name> template;

    public NameService(NameCounter counter, KafkaTemplate<String, Name> template) {
        this.counter = counter;
        this.template = template;
    }

    public List<NameStatus> findAll() {
        return counter.getValues()
                .map(NameStatus::of)
                .collect(Collectors.toUnmodifiableList());
    }

    public NameStatus findByName(String name) {
        return new NameStatus(name, counter.get(name));
    }

    public void decrement(String name) {
        template.send(TopicProducer.NAME_DECREMENT, new Name(name));
    }

    public void increment(String name) {
        template.send(TopicProducer.NAME_INCREMENT, new Name(name));
    }
}

一旦我们讨论了生产者, KafkaTemplate下一步就是定义一个消费者类。消费者类将侦听Kafka事件以执行操作。在此示例中, NameConsumer 将使用 注释轻松地监听事件 KafkaListener。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.logging.Logger;

@Component
public class NameConsumer {

    private static final Logger LOGGER = Logger.getLogger(NameConsumer.class.getName());

    private final NameCounter counter;

    public NameConsumer(NameCounter counter) {
        this.counter = counter;
    }

    @KafkaListener(id = "increment", topics = TopicProducer.NAME_INCREMENT)
    public void increment(Name name) {
        LOGGER.info("Increment listener to the name" + name);
        counter.increment(name.get());
    }

    @KafkaListener(id = "decrement", topics = TopicProducer.NAME_DECREMENT)
    public void decrement(Name name) {
        LOGGER.info("Decrement listener to the name " + name);
        counter.decrement(name.get());
    }
}

总而言之,我们看到了Apache Kafka的潜力以及为什么这个项目对大数据参与者如此容易。这是一个简单示例,说明与Spring集成的安全性。


原文链接:https://codingdict.com/