您现在的位置是:网站首页> 编程资料编程资料
运用.NetCore实例讲解RabbitMQ死信队列,延时队列_实用技巧_
2023-05-24
255人已围观
简介 运用.NetCore实例讲解RabbitMQ死信队列,延时队列_实用技巧_
一、死信队列

描述:Q1队列绑定了x-dead-letter-exchange(死信交换机)为X2,x-dead-letter-routing-key(死信路由key)指向Q2(队列2)
P(生产者)发送消息经X1(交换机1)路由到Q1(队列1),Q1的消息触发特定情况,自动把消息经X2(交换机2)路由到Q2(队列2),C(消费者)直接消息Q2的消息。
特定情况有哪些呢:
- 1.消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue=false);
- 2.当前队列中的消息数量已经超过最大长度(创建队列时指定" x-max-length参数设置队列最大消息数量)。
- 3.消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的TTL(Time To Live)时间;
这里演示情况1:
假如场景:Q1中队列数据不完整,就算从新处理也会报错,那就可以不ack,把这个消息转到死信队列另外处理。
生产者:
public static void SendMessage() { //死信交换机 string dlxexChange = "dlx.exchange"; //死信队列 string dlxQueueName = "dlx.queue"; //消息交换机 string exchange = "direct-exchange"; //消息队列 string queueName = "queue_a"; using (var connection = RabbitMQHelper.GetConnection()) { using (var channel = connection.CreateModel()) { //创建死信交换机 channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); //创建死信队列 channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); //死信队列绑定死信交换机 channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); // 创建消息交换机 channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false); //创建消息队列,并指定死信队列 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary { { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机) { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列 }); //消息队列绑定消息交换机 channel.QueueBind(queueName, exchange, routingKey: queueName); string message = "hello rabbitmq message"; var properties = channel.CreateBasicProperties(); properties.Persistent = true; //发布消息 channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message)); Console.WriteLine($"向队列:{queueName}发送消息:{message}"); } } } 消费者:
public static void Consumer() { //死信交换机 string dlxexChange = "dlx.exchange"; //死信队列 string dlxQueueName = "dlx.queue"; //消息交换机 string exchange = "direct-exchange"; //消息队列 string queueName = "queue_a"; var connection = RabbitMQHelper.GetConnection(); { //创建信道 var channel = connection.CreateModel(); { //创建死信交换机 channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); //创建死信队列 channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); //死信队列绑定死信交换机 channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); // 创建消息交换机 channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false); //创建消息队列,并指定死信队列 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary { { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列 }); //消息队列绑定消息交换机 channel.QueueBind(queueName, exchange, routingKey: queueName); var consumer = new EventingBasicConsumer(channel); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true); consumer.Received += (model, ea) => { //处理业务 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"队列{queueName}消费消息:{message},不做ack确认"); //channel.BasicAck(ea.DeliveryTag, false); //不ack(BasicNack),且不把消息放回队列(requeue:false) channel.BasicNack(ea.DeliveryTag, false, requeue: false); }; channel.BasicConsume(queueName, autoAck: false, consumer); } } } 消费者加上channel.BasickNack()模拟消息处理不了,不ack确认。
执行结果:

RabbitMQ管理界面:

看到消息队列为queue_a,特性有DLX(死信交换机),DLK(死信路由)。因为消费端不nack,触发了死信,被转发到了死信队列dlx.queue。
二、延时队列
延时队列其实也是配合死信队列一起用,其实就是上面死信队列的第二中情况。给队列添加消息过时时间(TTL),变成延时队列。

简单的描述就是:P(生产者)发送消息到Q1(延时队列),Q1的消息有过期时间,比如10s,那10s后消息过期就会触发死信,从而把消息转发到Q2(死信队列)。
解决问题场景:像商城下单,未支付时取消订单场景。下单时写一条记录入Q1,延时30分钟后转到Q2,消费Q2,检查订单,支付则不做操作,没支付则取消订单,恢复库存。
生产者代码:
public static void SendMessage() { //死信交换机 string dlxexChange = "dlx.exchange"; //死信队列 string dlxQueueName = "dlx.queue"; //消息交换机 string exchange = "direct-exchange"; //消息队列 string queueName = "delay_queue"; using (var connection = RabbitMQHelper.GetConnection()) { using (var channel = connection.CreateModel()) { //创建死信交换机 channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); //创建死信队列 channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); //死信队列绑定死信交换机 channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); // 创建消息交换机 channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false); //创建消息队列,并指定死信队列,和设置这个队列的消息过期时间为10s channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary { { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机) { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列 { "x-message-ttl",10000} //设置队列的消息过期时间 }); //消息队列绑定消息交换机 channel.QueueBind(queueName, exchange, routingKey: queueName); string message = "hello rabbitmq message"; var properties = channel.CreateBasicProperties(); properties.Persistent = true; //发布消息 channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message)); Console.WriteLine($"{DateTime.Now},向队列:{queueName}发送消息:{message}"); } } } 消费者代码:
public static void Consumer() { //死信交换机 string dlxexChange = "dlx.exchange"; //死信队列 string dlxQueueName = "dlx.queue"; var connection = RabbitMQHelper.GetConnection(); { //创建信道 var channel = connection.CreateModel(); { //创建死信交换机 channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); //创建死信队列 channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); //死信队列绑定死信交换机 channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); var consumer = new EventingBasicConsumer(channel); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true); consumer.Received += (model, ea) => { //处理业务 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"{DateTime.Now},队列{dlxQueueName}消费消息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(dlxQueueName, autoAck: false, consumer); } } }执行代码:

向延时队列发送消息,监听死信队列,发送和收到消息时间刚好是设置的10s。
RabbitMQ管理界面:

三、延时消息设置不同过期时间
上面的延时队列能解决消息过期时间都是相同的场景,能不能解决消息的过期时间是不一样的呢?
例如场景:机器人客服,为了更像人为操作,收到消息后要随机3-10秒回复客户。
- 1)队列不设置TTL(消息过期时间),把过期时间设置在消息上。
生产者代码:
public static void SendMessage() { //死信交换机 string dlxexChange = "dlx.exchange"; //死信队列 string dlxQueueName = "dlx.queue"; //消息交换机 string exchange = "direct-exchange"; //消息队列 string queueName = "delay_queue"; using (var connection = RabbitMQHelper.GetConnection()) { using (var channel = connection.CreateModel()) { //创建死信交换机 channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); //创建死信队列 channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); //死信队列绑定死信交换机 channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); // 创建消息交换机 channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false); //创建消息队列,并指定死信队列,和设置这个队列的消息过期时间为10s channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary { { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机) { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列 //{ "x-message-ttl",10000} //设置队列的消息过期时间 }); //消息队列绑定消息交换机 channel.QueueBind(queueName, exchange, routingKey: queueName); string message = "hello rabbitmq message 10s后处理"; var properties = channel.CreateBasicProperties(); properties.Persistent = true; properties.Expiration = "10000";//消息的有效期10s //发布消息,延时10s channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message)); Console.WriteLine($"{DateTime.Now},向队列:{queueName}发送消息:{message},延时:10s"); string message2 = "hello rabbitmq message 5s后处理"; var properties2 = channel.CreateBasicProperties(); properties2.Persistent = true; properties2.Expiration = "5000";//消息有效期5s //发布消息,延时5s channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties2, body: Encoding.UTF8.GetBytes(message2)); Console.WriteLine($"{DateTime.No
相关内容
- 运用.net core中实例讲解RabbitMQ_实用技巧_
- .Net中异步任务的取消和监控的具体实现_实用技巧_
- 理解ASP.NET Core 中间件(Middleware)_实用技巧_
- .Net Core项目中NLog整合Exceptionless实例_实用技巧_
- .NET Core对象池的应用:扩展篇_实用技巧_
- .NET Core对象池的应用:设计篇_实用技巧_
- .NET Core对象池的应用:编程篇_实用技巧_
- NAT网络地址转换详情_ASP.NET_
- .Net Framework .Net .NET Standard的概念及区别_ASP.NET_
- C# 有关Assembly.Unload详解_实用技巧_
点击排行
本栏推荐
