RabbitMQ知识点总结
2022-04-09 12:18:35一、消息队列的核心价值
- 应用场景
- 异步处理:电商订单支付成功后,异步通知库存系统扣减,避免主流程阻塞。
- 削峰填谷:秒杀活动瞬间高并发请求先入队,后端按处理能力消费,保护系统稳定性。
- 系统解耦:支付系统与物流系统通过消息传递数据,无需直接接口调用。
- 核心概念
- 生产者(Producer):发送消息的程序(如订单系统)。
- 消费者(Consumer):接收消息的程序(如库存系统)。
- 队列(Queue):消息的缓冲存储区,遵循先进先出(FIFO)原则。
二、主流消息队列产品对比
| 产品 | 特点 | 适用场景 | | :------------: | :----------------------------------------------------------: | :----------------------: | | RabbitMQ | 基于AMQP协议,支持灵活的路由规则(交换机)、消息确认机制,功能全面 | 复杂业务逻辑、企业级应用 | | Kafka | 高吞吐、持久化、分布式设计,支持百万级TPS | 日志采集、实时流处理 | | RocketMQ | 阿里开源,低延迟、高可用,专为金融场景优化 | 电商交易、大规模事务消息 | | Redis List | 轻量级,基于内存操作,无高级特性(如重试、持久化) | 简单任务队列、缓存场景 |
三、消息模型深度解析
-
队列模型(P2P)
- 单消费者消费消息,适用于订单支付、库存扣减等需严格顺序的场景。
-
发布-订阅模型(Pub/Sub)
- 典型应用:新闻推送、用户行为广播(如积分系统、数据分析系统同时消费)。
-
交换机模式——RabbitMQ特有
四、AMQP协议
- AMQP核心组件
- 交换机(Exchange):接收生产者消息,按规则路由到队列(直连、广播、主题等模式)。
- 队列(Queue):临时存储消息的容器。
- 绑定(Binding):定义交换机与队列的映射关系。
- 工作流程
- 建立连接
- 选择/创建虚拟主机
- 创建交换机和队列
- 绑定交换机和队列
- 发布消息
- 路由消息
- 消息消费
- 消息确认 ACK
- 断开连接
五、RabbitMQ实战
1. RabbitMQ部署
docker run -d \
--name rabbit-01 \
--hostname my-rabbit \
-p 5672:5672 \ # AMQP端口
-p 15672:15672 \ # 管理界面端口
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
-v /home/rabbitmq/data:/var/lib/rabbitmq \
rabbitmq:3-management
2. 消息从生产到消费的过程
- 生产阶段:生产者--》Broker(消息队列服务节电)--》请求确认机制(ACK)
- 存储阶段:Broker存储消息
- 消费阶段:消费者--》从Broker拉去消息---》业务完成再发送ACK确认接收消息
3. 工作队列模式(默认交换机)
// 发布者
public static async Task Run()
{
var factory = new ConnectionFactory
{
HostName = "192.168.2.5",
Port = 5672,
UserName = "admin",
Password = "admin"
};
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync("sms.queue", false, false, false);
for (var i = 0; i < 100; i++)
{
var message = $"phone:13900000{i},message:您的车票已预定成功u";
var body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync("", "sms.queue", body); //第一个参数默认交换机
Console.WriteLine($"消息已发送:{message} ");
}
}
// 消费者
public static async Task Run()
{
var factory = new ConnectionFactory
{
HostName = "192.168.2.5",
Port = 5672,
UserName = "admin",
Password = "admin"
};
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync("sms.queue", false, false, false);
var consumer = new AsyncEventingBasicConsumer(channel);
var count = 0;
consumer.ReceivedAsync += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
// 模拟发短信的操作延迟
Thread.Sleep(800);
Console.WriteLine($"短信已发送{++count}条:{message}");
return Task.CompletedTask;
};
await channel.BasicConsumeAsync(queue: "sms.queue", true, consumer);
Console.ReadLine();
}
存在问题:消息可能丢失,如何保证消息不丢失,后面内容介绍。
自动确认一般用在消息允许丢失的场景:日志、传感器记录。
4. 发布订阅模式(扇形交换机Fanout)
// 发布者
public static async Task Run()
{
var factory = new ConnectionFactory
{
HostName = "192.168.2.5",
Port = 5672,
UserName = "admin",
Password = "admin"
};
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
// 声明交换机
const string exchangeName = "sample.msg.fanout";
await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Fanout);
for (var i = 0; i < 100; i++)
{
var message = $"publisher:张三,message:hello {i}";
var body = Encoding.UTF8.GetBytes(message);
Console.WriteLine($"[{message}] 已发送");
await channel.BasicPublishAsync(exchangeName, "", body);
}
}
// 消费者
public static async Task Run()
{
var factory = new ConnectionFactory
{
HostName = "192.168.2.5",
Port = 5672,
UserName = "admin",
Password = "admin"
};
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
// 声明交换机
const string exchangeName = "sample.msg.fanout";
await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Fanout);
Console.Write("请输入队列名称:");
var queueName = Console.ReadLine()!;
await channel.QueueDeclareAsync(queueName, false, false, false);
// 绑定队列
await channel.QueueBindAsync(queueName, exchangeName, "");
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"[{message}]已接收");
await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
};
await channel.BasicConsumeAsync(queueName, false, consumer);
Console.ReadLine();
}
5. 路由模式(直连交换机Direct)
// 发布者
public static async Task Run()
{
var factory = new ConnectionFactory
{
HostName = "192.168.2.5",
Port = 5672,
UserName = "admin",
Password = "admin"
};
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
// 声明交换机
const string exchangeName = "sample.order.direct";
await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct);
// 创建供应商队列并绑定到交换机
await CreateSupplierQueueAsync(channel, "supplier1", exchangeName);
await CreateSupplierQueueAsync(channel, "supplier2", exchangeName);
// 模拟订单发送
await SendOrderAsync(channel, exchangeName, "supplier1", "Order 1");
await SendOrderAsync(channel, exchangeName, "supplier2", "Order 2");
}
private static async Task CreateSupplierQueueAsync(IChannel channel, string supplierId, string exchangeName)
{
// 声明队列
var queueName = "supplier.queue." + supplierId;
await channel.QueueDeclareAsync(queueName, false, false, false, null);
// 绑定队列到交换机
await channel.QueueBindAsync(queueName, exchangeName, supplierId);
}
private static async Task SendOrderAsync(IChannel channel, string exchangeName, string routingKey, string order)
{
var message = Encoding.UTF8.GetBytes(order);
// 发送消息到交换机
await channel.BasicPublishAsync(exchangeName, routingKey, message);
Console.WriteLine($"发送订单 [{order}] 到供应商[{routingKey}]");
}
// 消费者
public static async Task Run()
{
var factory = new ConnectionFactory
{
HostName = "192.168.2.5",
Port = 5672,
UserName = "admin",
Password = "admin"
};
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
// 声明交换机
const string exchangeName = "sample.order.direct";
await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct);
// 创建供应商队列并绑定到交换机
// supplier1,supplier2为路由键,路由器跟进路由键发送到队列
await CreateSupplierQueue(channel, "supplier1", exchangeName);
await CreateSupplierQueue(channel, "supplier2", exchangeName);
// 注册消费者
await RegisterConsumer(channel, "supplier1");
await RegisterConsumer(channel, "supplier2");
Console.ReadLine();
}
private static async Task CreateSupplierQueue(IChannel channel, string supplierId, string exchangeName)
{
// 声明队列
var queueName = "supplier.queue." + supplierId;
await channel.QueueDeclareAsync(queueName, false, false, false, null);
// 绑定队列到交换机
await channel.QueueBindAsync(queueName, exchangeName, supplierId);
}
private static async Task RegisterConsumer(IChannel channel, string supplierId)
{
var queueName = "supplier.queue." + supplierId;
// 创建消费者
var consumer = new AsyncEventingBasicConsumer(channel);
// 注册消息处理函数
consumer.ReceivedAsync += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"接受订单[{message}] 来自供应商[{supplierId}]");
await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
};
await channel.BasicConsumeAsync(queueName, false, consumer);
}
6. 主题模式(主题交换机Topic)
*:精准的匹配一个单词(用.来分割单词)
#:匹配0或多个单词
// 发布者
public static async Task Run()
{
var factory = new ConnectionFactory
{
HostName = "192.168.2.5",
Port = 5672,
UserName = "admin",
Password = "admin"
};
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
const string exchangeName = "sample.recommendation.topic";
await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Topic);
while (true)
{
Console.WriteLine("请输入要发送的推荐信息:");
var message = Console.ReadLine();
Console.WriteLine("请输入要发布的主题,以逗号分隔:");
var topicsInput = Console.ReadLine();
var topics = topicsInput.Split(',');
// 发布推荐信息到多个主题
foreach (var topic in topics)
{
var body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync(exchangeName, topic.Trim(), body);
Console.WriteLine($"推荐信息已发布到主题:[{topic.Trim()}]");
}
}
}
// 消费者
public static async Task Run()
{
var factory = new ConnectionFactory
{
HostName = "192.168.2.5",
Port = 5672,
UserName = "admin",
Password = "admin"
};
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
const string exchangeName = "sample.recommendation.topic";
await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Topic);
Console.WriteLine("请输入要订阅的主题,以逗号分隔:");
var topicsInput = Console.ReadLine();
var topics = topicsInput.Split(',');
var queueName = (await channel.QueueDeclareAsync()).QueueName;
// 订阅多个主题的推荐信息
foreach (var topic in topics)
{
await channel.QueueBindAsync(queueName,exchangeName, topic.Trim());
}
Console.WriteLine("等待推荐信息...");
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
{
var body = ea.Body.ToArray();
var receivedMessage = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine($"收到来自主题:[{routingKey}] 的推荐信息:[{receivedMessage}]");
return Task.CompletedTask;
};
await channel.BasicConsumeAsync(queueName, true, consumer);
Console.ReadLine();
}
主题模式和路由模式的区别:路由是精准匹配,主题是模糊匹配
主题模式是最灵活的路由模式,也是运用最广的模式
六、RabbitMQ常见问题
1. 如何保证消息不丢失
- 事务机制:会将异步消息变成同步方法,一般不建议事务机制
// 发布者
public static async Task Run()
{
var factory = new ConnectionFactory
{
HostName = "192.168.2.5",
Port = 5672,
UserName = "admin",
Password = "admin"
};
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync("sms.queue", false, false, false);
for (var i = 0; i < 100; i++)
{
var message = $"phone:13900000{i},message:您的车票已预定成功";
var body = Encoding.UTF8.GetBytes(message);
// 开启事务
await channel.TxSelectAsync();
try
{
// 发送消息
await channel.BasicPublishAsync("", "sms.queue", body);
// 提交事务
await channel.TxCommitAsync();
Console.WriteLine($"消息已发送:{message} ");
}
catch (Exception e)
{
// 发送消息失败,回滚事务
await channel.TxRollbackAsync();
Console.WriteLine("发送消息失败");
}
}
}
- 消息确认机制
- 生产者:启用
confirm
模式(消息发送到交换机确认)。 - 队列:消息持久化(标记为
durable
)。 - 消费者:手动ACK(消费成功后再移除消息)。
// 消费者
public static async Task Run()
{
var factory = new ConnectionFactory
{
HostName = "192.168.2.5",
Port = 5672,
UserName = "admin",
Password = "admin"
};
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync(new CreateChannelOptions(true, true));
await channel.QueueDeclareAsync("sms.queue", false, false, false);
try
{
for (var i = 0; i < 100; i++)
{
var message = $"phone:13900000{i},message:您的车票已预定成功";
var body = Encoding.UTF8.GetBytes(message);
// 发送消息
await channel.BasicPublishAsync("", "sms.queue", body);
Console.WriteLine($"消息已发送:{message} ");
}
}
catch (Exception e)
{
// 发送消息失败,处理重发逻辑
Console.WriteLine("发送消息失败");
}
}
// 消费者
public static async Task Run()
{
var factory = new ConnectionFactory
{
HostName = "192.168.2.5",
Port = 5672,
UserName = "admin",
Password = "admin"
};
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync("sms.queue", false, false, false);
var consumer = new AsyncEventingBasicConsumer(channel);
var count = 0;
consumer.ReceivedAsync += async (_, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
// 模拟发短信的操作延迟
Thread.Sleep(800);
Console.WriteLine($"短信已发送{++count}条:{message}");
await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
};
await channel.BasicConsumeAsync(queue: "sms.queue", false, consumer);
Console.ReadLine();
}
2. 消息重复消费问题
原因:
- 消费者确认失败:消费者处理完消息后,在发送确认(ACK)给RabbitMQ之前崩溃了,RabbitMQ会认为消息没被处理,重新投递
- 生产者重复发送:生产者不确定消息是否成功发送,可能会重复发送相同的消息
- 网络问题:网络延迟或中断可能导致确认信号丢失
解决方案:
- 幂等性消息:多次消费不影响结果
- 唯一约束:消息多次执行时因为唯一约束而失败
- 为消息执行提供前置条件:消息多次执行时条件不一致了,无法执行,比如版本号(乐观锁)
最佳实践:
- 为每条消息设置唯一ID:生产者发送消息时设置MessageId
- 消费者实现幂等性:通过记录已处理消息ID或业务本身的幂等性设计
- 使用手动确认模式:关闭自动确认(autoAck: false),正确处理后再手动确认
- 考虑消息过期时间:对于时效性强的消息,设置TTL(Time-To-Live)
3. 如何处理消息顺序问题
原因:
- 多个消费者:当有多个消费者同时处理队列中的消息时,处理速度快的消费者可能先完成,导致消息实际处理顺序与发送顺序不一致。
- 消息重试:当某条消息处理失败需要重试时,它会被重新放回队列或进入死信队列,导致顺序错乱。
- 多个队列:使用交换器将消息路由到多个队列时,不同队列的消费速度不同。
解决方案:
-
单队列单消费者(最简单方案):只使用一个队列和一个消费者,确保消息按顺序处理。
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { // 声明一个队列 channel.QueueDeclare(queue: "order_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); // 设置每次只接收一条消息,处理完再接收下一条 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($" [x] 收到消息: {message}"); // 模拟处理工作 Thread.Sleep(1000); Console.WriteLine($" [x] 处理完成: {message}"); // 手动确认消息已处理 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; // 关闭自动确认,改为手动确认 channel.BasicConsume(queue: "order_queue", autoAck: false, consumer: consumer); Console.WriteLine("按任意键退出"); Console.ReadKey(); }
-
消息分组:将需要保持顺序的消息分组,同一组的消息总是由同一个消费者处理。
// 生产者代码 using RabbitMQ.Client; using System.Text; var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { // 声明队列时设置x-single-active-consumer参数 var args = new Dictionary<string, object> { { "x-single-active-consumer", true } }; channel.QueueDeclare(queue: "grouped_queue", durable: true, exclusive: false, autoDelete: false, arguments: args); // 发送10条消息,分为3组 for (int i = 0; i < 10; i++) { string group = (i % 3).ToString(); // 分为3组 string message = $"消息 {i} (组 {group})"; var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; properties.Headers = new Dictionary<string, object> { { "group", group } }; channel.BasicPublish(exchange: "", routingKey: "grouped_queue", basicProperties: properties, body: body); Console.WriteLine($" [x] 发送: {message}"); } } Console.WriteLine("按任意键退出"); Console.ReadKey();
-
使用单活跃消费者(Single Active Consumer):RabbitMQ 3.8+ 支持单活跃消费者特性
var args = new Dictionary<string, object> { { "x-single-active-consumer", true } }; channel.QueueDeclare(queue: "sac_queue", durable: true, exclusive: false, autoDelete: false, arguments: args);
实际应用建议:
- 评估需求:不是所有场景都需要严格顺序,只有业务强依赖顺序时才需要处理。
- 权衡性能:保证顺序通常会降低吞吐量,需要在顺序和性能间权衡。
- 错误处理:确保消费者正确处理消息,避免因错误导致顺序问题。
- 监控:实现良好的监控系统,及时发现和处理顺序问题。
七、RabbitMQ高级特性
1. 消息过期特性
- 队列级别的TTL
- 消息级别的TTL
- 如果队列和消息都存在TTL,按时间短的生效
// 发布者
public static async Task Run()
{
var factory = new ConnectionFactory
{
HostName = "192.168.2.5",
Port = 5672,
UserName = "admin",
Password = "admin"
};
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
// 队列级别的TTL
await channel.QueueDeclareAsync("order.queue", false, false, false,
arguments: new Dictionary<string, object?>
{
{ "x-message-ttl", 60000 }
});
var message = "用户已下单";
var body = Encoding.UTF8.GetBytes(message);
// 消息级别的TTL
var props = new BasicProperties
{
// 设置消息的TTL为30秒
Expiration = "30000"
};
await channel.BasicPublishAsync("", "order.queue", true, props, body);
Console.WriteLine($"消息已发送:{message} ");
}
// 消费者
public static async Task Run()
{
var factory = new ConnectionFactory
{
HostName = "192.168.2.5",
Port = 5672,
UserName = "admin",
Password = "admin"
};
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
const string orderQueueName = "order.queue";
await channel.QueueDeclareAsync(orderQueueName, false, false, false,
new Dictionary<string, object?>
{
{ "x-message-ttl", 60000 }
});
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"收到消息:{message}");
// 处理订单支付逻辑
await channel.BasicAckAsync(ea.DeliveryTag, false);
};
await channel.BasicConsumeAsync(orderQueueName, false, consumer);
Console.ReadLine();
}
2. 延迟队列和死信队列
场景:订单因为超时没有支付而过期,删除过期消息的同时,系统还需要发出一系列的后续操作。
**延迟队列:**需要再某个事件发生之后的一段时间内,如果某个条件仍未满足,就需要系统自动触发一个通知或操作。
死信队列:(死信交换机)
- 消息被拒绝:拒绝Nack
- 业务消息被正常的投递到业务队列
- 消费者从业务队列获取消息,处理过程发生异常,执行了拒绝消息的操作
- 被拒绝的消息自动投递绑定的死信交换机
- 死信交换机根据路由规则,投递相应的死信队列
- 死信消费者进行后续的补偿处理或者人工干预
- 消息过期:业务队列(延迟队列)中的过期消息,成为了死信队列里的正常消息
- 创建一个普通的队列(业务队列、延迟队列),订单消息发送到这个队列,设置一个TTL(订单的有效支付时间)
- 创建一个与之关联的死信交换机和死信队列,专门接收从延迟队列里过期淘汰出来的消息
- 将延迟队列与死信交换机绑定
- 当延迟队列中的订单消息过期后,标记为死信,发送到指定到死信交换机,路由到死信队列
- 消费者绑定到死信队列,一旦收到消息(死信),已经过期的订单消息,执行处理逻辑
- 如果订单的有效期内,用户完成支付,消费端就可以手动确认消息,就不会再进去死信队列
// 发布者
public static async Task Run()
{
var factory = new ConnectionFactory
{
HostName = "192.168.2.5",
Port = 5672,
UserName = "admin",
Password = "admin"
};
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
// 声明订单队列作为延迟队列
const string orderQueueName = "sample.order.queue";
await channel.QueueDeclareAsync(orderQueueName, false, false, false,
new Dictionary<string, object?>
{
// 设置默认 TTL 60秒
{ "x-message-ttl", 60000 },
// 死信交换机
{ "x-dead-letter-exchange", "sample.dl.exchange" },
// 死信路由键
{ "x-dead-letter-routing-key", "sample.dl.order" }
});
var props = new BasicProperties()
{
// 设置消息的TTL为30秒
Expiration = "30000"
};
// 发送100条订单消息
for (int i = 0; i < 100; i++)
{
var message = $"用户订单:{i},时间:{DateTime.Now:T}";
var body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync("", orderQueueName,true, props , body);
Console.WriteLine($"消息已发送:{message} ");
}
}
// 消费者
public static async Task Run()
{
var factory = new ConnectionFactory
{
HostName = "192.168.2.5",
Port = 5672,
UserName = "admin",
Password = "admin"
};
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
// 声明死信交换机
const string deadLetterExchangeName = "sample.dl.exchange";
await channel.ExchangeDeclareAsync(deadLetterExchangeName, ExchangeType.Direct);
// 声明死信队列
const string deadLetterQueueName = "sample.dl.queue";
await channel.QueueDeclareAsync(deadLetterQueueName, false, false, false);
// 绑定死信队列到死信交换机
await channel.QueueBindAsync(deadLetterQueueName, deadLetterExchangeName, "sample.dl.order");
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"收到过期消息:{message},当前时间:{DateTime.Now:T}");
// 处理过期的订单逻辑
await channel.BasicAckAsync(ea.DeliveryTag, false);
};
await channel.BasicConsumeAsync(deadLetterQueueName, false, consumer);
Console.ReadLine();
}
3. 消息持久化机制
- 消息的持久化:消息本身存储在磁盘
- 队列的持久化:队列本身(元数据)存储
- 交换机的持久化
// 发布者
public static async Task Run()
{
var factory = new ConnectionFactory
{
HostName = "127.0.0.1",
Port = 5572,
UserName = "admin",
Password = "admin",
VirtualHost = "my_vhost"
};
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
// 声明持久化交换机
await channel.ExchangeDeclareAsync("sample.exchange2", ExchangeType.Direct, true);
// 声明持久化队列
await channel.QueueDeclareAsync("sample.queue2", true, false, false);
await channel.QueueBindAsync("sample.queue2", "sample.exchange2", "sample.queue");
var message = "测试消息";
var body = Encoding.UTF8.GetBytes(message);
var props = new BasicProperties()
{
// 设置消息的持久化
Persistent = true
};
await channel.BasicPublishAsync("sample.exchange2", "sample.queue", true, props, body);
Console.WriteLine($"消息已发送:{message} ");
}
持久化的影响
- 性能影响:增加消息传递的延迟
- 磁盘空间
- 消息顺序
八、RabbitMQ集群
1. 普通集群
docker-compose部署普通集群
version: '3'
services:
rabbitmq1:
image: rabbitmq:3-management
ports:
- 5572:5672
- 15672:15672
environment:
- RABBITMQ_ERLANG_COOKIE=examplecookie
- RABBITMQ_NODENAME=rabbit@rabbitmq1
- RABBITMQ_DEFAULT_VHOST=my_vhost
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
rabbitmq2:
image: rabbitmq:3-management
ports:
- 5573:5672
- 15673:15672
environment:
- RABBITMQ_ERLANG_COOKIE=examplecookie
- RABBITMQ_NODENAME=rabbit@rabbitmq2
- RABBITMQ_DEFAULT_VHOST=my_vhost
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
rabbitmq3:
image: rabbitmq:3-management
ports:
- 5574:5672
- 15674:15672
environment:
- RABBITMQ_ERLANG_COOKIE=examplecookie
- RABBITMQ_NODENAME=rabbit@rabbitmq3
- RABBITMQ_DEFAULT_VHOST=my_vhost
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
特点:普通集群只同步了元数据,所以只能分担消息的压力。只实现了伸缩性(负载均衡),不能实现高可用
2. 镜像队列
实现镜像队列的方式:
- 命令行工具
- Web控制台上配置:在后台管理页面上手动配置,具体怎么配置可以自己百度。
特点
-
节电之间不仅会同步元数据,连消息内容本身也会在镜像节点之间同步
-
分布式复制机制:所有节电平等、没有主从
假如:RabbitMQ集群,客户端程序到底应该连接到哪个节点?
客户端:代理组件Keepalived+HAProxy