在轮询Kafka时,我已经使用该subscribe()功能订阅了多个主题。现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后seek(),并poll()从一个话题。 在轮询数据之前seek(),是否可以迭代调用每个主题名称 来 达到结果?偏移量如何精确存储在Kafka中?
subscribe()
seek()
poll()
我每个主题有一个分区,并且只有一个使用者可以读取所有主题。
Kafka如何存储每个主题的偏移量?
卡夫卡已将抵销存储从动物园管理员转移到卡夫卡经纪人。原因如下:
Zookeeper不是服务于高写入负载(例如偏移量更新)的好方法,因为Zookeeper会将每个写入路由到每个节点,因此无法分区或扩展写入。 我们一直都知道这一点,但是由于已经依赖zk,因此选择此实现作为一种“方便婚姻”。
Kafka将偏移量提交存储在主题中,当使用者提交偏移量时,kafka将提交偏移量消息发布到“ commit- log”主题,并保留将组/主题/分区映射到最新偏移量的内存结构,以便快速检索。可以在此页面中找到有关偏移管理的更多设计信息。
现在,我要设置要从每个主题读取的偏移量,而无需在一个主题的每个seek()和poll()之后重新订阅。
kafka管理工具有一个新功能可重置偏移量。
kafka-consumer-group.sh --bootstrap-server 127.0.0.1:9092 --group your-consumer-group **--reset-offsets** --to-offset 1 --all-topics --execute
还有,你可以使用更多的选择。