延时队列常使用在某些业务场景,例如订单支付超时、接收到外卖后自动确认完成订单、定时任务、促销过期等,使用延时队列可以简化系统的设计和开发、提高系统的可靠性和可用性、提高系统的性能。下面介绍使用RabbitMQ的延时消息队列,使用之前先要让RabbitMQ支持延时队列。
在docker安装单机版rabbitMQ
docker-compose.yaml配置文件内容如下:
version: '3' services: rabbitmq: image: rabbitmq:3.12-management container_name: rabbitmq hostname: rabbitmq-service restart: always ports: - 5672:5672 - 15672:15672 volumes: - $PWD/data:/var/lib/rabbitmq - $PWD/plugins/enabled_plugins:/etc/rabbitmq/enabled_plugins - $PWD/plugins/rabbitmq_delayed_message_exchange-3.12.0.ez:/plugins/rabbitmq_delayed_message_exchange-3.12.0.ez environment: TZ: Asia/Shanghai RABBITMQ_DEFAULT_USER: guest RABBITMQ_DEFAULT_PASS: guest RABBITMQ_DEFAULT_VHOST: /
rabbitMQ默认不支持延时消息队列类型,需要另外安装插件来实现:
- enabled_plugins 是设置默认开启的插件,内容为
[rabbitmq_delayed_message_exchange,rabbitmq_management,rabbitmq_prometheus]
- rabbitmq_delayed_message_exchange-3.12.0.ez 是延时队列插件。
启动rabbitmq:
docker-compose up -d
可以在浏览器访问管理后台 http://localhost:15672 ,用户名和密码都是guest
。
点击菜单【exchange】--> 【Add a new exchange】-->【Type】,在下拉列表中看到x-delayed-message
类型的话,说明已经支持延时队列了。
使用延时队列需要指定具体某一种消息类型(direct、topic、fanout、headers),下面以direct类型的延时消息队列为例。
生产端示例代码
package main import ( "context" "fmt" "strconv" "time" amqp "github.com/rabbitmq/amqp091-go" ) var ( url = "amqp://guest:guest@127.0.0.1:5672/" exchangeName = "delayed-message-exchange-demo" ) func main() { conn, err := amqp.Dial(url) checkErr(err) defer conn.Close() ctx := context.Background() queueName := "delayed-message-queue" routingKey := "delayed-key" delayedMessageType := "direct" exchange := NewDelayedMessageExchange(exchangeName, delayedMessageType, routingKey) q, err := NewProducer(queueName, conn, exchange) checkErr(err) defer q.Close() for i := 1; i <= 5; i++ { body := time.Now().Format("2006-01-02 15:04:05.000") + " hello world " + strconv.Itoa(i) err = q.Publish(ctx, time.Second*5, []byte(body)) // 发送消息 checkErr(err) time.Sleep(time.Second) } } // Exchange 交换机 type Exchange struct { Name string // exchange名称 Type string // exchange类型,支持direct、topic、fanout、headers、x-delayed-message RoutingKey string // 路由key XDelayedMessageType string // 延时消息类型,支持direct、topic、fanout、headers } // NewDelayedMessageExchange 实例化一个delayed-message类型交换机,参数delayedMessageType 消息类型direct、topic、fanout、headers func NewDelayedMessageExchange(exchangeName string, delayedMessageType string, routingKey string) *Exchange { return &Exchange{ Name: exchangeName, Type: "x-delayed-message", RoutingKey: routingKey, XDelayedMessageType: delayedMessageType, } } // Producer 生产者对象 type Producer struct { queueName string exchange *Exchange conn *amqp.Connection ch *amqp.Channel } // NewProducer 实例化一个生产者 func NewProducer(queueName string, conn *amqp.Connection, exchange *Exchange) (*Producer, error) { // 创建管道 ch, err := conn.Channel() if err != nil { return nil, err } // 声明交换机类型 err = ch.ExchangeDeclare( exchange.Name, // 交换机名称 exchange.Type, // x-delayed-message true, // 是否持久化 false, // 是否自动删除 false, // 是否公开,false即公开 false, // 是否等待 amqp.Table{ "x-delayed-type": exchange.XDelayedMessageType, // 延时消息的类型direct、topic、fanout、headers }, ) if err != nil { _ = ch.Close() return nil, err } // 声明队列,如果队列不存在则自动创建,存在则跳过创建 q, err := ch.QueueDeclare( queueName, // 消息队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否具有排他性(仅创建它的程序才可用) false, // 是否阻塞处理 nil, // 额外的属性 ) if err != nil { _ = ch.Close() return nil, err } // 绑定队列和交换机 err = ch.QueueBind( q.Name, exchange.RoutingKey, exchange.Name, false, nil, ) if err != nil { _ = ch.Close() return nil, err } return &Producer{ queueName: queueName, conn: conn, ch: ch, exchange: exchange, }, nil } // Publish 发送消息 func (p *Producer) Publish(ctx context.Context, delayTime time.Duration, body []byte) error { err := p.ch.PublishWithContext( ctx, p.exchange.Name, // exchange name p.exchange.RoutingKey, // key false, // mandatory 如果为true,根据自身exchange类型和routingKey规则无法找到符合条件的队列会把消息返还给发送者 false, // immediate 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者 amqp.Publishing{ DeliveryMode: amqp.Persistent, // 如果队列的声明是持久化的,那么消息也设置为持久化 ContentType: "text/plain", Body: body, Headers: amqp.Table{ "x-delay": int(delayTime / time.Millisecond), // 延迟时间: 毫秒 }, }, ) if err != nil { return err } fmt.Printf("[send]: %s\n", body) return nil } // Close 关闭生产者 func (p *Producer) Close() { if p.ch != nil { _ = p.ch.Close() } } func checkErr(err error) { if err != nil { panic(err) } }
消费端示例代码
package main import ( "context" "fmt" "os" "os/signal" "syscall" "time" amqp "github.com/rabbitmq/amqp091-go" ) var ( url = "amqp://guest:guest@127.0.0.1:5672/" exchangeName = "delayed-message-exchange-demo" ) func main() { conn, err := amqp.Dial(url) checkErr(err) defer conn.Close() ctx := context.Background() queueName := "delayed-message-queue" routingKey := "delayed-key" delayedMessageType := "direct" exchange := NewDelayedMessageExchange(exchangeName, delayedMessageType, routingKey) c, err := NewConsumer(ctx, queueName, exchange, conn) checkErr(err) c.Consume() // 消费消息 defer c.Close() fmt.Println("exit press CTRL+C") interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) <-interrupt fmt.Println("exit consume messages") } // Exchange 交换机 type Exchange struct { Name string // exchange名称 Type string // exchange类型,支持direct、topic、fanout、headers、x-delayed-message RoutingKey string // 路由key XDelayedMessageType string // 延时消息类型,支持direct、topic、fanout、headers } // NewDelayedMessageExchange 实例化一个delayed-message类型交换机,参数delayedMessageType 消息类型direct、topic、fanout、headers func NewDelayedMessageExchange(exchangeName string, delayedMessageType string, routingKey string) *Exchange { return &Exchange{ Name: exchangeName, Type: "x-delayed-message", RoutingKey: routingKey, XDelayedMessageType: delayedMessageType, } } // Consumer 消费者 type Consumer struct { ctx context.Context queueName string conn *amqp.Connection ch *amqp.Channel delivery <-chan amqp.Delivery exchange *Exchange } // NewConsumer 实例化一个消费者 func NewConsumer(ctx context.Context, queueName string, exchange *Exchange, conn *amqp.Connection) (*Consumer, error) { // 创建管道 ch, err := conn.Channel() if err != nil { return nil, err } // 声明交换机类型 err = ch.ExchangeDeclare( exchange.Name, // 交换机名称 exchange.Type, // 交换机的类型,支持direct、topic、fanout、headers true, // 是否持久化 false, // 是否自动删除 false, // 是否公开,false即公开 false, // 是否等待 amqp.Table{ "x-delayed-type": exchange.XDelayedMessageType, // 延时消息的类型direct、topic、fanout、headers }, ) if err != nil { _ = ch.Close() return nil, err } // 声明队列,如果队列不存在则自动创建,存在则跳过创建 q, err := ch.QueueDeclare( queueName, // 消息队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否具有排他性(仅创建它的程序才可用) false, // 是否阻塞处理 nil, // 额外的属性 ) if err != nil { _ = ch.Close() return nil, err } // 绑定队列和交换机 err = ch.QueueBind( q.Name, exchange.RoutingKey, exchange.Name, false, nil, ) if err != nil { _ = ch.Close() return nil, err } // 为消息队列注册消费者 delivery, err := ch.ConsumeWithContext( ctx, queueName, // queue 名称 "", // consumer 用来区分多个消费者 true, // auto-ack 是否自动应答 false, // exclusive 是否独有 false, // no-local 如果设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者 false, // no-wait 是否阻塞 nil, // args ) if err != nil { _ = ch.Close() return nil, err } return &Consumer{ queueName: queueName, conn: conn, ch: ch, delivery: delivery, exchange: exchange, }, nil } // Consume 接收消息 func (c *Consumer) Consume() { go func() { fmt.Printf("waiting for messages, type=%s, queue=%s, key=%s\n", c.exchange.Type, c.queueName, c.exchange.RoutingKey) for d := range c.delivery { // 处理消息 fmt.Printf("%s %s [received]: %s\n", time.Now().Format("2006-01-02 15:04:05.000"), c.exchange.RoutingKey, d.Body) // _ = d.Ack(false) // 如果auto-ack为false时,需要手动ack } }() } // Close 关闭 func (c *Consumer) Close() { if c.ch != nil { _ = c.ch.Close() } } func checkErr(err error) { if err != nil { panic(err) } }
总结
上面介绍了rabbitMQ延时消息队列简单使用示例,在实际使用中,连接rabbitMQ应该有网络断开重连功能。
rabbitMQ需要依赖插件rabbitmq_delayed_message_exchange,目前该插件的当前设计并不真正适合包含大量延迟消息(例如数十万以上)的场景,另外该插件的一个可变性来源是依赖于 Erlang 计时器,在系统中使用了一定数量的长时间计时器之后,它们开始争用调度程序资源,并且时间漂移不断累积。
如果你采用了 Delayed Message 插件这种方式来实现,对于消息可靠性要求非常高,在发送消息之前可以先保存到 DB 打标记,消费之后将消息标记为已消费,中间可以加入定时任务做检测,这可以进一步保证你的消息的可靠性。
这是在github.com/rabbitmq/amqp091-go
基础上封装的 rabbitmq 库,开箱即用各种消息类型(direct
, topic
, fanout
, headers
, delayed message
, publisher subscriber
)。
到此这篇关于RabbitMQ延时消息队列在golang中的使用详解的文章就介绍到这了,更多相关go RabbitMQ延时队列内容请搜索好代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持好代码网!