Rabbitmq死信队列
死信,就是无法被消费的消息,一般来说, producer将消息投递到broker或者直接到queue里了, consumer从queue取出消息 进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信, 有死信自然就有了死信队列。 应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时, 将消息投入死信队列中.还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信队列的形成场景消息TTL过期 队列达到最大长度(队列满了,无法再添加数据到mq中) 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
TTL场景创建死信队列生产者
public class Producter { //普通交换机的名称 public static final String normal_exchange = "normal_exchange"; public static void main(String[] args) throws Exception{ //声明通道 Channel channel = Rabbitmqutil.getChannel(); //死信消息 设置ttl过期时间 单位是ms 10000ms=10s AMQP.BasicProperties properties= new AMQP.BasicProperties() .builder().expiration("10000").build(); for (int i = 1; i <11 ; i++) { String message = "info"+ i; channel.basicPublish(normal_exchange,"normal",properties,message.getBytes()); } } }书写消费者1:
public class Consumer1 { //普通交换机的名称 public static final String normal_exchange = "normal_exchange"; //死信交换机名称 public static final String dead_exchange = "dead_exchange"; //普通队列名称 public static final String normal_queue = "normal_queue"; //死信队列名称 public static final String dead_queue = "dead_queue"; public static void main(String[] args) throws Exception{ //声明通道 Channel channel = Rabbitmqutil.getChannel(); //声明死信和普通交换机类型 类型为direct channel.exchangeDeclare(normal_exchange, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(dead_exchange,BuiltinExchangeType.DIRECT); Map<String,Object> arguments = new HashMap<>(); //设置队列中消息过期时间 但是一旦设置相当于该队列中所有消息的过期时间都是统一的不够灵活 // 一般都在生产者方设置消息的过期时间 10s=100000ms // arguments.put("x-message-ttl",100000); //正常队列设置死信交换机 arguments.put("x-dead-letter-exchange",dead_exchange); //设置死信 routingkey arguments.put("x-dead-letter-routing-key","dead"); //声明普通队列 //第五个参数即是普通队列需要绑定死信交换机的map参数 channel.queueDeclare(normal_queue,false,false,false,arguments); //声明死信队列 channel.queueDeclare(dead_queue,false,false,false,null); //绑定普通的交换机与普通队列 channel.queueBind(normal_queue,normal_exchange,"normal"); //绑定死信交换机与死信队列 channel.queueBind(dead_queue,dead_exchange,"dead"); System.out.println("待机中....准备接收消息...."); DeliverCallback deliverCallback =(consumerTag,message)->{ System.out.println("消费者1接收的消息是:"+new String(message.getBody(),"UTF-8")); }; channel.basicConsume(normal_queue,true,deliverCallback,consumerTag->{}); } }首先执行消费者1: 观察rabbitmq网页版管理页面
关闭消费者1,执行生产者 可以观察到此时normal_queue中有10条消息等待消费,而dead_queuq中没有一条消息需要消费.等待大约10s(我们在书写生产者时已经定义生产的消息在10s后过期) 再次观察normal_queue和dead_queue 可以看到,因为此时消费者1后台没有运行,而导致生产者发出的消息没有消费者消费,而一旦消息的过期时间到了,消息会自动进入到死信队列中,而此时我们需要书写消费者2来进行专门消费死信队列的消息 消费者2:
public class Consumer2 { //死信队列名称 public static final String dead_queue = "dead_queue"; public static void main(String[] args) throws Exception{ //声明通道 Channel channel = Rabbitmqutil.getChannel(); System.out.println("待机中....准备接收消息...."); DeliverCallback deliverCallback =(consumerTag,message)->{ System.out.println("消费者2接收死信队列的消息是:"+new String(message.getBody(),"UTF-8")); }; channel.basicConsume(dead_queue,true,deliverCallback,consumerTag->{}); } }执行消费者2:
消费者2直接将死信队列中的消息进行消费
超出队列长度场景有些队列中设置了消息的最大长度,而这时生产者发出的消息大于队列设置的最大长度,超出的消息此时也会进入到死信队列中
书写生产者:
消费者1代码: 只需要在前面消费者1代码中添加长度限制参数即可,其他代码不用动 在rabbitmq中删除normal_queue队列,因为消费者1中添加了长度限制参数,和之前的normal_queue出现冲突,首先删除normal_queue队列再运行消费者1 然后将消费者1停止,运行生产者 在消费者1中我们设置消息长度为6个,生产者发送了10条消息,这时normal_queue队列中最多存储6条消息,超出的4条消息会直接进入到死信队列中,此时再运行消费者2进行消费消息:
消息被拒绝当生产者发送消息到队列中时,如果消息被队列所拒绝的话,该条消息也会进入到死信队列中
在消费者1中更改代码:
同样,因为这里将队列的消息长度进行了注释更改,需要在rabbitmq网页管理中删掉normal_queue,然后重新启动消费者1,在消费者中我们设置了消息"info4"会被拒绝,也就是说我们应该可以看到有9条消息是在normal_queue中,而有一条消息因为被拒绝而进入到死信队列中 启动生产者
消费者1:
此时因为info4被拒绝,直接进入了死信队列中,这时我们需要启动消费者2来消费掉死信队列中的消息info4: