err+connection+timed+out

@井裕2375:【错误代码:ERR - CONNECTION - RESET】怎么解决?就是大多网页无法访问 -
饶些18785272826…… 解决方法如下: 解决方法一:组件注册 1、我们点下键盘的win+R,输入下图英文,点【确定】,如图. 2、我们就可以看到组件注册成功,如图. 解决方法二:刷新DNS缓存 1、最后我们还需要刷新DNS,点下键盘的win+R,搜索栏输入【...

@井裕2375:手机搜网址出现ERR - CONNECTION - RESET怎么解决? -
饶些18785272826…… 前往百度APP查看 回答 1、点击“开始”--打开“控制面板”2、打开“internet选项”-在“常规”选项卡里面点击“删除”按钮,勾选需要删除的记录缓存,然后点击“删除”.2、点击“”高级“选项卡还原高级设置”--点击“应用”.3、点击“重置”--在弹出窗口勾选“删除个人设置”--点击“重置”.4、重置完成后点击确定,关闭重新打开浏览器测试即可.

@井裕2375:无法显示此网页 错误代码: err - connection - reset怎么解决? -
饶些18785272826…… 1、DNS服务器出错,网页提示DNS错误则是指“网络服务商的服务器IP错误” 2、HTTP代理出错 电脑驱动的问题 3、Winsock被恶意篡改 电脑有垃圾或网页木马 建议下载一个腾讯电脑管家,在腾讯电脑管家的电脑诊所里输入“QQ能上但是网页打不开”等关键词,找到QQ能上网页打不开,选择一键修复即可.

@井裕2375:手机上出现net::ERR - CONNECTION - REFUSED,怎样解决? -
饶些18785272826…… 建议进行以下步骤操作: 1、检查SIM卡是否开通GPRS上网业务或被临时关闭. 使用手机数据上网功能,电话卡需开通数据流量上网业务.可以联系电话卡当地的网络供应商,开通数据流量上网业务. 2、检查“移动数据”开关是否开启. 用手指向下滑动屏幕顶帘,把“移动数据”点为绿色. 若以上操作后仍无法上网,建议重置手机上网参数:设置-移动网络-接入点名称-更多-重置为默认设置. 3、请更换其他电话卡尝试 4、也可以换个时间段或网络环境再尝试. 5、备份手机数据(电话簿、短信、图片等),将手机恢复出厂设置 若故障依旧,建议将手机送至就近的服务中心进行检测维修

@井裕2375:电脑访问网页出现这种错误代码什么意思ERR - CONNECTION - FAILED? -
饶些18785272826…… 错误导致连接失败

@井裕2375:无法显示此网页 错误代码: ERR - CONNECTION - ABORTED -
饶些18785272826…… 提示你要访问的网站可能因某些原因临时关闭,或者网址改变了. 如果电脑可以正常访问其他网址,那么这个就是网站问题,请确认你输入了正确的网址.

@井裕2375:有些网页打不开怎么办 错误101(net::ERR - CONNECTION - RESET:)连接已重置 -
饶些18785272826…… 错误 101 (net::ERR_CONNECTION_RESET) 的本身含义就是这个网站存在故障暂时无法访问,也就是说这个网站服务器被关闭或者是你的网络提供商将这个网站IP屏蔽,你可以尝试刷新这个页面,或者搜索这个网站以方便寻找更多的访问途径...

@井裕2375:错误 101 (net::ERR - CONNECTION - RESET)网站打不开为什么? -
饶些18785272826…… ERR_CONNECTION_RESET的解释是:ERR-错误CONNECTION-连接-RESET-重复 Google没有对此错误的解决方案因此会再次提示用户这个网站含有未知错误. 网页可能暂时无法连接,或者它已永久性地移动到了新网址. 上面这段信息是...

@井裕2375:错误101 (net::ERR - CONNECTION - RESET):连接已重置.大神们帮帮忙 -
饶些18785272826…… 1.就是网页本身的错误 2.如果每个网页打开都是这样,就是你的电脑有垃圾或网页木马(顽固型),建议到卖电脑那里重组系统. 3、电脑驱动的问题 在桌面空白处右键--属性--屏幕保护程序--电源--休眠--启用休眠——确定 网页显示错误可能是驱动没装!右键我的电脑--硬件--设备管理器,看看有没有黄色的问号或叹号,有的话装上相匹配的驱动程序. 4、如果网页提示DNS错误则是指“网络服务商的服务器IP错误”,有可能是操作系统未升级等原因引起,建议用人工指定IP地址和服务器地址试一下,如果不行还是重装系统吧 记得采纳啊

作者:linzl

1. 为啥要用MQ

为啥使用,因为他很牛逼

2.使用docker部署单机RabbitMQ、go客户端库

docker 镜像

https://hub.docker.com/_/rabbitmq

docker pull rabbitmq:3.8.10-management-alpine

说明:management代表是带管理后台

启动容器

docker run -d --name my-rmq -e RABBITMQ_DEFAULT_USER=linzl -e RABBITMQ_DEFAULT_PASS=123 -p 8081:15672 -p 5672:5672 rabbitmq:3.8.10-management-alpine

rabbitmq golang 客户端库

https://github.com/streadway/amqp

go get -u github.com/streadway/amqp

测试golang 链接 mq

package mainimport (\t"fmt"\t"github.com/streadway/amqp"\t"log")func main() {\tdsn := fmt.Sprintf("amqp://%s:%s@%s:%d","linzl","123","192.168.1.6",5672)\tconnection, err := amqp.Dial(dsn)\tif err != nil {\t\tlog.Fatal(err)\t}\tdefer connection.Close()\tfmt.Println(connection)}

  1. 生产者创建channel发送消息给Exchange

  2. Exchange(有多种交换机)根据策略binding队列进行消息投递

  3. 队列具有推/拉模式

  4. 消费者使用channel获取消息,并确认接收或拒绝,重新入列给别的消费者

3. 用最简单的方式:生产者发送第一条消息

package mainimport (\t"fmt"\t"github.com/streadway/amqp"\t"log")func main() {\tconnection, err := amqp.Dial("amqp://linzl:[email protected]:5672")\tif err != nil {\t\tlog.Fatal(err)\t}\tdefer connection.Close()\t// 获取channel 连接\tchannelConn, err := connection.Channel()\tif err != nil {\t\tlog.Fatal(err)\t}\tdefer channelConn.Close()\t// 创建队列\tqueue, err := channelConn.QueueDeclare(\t\t"test_queue",\t\tfalse,\t\tfalse,\t\tfalse,\t\tfalse,\t\tnil,\t)\tif err != nil {\t\tlog.Fatal(err)\t}\t// 发生消息\terr = channelConn.Publish(\t\t"",\t\tqueue.Name,\t\tfalse,\t\tfalse,\t\tamqp.Publishing{\t\t\tContentType: "text/plain",\t\t\tBody:        []byte(`test002`),\t\t},\t)\tif err != nil {\t\tlog.Fatal(err)\t}\tfmt.Println("简单生产一条消息成功")}

4.用最简单的方式:消费者读取消息

对连接mq 简单封装

package AppInitimport (\t"fmt"\t"github.com/streadway/amqp"\t"log")var (\terr error\tMQConn *amqp.Connection)func init()  {\tdsn := fmt.Sprintf("amqp://%s:%s@%s:%d","linzl","123","192.168.1.6",5672)\tMQConn, err = amqp.Dial(dsn)\tif err != nil {\t\tlog.Fatal(err)\t}\tlog.Println(MQConn.Major)}func GetMQ()*amqp.Connection {\treturn MQConn}

消费者

package mainimport (\t"fmt"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\t"log")func main() {\tmq := AppInit.GetMQ()\tdefer mq.Close()\tchannelConn, err := mq.Channel()\tif err != nil {\t\tlog.Fatal(err)\t}\tdefer channelConn.Close()\t// 消费\tmsgs, err := channelConn.Consume(\t\t"test_queue",\t\t"Consumer01",\t\tfalse,\t\tfalse,\t\tfalse,\t\tfalse,\t\tnil,\t)\tif err != nil {\t\tlog.Fatal(err)\t}\tfor msg := range msgs {\t\tmsg.Ack(false) // 确认\t\tfmt.Println(msg.DeliveryTag,string(msg.Body))\t}}

5.简单API过程、注册流程、MQ操作简单封装

案例用户注册

简单的API过程、注册流程、MQ操作简单封装

gin 框架构建用户注册api

userModel

package Usertype UserModel struct {\tUserID int64 `json:"user_id"`\tUserName string `json:"user_name"`}func NewUserModel() *UserModel {\treturn &UserModel{}}

user api

package mainimport (\t"encoding/json"\t"github.com/gin-gonic/gin"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\t"net/http"\t"time")func main()  {\tengine := gin.New()\tengine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) {\t\tuser := User.NewUserModel()\t\terr := ctx.ShouldBindJSON(user)\t\tif err != nil {\t\t\tctx.JSON(400,err.Error())\t\t\treturn\t\t}\t\tuser.UserID =time.Now().Unix() // 模拟用户注册入库\t\tif user.UserID > 0 {  // 假设入库成功\t\t\tbytes, _ := json.Marshal(user)\t\t\tLib.NewMQ().SendMessage(Lib.QUEUE_NEWUSER,string(bytes))\t\t}\t\tctx.JSON(200,user)\t})\tengine.Run(":6060")}

mq 操作简单封装

package Libimport (\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\t"github.com/streadway/amqp"\t"log")const (\t// 用户注册队列名称\tQUEUE_NEWUSER = "newuser")type MQ struct {\tChannel *amqp.Channel}func NewMQ() *MQ {\tchannel, err := AppInit.GetMQ().Channel()\tif err != nil {\t\tlog.Println(err)\t}\treturn &MQ{Channel: channel}}// SendMessage 发生消息到mqfunc (this *MQ) SendMessage(queueName string, message string) error {\t_, err := this.Channel.QueueDeclare(queueName, false, false,\t\tfalse, false, nil)\tif err != nil {\t\treturn err\t}\treturn this.Channel.Publish("", queueName, false, false,\t\tamqp.Publishing{\t\t\tContentType: "text/plain",\t\t\tBody:        []byte(message),\t\t})}

6.定义交换机:向2个队列同时发送消息(QueueBind)

Exchange

Direct Exchange 也叫做直接模式交换机。交换机和和一个队列绑定起来,并指定路由键, 交换机会寻找匹配的路由键的绑定,并将消息路由给对应的队列

package Libimport (\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\t"github.com/streadway/amqp"\t"log")const (\t// 用户注册队列名称\tQUEUE_NEWUSER = "newuser")type MQ struct {\tChannel *amqp.Channel}func NewMQ() *MQ {\tchannel, err := AppInit.GetMQ().Channel()\tif err != nil {\t\tlog.Println(err)\t}\treturn &MQ{Channel: channel}}// SendMessage 发生消息到mqfunc (this *MQ) SendMessage(queueName string, message string) error {\tqueue1, err := this.Channel.QueueDeclare(queueName, false, false,\t\tfalse, false, nil)\tif err != nil {\t\treturn err\t}\t// 假设是其他业务方用的队列\tqueue2, err := this.Channel.QueueDeclare(queueName+"other", false, false,\t\tfalse, false, nil)\tif err != nil {\t\treturn err\t}\t// 声明一个交换机\terr = this.Channel.ExchangeDeclare("UserExchange", "direct",\t\tfalse, false, false, false, nil)\tif err != nil {\t\treturn err\t}\t// 队列1与交换机绑定\terr = this.Channel.QueueBind(queue1.Name, "UserReg",\t\t"UserExchange", false, nil)\tif err != nil {\t\treturn err\t}\t// 队列2与交换机绑定\terr = this.Channel.QueueBind(queue2.Name, "UserReg",\t\t"UserExchange", false, nil)\tif err != nil {\t\treturn err\t}\treturn this.Channel.Publish("UserExchange", "UserReg", false, false,\t\tamqp.Publishing{\t\t\tContentType: "text/plain",\t\t\tBody:        []byte(message),\t\t})}

7.整理和调整代码结构、初始化队列等

初始化队列

go-rabbitmq/Lib/QueueInit.go

package Libimport "fmt"// UserQueueInit 用户队列初始化..func UserQueueInit() error{\tmq := NewMQ()\tif mq == nil {\t\treturn fmt.Errorf("mq init err")\t}\tdefer mq.Channel.Close()\t// 声明交换机\terr := mq.Channel.ExchangeDeclare(USER_EXCHANGE, "direct", false,\t\tfalse, false, false, nil)\tif err != nil {\t\treturn fmt.Errorf("Exchange error:%s",err.Error())\t}\t// 声明队列及绑定\tqueues := fmt.Sprintf("%s,%s",QUEUE_NEWUSER,QUEUE_NEWUSER_OTHER01)\terr = mq.DecQueuueAndBind(queues, USER_EXCHANGE, USER_REG_ROUTER_KEY)\tif err != nil {\t\treturn fmt.Errorf("DecQueuueAndBind error:%s",err.Error())\t}\treturn nil}

SendMessage改造

package Libimport (\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\t"github.com/streadway/amqp"\t"log"\t"strings")const (\t// 用户注册队列名称\tQUEUE_NEWUSER = "newuser"\t// 其他业务的新用户队列\tQUEUE_NEWUSER_OTHER01 = "newuser-other01"\t// 用户业务交换机\tUSER_EXCHANGE = "exchange-user"\t// 用户注册路由key\tUSER_REG_ROUTER_KEY = "router-key-userreg")type MQ struct {\tChannel *amqp.Channel}func NewMQ() *MQ {\tchannel, err := AppInit.GetMQ().Channel()\tif err != nil {\t\tlog.Println(err)\t}\treturn &MQ{Channel: channel}}// DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开func (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error {\tqueueList := strings.Split(queues,",")\tfor _,queue := range queueList {\t\t// 声明队列\t\tq, err := this.Channel.QueueDeclare(queue, false, false,\t\t\tfalse, false, nil)\t\tif err != nil {\t\t\treturn err\t\t}\t\t// 绑定交换机和路由key\t\terr = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil)\t\tif err != nil {\t\t\treturn err\t\t}\t}\treturn nil}// SendMessage 发生消息到mqfunc (this *MQ) SendMessage(key string, exchange string,message string) error {\treturn this.Channel.Publish(exchange, key, false, false,\t\tamqp.Publishing{\t\t\tContentType: "text/plain",\t\t\tBody:        []byte(message),\t\t})}

拉起gin框架时初始化队列

package mainimport (\t"context"\t"encoding/json"\t"fmt"\t"github.com/gin-gonic/gin"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\t"log"\t"net/http"\t"os"\t"os/signal"\t"syscall"\t"time")func main()  {\terrchan := make(chan error)\tengine := gin.Default()\tengine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) {\t\tuser := User.NewUserModel()\t\terr := ctx.ShouldBindJSON(user)\t\tif err != nil {\t\t\tctx.JSON(400,err.Error())\t\t\treturn\t\t}\t\tuser.UserID =time.Now().Unix() // 模拟用户注册入库\t\tif user.UserID > 0 {  // 假设入库成功\t\t\tbytes, _ := json.Marshal(user)\t\t\tmq := Lib.NewMQ()\t\t\terr := mq.SendMessage(Lib.USER_REG_ROUTER_KEY, Lib.USER_EXCHANGE, string(bytes))\t\t\tif err != nil {\t\t\t\tlog.Println(err)\t\t\t\terrchan

8.客户端消费注册用户消息、确认消息

模拟消费

  1. 接收消息

  2. 模拟发生邮件

  3. ack 确认

消费

package mainimport (\t"encoding/json"\t"fmt"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\t"github.com/streadway/amqp"\t"time")func SendMail(msgs 

MQ.go 新增Consume方法

package Libimport (\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\t"github.com/streadway/amqp"\t"log"\t"strings")const (\t// 用户注册队列名称\tQUEUE_NEWUSER = "newuser"\t// 其他业务的新用户队列\tQUEUE_NEWUSER_OTHER01 = "newuser-other01"\t// 用户业务交换机\tUSER_EXCHANGE = "exchange-user"\t// 用户注册路由key\tUSER_REG_ROUTER_KEY = "router-key-userreg")type MQ struct {\tChannel *amqp.Channel}func NewMQ() *MQ {\tchannel, err := AppInit.GetMQ().Channel()\tif err != nil {\t\tlog.Println(err)\t}\treturn &MQ{Channel: channel}}// DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开func (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error {\tqueueList := strings.Split(queues,",")\tfor _,queue := range queueList {\t\t// 声明队列\t\tq, err := this.Channel.QueueDeclare(queue, false, false,\t\t\tfalse, false, nil)\t\tif err != nil {\t\t\treturn err\t\t}\t\t// 绑定交换机和路由key\t\terr = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil)\t\tif err != nil {\t\t\treturn err\t\t}\t}\treturn nil}// SendMessage 发生消息到mqfunc (this *MQ) SendMessage(key string, exchange string,message string) error {\treturn this.Channel.Publish(exchange, key, false, false,\t\tamqp.Publishing{\t\t\tContentType: "text/plain",\t\t\tBody:        []byte(message),\t\t})}func (this *MQ)Consume(queueName string, key string,callback func(

9. 多消费者消费消息、重新入列

消费者改造支持多消费者

package mainimport (\t"encoding/json"\t"flag"\t"fmt"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\t"github.com/streadway/amqp"\t"log"\t"time")func SendMail(msgs 

10.消费者限流:ACK后再收新消息

代码改造一下,使用协程消费

package mainimport (\t"encoding/json"\t"flag"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\t"github.com/streadway/amqp"\t"log"\t"time")func Send(c string, msg amqp.Delivery) error {\ttime.time.Second * 3)\tuserModel := &User.UserModel{}\tjson.Unmarshal(msg.Body, userModel)\tlog.Printf("消费者:%s,向userid=%d的用户发生邮件n",c,userModel.UserID)\tmsg.Ack(false)\treturn nil}func SendMail(msgs 

消费者限流

mq.Channel.Qos(2, 0, false)  第一个参数prefetchCount 可以限制,当接受prefetchCount 条消息后

只有ack 之后可以继续接收消费下一条消息,起到保护消费者的作用

mq.Channel.Qos(2, 0, false)

11. 开启模式、记录失败的消息

当生产消息时,由于mq 的网络问题或是其他问题,可能出现发送失败的情况

当有些敏感信息又不能失败,需要确保每一条消息都发送成功

因此mq 有一个机制就是可以开发 模式,当给mq发送消息时,如果成功会有一个ack 回执

ack 成功说明发送消息成功,ack 失败时需要记录日志(写到mysql 或redis)什么的进行重发

第一步

  1. 开启模式

// SetConfirm 设置模式func (this *MQ)Setconfirm() {\terr := this.Channel.confirm(false)\tif err != nil {\t\tlog.Println(err)\t}}

  1. 在MQ 的结构体添加属性

type MQ struct {\tChannel *amqp.Channel\tnotifyConfirm chan amqp.Confirmation}

// SetConfirm 设置模式func (this *MQ)Setconfirm() {\terr := this.Channel.confirm(false)\tif err != nil {\t\tlog.Println(err)\t}\tthis.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation))}

发生消息后,当服务器确认此chan 会有数据传输过来

发生消息时调用SetConfirm 开启 模式 完整代码

MQ.go

package Libimport (\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\t"github.com/streadway/amqp"\t"log"\t"strings")const (\t// 用户注册队列名称\tQUEUE_NEWUSER = "newuser"\t// 其他业务的新用户队列\tQUEUE_NEWUSER_OTHER01 = "newuser-other01"\t// 用户业务交换机\tUSER_EXCHANGE = "exchange-user"\t// 用户注册路由key\tUSER_REG_ROUTER_KEY = "router-key-userreg")type MQ struct {\tChannel *amqp.Channel\tnotifyConfirm chan amqp.Confirmation}func NewMQ() *MQ {\tchannel, err := AppInit.GetMQ().Channel()\tif err != nil {\t\tlog.Println(err)\t}\treturn &MQ{Channel: channel}}// DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开func (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error {\tqueueList := strings.Split(queues,",")\tfor _,queue := range queueList {\t\t// 声明队列\t\tq, err := this.Channel.QueueDeclare(queue, false, false,\t\t\tfalse, false, nil)\t\tif err != nil {\t\t\treturn err\t\t}\t\t// 绑定交换机和路由key\t\terr = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil)\t\tif err != nil {\t\t\treturn err\t\t}\t}\treturn nil}// SetConfirm 设置模式func (this *MQ)Setconfirm() {\terr := this.Channel.confirm(false)\tif err != nil {\t\tlog.Println(err)\t}\tthis.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation))\tgo this.Listenconfirm()}// ListenConfirm 监听消息func (this *MQ)Listenconfirm() {\tdefer this.Channel.Close()\tret := 

生产者

package mainimport (\t"context"\t"encoding/json"\t"fmt"\t"github.com/gin-gonic/gin"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\t"log"\t"net/http"\t"os"\t"os/signal"\t"syscall"\t"time")func main()  {\terrchan := make(chan error)\tengine := gin.Default()\tengine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) {\t\tuser := User.NewUserModel()\t\terr := ctx.ShouldBindJSON(user)\t\tif err != nil {\t\t\tctx.JSON(400,err.Error())\t\t\treturn\t\t}\t\tuser.UserID =time.Now().Unix() // 模拟用户注册入库\t\tif user.UserID > 0 {  // 假设入库成功\t\t\tbytes, _ := json.Marshal(user)\t\t\tmq := Lib.NewMQ()\t\t\t// 开启 模式\t\t\tmq.Setconfirm()\t\t\terr := mq.SendMessage(Lib.USER_REG_ROUTER_KEY, Lib.USER_EXCHANGE, string(bytes))\t\t\tif err != nil {\t\t\t\tlog.Println(err)\t\t\t\terrchan

12.监听消息入列回执:NotifyReturn的用法

mandatory参数

如果为true,在exchange正常且可以到达的情况下。

如果exchange+routeKey 无法投递给queue,那么MQ会将消息还给生产者

如果为false,则直接丢弃

模拟无法投递到exchange+routeKey 通过rabbitmq 管理后台,手动解绑(写多个队列的需要全部解绑)

package Libimport (\t"github.com/linzhenlong/golang-jt/go-rabbitmq/AppInit"\t"github.com/streadway/amqp"\t"log"\t"strings")const (\t// 用户注册队列名称\tQUEUE_NEWUSER = "newuser"\t// 其他业务的新用户队列\tQUEUE_NEWUSER_OTHER01 = "newuser-other01"\t// 用户业务交换机\tUSER_EXCHANGE = "exchange-user"\t// 用户注册路由key\tUSER_REG_ROUTER_KEY = "router-key-userreg")type MQ struct {\tChannel *amqp.Channel\tnotifyConfirm chan amqp.Confirmation\t// NotifyReturn的用法\tnotifyReturn chan amqp.Return}func NewMQ() *MQ {\tchannel, err := AppInit.GetMQ().Channel()\tif err != nil {\t\tlog.Println(err)\t}\treturn &MQ{Channel: channel}}// DecQueuueAndBind 声明队列及绑定队列,多个队列用逗号隔开func (this *MQ)DecQueuueAndBind(queues string,exchange string,routerKey string) error {\tqueueList := strings.Split(queues,",")\tfor _,queue := range queueList {\t\t// 声明队列\t\tq, err := this.Channel.QueueDeclare(queue, false, false,\t\t\tfalse, false, nil)\t\tif err != nil {\t\t\treturn err\t\t}\t\t// 绑定交换机和路由key\t\terr = this.Channel.QueueBind(q.Name, routerKey, exchange, false, nil)\t\tif err != nil {\t\t\treturn err\t\t}\t}\treturn nil}// SetConfirm 设置模式func (this *MQ)Setconfirm() {\terr := this.Channel.confirm(false)\tif err != nil {\t\tlog.Println(err)\t}\tthis.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation))\tgo this.Listenconfirm()}// ListenConfirm 监听消息func (this *MQ)Listenconfirm() {\tdefer this.Channel.Close()\tret := 

SendMessage 之前需要 NotifyReturn

package mainimport (\t"context"\t"encoding/json"\t"fmt"\t"github.com/gin-gonic/gin"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/UserReg/Models/User"\t"log"\t"net/http"\t"os"\t"os/signal"\t"syscall"\t"time")func main()  {\terrchan := make(chan error)\tengine := gin.Default()\tengine.Handle(http.MethodPost,"/user", func(ctx *gin.Context) {\t\tuser := User.NewUserModel()\t\terr := ctx.ShouldBindJSON(user)\t\tif err != nil {\t\t\tctx.JSON(400,err.Error())\t\t\treturn\t\t}\t\tuser.UserID =time.Now().Unix() // 模拟用户注册入库\t\tif user.UserID > 0 {  // 假设入库成功\t\t\tbytes, _ := json.Marshal(user)\t\t\tmq := Lib.NewMQ()\t\t\t// 开启 模式\t\t\tmq.Setconfirm()\t\t\t// 监听return 需要在发送消息之前\t\t\tmq.NotifyReturn()\t\t\terr := mq.SendMessage(Lib.USER_REG_ROUTER_KEY, Lib.USER_EXCHANGE, string(bytes))\t\t\tif err != nil {\t\t\t\tlog.Println(err)\t\t\t\terrchan

13. 以用户注册为例产生的事务需求、延迟队列使用

基本实现

  1. 生产者注册成功之后发生消息

  2. 消息者接受消息后,调用邮件服务

  3. 调用失败。重新入列(要加个延迟时间,失败次数越多,延迟时间越长)

  4. 超过最大重试次数。就不发邮件了

安装插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

这是一个延迟交换机插件。省去我们自己写规则的麻烦

由于我们使用的是3.8.10。因此使用3.8.10对应的插件

拷贝plugins中,容器对应的目录是/opt/rabbitmq/plugins

docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez my-rmq:/opt/rabbitmq/plugins

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

延迟队列使用

官方文档:

// ... elided code ...Map args = new HashMap();args.put("x-delayed-type", "direct");channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);// ... more code ...

// ... elided code ...byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");Map headers = new HashMap();headers.put("x-delay", 5000);AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);byte[] messageBodyBytes2 = "more delayed payload".getBytes("UTF-8");Map headers2 = new HashMap();headers2.put("x-delay", 1000);AMQP.BasicProperties.Builder props2 = new AMQP.BasicProperties.Builder().headers(headers2);channel.basicPublish("my-exchange", "", props2.build(), messageBodyBytes2);// ... more code ...

因此go 代码改动

定义交互机的kind 应该使用x-delayed-message,args 用map[string]interface{}{"x-delayed-type":"direct"}

// UserDelayInit 创建用户延迟交换机func UserDelayInit() error {\tmq := NewMQ()\tif mq == nil {\t\treturn fmt.Errorf("UserDelayInit init error")\t}\tdefer mq.Channel.Close()\t// 声明交换机\terr := mq.Channel.ExchangeDeclare(USER_EXCHANGE_DELAY, "x-delayed-message", false, false,\t\tfalse, false, map[string]interface{}{"x-delayed-type":"direct"})\tif err != nil {\t\treturn fmt.Errorf("UserDelayInit ExchangeDeclare error")\t}\t// 声明队列名称及绑定\tqueues := fmt.Sprintf("%s",QUEUE_NEWUSER)\terr = mq.DecQueuueAndBind(queues, USER_EXCHANGE_DELAY, USER_REG_ROUTER_KEY)\tif err != nil {\t\treturn fmt.Errorf("DecQueuueAndBind error:%s",err.Error())\t}\treturn nil}

发送消息时需要设置Headers: map[string]interface{}{"x-delay":delay}, // 单位毫秒

// SendDelayMessage 发生延迟消息到mq// delay 单位是msfunc (this *MQ) SendDelayMessage(key string, exchange string,message string,delay int) error {\treturn this.Channel.Publish(exchange, key, true, false,\t\tamqp.Publishing{\t\t\tHeaders: map[string]interface{}{"x-delay":delay}, // 单位毫秒\t\t\tContentType: "text/plain",\t\t\tBody:        []byte(message),\t\t})}

生产部分

go func() {    err := Lib.UserQueueInit()    if err !=nil {        errchan

消费者不需要调整

消息者接收消息时会延迟delay 毫秒后收到

14. 记录消费者调用失败次数、逼格SQL技巧

首先建表 user_notify

 `user_notify` (  `user_id` int(11) NOT NULL,  `notify_num` int(11) NOT NULL DEFAULT '1',  `is_done` int(11) NOT NULL DEFAULT '0',  `updatetime` datetime NOT NULL,  PRIMARY KEY (`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

一旦调用邮件服务失败,写入这个表

使用MySQL库

扩展库:https://github.com/jmoiron/sqlx

安装

go get -u github.com/jmoiron/sqlx

mysql驱动:https://github.com/go-sql-driver/mysql

安装

go get -u github.com/go-sql-driver/mysql

mysql记录常规做法(伪代码)

开启事物    取出该条记录。    if 没有        insert info ...    else         if notify_num >=5 // 失败5次,程序里订阅             \t\t\tuser_money=user_money-:money \t\t\twhere user_name=:from =:money`\tresult, err := tx.NamedExec(sql1, tm)\tif err != nil {\t\treturn err\t}\taffected, err := result.RowsAffected()\tif err != nil {\t\treturn err\t}\t// 受影响行为0 代表木有扣款\tif affected == 0 {\t\terr = tx.Rollback() // 回滚\t\tif err != nil {\t\t\treturn err\t\t}\t\treturn fmt.Errorf("扣款失败1111")\t}\tsql2 := "(:from,:to,:money,NOW())"\t// 写日志表\tresult, err = tx.NamedExec(sql2, tm)\tif err != nil {\t\terr2 := tx.Rollback() // 回滚\t\tif err2 != nil {\t\t\treturn err\t\t}\t\treturn fmt.Errorf("扣款失败2222:%s",err.Error())\t}\taffected, err = result.RowsAffected()\tif err != nil {\t\terr = tx.Rollback() // 回滚\t\tif err != nil {\t\t\treturn err\t\t}\t\treturn fmt.Errorf("扣款失败3333",err.Error())\t}\t// 受影响行为0 代表木有扣款\tif affected == 0 {\t\terr = tx.Rollback() // 回滚\t\tif err != nil {\t\t\treturn err\t\t}\t\treturn fmt.Errorf("扣款失败,写日志表出错")\t}\ttx.Commit()\treturn nil}

a公司实际扣款操作

package mainimport (\t"fmt"\t"github.com/gin-gonic/gin"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"\t"log"\t"net/http"\t"os"\t"os/signal"\t"syscall")func main() {\tengine := gin.Default()\tengine.Use(Trans.HandleErr())\tengine.POST("/", func(ctx *gin.Context) {\t\ttransModel := Trans.NewTransModel()\t\terr := ctx.ShouldBindJSON(&transModel)\t\tTrans.CheckErr(err,"ShouldBindJSON error:")\t\t// 执行转账\t\terr = Trans.TransMoney(transModel)\t\tTrans.CheckErr(err,"TransMoney error:")\t\tctx.JSON(200,gin.H{"result":transModel.String()})\t})\terrChan := make(chan error)\tserver := http.Server{\t\tAddr: ":6060",\t\tHandler: engine,\t}\tgo func() {\t\terr := server.ListenAndServe()\t\tif err != nil {\t\t\terrChan

18.A公司转账业务逻辑:记录日志后发送消息到mq

初始化mq 队列

func TransInit() error {\tmq := NewMQ()\tif mq == nil {\t\treturn fmt.Errorf("UserDelayInit init error")\t}\tdefer mq.Channel.Close()\terr := mq.Channel.ExchangeDeclare(TRANS_EXCHANGE, "direct", false,\t\tfalse, false, false, nil)\tif err != nil {\t\treturn fmt.Errorf("mq.Channel.ExchangeDeclare TRANS_EXCHANGE error:%s",err.Error())\t}\terr = mq.DecQueuueAndBind(TRANS_QUEUE, TRANS_EXCHANGE, TRANS_ROUTER_KEY)\tif err != nil {\t\treturn fmt.Errorf("trans DecQueuueAndBind error:%s",err.Error())\t}\treturn nil}

拉起框架时,起协程拉起mq

// 初始化队列\tgo func() {\t\terr := Lib.TransInit()\t\tif err != nil {\t\t\terrChan

发送消息到mq

engine.POST("/", func(ctx *gin.Context) {    transModel := Trans.NewTransModel()    err := ctx.ShouldBindJSON(&transModel)    Trans.CheckErr(err,"ShouldBindJSON error:")    // 执行转账    err = Trans.TransMoney(transModel)    Trans.CheckErr(err,"TransMoney error:")    // 写mq    mq := Lib.NewMQ()    jsonBytes, _ := json.Marshal(transModel)    err = mq.SendMessage(Lib.TRANS_ROUTER_KEY, Lib.TRANS_EXCHANGE, string(jsonBytes))    Trans.CheckErr(err,"发送消息队列失败了")    ctx.JSON(200,gin.H{"result":transModel.String()})})

19.A公司转账业务逻辑:定时”无脑”补偿机制(上)

不管发送成功与否

思路如下: 1、我们写个 “死循环”程序

2、定时取5秒或自定义秒内 :status==0 的数据,再发一次消息

3、设定定时任务。定时清理20秒内(或自定义)status==0的消息,把它改为status=2

定时任务 第三方库

看这里 https://github.com/robfig/cron

安装: go get github.com/robfig/cron/[email protected]

https://www.jtthink.com/course/play/2461

定时任务补偿代码

package mainimport (\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"\t"github.com/robfig/cron/v3"\t"log")const failSql = ` STATUS=2 where \t\tTIMESTAMPDIFF(SECOND,updatetime,now())>30 >2`var MyCron *cron.Cronfunc CronInit()error  {\tMyCron = cron.New(cron.WithSeconds())\t_, err := MyCron.AddFunc("0/3 * * * * *", FailTransLog)\treturn err}// 定时取消订单func FailTransLog()  {\t_, err := Trans.GetDB().(failSql)\tif err != nil {\t\tlog.Println(err)\t}\tlog.Println("更新成功")}func main() {\terrChan := make(chan error)\tgo func() {\t\terr := Trans.InitDB("a")\t\tif err != nil {\t\t\terrChan

20.A公司转账逻辑: 补偿机制之交易失败后“还钱 ”

两个任务

取消交易

 STATUS=2 where TIMESTAMPDIFF(SECOND,updatetime,now())>20 >2

_, err := MyCron.AddFunc("0/3 * * * * *", FailTransLog)\tif err != nil {\t\treturn err\t}

还钱

`,money from `translog` where `status`=2 0 limit 10
// 还钱_, err = MyCron.AddFunc("0/4 * * * * *", BackMoney)if err != nil {    return err}

SQL

首先加个字段: isback ,money from translog  where status=2 0 limit 10

这里面为了防止 数据不一致,都要依赖数据库事务

做个统一的事务提交

func clearTx(tx *sqlx.Tx) {\terr := tx.Commit()\tif err != nil && err != sql.ErrTxDone {\t\tlog.Println("tx err",err)\t}\tislock = false}

全部代码

package mainimport (\t"context"\t"database/sql"\t"github.com/jmoiron/sqlx"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"\t"github.com/robfig/cron/v3"\t"log"\t"time")// 取消订单const failSql = ` STATUS=2 where \t\tTIMESTAMPDIFF(SECOND,updatetime,now())>30 >2`// 从日志表里取出status=2 并且isback=0 的进行还钱操作const backSql = "`,money from `translog` where `status`=2 0 limit 10"// 锁 防止上一个任务没执行完,下一个任务又开始了,产生脏读(只适用于单线程,通过变量控制锁)var islock = falsefunc clearTx(tx *sqlx.Tx) {\terr := tx.Commit()\tif err != nil && err != sql.ErrTxDone {\t\tlog.Println("tx err",err)\t}\tislock = false}// 还钱func BackMoney() {\tif islock {\t\tlog.Println("已经锁住了")\t\treturn\t}\ttxx, err := Trans.GetDB().BeginTxx(context.Background(), nil)\tif err != nil {\t\tlog.Println("事务失败",err)\t\treturn\t}\tislock = true // 加锁\tdefer clearTx(txx) // 清理事物\ttime.time.Second * 8)\trows, err := txx.Queryx(backSql)\tif err != nil {\t\tlog.Println("Queryx err:",err)\t\ttxx.Rollback()\t}\tdefer rows.Close()\ttransModels := []Trans.TransModel{}\terr = sqlx.StructScan(rows, &transModels)\tif err != nil {\t\tlog.Println("StructScan err:",err)\t\ttxx.Rollback()\t}\t// 还钱操作\tfor _, row := range transModels {\t\t_, err = txx.(" user_money=user_money+? where user_name=?",\t\t\trow.Money, row.From)\t\tif err !=nil {\t\t\ttxx.Rollback()\t\t}\t\t_, err = txx.(" isback=1 where tid=?",row.Tid)\t\tif err !=nil {\t\t\ttxx.Rollback()\t\t}\t}}var MyCron *cron.Cronfunc CronInit()error  {\tMyCron = cron.New(cron.WithSeconds())\t_, err := MyCron.AddFunc("0/3 * * * * *", FailTransLog)\tif err != nil {\t\treturn err\t}\t// 还钱\t_, err = MyCron.AddFunc("0/4 * * * * *", BackMoney)\tif err != nil {\t\treturn err\t}\treturn err}// 定时取消订单func FailTransLog()  {\t_, err := Trans.GetDB().(failSql)\tif err != nil {\t\tlog.Println(err)\t}\tlog.Println("更新成功")}func main() {\terrChan := make(chan error)\tgo func() {\t\terr := Trans.InitDB("a")\t\tif err != nil {\t\t\terrChan

21.补偿机制之重发MQ消息、B公司记录日志

今天完成的任务是

取出交易时间在8秒内,且status=0的数据,进行MQ 重发

1、SQL如下 translog where TIMESTAMPDIFF(SECOND,updatetime,now())<=8 0

2、定时器设置:为 每隔2秒 处理。

B公司日志表

和A公司一样。 不需要IsBacktid 注意 不需要自增

b公司消费代码

package mainimport (\t"encoding/json"\t"flag"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"\t"github.com/streadway/amqp"\t"log"\t"fmt")func saveLog(tm *Trans.TransModel, msg amqp.Delivery) {\tfmt.Println(tm.Tid,tm.From,tm.Money)\tsql := "(?,?,?,?,now())"\t_, err := Trans.GetDB().(sql, tm.Tid,tm.From,tm.To,tm.Money)\tif err != nil {\t\tlog.Println("1111",err)\t}\tmsg.Ack(false)}func myconsumer(messages 

22.B公司业务逻辑:确认收钱

A和B 要约定个 回调地址(A是回调地址)   http://localhost:8080/callback-----A

参数:tid

SQL status=1 where tid=? 0

A 公司回调接口

// 回调接口engine.POST("/callback", func(ctx *gin.Context) {    tid := ctx.PostForm("tid")    sql := " `status`=1 where tid=? 0"    result, err := Trans.GetDB().(sql, tid)    affected, err2 := result.RowsAffected()    if err != nil || err2 != nil || affected != 1 {        ctx.String(200,"error")    } else {        ctx.String(200,"success")    }})

B公司使用mysql 事物保证日志及确认收钱及回调成功

B消费者 消费到记录后 执行两个过程 1) 插记录 2)把钱更新给用户 3) 回调接口 3步必须都成功。否则回滚数据库。

package mainimport (\t"context"\t"database/sql"\t"encoding/json"\t"flag"\t"github.com/jmoiron/sqlx"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Lib"\t"github.com/linzhenlong/golang-jt/go-rabbitmq/Trans"\t"github.com/streadway/amqp"\t"io/ioutil"\t"log"\t"fmt"\t"net/http"\t"strings")func clearTx(tx *sqlx.Tx) {\terr := tx.Commit()\tif err != nil && err != sql.ErrTxDone {\t\tlog.Println("clearTx error",err)\t}}func saveLog(tm *Trans.TransModel, msg amqp.Delivery) {\tfmt.Println(tm.Tid,tm.From,tm.Money)\tsql := "(?,?,?,?,now())"\t_, err := Trans.GetDB().(sql, tm.Tid,tm.From,tm.To,tm.Money)\tif err != nil {\t\tlog.Println("1111",err)\t}\tmsg.Ack(false)}func saveLogWithTx(tm *Trans.TransModel, msg amqp.Delivery)  {\ttxx, err := Trans.GetDB().BeginTxx(context.Background(), nil)\tif err != nil {\t\tlog.Println("BeginTxx error",err)\t\treturn\t}\tdefer clearTx(txx)\tsql := "(?,?,?,?,now())"\t_, err = txx.(sql, tm.Tid, tm.From, tm.To, tm.Money)\tif err != nil {\t\tlog.Println("(:order_no,:order_user,:order_time)", req)\t\t\tif err != nil {\t\t\t\tlog.Println(err)\t\t\t}\t\t}\t\tmsg.Ack(false)\t}}func main() {\tmq := Lib.NewMQ()\tdefer mq.Channel.Close()\terr := Trans.InitDB("a")\tif err != nil {\t\tlog.Fatal(err)\t}\tmq.Channel.Qos(2,0,false)\tmq.Consume(Lib.ORDER_QUEUE,"消费者1", saveOrder)}

查看文章精彩评论,请前往什么值得买进行阅读互动

","gnid":"98341c1f478db84bf","img_data":[{"flag":2,"img":[{"desc":"","height":"389","title":"","url":"http://p2.img.360kuai.com/dmfd/__60/t018d54c51ce390ac56.webp","width":"702"}]}],"original":0,"pat":"art_src_1,fts0,sts0","powerby":"pika","pub_time":1695891920000,"pure":"","rawurl":"http://zm.news.so.com/bab4d4d9485a5d4b43919ad4d896220e","redirect":0,"rptid":"526da860dcbebebb","rss_ext":[],"s":"t","src":"什么值得买","tag":[],"title":"go web+RabbitMQ实战速学

相关推荐

  • connection reset 101
  • www.sony.com.cn
  • err network change
  • err connection timed
  • err connection close
  • err-connection-failed
  • connection refused
  • err timed out
  • err connection aborted
  • connection timed out
  • www.paperyy.com
  • err connection refused
  • err proxy connection
  • proxy connection failed
  • err name not resolved
  • connection reset by peer
  • connect etimedout
  • err+connection+timed out
  • www.vivo.com
  • eer connection refused
  • err connection timed out
  • nginx connection reset
  • err-connection-aborted
  • error connection reset
  • err-connection-refused
  • 手机errtimedout解决办法
  • 本文由网友投稿,不代表本站立场,仅表示其个人看法,不对其真实性、正确性、有效性作任何的担保
    若有什么问题请联系我们
    2024© 客安网