Go项目(rocketmq)
创始人
2025-06-01 12:47:12
0

文章目录

  • 简介
  • 场景
  • 技术选型
  • rocketmq
    • 概念
    • 消息类型
  • go-client
  • 集成
    • CreateOrder
    • inventory
    • 库存归还

简介

  • 这篇简单介绍下 mq 的应用场景和 racketmq 技术

场景

  • 消息队列是一种“先进先出”的数据结构,应用场景主要包含以下3个方面
  • 一、应用解耦
    • 系统的耦合性越高,容错性就越低
    • 以电商应用为例,用户创建订单后,如果耦合调用(接连调用各服务并等待返回状态码)库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验
    • 使用消息队列解耦,即使某个系统发生故障,需要几分钟才能修复,但这并不影响订单服务
  • 二、流量削峰
    • 应用系统如果遇到请求流量的瞬间猛增,有可能会将系统压垮
    • 有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验
  • 三、数据分发
    • Producer 不需要关心谁来使用数据,只需要将数据发送到消息队列,Consumer 直接在消息队列中直接获取数据即可
    • 比如某个系统下线了(维护),我并不需要着急改 A 的逻辑,还是照常产生消息到 MQ,消不消费是 D 的问题
      1
  • 总而言之,有了 MQ,系统之间松散了,干起活来不用那么着急了
  • 当然,并不是没有缺点
    • 系统可用性降低:引入的外部依赖越多,系统稳定性越差;一旦MQ宕机,就会对业务造成影响。如何保证MQ的高可用?
    • 系统复杂度提高:MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用;如何保证消息没有被重复消费?怎么处理消息丢失情况?怎么保证消息传递的顺序性?
    • 一致性问题:A 系统处理完业务,通过MQ给 B、C、D 三个系统发消息数据,如果B系统、C系统处理成功,D系处理失败,如何保证消息处理的一致性?(分布式事务)

技术选型

  • 常见的 MQ 技术
    1
    2
  • 中小型软件公司,建议选 RabbitMQ,一方面,erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便;另一方面,中小型软件公司数据量没那么大,选消息中间件,应首选功能比较完备的,所以 kafka 排除(如果只是做日志,闭眼睛选这个);但是,虽然 RabbitMQ 是开源的,然而国内少有开发 erlang 的程序员,所幸它的社区十分活跃,可以解决开发过程中常见的 bug
  • 大型软件公司,根据具体使用在 rocketmq 和 kafka 之间二选一;一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量;针对 rocketMQ,大型软件公司也可以抽出人力进行定制化开发,毕竟国内有能力改 JAVA 源码的人,还是相当多的
  • 我们这里选择 rocketmq,因为它对分布式架构的支持很好,能极大提高可用性
    • MQ 技术学哪个都一样,因为功能是类似的,很容易转换
    • 阿里的有些技术会放弃维护,但 rocketmq 已经交给 Apache 管理,所以不必担心,未来发展趋势看好

rocketmq

  • 安装
    • 准备了 docker compose 脚本,避免环境问题
      version: '3.5'
      services:rmqnamesrv:image: foxiswho/rocketmq:servercontainer_name: rmqnamesrvports:- 9876:9876volumes:- ./logs:/opt/logs- ./store:/opt/storenetworks:rmq:aliases:- rmqnamesrvrmqbroker:image: foxiswho/rocketmq:brokercontainer_name: rmqbrokerports:- 10909:10909- 10911:10911volumes:- ./logs:/opt/logs- ./store:/opt/store- ./conf/broker.conf:/etc/rocketmq/broker.confenvironment:NAMESRV_ADDR: "rmqnamesrv:9876"JAVA_OPTS: " -Duser.home=/opt"JAVA_OPT_EXT: "-server -Xms256m -Xmx256m -Xmn256m"command: mqbroker -c /etc/rocketmq/broker.confdepends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqbrokerrmqconsole:image: styletang/rocketmq-console-ngcontainer_name: rmqconsoleports:- 8080:8080environment:JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"depends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqconsolenetworks:rmq:name: rmqdriver: bridge
      
    • rmqconsole 类似 kibana,访问 http://ip:8080 可以通过界面管理 MQ
    • 要挂载的 conf 目录,broker.conf;brokerIP1 一定要改成自己虚拟机的 IP
      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #     http://www.apache.org/licenses/LICENSE-2.0
      #
      #  Unless required by applicable law or agreed to in writing, software
      #  distributed under the License is distributed on an "AS IS" BASIS,
      #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      #  See the License for the specific language governing permissions and
      #  limitations under the License.# 所属集群名字
      brokerClusterName=DefaultCluster# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
      # 在 broker-b.properties 使用: broker-b
      brokerName=broker-a# 0 表示 Master,> 0 表示 Slave
      brokerId=0# nameServer地址,分号分割
      # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
      # 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
      brokerIP1=192168.109.128# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
      defaultTopicQueueNums=4# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
      autoCreateTopicEnable=true# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
      autoCreateSubscriptionGroup=true# Broker 对外服务的监听端口
      listenPort=10911# 删除文件时间点,默认凌晨4点
      deleteWhen=04# 文件保留时间,默认48小时
      fileReservedTime=120# commitLog 每个文件的大小默认1G
      mapedFileSizeCommitLog=1073741824# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
      mapedFileSizeConsumeQueue=300000# destroyMapedFileIntervalForcibly=120000
      # redeleteHangedFileInterval=120000
      # 检测物理文件磁盘空间
      diskMaxUsedSpaceRatio=88
      # 存储路径
      # storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
      # commitLog 存储路径
      # storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
      # 消费队列存储
      # storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
      # 消息索引存储路径
      # storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
      # checkpoint 文件存储路径
      # storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
      # abort 文件存储路径
      # abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
      # 限制的消息大小
      maxMessageSize=65536# flushCommitLogLeastPages=4
      # flushConsumeQueueLeastPages=2
      # flushCommitLogThoroughInterval=10000
      # flushConsumeQueueThoroughInterval=60000# Broker 的角色
      # - ASYNC_MASTER 异步复制Master
      # - SYNC_MASTER 同步双写Master
      # - SLAVE
      brokerRole=ASYNC_MASTER# 刷盘方式
      # - ASYNC_FLUSH 异步刷盘
      # - SYNC_FLUSH 同步刷盘
      flushDiskType=ASYNC_FLUSH# 发消息线程池数量
      # sendMessageThreadPoolNums=128
      # 拉消息线程池数量
      # pullMessageThreadPoolNums=128
      
    • 启动成功后可以先体验一下发送和消费消息,这里都是基于 topic 的
      1
    • 安装上面的做法启动如果还报 Java 异常,先 Ctrl-C 重启一下试试,别着急查

概念

  • Producer:消息的发送者;举例:发信者
  • Consumer:消息接收者;举例:收信者
  • Broker:暂存和传输消息(数据是存在这的);举例:邮局
  • NameServer:管理Broker;举例:各个邮局的管理机构
  • Topic:区分消息的种类(主题);一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息
  • 这张图展示了各组件的关系,broker 相当于服务器
    2

消息类型

  • 按照发送特点分
    • 同步发送
      • 同步发送,线程阻塞,投递 completes 后阻塞结束
      • 如果发送失败,会在默认的超时时间3秒内进行重试,最多重试2次
      • 投递 completes 不代表发送成功,要 check SendResult.sendStatus 来判断是否投递成功
      • SendStatus 里面有发送状态的枚举,同步的消息投递会有一个状态返回值
        public enum SendStatus {SEND_OK,FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE,
        }
        
      • 注:发送同步消息且 Ack 为 SEND_OK,只代表该消息成功写入了 MQ 中,并不代表该消息被Consumer 消费了
    • 异步发送
      • 异步调用,当前线程一定要等待异步线程回调结束再关闭 Producer,因为是异步的,不会阻塞,提前关闭 producer 会导致未回调链接就断开了
      • 异步消息不 retry,投递失败回调 onException() 方法,只有同步消息才会 retry,源码参考DefaultMQProducerImpl.class
      • 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后启动转码服务,转码完成后发送转码结果(向MQ发送)等,不必一直等待转码结果是否发送成功,会有回调链,保证消息已发送(不丢失)
    • 单向发送
      • 消息不可靠,性能高,只负责往服务器(broker)发送一条消息,不会重试也不关心是否发送成功
      • 此方式发送消息的过程耗时非常短,一般在微秒级别
    • 概括对比一下
      1
  • 按照使用功能特点分
    • 或者说按照使用场景分、按照需求分
    • 普通消息
      • 普通消息是我们在业务开发中用到的最多的消息类型,生产者需要关注消息发送成功即可,消费者消费到消息即可,不需要保证消息的顺序,所以消息可以大规模并发地发送和消费,吞吐量很高,适合大部分场景
      • 每个 broker 都相当于一个 queue,broker 内保证 FIFO,但一个 topic 可能在多个 broker 上,所以从整体来看是默认无序的
    • 顺序消息
      • 分为分区顺序消息和全局顺序消息,全局顺序消息比较容易理解,也就是哪条消息先进入,哪条消息就会先被消费,符合我们的 FIFO,但很多时候全局消息的实现代价很大,所以就出现了分区顺序消息
      • 分区顺序消息的概念可以如下图所示
        2
      • 通过对消息的 key 进行 hash,相同 hash 的消息会被分配到同一个分区里面,当然如果要做全局顺序消息,我们的分区只需要一个即可,所以全局顺序消息的代价是比较大的
    • 延时消息
      • 延迟的机制是在服务端(broker)实现的,也就是 Broker 收到了消息,但是经过一段时间以后才发送(才能被消费);有 push/pull 两种消费方式
      • 服务器按照 1-N 定义了如下级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
      • 若要发送定时消息,在应用层初始化 Message 消息对象之后(这都是broker里的操作,producer只需要传参),调用Message.setDelayTimeLevel(intlevel) 方法来设置延迟级别,按照序列取相应的延迟级别,例如 level=2,则延迟为 5s
      • 发送消息的时候如果消息设置了 DelayTimeLevel,该消息会被丢到ScheduleMessageService.SCHEDULE_TOPIC 这个 Topic 里面
      • 根据 DelayTimeLevel 选择对应的 queue(broker),不需要每个 broker 都有延时各种 level 的能力
    • 事务消息
      • 看看官方文档吧,上面说的这几种消息类型都有介绍
  • 上面只是从不同角度划分了消息种类,但彼此之间是有交集的,比如某个 topic 下的这条消息可以是异步发送的普通消息,我们只需要关注自己的需求选择合适的方式,不必太关注类型划分

go-client

  • go 操作 rocketmq 的客户端
  • 同步发送 SendSync,普通消息;这里涉及一个重要的概念:NameServer
    package mainimport ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
    )func main() {// rmqnamesrv 的端口 9876p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.109.128:9876"}))if err != nil {panic("生成producer失败")}if err = p.Start(); err != nil {panic("启动producer失败")}res, err := p.SendSync(context.Background(), primitive.NewMessage("vshop", []byte("this is Roy!")))if err != nil {fmt.Printf("发送失败: %s\n", err)} else {fmt.Printf("发送成功: %s\n", res.String())}if err = p.Shutdown(); err != nil {panic("关闭producer失败")}}
    
    1
    • 注:会返回一个 ID msgIds=C0A87E022710000000005a1e38900001
  • 消费/订阅消息(push)
    • 这里要设置 groupname,因为我们是 consumer 集群,其中一台机器消费了消息,其他就不需要再消费,这就需要这组机器加入到同一个 group,相同 group 的机器再来只会消费下一个消息
      2
    • 只推送一份消息不就行了吗???
    • 订阅 NewPushConsumer
      package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive"
      )func main() {c, _ := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"192.168.109.128:9876"}),consumer.WithGroupName("shop"),)if err := c.Subscribe("vshop", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for i := range msgs {fmt.Printf("获取到值: %v \n", msgs[i])// Message=[topic=vshop, body=this is Roy!, Flag=0, properties=map[CONSUME_START_TIME:1679116202537 MAX_OFFSET:1 MIN_OFFSE//T:0 UNIQ_KEY:C0A87E022710000000005a1e38900001], TransactionId=]}return consumer.ConsumeSuccess, nil}); err != nil {fmt.Println("读取消息失败")}_ = c.Start()// 不能让主goroutine退出time.Sleep(time.Hour)_ = c.Shutdown()
      }
      
    • 通过主线程 sleep 让 consumer 不退出,只要给这个 topic push 了消息,这边就能收到
    • 被消费了的消息还是能看到,detail 中会看到 consumerGroup
      3
  • 发送延时消息
    • 可以启动 consumer 观察是否延时了;注:topic 要对应上,一切基于 topic
      package mainimport ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
      )func main() {p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.109.128:9876"}))if err != nil {panic("生成producer失败")}if err = p.Start(); err != nil {panic("启动producer失败")}msg := primitive.NewMessage("vshop", []byte("this is a delay message3"))// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hmsg.WithDelayTimeLevel(4)res, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("发送失败: %s\n", err)} else {fmt.Printf("发送成功: %s\n", res.String())}if err = p.Shutdown(); err != nil {panic("关闭producer失败")}//使用场景:支付的时候, 淘宝, 12306, 购票, 超时归还 - 定时执行逻辑//我可以去写一个轮询, 轮询的问题: 1. 多久执行一次轮询 30分钟//在12:00执行过一次, 下一次执行就是在 12:30的时候 但是12:01的时候下了单, 12:31就应该超时 但现在13:00时候才能超时//那我1分钟执行一次, 比如我的订单量没有这么大,1分钟执行一次, 其中29次查询都是无用, 而且你还还会轮询mysql//rocketmq的延迟消息, 1. 时间一到就执行, 2. 消息中包含了订单编号,你只查询这种订单编号
      }
      
    • 后续完善订单服务的超时归还库存要使用延时消息
  • 发送事务消息
    • 底层原理就是上一篇中提到的基于可靠消息的最终一致性方案,可以先回顾一下
    • SendMessageInTransaction 会发送半消息,然后阻塞等待本地事务的结果
    • 这里模拟了三种返回结果,注意理解
      package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
      )type OrderListener struct{}func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {fmt.Println("开始执行本地逻辑")time.Sleep(time.Second * 10)fmt.Println("执行本地事务结束")// 1.执行本地事务成功// return primitive.CommitMessageState // expect:成功发送commit消息,控制台能看到消息// 2.执行本地事务失败// return primitive.RollbackMessageState 	// expect:成功发送rollback消息,half message被删除,控制台看不到消息// 3.本地执行逻辑无缘无故失败 代码异常 宕机fmt.Println("执行本地逻辑失败")return primitive.UnknowState // 模拟不返回状态;等待一定时间后,开始 check
      }// 即使服务挂掉,下次启动后还是会接着回查,严谨
      // 因为 MQ 里存着 half message,没有 commit,也没有 rollback,于是就会拿着 ID 回查对应事务
      func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {fmt.Println("本地事务状态未知,rocketmq 回查")time.Sleep(time.Second * 5)return primitive.CommitMessageState // 回查本地事务发现已成功执行,可以 commit
      }func main() {p, err := rocketmq.NewTransactionProducer(&OrderListener{},producer.WithNameServer([]string{"192.168.109.128:9876"}),)if err != nil {panic("生成producer失败")}if err = p.Start(); err != nil {panic("启动producer失败")}// 底层先发送了 half message;success后再去执行本地事务 ExecuteLocalTransaction,// 等本地事务结束// 		1. 根据返回状态发送 commit/rollback//		2. 一直没返回状态,调用 CheckLocalTransaction 检测本地事务的状态,再次 commit/rollback// 这里会阻塞,情况1时,阻塞到已发送完整消息/已删除半消息,情况2时,阻塞到本地事务返回 UnknowState,剩下的交给回查// 具体细节还需要看源码res, err := p.SendMessageInTransaction(context.Background(), primitive.NewMessage("TransTopic", []byte("this is successful transaction message1")))fmt.Println("state:", res.State) // 1 2 3if err != nil {fmt.Printf("返回状态失败: %s\n", err)} else {fmt.Printf("返回状态成功: %s\n", res.String())}time.Sleep(time.Hour)if err = p.Shutdown(); err != nil {panic("关闭producer失败")}
      }
      
    • 后续会用到事务消息实现分布式事务
    • 这里要保证订阅者消费了消息(响应了请求),具体怎么实现后续会介绍
  • Tips:任何系统,日志都需要单独设计并测试各种可能情况,是排查问题的关键

集成

  • 接下来将分布式事务和超时退还机制集成到订单微服务,在实践中深化理解
  • 先设计大致的架构,考虑库存扣减中可能遇到的情况,或者说分析数据不一致的可能原因
    • 在订单服务本地,要操作:新建订单信息、新建订单商品信息、删除购物车记录,这部分可以放在 MySQL 本地事务保证数据一致性
    • 扣减库存成功
      • 本地事务失败,能否在事务 rollback 中调用库存归还呢?不可以,服务挂掉、服务器宕机、本地代码异常等会导致无法执行到归还库存
    • 扣减库存失败,直接回滚本地事务,但这就要把调用库存服务放在事务中,如果遇到网络拥塞或库存服务宕机,重试次数达到上限,本地事务无法执行成功
    • 这部分就是把服务挂掉、网络拥塞、服务器宕机、本地代码异常这些问题尝试放到各个步骤中,看架构能否解决问题或存在哪些不足
  • 尝试用上一篇中介绍的分布式事务方案解决问题
    • tcc(虽然合适,但实现较为复杂,go语言还没有完善的框架)
      1
    • 基于可靠消息的最终一致性方案
      • 前面说过,需要保证订阅者消费了消息,因为本地 commit 后不能再回滚,库存服务必须执行成功
        2
      • 一般情况下,可以通过解决代码 bug 保证消息被消费,即使宕机,启动后还是可以继续消费
      • 但是库存服务特殊,因为有库存不足的情况!或者概括为:这个方案对于资源有限制的场景是需要改造的,不能直接套用
    • 改进
      • 只需调整一下业务逻辑,如图,先发送半消息准备归还库存,再调用库存服务(扣减),如果库存不足,后续的本地事务不会开始,从而解决了资源有限制的问题
        3
      • 如果本地事务执行成功,则 rollback 调 reback 的 half message;如果失败,就 commit,执行库存归还
      • 同样有回查,如果能查到已支付订单(或未超时),说明不需要库存归还(rollback),如果没有订单信息(或已超时),commit 归还库存
      • 这里还应该发送延时消息,也就是实现超时归还机制;但对同一订单,库存服务只能消费归还消息(回查commit的)或延时消息(同一时刻出现这两个commit的几率不大)
    • 接下来具体实现这一改进方案

CreateOrder

  • 在 order_srv/handler/order.go 函数 CreateOrder 中发送事务消息
    // 更新订单表
    order := model.OrderInfo{OrderSn:      GenerateOrderSn(req.UserId), // 一定要放在这里生成,归还逻辑会用到Address:      req.Address,SignerName:   req.Name,SingerMobile: req.Mobile,Post:         req.Post,User:         req.UserId,
    }
    // 应该在消息中具体指明一个订单的具体的商品的扣减情况
    jsonString, _ := json.Marshal(order)
    // 1. 先发送库存归还半消息,再调用 ExecuteLocalTransaction
    _, err = p.SendMessageInTransaction(context.Background(),primitive.NewMessage("order_reback", jsonString))
    if err != nil {fmt.Printf("发送失败: %s\n", err)return nil, status.Error(codes.Internal, "发送消息失败")
    }
    if orderListener.Code != codes.OK {return nil, status.Error(orderListener.Code, orderListener.Detail)
    }return &proto.OrderInfoResponse{Id: orderListener.ID, OrderSn: order.OrderSn, Total: orderListener.OrderAmount}, nil
    
  • 这个函数的大部分逻辑移到 ExecuteLocalTransaction
  • 还需要实现指向结构体 OrderListener 的另一个方法 CheckLocalTransaction,用于回查
    • Q:查到订单号,也应该查看订单状态呀,可能是未支付未超时,直接回滚岂不是没机会归还了?
    • 后续添加发送延时消息的逻辑,到时再梳理逻辑
  • 这部分需要看代码仔细理解,并结合前面的理论知识,比较考验逻辑思维

inventory

  • 库存服务部分 inventory_srv/handler/inventory.go
  • 首先,需要在 main.go 订阅订单服务生成的 topic
    if err := c.Subscribe("order_reback", consumer.MessageSelector{}, handler.AutoReback); err != nil {fmt.Println("读取消息失败")
    }
    
  • 因为逻辑较为复杂,库存归还函数 AutoReback 放在 handler 中单独定义,重点是解决重复归还的问题
  • 主要是保证归还接口的幂等性,方案是:新建一张表, 这张表记录了详细的订单扣减细节,以及归还细节
  • 在 model 设计并生成表
    // 便于返回商品详情
    type GoodsDetail struct {Goods int32Num   int32
    }
    type GoodsDetailList []GoodsDetailtype StockSellDetail struct {OrderSn string          `gorm:"type:varchar(200);index:idx_order_sn,unique;"`	// 要指定索引名称Status  int32           `gorm:"type:varchar(200)"` //1 表示已扣减 2. 表示已归还Detail  GoodsDetailList `gorm:"type:varchar(200)"`
    }func (StockSellDetail) TableName() string {return "stockselldetail"
    }
    
  • 测试表
    package mainimport ("crypto/md5""encoding/hex""fmt""gorm.io/driver/mysql""gorm.io/gorm""gorm.io/gorm/logger""gorm.io/gorm/schema""io""log""shop_srvs/inventory_srv/model""os""time"
    )func genMd5(code string) string{Md5 := md5.New()_, _ = io.WriteString(Md5, code)return hex.EncodeToString(Md5.Sum(nil))
    }func main() {dsn := "root:root@tcp(192.168.109.128:3306)/shop_inventory_srv?charset=utf8mb4&parseTime=True&loc=Local"newLogger := logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags), // io writerlogger.Config{SlowThreshold: time.Second,   // 慢 SQL 阈值LogLevel:      logger.Info, // Log levelColorful:      true,         // 禁用彩色打印},)// 全局模式db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{NamingStrategy: schema.NamingStrategy{SingularTable: true,},Logger: newLogger,})if err != nil {panic(err)}//_ = db.AutoMigrate(&model.Inventory{}, &model.StockSellDetail{})//插入一条数据//orderDetail := model.StockSellDetail{//	OrderSn: "imooc-bobby",//	Status:  1,//	Detail:  []model.GoodsDetail{{1,2},{2,3}},//}//db.Create(&orderDetail)var sellDetail model.StockSellDetaildb.Where(model.StockSellDetail{OrderSn:"vshop-roy"}).First(&sellDetail)fmt.Println(sellDetail.Detail)
    }
    
  • 接下来定义自动归还的逻辑
    • Q:consumer.ConsumeRetryLater 的底层逻辑是怎样的?如何重新消费这条消息?
    • 推理:应该是收集 return 了 ConsumeRetryLater 的消息,重新调用 AutoReback
  • 测试修改的代码
    • 让本地事务执行出错(返回 codes.Internal),测试库存归还,通过 UT 也可以完成,但是需要 mock,会麻烦一些
    • 让本地事务返回 primitive.UnknowState,测试回查函数;注:如果我们的业务逻辑正常,回查函数一般只会在极端情况下才会用到
    • Q:未知状态包括哪些?本地事务执行时间过长是否会触发回查?还是说必须返回一个异常?可能不是最后 return 的,而是本地事务的代码出问题,ExecuteLocalTransaction 报错,也会回查
    • 触发回查后还会等待一段时间,才执行 CheckLocalTransaction

库存归还

  • 还是放在本地事务的代码中,发送延时消息
  • 这里要判断,可能 30min 内已支付,在 main 监听 topic,取消订单逻辑放在 order/handler
    • 操作订单表 TRADE_CLOSED
    • 归还库存,直接给 order_reback 这个 topic 发消息就行
    • 那购物车表、商品订单表怎么归还呢?TODO
  • 这里要注意,不要使用 p.Shutdown 关闭 Producer,会报错
    • 通过 NewPushConsumer 追踪源码,可以看到里面有个函数 GetOrNewRocketMQClient
    • 通过 NewProducer/NewTransactionProducer,还是用了这个函数
    • 源码用的 clientMap 是一个协程同步的 map,传入的 ClientID 可以看到是进程号
      func (c *rmqClient) ClientID() string {id := c.option.ClientIP + "@"if c.option.InstanceName == "DEFAULT" {id += strconv.Itoa(os.Getpid())} else {id += c.option.InstanceName}if c.option.UnitName != "" {id += "@" + c.option.UnitName}return id
      }
      
    • 也就是说,任何地方创建的 C/P,因为没有指定 ID,都是进程号,这个 Map 就会 Load 已经存在的这个 client,只要在任意地方 Shutdown 了,都会导致全部的 C/P 关闭
      • 也就是说,一个 client 下可以有多个 C/P
      • 这种实现便于统一管理,也有应用场景
    • 我们这里解决报错的办法有两个:1.不用 Shutdown,2.在新建时,options 中指定 ClientID
  • 项目代码方面,这章主要是给实现分布式事务的 code 加了些注释
    • git commit

相关内容

热门资讯

不开心的句子 发表不开心的心... 不开心的句子 发表不开心的心情句子  在平时的学习、工作或生活中,大家都接触过很多优秀的句子吧,句子...
幽默风趣的句子 幽默风趣的句子(精选100句)  在生活、工作和学习中,大家都看到过许多经典的句子吧,从语气上分,句...
孝敬老人最美的句子 孝敬老人最美的句子大全  在日常的学习、工作、生活中,大家一定没少看到经典的句子吧,不同的句子在语言...
稳重成熟的句子 稳重成熟的句子  在日常学习、工作或生活中,大家都经常接触到句子吧,从表达的角度说,句子是最基本的表...
描写开心的句子 描写开心的句子(精选120句)  无论是身处学校还是步入社会,大家都看到过许多经典的句子吧,不同的句...
感谢姐姐的短句暖心感恩姐姐的... 感谢姐姐的短句暖心2022感恩姐姐的句子  在平凡的学习、工作、生活中,大家总少不了接触一些耳熟能详...
刻骨铭心的爱情句子 有关刻骨铭心的爱情句子  1、爱情不在于一朝一夕,需要的是刻骨铭心。  2、回首往事,是谁欠谁的债,...
自我提神醒脑的句子 自我提神醒脑的句子(精选185句)  在平平淡淡的日常中,大家都收藏过令自己印象深刻的句子吧,句子可...
草房子好句 草房子好句  1、立在炉上的那只黑色的瓦罐,造型土气,但似乎又十分讲究,粗朴的身子,配了一只弯曲得很...
火树银花造句 火树银花造句  一、火树银花简介  意思是形容张灯结彩或大放焰火的灿烂夜景。出自唐·苏味道《正月十五...
友谊留言句子   友情的真挚永远比不上真金。真金能卖友情不能。友情,就好比一条简单的线。无论你在什么地方,发生什么...
短而精的句子   短而精的句子  1、爱是一种需要不断被人证明的虚妄,就像烟花需要被点燃才能看到辉煌一样。  2、...
简单的排比句 简单的排比句  在学习、工作或生活中,大家总少不了接触一些耳熟能详的句子吧,从语气上分,句子可以分为...
优美文艺句子 通用优美文艺句子48句  黑白头像下隐藏着一颗破碎的心和一个等待的人。下面是小编搜索整理的优美文艺句...
对人生路迷茫的句子 对人生路迷茫的句子  迷茫,让我们的生活像水一样平乏无味却又无处不在,久而久之,渗透出汩汩水流,汇而...
描写周边环境的句子 描写周边环境的句子(精选50句)  在生活、工作和学习中,大家都接触过很多优秀的句子吧,不同的句子在...
高中好词佳句摘抄 高中好词佳句摘抄大全  摘抄是指从文刊、文件等资料里阅读的时候 ,把语言优美,值得品析,值得学习的词...
真心思念一个人的句子 真心思念一个人的句子1、无论如何谢谢你,七年里,你见证了我的一切。2、我在怀念,没有想念的意思。3、...
描写母爱伟大的经典句子 描写母爱伟大的经典句子(精选75句)  无论在学习、工作或是生活中,大家最不陌生的就是句子了吧,句子...
新年祝福语句子 新年祝福语句子(精选160句)  在学习、工作乃至生活中,大家或多或少都会用到过祝福语吧,祝福语是人...