小编典典

在Storm中延迟执行队列– Kafka,Cassandra,Redis或Beanstalk?

java

我有一个风暴拓扑来处理来自Kafka的消息,并根据手头的任务在Cassandra中进行HTTP调用/保存。我会尽快处理这些消息。由于来自外部源(例如HTTP)的响应,很少有消息没有得到完全处理。如果HTTP服务器在一段时间后不响应/返回错误消息以重试,我想为重试实现指数补偿机制。我想不出什么主意就能实现它们。我想知道如果还有其他可以容错的解决方案,那么其中哪一个将是更好的解决方案。由于这是用于实现指数补偿的,因此每个消息将具有不同的延迟时间。

  • 将其发送给 Kafka 中的另一个主题,稍后再使用。 我的首选解决方案 。我知道我们可以使用Kafka偏移量,以便在后期使用消息。我怎么找不到文档/示例代码来做同样的事情。如果有人可以帮助我,这将非常有帮助。
  • 编写消息 Cassandra / Redis, 并编写调度程序以获取未处理的消息并准备使用,然后将其发送给Kafka,以便我的风暴拓扑可以使用它。(其他遗留项目中的现有解决方案(Non Storm))
  • 延迟发送给 Beanstalk (其他遗留项目中的现有解决方案(Non Storm)。我想避免使用该解决方案,而仅在无法使用时使用它)。

虽然这几乎是我想要做的。我无法找到实现Kafka中提到的delayProcessingUntil的文档-使用高级使用者实现延迟队列

过去,我已经完成了数据存储中的预定工作,并使用Beanstalk进行了延迟,但是我更喜欢使用Kafka。


阅读 222

收藏
2020-11-23

共1个答案

小编典典

Kafka出口具有内置的指数补偿消息重试功能。您可以通过喷嘴配置来配置初始延迟,延迟乘数和最大延迟。如果螺栓有错误,则可以调用collector.fail(input)。之后,您只需将其喷出即可重试。

https://github.com/apache/storm/blob/v0.10.0/external/storm-
kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java

2020-11-23