Spiga

RabbitMQ知识点总结

2022-04-09 12:18:35

一、消息队列的核心价值

  1. 应用场景
    • 异步处理:电商订单支付成功后,异步通知库存系统扣减,避免主流程阻塞。
    • 削峰填谷:秒杀活动瞬间高并发请求先入队,后端按处理能力消费,保护系统稳定性。
    • 系统解耦:支付系统与物流系统通过消息传递数据,无需直接接口调用。
  2. 核心概念
    • 生产者(Producer):发送消息的程序(如订单系统)。
    • 消费者(Consumer):接收消息的程序(如库存系统)。
    • 队列(Queue):消息的缓冲存储区,遵循先进先出(FIFO)原则。

二、主流消息队列产品对比

| 产品 | 特点 | 适用场景 | | :------------: | :----------------------------------------------------------: | :----------------------: | | RabbitMQ | 基于AMQP协议,支持灵活的路由规则(交换机)、消息确认机制,功能全面 | 复杂业务逻辑、企业级应用 | | Kafka | 高吞吐、持久化、分布式设计,支持百万级TPS | 日志采集、实时流处理 | | RocketMQ | 阿里开源,低延迟、高可用,专为金融场景优化 | 电商交易、大规模事务消息 | | Redis List | 轻量级,基于内存操作,无高级特性(如重试、持久化) | 简单任务队列、缓存场景 |

三、消息模型深度解析

  1. 队列模型(P2P)

    • 单消费者消费消息,适用于订单支付、库存扣减等需严格顺序的场景。
  2. 发布-订阅模型(Pub/Sub)

    • 典型应用:新闻推送、用户行为广播(如积分系统、数据分析系统同时消费)。
  3. 交换机模式——RabbitMQ特有

四、AMQP协议

  1. AMQP核心组件
    • 交换机(Exchange):接收生产者消息,按规则路由到队列(直连、广播、主题等模式)。
    • 队列(Queue):临时存储消息的容器。
    • 绑定(Binding):定义交换机与队列的映射关系。
  2. 工作流程
    1. 建立连接
    2. 选择/创建虚拟主机
    3. 创建交换机和队列
    4. 绑定交换机和队列
    5. 发布消息
    6. 路由消息
    7. 消息消费
    8. 消息确认 ACK
    9. 断开连接

五、RabbitMQ实战

1. RabbitMQ部署

docker run -d \
  --name rabbit-01 \
  --hostname my-rabbit \
  -p 5672:5672 \  # AMQP端口
  -p 15672:15672 \ # 管理界面端口
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=password \
  -v /home/rabbitmq/data:/var/lib/rabbitmq \
  rabbitmq:3-management

2. 消息从生产到消费的过程

  • 生产阶段:生产者--》Broker(消息队列服务节电)--》请求确认机制(ACK)
  • 存储阶段:Broker存储消息
  • 消费阶段:消费者--》从Broker拉去消息---》业务完成再发送ACK确认接收消息

3. 工作队列模式(默认交换机)

// 发布者
public static async Task Run()
{
    var factory = new ConnectionFactory
    {
        HostName = "192.168.2.5",
        Port = 5672,
        UserName = "admin",
        Password = "admin"
    };

    await using var connection = await factory.CreateConnectionAsync();
    await using var channel = await connection.CreateChannelAsync();

    await channel.QueueDeclareAsync("sms.queue", false, false, false);

    for (var i = 0; i < 100; i++)
    {
        var message = $"phone:13900000{i},message:您的车票已预定成功u";
        var body = Encoding.UTF8.GetBytes(message);
        await channel.BasicPublishAsync("", "sms.queue", body); //第一个参数默认交换机
        Console.WriteLine($"消息已发送:{message} ");
    }
}

// 消费者
public static async Task Run()
{
    var factory = new ConnectionFactory
    {
        HostName = "192.168.2.5",
        Port = 5672,
        UserName = "admin",
        Password = "admin"
    };
    await using var connection = await factory.CreateConnectionAsync();
    await using var channel = await connection.CreateChannelAsync();

    await channel.QueueDeclareAsync("sms.queue", false, false, false);

    var consumer = new AsyncEventingBasicConsumer(channel);
    var count = 0;
    consumer.ReceivedAsync += (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        // 模拟发短信的操作延迟
        Thread.Sleep(800);
        Console.WriteLine($"短信已发送{++count}条:{message}");
        return Task.CompletedTask;
    };
    await channel.BasicConsumeAsync(queue: "sms.queue", true, consumer);
    Console.ReadLine();
}

存在问题:消息可能丢失,如何保证消息不丢失,后面内容介绍。

自动确认一般用在消息允许丢失的场景:日志、传感器记录。

4. 发布订阅模式(扇形交换机Fanout)

// 发布者
public static async Task Run()
{
    var factory = new ConnectionFactory
    {
        HostName = "192.168.2.5",
        Port = 5672,
        UserName = "admin",
        Password = "admin"
    };

    await using var connection = await factory.CreateConnectionAsync();
    await using var channel = await connection.CreateChannelAsync();

    // 声明交换机
    const string exchangeName = "sample.msg.fanout";
    await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Fanout);

    for (var i = 0; i < 100; i++)
    {
        var message = $"publisher:张三,message:hello {i}";
        var body = Encoding.UTF8.GetBytes(message);
        Console.WriteLine($"[{message}] 已发送");
        await channel.BasicPublishAsync(exchangeName, "", body);
    }
}

// 消费者
public static async Task Run()
{
    var factory = new ConnectionFactory
    {
        HostName = "192.168.2.5",
        Port = 5672,
        UserName = "admin",
        Password = "admin"
    };

    await using var connection = await factory.CreateConnectionAsync();
    await using var channel = await connection.CreateChannelAsync();

    // 声明交换机
    const string exchangeName = "sample.msg.fanout";
    await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Fanout);

    Console.Write("请输入队列名称:");
    var queueName = Console.ReadLine()!;
    await channel.QueueDeclareAsync(queueName, false, false, false);

    // 绑定队列
    await channel.QueueBindAsync(queueName, exchangeName, "");

    var consumer = new AsyncEventingBasicConsumer(channel);
    consumer.ReceivedAsync += async (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine($"[{message}]已接收");
        await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
    };
    await channel.BasicConsumeAsync(queueName, false, consumer);
    Console.ReadLine();
}

5. 路由模式(直连交换机Direct)

// 发布者
public static async Task Run()
{
    var factory = new ConnectionFactory
    {
        HostName = "192.168.2.5",
        Port = 5672,
        UserName = "admin",
        Password = "admin"
    };

    await using var connection = await factory.CreateConnectionAsync();
    await using var channel = await connection.CreateChannelAsync();

    // 声明交换机
    const string exchangeName = "sample.order.direct";
    await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct);

    // 创建供应商队列并绑定到交换机
    await CreateSupplierQueueAsync(channel, "supplier1", exchangeName);
    await CreateSupplierQueueAsync(channel, "supplier2", exchangeName);

    // 模拟订单发送
    await SendOrderAsync(channel, exchangeName, "supplier1", "Order 1");
    await SendOrderAsync(channel, exchangeName, "supplier2", "Order 2");
}

private static async Task CreateSupplierQueueAsync(IChannel channel, string supplierId, string exchangeName)
{
    // 声明队列
    var queueName = "supplier.queue." + supplierId;
    await channel.QueueDeclareAsync(queueName, false, false, false, null);

    // 绑定队列到交换机
    await channel.QueueBindAsync(queueName, exchangeName, supplierId);
}

private static async Task SendOrderAsync(IChannel channel, string exchangeName, string routingKey, string order)
{
    var message = Encoding.UTF8.GetBytes(order);

    // 发送消息到交换机
    await channel.BasicPublishAsync(exchangeName, routingKey, message);

    Console.WriteLine($"发送订单 [{order}] 到供应商[{routingKey}]");
}

// 消费者
public static async Task Run()
{
    var factory = new ConnectionFactory
    {
        HostName = "192.168.2.5",
        Port = 5672,
        UserName = "admin",
        Password = "admin"
    };

    await using var connection = await factory.CreateConnectionAsync();
    await using var channel = await connection.CreateChannelAsync();

    // 声明交换机
    const string exchangeName = "sample.order.direct";
    await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct);

    // 创建供应商队列并绑定到交换机
    // supplier1,supplier2为路由键,路由器跟进路由键发送到队列
    await CreateSupplierQueue(channel, "supplier1", exchangeName);
    await CreateSupplierQueue(channel, "supplier2", exchangeName);

    // 注册消费者
    await RegisterConsumer(channel, "supplier1");
    await RegisterConsumer(channel, "supplier2");

    Console.ReadLine();
}

private static async Task CreateSupplierQueue(IChannel channel, string supplierId, string exchangeName)
{
    // 声明队列
    var queueName = "supplier.queue." + supplierId;
    await channel.QueueDeclareAsync(queueName, false, false, false, null);

    // 绑定队列到交换机
    await channel.QueueBindAsync(queueName, exchangeName, supplierId);
}

private static async Task RegisterConsumer(IChannel channel, string supplierId)
{
    var queueName = "supplier.queue." + supplierId;

    // 创建消费者
    var consumer = new AsyncEventingBasicConsumer(channel);

    // 注册消息处理函数
    consumer.ReceivedAsync += async (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine($"接受订单[{message}] 来自供应商[{supplierId}]");
        await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
    };

    await channel.BasicConsumeAsync(queueName, false,  consumer);
}

6. 主题模式(主题交换机Topic)

*:精准的匹配一个单词(用.来分割单词)

#:匹配0或多个单词

// 发布者
public static async Task Run()
{
    var factory = new ConnectionFactory
    {
        HostName = "192.168.2.5",
        Port = 5672,
        UserName = "admin",
        Password = "admin"
    };

    await using var connection = await factory.CreateConnectionAsync();
    await using var channel = await connection.CreateChannelAsync();

    const string exchangeName = "sample.recommendation.topic";
    await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Topic);

    while (true)
    {
        Console.WriteLine("请输入要发送的推荐信息:");
        var message = Console.ReadLine();

        Console.WriteLine("请输入要发布的主题,以逗号分隔:");
        var topicsInput = Console.ReadLine();
        var topics = topicsInput.Split(',');

        // 发布推荐信息到多个主题
        foreach (var topic in topics)
        {
            var body = Encoding.UTF8.GetBytes(message);
            await channel.BasicPublishAsync(exchangeName, topic.Trim(), body);

            Console.WriteLine($"推荐信息已发布到主题:[{topic.Trim()}]");
        }
    }
}

// 消费者
public static async Task Run()
{
    var factory = new ConnectionFactory
    {
        HostName = "192.168.2.5",
        Port = 5672,
        UserName = "admin",
        Password = "admin"
    };

    await using var connection = await factory.CreateConnectionAsync();
    await using var channel = await connection.CreateChannelAsync();

    const string exchangeName = "sample.recommendation.topic";
    await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Topic);

    Console.WriteLine("请输入要订阅的主题,以逗号分隔:");
    var topicsInput = Console.ReadLine();
    var topics = topicsInput.Split(',');

    var queueName = (await channel.QueueDeclareAsync()).QueueName;

    // 订阅多个主题的推荐信息
    foreach (var topic in topics)
    {
        await channel.QueueBindAsync(queueName,exchangeName, topic.Trim());
    }

    Console.WriteLine("等待推荐信息...");

    var consumer = new AsyncEventingBasicConsumer(channel);
    consumer.ReceivedAsync += (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var receivedMessage = Encoding.UTF8.GetString(body);
        var routingKey = ea.RoutingKey;

        Console.WriteLine($"收到来自主题:[{routingKey}] 的推荐信息:[{receivedMessage}]");
        return Task.CompletedTask;
    };

    await channel.BasicConsumeAsync(queueName, true, consumer);

    Console.ReadLine();
}

主题模式和路由模式的区别:路由是精准匹配,主题是模糊匹配

主题模式是最灵活的路由模式,也是运用最广的模式

六、RabbitMQ常见问题

1. 如何保证消息不丢失

  1. 事务机制:会将异步消息变成同步方法,一般不建议事务机制
// 发布者
public static async Task Run()
{
    var factory = new ConnectionFactory
    {
        HostName = "192.168.2.5",
        Port = 5672,
        UserName = "admin",
        Password = "admin"
    };

    await using var connection = await factory.CreateConnectionAsync();
    await using var channel = await connection.CreateChannelAsync();

    await channel.QueueDeclareAsync("sms.queue", false, false, false);

    for (var i = 0; i < 100; i++)
    {
        var message = $"phone:13900000{i},message:您的车票已预定成功";
        var body = Encoding.UTF8.GetBytes(message);
        // 开启事务
        await channel.TxSelectAsync();
        try
        {
            // 发送消息
            await channel.BasicPublishAsync("", "sms.queue", body);
            // 提交事务
            await channel.TxCommitAsync();

            Console.WriteLine($"消息已发送:{message} ");
        }
        catch (Exception e)
        {
            // 发送消息失败,回滚事务
            await channel.TxRollbackAsync();
            Console.WriteLine("发送消息失败");
        }
    }
}
  1. 消息确认机制
  • 生产者:启用confirm模式(消息发送到交换机确认)。
  • 队列:消息持久化(标记为durable)。
  • 消费者:手动ACK(消费成功后再移除消息)。
// 消费者
public static async Task Run()
{
    var factory = new ConnectionFactory
    {
        HostName = "192.168.2.5",
        Port = 5672,
        UserName = "admin",
        Password = "admin"
    };

    await using var connection = await factory.CreateConnectionAsync();
    await using var channel = await connection.CreateChannelAsync(new CreateChannelOptions(true, true));

    await channel.QueueDeclareAsync("sms.queue", false, false, false);
    try
    {
        for (var i = 0; i < 100; i++)
        {
            var message = $"phone:13900000{i},message:您的车票已预定成功";
            var body = Encoding.UTF8.GetBytes(message);
            // 发送消息
            await channel.BasicPublishAsync("", "sms.queue", body);
            Console.WriteLine($"消息已发送:{message} ");
        }
    }
    catch (Exception e)
    {
        // 发送消息失败,处理重发逻辑
        Console.WriteLine("发送消息失败");
    }
}

// 消费者
public static async Task Run()
{
    var factory = new ConnectionFactory
    {
        HostName = "192.168.2.5",
        Port = 5672,
        UserName = "admin",
        Password = "admin"
    };

    await using var connection = await factory.CreateConnectionAsync();
    await using var channel = await connection.CreateChannelAsync();

    await channel.QueueDeclareAsync("sms.queue", false, false, false);

    var consumer = new AsyncEventingBasicConsumer(channel);
    var count = 0;
    consumer.ReceivedAsync += async (_, ea) =>
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        // 模拟发短信的操作延迟
        Thread.Sleep(800);
        Console.WriteLine($"短信已发送{++count}条:{message}");
        await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
    };
    await channel.BasicConsumeAsync(queue: "sms.queue", false, consumer);
    Console.ReadLine();
}

2. 消息重复消费问题

原因:

  1. 消费者确认失败:消费者处理完消息后,在发送确认(ACK)给RabbitMQ之前崩溃了,RabbitMQ会认为消息没被处理,重新投递
  2. 生产者重复发送:生产者不确定消息是否成功发送,可能会重复发送相同的消息
  3. 网络问题:网络延迟或中断可能导致确认信号丢失

解决方案:

  1. 幂等性消息:多次消费不影响结果
  2. 唯一约束:消息多次执行时因为唯一约束而失败
  3. 为消息执行提供前置条件:消息多次执行时条件不一致了,无法执行,比如版本号(乐观锁)

最佳实践:

  1. 为每条消息设置唯一ID:生产者发送消息时设置MessageId
  2. 消费者实现幂等性:通过记录已处理消息ID或业务本身的幂等性设计
  3. 使用手动确认模式:关闭自动确认(autoAck: false),正确处理后再手动确认
  4. 考虑消息过期时间:对于时效性强的消息,设置TTL(Time-To-Live)

3. 如何处理消息顺序问题

原因:

  1. 多个消费者:当有多个消费者同时处理队列中的消息时,处理速度快的消费者可能先完成,导致消息实际处理顺序与发送顺序不一致。
  2. 消息重试:当某条消息处理失败需要重试时,它会被重新放回队列或进入死信队列,导致顺序错乱。
  3. 多个队列:使用交换器将消息路由到多个队列时,不同队列的消费速度不同。

解决方案:

  • 单队列单消费者(最简单方案):只使用一个队列和一个消费者,确保消息按顺序处理。

    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System.Text;
    
    var factory = new ConnectionFactory() { HostName = "localhost" };
    
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        // 声明一个队列
        channel.QueueDeclare(queue: "order_queue",
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);
        
        // 设置每次只接收一条消息,处理完再接收下一条
        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine($" [x] 收到消息: {message}");
            
            // 模拟处理工作
            Thread.Sleep(1000);
            Console.WriteLine($" [x] 处理完成: {message}");
            
            // 手动确认消息已处理
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        };
        
        // 关闭自动确认,改为手动确认
        channel.BasicConsume(queue: "order_queue",
                             autoAck: false,
                             consumer: consumer);
    
        Console.WriteLine("按任意键退出");
        Console.ReadKey();
    }
    
  • 消息分组:将需要保持顺序的消息分组,同一组的消息总是由同一个消费者处理。

    // 生产者代码
    using RabbitMQ.Client;
    using System.Text;
    
    var factory = new ConnectionFactory() { HostName = "localhost" };
    
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        // 声明队列时设置x-single-active-consumer参数
        var args = new Dictionary<string, object>
        {
            { "x-single-active-consumer", true }
        };
        
        channel.QueueDeclare(queue: "grouped_queue",
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: args);
    
        // 发送10条消息,分为3组
        for (int i = 0; i < 10; i++)
        {
            string group = (i % 3).ToString(); // 分为3组
            string message = $"消息 {i} (组 {group})";
            var body = Encoding.UTF8.GetBytes(message);
            
            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;
            properties.Headers = new Dictionary<string, object>
            {
                { "group", group }
            };
            
            channel.BasicPublish(exchange: "",
                                 routingKey: "grouped_queue",
                                 basicProperties: properties,
                                 body: body);
            Console.WriteLine($" [x] 发送: {message}");
        }
    }
    
    Console.WriteLine("按任意键退出");
    Console.ReadKey();
    
  • 使用单活跃消费者(Single Active Consumer):RabbitMQ 3.8+ 支持单活跃消费者特性

    var args = new Dictionary<string, object>
    {
        { "x-single-active-consumer", true }
    };
    
    channel.QueueDeclare(queue: "sac_queue",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: args);
    

实际应用建议:

  1. 评估需求:不是所有场景都需要严格顺序,只有业务强依赖顺序时才需要处理。
  2. 权衡性能:保证顺序通常会降低吞吐量,需要在顺序和性能间权衡。
  3. 错误处理:确保消费者正确处理消息,避免因错误导致顺序问题。
  4. 监控:实现良好的监控系统,及时发现和处理顺序问题。

七、RabbitMQ高级特性

1. 消息过期特性

  • 队列级别的TTL
  • 消息级别的TTL
  • 如果队列和消息都存在TTL,按时间短的生效
// 发布者
public static async Task Run()
{
    var factory = new ConnectionFactory
    {
        HostName = "192.168.2.5",
        Port = 5672,
        UserName = "admin",
        Password = "admin"
    };

    await using var connection = await factory.CreateConnectionAsync();
    await using var channel = await connection.CreateChannelAsync();
	// 队列级别的TTL
    await channel.QueueDeclareAsync("order.queue", false, false, false, 
        arguments: new Dictionary<string, object?>
        {
            { "x-message-ttl", 60000 }
        });

    var message = "用户已下单";
    var body = Encoding.UTF8.GetBytes(message);
	// 消息级别的TTL
    var props = new BasicProperties
    {
        // 设置消息的TTL为30秒
        Expiration = "30000"
    };

    await channel.BasicPublishAsync("", "order.queue", true, props, body);
    Console.WriteLine($"消息已发送:{message} ");
}

// 消费者
public static async Task Run()
{
    var factory = new ConnectionFactory
    {
        HostName = "192.168.2.5",
        Port = 5672,
        UserName = "admin",
        Password = "admin"
    };

    await using var connection = await factory.CreateConnectionAsync();
    await using var channel = await connection.CreateChannelAsync();

    const string orderQueueName = "order.queue";
    await channel.QueueDeclareAsync(orderQueueName, false, false, false, 
        new Dictionary<string, object?>
        {
            { "x-message-ttl", 60000 }
        });

    var consumer = new AsyncEventingBasicConsumer(channel);
    consumer.ReceivedAsync += async (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine($"收到消息:{message}");

        // 处理订单支付逻辑

        await channel.BasicAckAsync(ea.DeliveryTag, false);

    };
    await channel.BasicConsumeAsync(orderQueueName, false, consumer);
    Console.ReadLine();
}

2. 延迟队列和死信队列

场景:订单因为超时没有支付而过期,删除过期消息的同时,系统还需要发出一系列的后续操作。

**延迟队列:**需要再某个事件发生之后的一段时间内,如果某个条件仍未满足,就需要系统自动触发一个通知或操作。

死信队列:(死信交换机)

  • 消息被拒绝:拒绝Nack
  1. 业务消息被正常的投递到业务队列
  2. 消费者从业务队列获取消息,处理过程发生异常,执行了拒绝消息的操作
  3. 被拒绝的消息自动投递绑定的死信交换机
  4. 死信交换机根据路由规则,投递相应的死信队列
  5. 死信消费者进行后续的补偿处理或者人工干预
  • 消息过期:业务队列(延迟队列)中的过期消息,成为了死信队列里的正常消息
  1. 创建一个普通的队列(业务队列、延迟队列),订单消息发送到这个队列,设置一个TTL(订单的有效支付时间)
  2. 创建一个与之关联的死信交换机和死信队列,专门接收从延迟队列里过期淘汰出来的消息
  3. 将延迟队列与死信交换机绑定
  4. 当延迟队列中的订单消息过期后,标记为死信,发送到指定到死信交换机,路由到死信队列
  5. 消费者绑定到死信队列,一旦收到消息(死信),已经过期的订单消息,执行处理逻辑
  6. 如果订单的有效期内,用户完成支付,消费端就可以手动确认消息,就不会再进去死信队列
// 发布者
public static async Task Run()
{
    var factory = new ConnectionFactory
    {
        HostName = "192.168.2.5",
        Port = 5672,
        UserName = "admin",
        Password = "admin"
    };

    await using var connection = await factory.CreateConnectionAsync();
    await using var channel = await connection.CreateChannelAsync();

    // 声明订单队列作为延迟队列
    const string orderQueueName = "sample.order.queue";
    await channel.QueueDeclareAsync(orderQueueName, false, false, false,
        new Dictionary<string, object?>
        {
            // 设置默认 TTL 60秒
            { "x-message-ttl", 60000 },
            // 死信交换机
            { "x-dead-letter-exchange", "sample.dl.exchange" },
            // 死信路由键
            { "x-dead-letter-routing-key", "sample.dl.order" }
        });

    var props = new BasicProperties()
    {
        // 设置消息的TTL为30秒
        Expiration = "30000"
    };

    // 发送100条订单消息
    for (int i = 0; i < 100; i++)
    {
        var message = $"用户订单:{i},时间:{DateTime.Now:T}";
        var body = Encoding.UTF8.GetBytes(message);
        await channel.BasicPublishAsync("", orderQueueName,true, props , body);
        Console.WriteLine($"消息已发送:{message} ");
    }
}

// 消费者
public static async Task Run()
{
    var factory = new ConnectionFactory
    {
        HostName = "192.168.2.5",
        Port = 5672,
        UserName = "admin",
        Password = "admin"
    };

    await using var connection = await factory.CreateConnectionAsync();
    await using var channel = await connection.CreateChannelAsync();

    // 声明死信交换机
    const string deadLetterExchangeName = "sample.dl.exchange";
    await channel.ExchangeDeclareAsync(deadLetterExchangeName, ExchangeType.Direct);

    // 声明死信队列
    const string deadLetterQueueName = "sample.dl.queue";
    await channel.QueueDeclareAsync(deadLetterQueueName, false, false, false);

    // 绑定死信队列到死信交换机
    await channel.QueueBindAsync(deadLetterQueueName, deadLetterExchangeName, "sample.dl.order");

    var consumer = new AsyncEventingBasicConsumer(channel);
    consumer.ReceivedAsync += async (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine($"收到过期消息:{message},当前时间:{DateTime.Now:T}");

        // 处理过期的订单逻辑
        await channel.BasicAckAsync(ea.DeliveryTag, false);

    };
    await channel.BasicConsumeAsync(deadLetterQueueName, false, consumer);
    Console.ReadLine();
}

3. 消息持久化机制

  • 消息的持久化:消息本身存储在磁盘
  • 队列的持久化:队列本身(元数据)存储
  • 交换机的持久化
// 发布者
public static async Task Run()
{
    var factory = new ConnectionFactory
    {
        HostName = "127.0.0.1",
        Port = 5572,
        UserName = "admin",
        Password = "admin",
        VirtualHost = "my_vhost"
    };

    await using var connection = await factory.CreateConnectionAsync();
    await using var channel = await connection.CreateChannelAsync();

    // 声明持久化交换机
    await channel.ExchangeDeclareAsync("sample.exchange2", ExchangeType.Direct, true);

    // 声明持久化队列
    await channel.QueueDeclareAsync("sample.queue2", true, false, false);

    await channel.QueueBindAsync("sample.queue2", "sample.exchange2", "sample.queue");
            
    var message = "测试消息";
    var body = Encoding.UTF8.GetBytes(message);
            
    var props = new BasicProperties()
    {
        // 设置消息的持久化
        Persistent = true
    };
            
    await channel.BasicPublishAsync("sample.exchange2", "sample.queue", true, props, body);
    Console.WriteLine($"消息已发送:{message} ");
}

持久化的影响

  1. 性能影响:增加消息传递的延迟
  2. 磁盘空间
  3. 消息顺序

八、RabbitMQ集群

1. 普通集群

docker-compose部署普通集群

version: '3'

services:
  rabbitmq1:
    image: rabbitmq:3-management
    ports:
      - 5572:5672
      - 15672:15672
    environment:
      - RABBITMQ_ERLANG_COOKIE=examplecookie
      - RABBITMQ_NODENAME=rabbit@rabbitmq1
      - RABBITMQ_DEFAULT_VHOST=my_vhost
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin

  rabbitmq2:
    image: rabbitmq:3-management
    ports:
      - 5573:5672
      - 15673:15672
    environment:
      - RABBITMQ_ERLANG_COOKIE=examplecookie
      - RABBITMQ_NODENAME=rabbit@rabbitmq2
      - RABBITMQ_DEFAULT_VHOST=my_vhost
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin

  rabbitmq3:
    image: rabbitmq:3-management
    ports:
      - 5574:5672
      - 15674:15672
    environment:
      - RABBITMQ_ERLANG_COOKIE=examplecookie
      - RABBITMQ_NODENAME=rabbit@rabbitmq3
      - RABBITMQ_DEFAULT_VHOST=my_vhost
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin

特点:普通集群只同步了元数据,所以只能分担消息的压力。只实现了伸缩性(负载均衡),不能实现高可用

2. 镜像队列

实现镜像队列的方式:

  1. 命令行工具
  2. Web控制台上配置:在后台管理页面上手动配置,具体怎么配置可以自己百度。

特点

  • 节电之间不仅会同步元数据,连消息内容本身也会在镜像节点之间同步

  • 分布式复制机制:所有节电平等、没有主从

假如:RabbitMQ集群,客户端程序到底应该连接到哪个节点?

客户端:代理组件Keepalived+HAProxy