RabbitMQ

架构图

rabbitmq_exchange_types

核心概念

Producer(生产者)

  • 作用:发送消息到 RabbitMQ 的应用程序。
  • 关键行为

    • 指定消息的 routing_key(路由键)和 exchange(交换机)。
    • 可设置消息属性(如持久化、优先级、过期时间)。
  • 示例场景:用户注册后,订单系统发送一条“订单创建”消息。

Consumer(消费者)

  • 作用:从队列中获取消息并处理。
  • 关键行为

    • 监听指定的队列。
    • 手动确认(ACK)或拒绝(NACK)消息,确保可靠性。
  • 示例场景:库存系统监听队列,收到“订单创建”消息后扣减库存。

Queue(队列)

  • 作用:存储消息的缓冲区,消息会一直存在队列中,直到被消费或过期。
  • 核心特性

    • 持久化:队列可持久化到磁盘,防止 RabbitMQ 重启后丢失。
    • 独占队列:仅允许一个消费者连接(适用于临时队列)。
    • TTL(Time-To-Live):消息可设置自动过期时间。

Exchange(交换机)

  • 作用:接收生产者消息,并根据规则将消息路由到队列。
  • 四种类型

    类型规则典型场景
    Direct精确匹配 routing_key(如 order.create点对点通信(如订单处理)
    Fanout忽略 routing_key,广播到所有绑定的队列群发通知(如日志广播)
    Topic通配符匹配 routing_key(如 order.* 匹配 order.create多消费者订阅不同事件
    Headers根据消息头(headers)匹配,而非 routing_key(性能较低,较少使用)复杂路由逻辑

Binding(绑定)

  • 作用:定义交换机和队列之间的关联规则。
  • 关键参数routing_key(用于 Direct/Topic 交换机)或 headers(用于 Headers 交换机)。
  • 示例:将队列 inventory 绑定到交换机 ordersrouting_key="order.create"

Channel(信道)

  • 作用:在 TCP 连接上建立的轻量级虚拟连接,减少资源消耗。
  • 为什么需要:避免为每个线程创建独立的 TCP 连接(高性能场景必需)。

Virtual Host(虚拟主机)

  • 作用:逻辑隔离,类似命名空间。不同 vhost 可配置独立的队列、交换机和权限。
  • 示例/prod 用于生产环境,/test 用于测试环境。

vhost

应用场景

异步任务处理

  • 场景:耗时操作(如发送邮件、生成报表)异步化,避免阻塞主流程。
  • 示例

    • 用户注册后,生产者发送“发送欢迎邮件”消息到队列。
    • 消费者(邮件服务)从队列获取消息并发送邮件。
  • 优势:提升系统响应速度,解耦核心业务与非关键任务。

应用解耦

  • 场景:系统间通过消息通信,而非直接调用(如订单系统与库存系统)。
  • 示例

    • 订单系统创建订单后,发送消息到 order_created 队列。
    • 库存系统、物流系统分别监听队列,各自处理逻辑。
  • 优势:任一系统宕机不影响其他系统,避免级联故障。

流量削峰

  • 场景:突发流量下,用队列缓冲请求(如秒杀活动)。
  • 示例

    • 用户请求先写入队列,消费者按系统能力逐步处理。
    • 设置队列最大长度,超限后拒绝请求。
  • 优势:避免系统过载,平稳处理高峰流量。

事件驱动架构(EDA)

  • 场景:微服务间通过事件通知状态变化(如订单支付完成)。
  • 示例

    • 支付服务发送 payment_completed 事件到交换机。
    • 订单服务、积分服务通过不同队列订阅事件。
  • 优势:服务间无强依赖,易于扩展。

分布式系统协调

  • 场景:跨服务数据一致性(如 Saga 事务)。
  • 示例

    • 订单服务发起事务,发送“预留库存”消息。
    • 若库存服务失败,发送“补偿”消息回滚订单。
  • 优势:替代复杂的分布式事务(如 2PC)。

工作模式

Hello World

模式描述
最简单的消息队列模式,一个生产者、一个队列、一个消费者。生产者将消息发送到队列,消费者从队列中获取消息。

适用场景

  • 单一生产者与单一消费者的简单场景
  • 学习RabbitMQ的基础模式

重要参数

  • queue:队列名称,必须指定
  • durable:队列是否持久化(true/false),决定RabbitMQ重启后队列是否还存在
  • exclusive:是否排他队列(true/false),排他队列仅对声明它的连接可见
  • auto_delete:当最后一个消费者断开后是否自动删除队列(true/false)
  • arguments:额外的队列参数(Map类型)

Work Queues

模式描述
一个生产者、一个队列、多个消费者。消息会被平均分配给多个消费者处理,每个消息只能被一个消费者消费。

适用场景

  • 任务分发场景,多个worker处理任务
  • 负载均衡场景

重要参数

  • prefetch_count:每个消费者最大未确认消息数(QoS设置),控制消费者负载均衡
  • acknowledgement:消息确认机制(自动确认/手动确认)
  • no_ack:是否不需要确认(false表示需要手动确认)
  • message_ttl:消息存活时间(毫秒),过期消息会被丢弃或转入死信队列

Publish/Subscribe

模式描述
生产者将消息发送到交换机(exchange),交换机将消息广播到所有绑定的队列,每个队列的消费者都能收到消息。

适用场景

  • 广播通知场景
  • 事件驱动架构中的事件发布

重要参数

  • exchange:交换机名称
  • exchange_type:交换机类型(fanout表示发布订阅)
  • binding_key:绑定键(在fanout模式中通常为空)
  • durable:交换机是否持久化
  • auto_delete:交换机是否自动删除

Routing

模式描述
生产者将消息发送到direct类型的交换机,并指定路由键(routing key),交换机根据路由键将消息路由到匹配的队列。

适用场景

  • 根据消息属性进行选择性接收的场景
  • 需要根据特定条件过滤消息的场景

重要参数

  • exchange_type:必须为direct
  • routing_key:路由键,用于匹配队列绑定键
  • binding_key:队列绑定到交换机时指定的键
  • mandatory:当消息无法路由时是否返回给生产者(true/false)

Topics

模式描述
生产者将消息发送到topic类型的交换机,并指定路由键(可包含通配符),交换机根据路由键模式匹配将消息路由到对应队列。

适用场景

  • 需要基于多个标准进行消息路由的场景
  • 复杂的事件通知系统

重要参数

  • exchange_type:必须为topic
  • routing_key:带通配符的路由键(如"stock.usd.nyse")
  • binding_key:可包含通配符(*匹配一个单词,#匹配零或多个单词)
  • alternate-exchange:指定备用交换机,处理无法路由的消息

死信队列

死信队列(Dead Letter Exchange, DLX)是RabbitMQ中一种重要的消息处理机制,用于处理无法被正常消费的消息。

什么是死信队列

死信队列是指当消息在队列中变成"死信"(dead letter)后,可以被重新发送到另一个交换机(Exchange),这个交换机就是DLX。绑定DLX的队列称为死信队列。

消息变成死信的情况

消息在以下情况下会成为死信:

  1. 消费者拒绝消息

    • 消费者使用basic.rejectbasic.nack并且设置requeue=false
    • 这意味着消息不会被重新放回原队列
  2. 消息过期(TTL过期)

    • 消息设置了TTL(Time To Live)并且已经过期
    • 队列设置了TTL并且消息在队列中停留时间超过了TTL
  3. 队列达到最大长度

    • 队列设置了x-max-length参数并且队列已满
    • 新消息到达时,最老的消息会被转移到死信队列

死信队列的工作原理

  1. 首先需要创建一个普通的交换器(作为DLX)和一个队列(作为死信队列)
  2. 在原始队列上设置以下参数:

    • x-dead-letter-exchange: 指定死信交换器
    • x-dead-letter-routing-key: (可选)指定死信的路由键
  3. 当消息成为死信时,RabbitMQ会自动将其发布到指定的DLX
  4. DLX将消息路由到绑定的死信队列

死信队列的使用场景

  1. 消息处理失败的重试机制

    • 将处理失败的消息发送到死信队列
    • 可以设置延迟后重新尝试处理
  2. 延迟队列的实现

    • 结合TTL实现延迟消息处理
    • 消息先进入一个有TTL的队列,过期后转入死信队列被消费
  3. 异常消息处理

    • 收集所有无法处理的消息进行统一分析
    • 避免消息丢失或阻塞正常队列
  4. 消息审计

    • 记录所有被拒绝或过期的消息

代码声明死信队列

args := amqp.Table{
    "x-dead-letter-exchange": "my-dlx-exchange", // 死信交换机
    "x-dead-letter-routing-key": "dlx-routing-key", // 可选
}
_, err := ch.QueueDeclare(
    "my-queue", // 队列名
    true,       // 持久化
    false,      // 自动删除
    false,      // 排他
    false,      // 不等待
    args,       // 参数
)

延迟队列

什么是延迟队列

延迟队列(Delayed Queue)是一种特殊的消息队列,它允许消息在指定的延迟时间之后才被消费者获取和处理。这种机制在需要定时任务、延迟通知等场景下非常有用。

延迟队列的几种实现方式

RabbitMQ本身没有直接提供延迟队列的功能,但可以通过以下几种方式实现:

  1. TTL + 死信队列(DLX):最常用的实现方式
  2. rabbitmq-delayed-message-exchange 插件:官方提供的延迟消息插件
  3. 定时轮询:不推荐,效率低

TTL + DLX

原理

  1. 为消息设置TTL(Time To Live,生存时间)
  2. 当消息过期后,会被转发到死信交换机(DLX)
  3. 死信交换机将消息路由到实际的消费队列

优缺点

  • 优点:实现简单,不需要插件
  • 缺点:不支持任意时间的延迟,队列中前面的消息会阻塞后面的消息

插件

原理

RabbitMQ官方提供了一个插件,可以直接实现延迟队列功能,无需使用死信队列的复杂设置。

安装插件

  1. 下载插件:rabbitmq-delayed-message-exchange(注意版本匹配)
  2. 将插件放入RabbitMQ的plugins目录
  3. 启用插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange

优缺点

  • 优点:支持任意时间的延迟,使用灵活
  • 缺点:需要安装插件,高负载下可能有性能问题

数据库定时轮询

这种方法不依赖RabbitMQ特性,而是通过外部系统实现:

  1. 将消息存入数据库并记录预期执行时间
  2. 定时任务轮询数据库,将到期的消息发送到RabbitMQ
  3. 消费者从RabbitMQ获取消息

优缺点

  • 优点:实现精确,可控性强
  • 缺点:系统复杂度高,依赖外部存储

应用场景

  1. 订单超时关闭:下单后30分钟未支付,自动取消订单
  2. 延迟通知:注册后24小时发送提醒邮件
  3. 定时任务:每天固定时间执行报表生成
  4. 重试机制:任务失败后延迟一段时间再重试
  5. 预约系统:提前预约,到指定时间触发

Go封装rabbitmq

rabbitmq.go

package rabbitmq

import "github.com/streadway/amqp"

const (
    ExchangeDirect = "direct"
    ExchangeTopic  = "topic"
    ExchangeFanout = "fanout"
)

type Option struct {
    RabbitmqUrl      string
    ExchangeName     string
    ExchangeType     string
    RoutingKey       string
    QueueName        string
    BindingKey       string
    ConsumerTag      string
    ConsumerWorker   int
    ConsumerPrefetch int
    Dlx              string // 死信交换机
    DlxRoutingKey    string
}

func exchangeDeclare(channel *amqp.Channel, option Option) error {
    return channel.ExchangeDeclare(
        option.ExchangeName,
        option.ExchangeType,
        true,
        false,
        false,
        false,
        nil,
    )
}

func queueDeclare(channel *amqp.Channel, option Option) (amqp.Queue, error) {
    args := amqp.Table{}
    if option.Dlx != "" && option.DlxRoutingKey != "" {
        args = amqp.Table{
            "x-dead-letter-exchange":    option.Dlx, // 死信交换机
            "x-dead-letter-routing-key": option.DlxRoutingKey,
        }
    }
    return channel.QueueDeclare(
        option.QueueName,
        true,
        false,
        false,
        false,
        args,
    )
}

func queueBind(channel *amqp.Channel, option Option) error {
    return channel.QueueBind(
        option.QueueName,
        option.BindingKey,
        option.ExchangeName,
        false,
        nil,
    )
}

publisher.go

package rabbitmq

import (
    "github.com/pkg/errors"
    "github.com/streadway/amqp"
)

type Publisher struct {
    conn    *amqp.Connection
    channel *amqp.Channel
    Option  Option
}

func NewPublisher(option Option) *Publisher {
    conn, err := amqp.Dial(option.RabbitmqUrl)
    if err != nil {
        panic(err)
    }

    publisher := &Publisher{
        conn:   conn,
        Option: option,
    }
    err = publisher.initChannel()
    if err != nil {
        publisher.Close()
        panic(err)
    }
    return publisher
}

func (publisher *Publisher) Close() error {
    if publisher.channel != nil {
        if err := publisher.channel.Close(); err != nil {
            return err
        }
    }
    if publisher.conn != nil {
        if err := publisher.conn.Close(); err != nil {
            return err
        }
    }
    return nil
}

func (publisher *Publisher) initChannel() error {
    channel, err := publisher.conn.Channel()
    if err != nil {
        return err
    }
    if publisher.Option.ExchangeName != "" {
        err = exchangeDeclare(channel, publisher.Option)
        if err != nil {
            return err
        }
    }
    if publisher.Option.QueueName != "" {
        _, err = queueDeclare(channel, publisher.Option)
        if err != nil {
            return err
        }
    }
    if publisher.Option.BindingKey != "" {
        err = queueBind(channel, publisher.Option)
        if err != nil {
            return err
        }
    }

    publisher.channel = channel
    return nil
}

func (publisher *Publisher) PublishToExchange(message string) error {
    if publisher.Option.ExchangeName == "" {
        return errors.New("Exchange name is empty")
    }
    if publisher.Option.RoutingKey == "" {
        return errors.New("routing key is empty")
    }

    err := publisher.channel.Publish(
        publisher.Option.ExchangeName,
        publisher.Option.RoutingKey,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(message),
        },
    )
    if err != nil {
        return err
    }
    return nil
}

consumer.go

package rabbitmq

import (
    "fmt"
    "log"
    "sync"

    "github.com/streadway/amqp"
)

type Consumer struct {
    conn    *amqp.Connection
    channel *amqp.Channel
    Option  Option
}

func NewConsumer(option Option) *Consumer {
    conn, err := amqp.Dial(option.RabbitmqUrl)
    if err != nil {
        panic(err)
    }
    if option.ConsumerWorker <= 0 {
        option.ConsumerWorker = 1
    }

    consumer := &Consumer{
        conn:   conn,
        Option: option,
    }
    err = consumer.initChannel()
    if err != nil {
        consumer.Close()
        panic(err)
    }
    return consumer
}

func (consumer *Consumer) Close() error {
    if consumer.channel != nil {
        if err := consumer.channel.Close(); err != nil {
            return err
        }
    }
    if consumer.conn != nil {
        if err := consumer.conn.Close(); err != nil {
            return err
        }
    }
    return nil
}

func (consumer *Consumer) initChannel() error {
    channel, err := consumer.conn.Channel()
    if err != nil {
        return err
    }
    if consumer.Option.QueueName != "" {
        _, err = queueDeclare(channel, consumer.Option)
        if err != nil {
            return err
        }
    }
    if consumer.Option.ExchangeName != "" {
        err = exchangeDeclare(channel, consumer.Option)
        if err != nil {
            return err
        }
    }
    if consumer.Option.BindingKey != "" {
        err = queueBind(channel, consumer.Option)
        if err != nil {
            return err
        }
    }

    prefetch := 1
    if consumer.Option.ConsumerPrefetch > 0 {
        prefetch = consumer.Option.ConsumerPrefetch
    }
    if err = channel.Qos(prefetch, 0, false); err != nil {
        return err
    }

    consumer.channel = channel
    return nil
}

func (consumer *Consumer) Consume(handler func(msg []byte) error) error {
    wg := sync.WaitGroup{}
    workerNum := consumer.Option.ConsumerWorker
    for i := 0; i < workerNum; i++ {
        wg.Add(1)
        go func(index int) {
            defer wg.Done()
            err := consumer.consumeMessage(index, handler)
            if err != nil {
                log.Println(err.Error())
            }
        }(i)
    }
    wg.Wait()
    return nil
}

func (consumer *Consumer) consumeMessage(index int, handler func(msg []byte) error) error {
    channel := consumer.channel

    deliveries, err := channel.Consume(
        consumer.Option.QueueName,
        fmt.Sprintf("%s_%d", consumer.Option.ConsumerTag, index+1),
        false,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return err
    }

    for d := range deliveries {
        err = handler(d.Body)
        if err != nil {
            log.Println(err.Error())
        }
        err = d.Ack(false)
        if err != nil {
            return err
        }
    }
    return nil
}
最后修改:2025 年 07 月 01 日
如果觉得我的文章对你有用,请随意赞赏