我有一个风暴拓扑来处理来自Kafka的消息,并根据手头的任务在Cassandra中进行HTTP调用/保存。我会尽快处理这些消息。由于来自外部源(例如HTTP)的响应,很少有消息没有得到完全处理。如果HTTP服务器在一段时间后不响应/返回错误消息以重试,我想为重试实现指数补偿机制。我想不出什么主意就能实现它们。我想知道如果还有其他可以容错的解决方案,那么其中哪一个将是更好的解决方案。由于这是用于实现指数补偿的,因此每个消息将具有不同的延迟时间。
虽然这几乎是我想要做的。我无法找到实现Kafka中提到的delayProcessingUntil的文档-使用高级使用者实现延迟队列
过去,我已经完成了数据存储中的预定工作,并使用Beanstalk进行了延迟,但是我更喜欢使用Kafka。
Kafka出口具有内置的指数补偿消息重试功能。您可以通过喷嘴配置来配置初始延迟,延迟乘数和最大延迟。如果螺栓有错误,则可以调用collector.fail(input)。之后,您只需将其喷出即可重试。
https://github.com/apache/storm/blob/v0.10.0/external/storm- kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java