RabbitMQ-死信队列(golang)

news/2024/11/15 6:09:23 标签: rabbitmq, golang, 开发语言, 后端, 中间件

1、概念

      死信(Dead Letter),字面上可以理解为未被消费者成功消费的信息,正常来说,生产者将消息放入到队列中,消费者从队列获取消息,并进行处理,但是由于某种原因,队列中的消息未被消费者拿到,这样的消息就会成为死信,存放死信消息的队列,也就被称为死信队列(Dead Letter Queue,简称DLQ)。

2、死信产生的原因

文心一言的回答如下:

  1. 消息被拒绝:当消费者使用basic.reject或basic.nack方法拒绝消息,并且requeue参数被设置为false时,消息会被视为死信。这意味着消费者明确表示无法或不愿意处理该消息,并且不希望该消息重新进入队列等待其他消费者处理。
  2. 消息处理失败:消费者由于代码错误、消息格式不正确、业务规则冲突等原因无法成功处理消息时,该消息也可以被标记为死信。这种情况下,尽管消费者尝试处理消息,但由于某些无法克服的错误,消息无法被成功消费。
  3. 消息过期:如果消息设置了生存时间(TTL,Time To Live),并且在这个时间内没有被消费,那么消息会过期并被视为死信。TTL是RabbitMQ中用于指定消息在队列中存活时间的参数,超过该时间的消息将被视为过期并丢弃或转发到死信队列。
  4. 队列长度限制:当队列中的消息数量超过了设置的最大长度时,新到达的消息无法进入队列,这些消息也会被视为死信。队列长度限制是RabbitMQ中用于控制队列大小的一种机制,当队列达到最大容量时,新到达的消息将无法被接收并可能被丢弃或转发到死信队列。

总结来说,主要原因就三个,消息被拒绝、消息过期、队列满

在一些重要的场景,比如支付场景,提交的订单超时未支付的,可以设计为进入死信队列。

3、死信队列使用实践

3.1 消息过期

设置正常队列ttl过期时间为5s,如果5s内消息没有被消费,则会自动放入死信队列中。

关键点:设置正常队列属性,ttl5s过期:

// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列
	args := amqp.Table{
		"x-message-ttl":             int64(5000), // 5秒TTL
		"x-dead-letter-exchange":    "",
		"x-dead-letter-routing-key": dlx.Name,
	}
	// 声明正常队列
	q, err := ch.QueueDeclare(
		"normal_queue", // name
		true,           // durable
		false,          // delete when unused
		false,          // exclusive
		false,          // no-wait
		args,           // arguments
	)

全部代码如下: 

package main

import (
	"fmt"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@xxxx.xx.xx.xxx:5672/")
	if err != nil {
		fmt.Println("Failed to connect to RabbitMQ")
		return
	}
	defer conn.Close()
	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Failed to open a channel")
		return
	}
	// 声明死信队列
	dlx, err := ch.QueueDeclare(
		"dead_letter_queue", // name
		true,                // durable
		false,               // delete when unused
		false,               // exclusive
		false,               // no-wait
		nil,                 // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare a queue: dead_letter_queue,err:" + err.Error())
		return
	}
	err = ch.ExchangeDeclare(
		"my_exchange", // name
		"direct",      // type
		true,          // durable
		false,         // auto-deleted
		false,         // internal
		false,         // no-wait
		nil,           // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())
		return
	}
	// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列
	args := amqp.Table{
		"x-message-ttl":             int64(5000), // 5秒TTL
		"x-dead-letter-exchange":    "",
		"x-dead-letter-routing-key": dlx.Name,
	}
	// 声明正常队列,注意,必须在声明队列时就要设置死信队列信息
	q, err := ch.QueueDeclare(
		"normal_queue", // name
		true,           // durable
		false,          // delete when unused
		false,          // exclusive
		false,          // no-wait
		args,           // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())
		return
	}
	// 将正常队列绑定到交换机,并设置死信交换机和路由键
	err = ch.QueueBind(
		q.Name,        // queue name
		q.Name,        // routing key
		"my_exchange", // exchange
		false,
		nil,
	)
	if err != nil {
		fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())
		return
	}
	err = ch.Publish("my_exchange", "normal_queue", false, false, amqp.Publishing{Body: []byte("hello world")})
	if err != nil {
		fmt.Println("Failed to publis msg to exchange: my_exchange,err:" + err.Error())
		return
	}
}

队列信息包括绑定的死信队列信息、ttl等信息如下:

运行上方代码,会向队列发送一条信息,我们先不创建消费者,5s后,消息会被自动放入死信队列。

3.2 队列满

当mq队列由于消息量过多导致队列打满时,这个时候过来的消息,将会被自动放入到死信队列中。

设置队列长度属性代码如下:

args := amqp.Table{
		// "x-message-ttl":             int64(5000), // 5秒TTL
		"x-max-length":              2,
		"x-dead-letter-exchange":    "",
		"x-dead-letter-routing-key": dlx.Name,
	}
	// 声明正常队列
	q, err := ch.QueueDeclare(
		"normal_queue", // name
		true,           // durable
		false,          // delete when unused
		false,          // exclusive
		false,          // no-wait
		args,           // arguments
	)

队列属性如下:

发送两条信息:

 继续发送第三个:

 测试代码:

package main

import (
	"fmt"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@xx.xx.xx.xx:5672/")
	if err != nil {
		fmt.Println("Failed to connect to RabbitMQ")
		return
	}
	defer conn.Close()
	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Failed to open a channel")
		return
	}
	// 声明死信队列
	dlx, err := ch.QueueDeclare(
		"dead_letter_queue", // name
		true,                // durable
		false,               // delete when unused
		false,               // exclusive
		false,               // no-wait
		nil,                 // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare a queue: dead_letter_queue,err:" + err.Error())
		return
	}
	err = ch.ExchangeDeclare(
		"my_exchange", // name
		"direct",      // type
		true,          // durable
		false,         // auto-deleted
		false,         // internal
		false,         // no-wait
		nil,           // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())
		return
	}
	// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列
	args := amqp.Table{
		// "x-message-ttl":             int64(5000), // 5秒TTL
		"x-max-length":              2,
		"x-dead-letter-exchange":    "",
		"x-dead-letter-routing-key": dlx.Name,
	}
	// 声明正常队列
	q, err := ch.QueueDeclare(
		"normal_queue", // name
		true,           // durable
		false,          // delete when unused
		false,          // exclusive
		false,          // no-wait
		args,           // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())
		return
	}
	// 将正常队列绑定到交换机,并设置死信交换机和路由键
	err = ch.QueueBind(
		q.Name,        // queue name
		q.Name,        // routing key
		"my_exchange", // exchange
		false,
		nil,
	)
	if err != nil {
		fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())
		return
	}
	err = ch.Publish("my_exchange", "normal_queue", false, false, amqp.Publishing{Body: []byte("hello world")})
	if err != nil {
		fmt.Println("Failed to publis msg to exchange: my_exchange,err:" + err.Error())
		return
	}
}

3.3 消息被拒绝

       消息被拒绝的情况,当消费者无法处理某条信息时,客户端想rabbitmq服务器发送一个【负确认】应答,表示消费者未能成功处理此条消息,并且希望RabbitMQ根据配置重新发送这条消息(例如,将其重新排队)或者将其丢弃。

客户端函数:ch.Nack,函数原型:

func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error {
	ch.m.Lock()
	defer ch.m.Unlock()

	return ch.send(&basicNack{
		DeliveryTag: tag,
		Multiple:    multiple,
		Requeue:     requeue,
	})
}

入参含义如下: 

tag

这是一个唯一标识符,用于标识消费者之前接收到的特定消息。当消费者调用 ch.Ackch.Nack 或 ch.Reject 时,必须提供这个标识符,以便RabbitMQ知道是对哪条消息进行确认或拒绝。

multiple

这是一个布尔值(bool),用于指示是否应该同时确认(或拒绝)多条消息。如果设置为 true,则RabbitMQ将认为从上一个被确认的消息开始(包括该消息),直到当前消息为止的所有未确认消息都被拒绝。这通常用于批量处理消息确认,但在使用 ch.Nack 时,它的作用更多是关于是否应该重新排队当前消息之后的消息(取决于RabbitMQ的配置和消息的属性)。

requeue

这也是一个布尔值(bool),用于指示被拒绝的消息是否应该被重新放入队列的末尾以便稍后重试。如果设置为 true,则消息将被重新排队;如果设置为 false,则消息将被丢弃(或者根据RabbitMQ的配置可能被发送到死信队列,如果配置了的话)。

测试过程,首先使用3.1或者3.2的代码向mq中写入几条信息:

之后使用如下代码进行消费:

package main

import (
	"fmt"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@xx.xx.xx.xx:5672/")
	if err != nil {
		fmt.Println("Failed to connect to RabbitMQ")
		return
	}
	defer conn.Close()
	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Failed to open a channel")
		return
	}
	err = ch.ExchangeDeclare(
		"my_exchange", // name
		"direct",      // type
		true,          // durable
		false,         // auto-deleted
		false,         // internal
		false,         // no-wait
		nil,           // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())
		return
	}

	// 声明正常队列
	// q, err := ch.QueueDeclare(
	// 	"normal_queue", // name
	// 	true,           // durable
	// 	false,          // delete when unused
	// 	false,          // exclusive
	// 	false,          // no-wait
	// 	nil,            // arguments
	// )
	// if err != nil {
	// 	fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())
	// 	return
	// }
	// 将正常队列绑定到交换机,并设置死信交换机和路由键
	err = ch.QueueBind(
		"normal_queue", // queue name
		"normal_queue", // routing key
		"my_exchange",  // exchange
		false,
		nil,
	)
	if err != nil {
		fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())
		return
	}
	msgs, _ := ch.Consume(
		"normal_queue", // queue
		"",             // consumer
		false,          // auto-ack
		true,           // exclusive
		false,          // no-local
		false,          // no-wait
		nil,            // args
	)
	go func() {
		for d := range msgs {
			// 模拟处理失败,全部放入死信队列
			ch.Nack(d.DeliveryTag, false, false)
		}
	}()
	time.Sleep(10 * time.Second)
}

运行代码后,3条消息全部进入到死信队列中:

 4、总结

      RabbitMQ的死信队列(Dead Letter Queue,简称DLQ)是一种用于处理消息失败或无法路由的消息的机制,死信队列中的所有消息都是无法被正常消费的死信,这使得开发者可以集中对这些消息进行管理和分析。通过分析死信队列中的消息,开发者可以了解系统的运行状态、发现潜在的问题,并进行相应的优化和改进,以提升系统的稳定性和可靠性。


http://www.niftyadmin.cn/n/5752790.html

相关文章

微信小程序自定义顶部导航栏(适配各种机型)

效果图 1.pages.js,需要自定义导航栏的页面设置"navigationStyle": "custom" 2.App.vue,获取设备高度及胶囊位置 onLaunch: function () {// 系统信息const systemInfo uni.getSystemInfoSync()// 胶囊按钮位置信息const menuButtonInfo uni.…

thinkphp6 --数据库操作 增删改查

一、数据库连接配置 本地测试 直接在.env中修改,不用去config/database.php中修改 正式环境,在部署环境下数据库连接在 config 目录下的 database.php 文件中进行配置: 二、静态方式访问数据库 在tp6 中可以使用 Db 访问数据库,…

云运维基础

笔记内容侵权联系删除 云审计(CTS) 云审计云上资源变更均可被管控,实时系统性记录所有人的操作,无需手工统计。云审计服务支持将操作记录合并,周期性地生成事件文件实时同步转存到OBS存储桶,帮助用户实现…

ORA-00257: archiver error

ORA-00257: archiver error 归档满问题: 报错: SQL> conn admin/admin ERROR: ORA-00257: archiver error. Connect internal only, until freed. Warning: You are no longer connected to ORACLE. 检查空间: SQL> select name, tot…

uniapp h5 实现扫扫二维码

一、实现思路(网上搜的) 1.使用h5的navigator.mediaDevices.getUserMedia() 调起相机获取视频流 2.使用canvas截取二维码图片 3.使用qrcode解析二维码 二、源码 qrcode.js // qrcode.js var _aa {}; _aa._ab function (f, e) {var d qrcode.wid…

GNU/Linux - tar命令

1,Online GNU manual tar命令是一个古老的命令,在线帮助手册地址: GNU tar manual - GNU Project - Free Software Foundation GNU tar 1.35 这么一个简单命令,上面的在线手册却是非常的长。 2,Man命令 读取本地的man…

基于SpringBoot智慧社区管理平台

作者简介:Java领域优质创作者、CSDN博客专家 、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、多年校企合作经验,被多个学校常年聘为校外企业导师,指导学生毕业设计并参与学生毕业答辩指导,…

分布式光伏智慧平台建设现场 系统集成商如何盈利

在当今能源转型的大背景下,分布式光伏作为一种清洁、可再生的能源形式,正得到越来越广泛的应用。而分布式光伏运维管理系统的出现,则为分布式光伏电站的高效、稳定运行提供了有力保障。 一、分布式光伏的发展现状 随着环保意识的不断提高和…