我刚开始使用 RabbitMQ 和 AMQP。
大多数 RabbitMQ 文档似乎都集中在循环,即单个消费者使用单个消息,负载在每个消费者之间分散。这确实是我目睹的行为。
一个例子:生产者有一个队列,每 2 秒发送一次消息:
var amqp = require('amqp'); var connection = amqp.createConnection({ host: "localhost", port: 5672 }); var count = 1; connection.on('ready', function () { var sendMessage = function(connection, queue_name, payload) { var encoded_payload = JSON.stringify(payload); connection.publish(queue_name, encoded_payload); } setInterval( function() { var test_message = 'TEST '+count sendMessage(connection, "my_queue_name", test_message) count += 1; }, 2000) })
这是一个消费者:
var amqp = require('amqp'); var connection = amqp.createConnection({ host: "localhost", port: 5672 }); connection.on('ready', function () { connection.queue("my_queue_name", function(queue){ queue.bind('#'); queue.subscribe(function (message) { var encoded_payload = unescape(message.data) var payload = JSON.parse(encoded_payload) console.log('Recieved a message:') console.log(payload) }) }) })
如果我启动消费者两次, 我可以看到每个消费者都在循环使用交替消息。 例如,我将在一个终端中看到消息 1、3、5,在另一个终端中看到 2、4、6。
我的问题是:
我可以让每个消费者收到相同的消息吗?即,两个消费者都收到消息 1、2、3、4、5、6?这在 AMQP/RabbitMQ 中叫什么?一般是怎么配置的?
这通常是这样做的吗?我是否应该让交换将消息路由到两个单独的队列中,而只有一个消费者?
我可以让每个消费者收到相同的消息吗? 即,两个消费者都收到消息 1、2、3、4、5、6?这在 AMQP/RabbitMQ 中叫什么?一般是怎么配置的?
不,如果消费者在同一个队列中,则不会。来自 RabbitMQ 的AMQP 概念指南:
重要的是要了解,在 AMQP 0-9-1 中,消息在消费者之间是负载平衡的。
这似乎意味着 队列中的循环行为是给定的 ,并且不可配置。即,为了让多个消费者处理相同的消息 ID,需要单独的队列。
这通常是这样做的吗? 我是否应该让交换将消息路由到两个单独的队列中,而只有一个消费者?
不,不是,单个队列/多个消费者,每个消费者处理相同的消息 ID 是不可能的。让交换将消息路由到两个单独的队列中确实更好。
因为我不需要太复杂的路由,所以 扇出交换 会很好地处理这个问题。我之前并没有过多关注 Exchange,因为 node-amqp 具有“默认交换”的概念,允许您直接将消息发布到连接,但是大多数 AMQP 消息都会发布到特定的交换。
这是我的扇出交换,发送和接收:
var amqp = require('amqp'); var connection = amqp.createConnection({ host: "localhost", port: 5672 }); var count = 1; connection.on('ready', function () { connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) { var sendMessage = function(exchange, payload) { console.log('about to publish') var encoded_payload = JSON.stringify(payload); exchange.publish('', encoded_payload, {}) } // Recieve messages connection.queue("my_queue_name", function(queue){ console.log('Created queue') queue.bind(exchange, ''); queue.subscribe(function (message) { console.log('subscribed to queue') var encoded_payload = unescape(message.data) var payload = JSON.parse(encoded_payload) console.log('Recieved a message:') console.log(payload) }) }) setInterval( function() { var test_message = 'TEST '+count sendMessage(exchange, test_message) count += 1; }, 2000) }) })