现在,Golang Kafka库(sarama)提供了使用者组功能,而kafka 10没有任何外部库帮助。如何在任何给定时间获得使用者组正在处理的当前消息偏移量?
以前,我使用kazoo-go(https://github.com/wvanbergen/kazoo- go)来获取我的消费者组消息偏移量,因为它存储在Zookeeper中。现在,我使用sarama- cluster(https://github.com/bsm/sarama-cluster),我不确定使用哪个API来抵消我的消费者组消息。
我还与Sarama和Kafka合作,以抵消一个话题。
您可以使用以下代码获取偏移量。
package main import ( "gopkg.in/Shopify/sarama" "fmt" ) func main(){ client , err := sarama.Client([]string{"localhost:9092"},nil) // I am not giving any configuration if err != nil { panic(err) } lastoffset, err := client.GetOffset("topic-test",0,sarama.OffsetNewest) if err != nil { panic(err) } fmt.Println("Last Commited Offset ",lastoffset) }
让我知道这是否是您要找的答案,是否有帮助。