RabbitMQ
架构图

核心概念
Producer(生产者)
- 作用:发送消息到 RabbitMQ 的应用程序。
关键行为:
- 指定消息的
routing_key(路由键)和exchange(交换机)。 - 可设置消息属性(如持久化、优先级、过期时间)。
- 指定消息的
- 示例场景:用户注册后,订单系统发送一条“订单创建”消息。
Consumer(消费者)
- 作用:从队列中获取消息并处理。
关键行为:
- 监听指定的队列。
- 手动确认(ACK)或拒绝(NACK)消息,确保可靠性。
- 示例场景:库存系统监听队列,收到“订单创建”消息后扣减库存。
Queue(队列)
- 作用:存储消息的缓冲区,消息会一直存在队列中,直到被消费或过期。
核心特性:
- 持久化:队列可持久化到磁盘,防止 RabbitMQ 重启后丢失。
- 独占队列:仅允许一个消费者连接(适用于临时队列)。
- TTL(Time-To-Live):消息可设置自动过期时间。
Exchange(交换机)
- 作用:接收生产者消息,并根据规则将消息路由到队列。
四种类型:
类型 规则 典型场景 Direct 精确匹配 routing_key(如order.create)点对点通信(如订单处理) Fanout 忽略 routing_key,广播到所有绑定的队列群发通知(如日志广播) Topic 通配符匹配 routing_key(如order.*匹配order.create)多消费者订阅不同事件 Headers 根据消息头(headers)匹配,而非 routing_key(性能较低,较少使用)复杂路由逻辑
Binding(绑定)
- 作用:定义交换机和队列之间的关联规则。
- 关键参数:
routing_key(用于 Direct/Topic 交换机)或headers(用于 Headers 交换机)。 - 示例:将队列
inventory绑定到交换机orders,routing_key="order.create"。
Channel(信道)
- 作用:在 TCP 连接上建立的轻量级虚拟连接,减少资源消耗。
- 为什么需要:避免为每个线程创建独立的 TCP 连接(高性能场景必需)。
Virtual Host(虚拟主机)
- 作用:逻辑隔离,类似命名空间。不同 vhost 可配置独立的队列、交换机和权限。
- 示例:
/prod用于生产环境,/test用于测试环境。

应用场景
异步任务处理
- 场景:耗时操作(如发送邮件、生成报表)异步化,避免阻塞主流程。
示例:
- 用户注册后,生产者发送“发送欢迎邮件”消息到队列。
- 消费者(邮件服务)从队列获取消息并发送邮件。
- 优势:提升系统响应速度,解耦核心业务与非关键任务。
应用解耦
- 场景:系统间通过消息通信,而非直接调用(如订单系统与库存系统)。
示例:
- 订单系统创建订单后,发送消息到
order_created队列。 - 库存系统、物流系统分别监听队列,各自处理逻辑。
- 订单系统创建订单后,发送消息到
- 优势:任一系统宕机不影响其他系统,避免级联故障。
流量削峰
- 场景:突发流量下,用队列缓冲请求(如秒杀活动)。
示例:
- 用户请求先写入队列,消费者按系统能力逐步处理。
- 设置队列最大长度,超限后拒绝请求。
- 优势:避免系统过载,平稳处理高峰流量。
事件驱动架构(EDA)
- 场景:微服务间通过事件通知状态变化(如订单支付完成)。
示例:
- 支付服务发送
payment_completed事件到交换机。 - 订单服务、积分服务通过不同队列订阅事件。
- 支付服务发送
- 优势:服务间无强依赖,易于扩展。
分布式系统协调
- 场景:跨服务数据一致性(如 Saga 事务)。
示例:
- 订单服务发起事务,发送“预留库存”消息。
- 若库存服务失败,发送“补偿”消息回滚订单。
- 优势:替代复杂的分布式事务(如 2PC)。
工作模式
Hello World
模式描述:
最简单的消息队列模式,一个生产者、一个队列、一个消费者。生产者将消息发送到队列,消费者从队列中获取消息。
适用场景:
- 单一生产者与单一消费者的简单场景
- 学习RabbitMQ的基础模式
重要参数:
queue:队列名称,必须指定durable:队列是否持久化(true/false),决定RabbitMQ重启后队列是否还存在exclusive:是否排他队列(true/false),排他队列仅对声明它的连接可见auto_delete:当最后一个消费者断开后是否自动删除队列(true/false)arguments:额外的队列参数(Map类型)
Work Queues
模式描述:
一个生产者、一个队列、多个消费者。消息会被平均分配给多个消费者处理,每个消息只能被一个消费者消费。
适用场景:
- 任务分发场景,多个worker处理任务
- 负载均衡场景
重要参数:
prefetch_count:每个消费者最大未确认消息数(QoS设置),控制消费者负载均衡acknowledgement:消息确认机制(自动确认/手动确认)no_ack:是否不需要确认(false表示需要手动确认)message_ttl:消息存活时间(毫秒),过期消息会被丢弃或转入死信队列
Publish/Subscribe
模式描述:
生产者将消息发送到交换机(exchange),交换机将消息广播到所有绑定的队列,每个队列的消费者都能收到消息。
适用场景:
- 广播通知场景
- 事件驱动架构中的事件发布
重要参数:
exchange:交换机名称exchange_type:交换机类型(fanout表示发布订阅)binding_key:绑定键(在fanout模式中通常为空)durable:交换机是否持久化auto_delete:交换机是否自动删除
Routing
模式描述:
生产者将消息发送到direct类型的交换机,并指定路由键(routing key),交换机根据路由键将消息路由到匹配的队列。
适用场景:
- 根据消息属性进行选择性接收的场景
- 需要根据特定条件过滤消息的场景
重要参数:
exchange_type:必须为directrouting_key:路由键,用于匹配队列绑定键binding_key:队列绑定到交换机时指定的键mandatory:当消息无法路由时是否返回给生产者(true/false)
Topics
模式描述:
生产者将消息发送到topic类型的交换机,并指定路由键(可包含通配符),交换机根据路由键模式匹配将消息路由到对应队列。
适用场景:
- 需要基于多个标准进行消息路由的场景
- 复杂的事件通知系统
重要参数:
exchange_type:必须为topicrouting_key:带通配符的路由键(如"stock.usd.nyse")binding_key:可包含通配符(*匹配一个单词,#匹配零或多个单词)alternate-exchange:指定备用交换机,处理无法路由的消息
死信队列
死信队列(Dead Letter Exchange, DLX)是RabbitMQ中一种重要的消息处理机制,用于处理无法被正常消费的消息。
什么是死信队列
死信队列是指当消息在队列中变成"死信"(dead letter)后,可以被重新发送到另一个交换机(Exchange),这个交换机就是DLX。绑定DLX的队列称为死信队列。
消息变成死信的情况
消息在以下情况下会成为死信:
消费者拒绝消息:
- 消费者使用
basic.reject或basic.nack并且设置requeue=false - 这意味着消息不会被重新放回原队列
- 消费者使用
消息过期(TTL过期):
- 消息设置了TTL(Time To Live)并且已经过期
- 队列设置了TTL并且消息在队列中停留时间超过了TTL
队列达到最大长度:
- 队列设置了
x-max-length参数并且队列已满 - 新消息到达时,最老的消息会被转移到死信队列
- 队列设置了
死信队列的工作原理
- 首先需要创建一个普通的交换器(作为DLX)和一个队列(作为死信队列)
在原始队列上设置以下参数:
x-dead-letter-exchange: 指定死信交换器x-dead-letter-routing-key: (可选)指定死信的路由键
- 当消息成为死信时,RabbitMQ会自动将其发布到指定的DLX
- DLX将消息路由到绑定的死信队列
死信队列的使用场景
消息处理失败的重试机制:
- 将处理失败的消息发送到死信队列
- 可以设置延迟后重新尝试处理
延迟队列的实现:
- 结合TTL实现延迟消息处理
- 消息先进入一个有TTL的队列,过期后转入死信队列被消费
异常消息处理:
- 收集所有无法处理的消息进行统一分析
- 避免消息丢失或阻塞正常队列
消息审计:
- 记录所有被拒绝或过期的消息
代码声明死信队列
args := amqp.Table{
"x-dead-letter-exchange": "my-dlx-exchange", // 死信交换机
"x-dead-letter-routing-key": "dlx-routing-key", // 可选
}
_, err := ch.QueueDeclare(
"my-queue", // 队列名
true, // 持久化
false, // 自动删除
false, // 排他
false, // 不等待
args, // 参数
)延迟队列
什么是延迟队列
延迟队列(Delayed Queue)是一种特殊的消息队列,它允许消息在指定的延迟时间之后才被消费者获取和处理。这种机制在需要定时任务、延迟通知等场景下非常有用。
延迟队列的几种实现方式
RabbitMQ本身没有直接提供延迟队列的功能,但可以通过以下几种方式实现:
- TTL + 死信队列(DLX):最常用的实现方式
- rabbitmq-delayed-message-exchange 插件:官方提供的延迟消息插件
- 定时轮询:不推荐,效率低
TTL + DLX
原理
- 为消息设置TTL(Time To Live,生存时间)
- 当消息过期后,会被转发到死信交换机(DLX)
- 死信交换机将消息路由到实际的消费队列
优缺点
- 优点:实现简单,不需要插件
- 缺点:不支持任意时间的延迟,队列中前面的消息会阻塞后面的消息
插件
原理
RabbitMQ官方提供了一个插件,可以直接实现延迟队列功能,无需使用死信队列的复杂设置。
安装插件
- 下载插件:
rabbitmq-delayed-message-exchange(注意版本匹配) - 将插件放入RabbitMQ的plugins目录
- 启用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
优缺点
- 优点:支持任意时间的延迟,使用灵活
- 缺点:需要安装插件,高负载下可能有性能问题
数据库定时轮询
这种方法不依赖RabbitMQ特性,而是通过外部系统实现:
- 将消息存入数据库并记录预期执行时间
- 定时任务轮询数据库,将到期的消息发送到RabbitMQ
- 消费者从RabbitMQ获取消息
优缺点:
- 优点:实现精确,可控性强
- 缺点:系统复杂度高,依赖外部存储
应用场景
- 订单超时关闭:下单后30分钟未支付,自动取消订单
- 延迟通知:注册后24小时发送提醒邮件
- 定时任务:每天固定时间执行报表生成
- 重试机制:任务失败后延迟一段时间再重试
- 预约系统:提前预约,到指定时间触发
Go封装rabbitmq
rabbitmq.go
package rabbitmq
import "github.com/streadway/amqp"
const (
ExchangeDirect = "direct"
ExchangeTopic = "topic"
ExchangeFanout = "fanout"
)
type Option struct {
RabbitmqUrl string
ExchangeName string
ExchangeType string
RoutingKey string
QueueName string
BindingKey string
ConsumerTag string
ConsumerWorker int
ConsumerPrefetch int
Dlx string // 死信交换机
DlxRoutingKey string
}
func exchangeDeclare(channel *amqp.Channel, option Option) error {
return channel.ExchangeDeclare(
option.ExchangeName,
option.ExchangeType,
true,
false,
false,
false,
nil,
)
}
func queueDeclare(channel *amqp.Channel, option Option) (amqp.Queue, error) {
args := amqp.Table{}
if option.Dlx != "" && option.DlxRoutingKey != "" {
args = amqp.Table{
"x-dead-letter-exchange": option.Dlx, // 死信交换机
"x-dead-letter-routing-key": option.DlxRoutingKey,
}
}
return channel.QueueDeclare(
option.QueueName,
true,
false,
false,
false,
args,
)
}
func queueBind(channel *amqp.Channel, option Option) error {
return channel.QueueBind(
option.QueueName,
option.BindingKey,
option.ExchangeName,
false,
nil,
)
}publisher.go
package rabbitmq
import (
"github.com/pkg/errors"
"github.com/streadway/amqp"
)
type Publisher struct {
conn *amqp.Connection
channel *amqp.Channel
Option Option
}
func NewPublisher(option Option) *Publisher {
conn, err := amqp.Dial(option.RabbitmqUrl)
if err != nil {
panic(err)
}
publisher := &Publisher{
conn: conn,
Option: option,
}
err = publisher.initChannel()
if err != nil {
publisher.Close()
panic(err)
}
return publisher
}
func (publisher *Publisher) Close() error {
if publisher.channel != nil {
if err := publisher.channel.Close(); err != nil {
return err
}
}
if publisher.conn != nil {
if err := publisher.conn.Close(); err != nil {
return err
}
}
return nil
}
func (publisher *Publisher) initChannel() error {
channel, err := publisher.conn.Channel()
if err != nil {
return err
}
if publisher.Option.ExchangeName != "" {
err = exchangeDeclare(channel, publisher.Option)
if err != nil {
return err
}
}
if publisher.Option.QueueName != "" {
_, err = queueDeclare(channel, publisher.Option)
if err != nil {
return err
}
}
if publisher.Option.BindingKey != "" {
err = queueBind(channel, publisher.Option)
if err != nil {
return err
}
}
publisher.channel = channel
return nil
}
func (publisher *Publisher) PublishToExchange(message string) error {
if publisher.Option.ExchangeName == "" {
return errors.New("Exchange name is empty")
}
if publisher.Option.RoutingKey == "" {
return errors.New("routing key is empty")
}
err := publisher.channel.Publish(
publisher.Option.ExchangeName,
publisher.Option.RoutingKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
if err != nil {
return err
}
return nil
}consumer.go
package rabbitmq
import (
"fmt"
"log"
"sync"
"github.com/streadway/amqp"
)
type Consumer struct {
conn *amqp.Connection
channel *amqp.Channel
Option Option
}
func NewConsumer(option Option) *Consumer {
conn, err := amqp.Dial(option.RabbitmqUrl)
if err != nil {
panic(err)
}
if option.ConsumerWorker <= 0 {
option.ConsumerWorker = 1
}
consumer := &Consumer{
conn: conn,
Option: option,
}
err = consumer.initChannel()
if err != nil {
consumer.Close()
panic(err)
}
return consumer
}
func (consumer *Consumer) Close() error {
if consumer.channel != nil {
if err := consumer.channel.Close(); err != nil {
return err
}
}
if consumer.conn != nil {
if err := consumer.conn.Close(); err != nil {
return err
}
}
return nil
}
func (consumer *Consumer) initChannel() error {
channel, err := consumer.conn.Channel()
if err != nil {
return err
}
if consumer.Option.QueueName != "" {
_, err = queueDeclare(channel, consumer.Option)
if err != nil {
return err
}
}
if consumer.Option.ExchangeName != "" {
err = exchangeDeclare(channel, consumer.Option)
if err != nil {
return err
}
}
if consumer.Option.BindingKey != "" {
err = queueBind(channel, consumer.Option)
if err != nil {
return err
}
}
prefetch := 1
if consumer.Option.ConsumerPrefetch > 0 {
prefetch = consumer.Option.ConsumerPrefetch
}
if err = channel.Qos(prefetch, 0, false); err != nil {
return err
}
consumer.channel = channel
return nil
}
func (consumer *Consumer) Consume(handler func(msg []byte) error) error {
wg := sync.WaitGroup{}
workerNum := consumer.Option.ConsumerWorker
for i := 0; i < workerNum; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
err := consumer.consumeMessage(index, handler)
if err != nil {
log.Println(err.Error())
}
}(i)
}
wg.Wait()
return nil
}
func (consumer *Consumer) consumeMessage(index int, handler func(msg []byte) error) error {
channel := consumer.channel
deliveries, err := channel.Consume(
consumer.Option.QueueName,
fmt.Sprintf("%s_%d", consumer.Option.ConsumerTag, index+1),
false,
false,
false,
false,
nil,
)
if err != nil {
return err
}
for d := range deliveries {
err = handler(d.Body)
if err != nil {
log.Println(err.Error())
}
err = d.Ack(false)
if err != nil {
return err
}
}
return nil
}