Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
两阶段提交协议的演变:
过程:
举个栗子:
两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。
tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。
下面来看下官方的两张图来加深下理解:
tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。
在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。
如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。
见官方图:
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。
出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:
TCC 模式,不依赖于底层数据资源的事务支持:
所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。
Saga模式是SEATA提供的长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。
官方图:
下载地址:
我这边下载的是v1.4.2版本,大家下载时需要注意下seata版本和springcloud alibaba的版本,根据自己的alibaba的版本选择对应的seata
给大家贴出组件版本关系对应:
首先需要创建 分支表、全局表、锁表
创建sql如下
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(`xid` VARCHAR(128) NOT NULL,`transaction_id` BIGINT,`status` TINYINT NOT NULL,`application_id` VARCHAR(32),`transaction_service_group` VARCHAR(32),`transaction_name` VARCHAR(128),`timeout` INT,`begin_time` BIGINT,`application_data` VARCHAR(2000),`gmt_create` DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`xid`),KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8;-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(`branch_id` BIGINT NOT NULL,`xid` VARCHAR(128) NOT NULL,`transaction_id` BIGINT,`resource_group_id` VARCHAR(32),`resource_id` VARCHAR(256),`branch_type` VARCHAR(8),`status` TINYINT,`client_id` VARCHAR(64),`application_data` VARCHAR(2000),`gmt_create` DATETIME(6),`gmt_modified` DATETIME(6),PRIMARY KEY (`branch_id`),KEY `idx_xid` (`xid`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8;-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(`row_key` VARCHAR(128) NOT NULL,`xid` VARCHAR(128),`transaction_id` BIGINT,`branch_id` BIGINT NOT NULL,`resource_id` VARCHAR(256),`table_name` VARCHAR(32),`pk` VARCHAR(36),`gmt_create` DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`row_key`),KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8;
SEATA AT 模式需要 UNDO_LOG 表
创建sql如下:
CREATE TABLE `undo_log` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`branch_id` bigint(20) NOT NULL,`xid` varchar(100) NOT NULL,`context` varchar(128) NOT NULL,`rollback_info` longblob NOT NULL,`log_status` int(11) NOT NULL,`log_created` datetime NOT NULL,`log_modified` datetime NOT NULL,`ext` varchar(100) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
conf目录下找到registry.conf文件
首先将type类型改为 nacos,type默认file (这种方式需要把源码的file.conf文件复制到项目中,比较麻烦不推荐) ,然后修改seata的注册中心的相关配置
其次修改seata的配置中心的相关配置,同样type类型改为nacos
下面还需要修改seata的DB类型
我们在conf目录下找到file.conf文件
mode类型改为db
然后修改自己的数据库配置
在bin目录下找到 seata-server.bat 双击启动
看到日志输出 Server started 应该就启动成功了
查看nacos注册中心
观察服务列表,发现seata服务已经成功注册
对nacos还不了解的可以看这里
我们简单模拟下用户从下单到扣减库存的流程,来看看seata在项目中是如何应用的
先看下我的项目的整体模块架构
springcloud版本
8 8 2021.0.1 2021.0.1.0
因为我的项目中已经有order的相关服务了, 为了故事的延续性我在建一个仓储的服务用来扣减库存
想参考我的项目架构的同学可以点击下面的地址
mdx-shop gitee地址
创建一个maven模块
为服务添加启动类配置文件和seata依赖等
seata依赖
com.alibaba.cloud spring-cloud-starter-alibaba-seata ${spring-cloud-alibaba.version}
仓储服务application.yml文件
server:port: 9092spring:application:name: mdx-shop-storagecloud:nacos:discovery:server-addr: localhost:8848namespace: mdxgroup: mdxdatasource:type: com.alibaba.druid.pool.DruidDataSourceurl: jdbc:mysql://localhost:3306/mdx_storage?autoRec&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8driverClassName: com.mysql.cj.jdbc.Driverusername: rootpassword: Bendi+Ceshi+jpa:show-sql: true #打印执行的sql语句,false则不打印sqlproperties:hibernate:ddl-auto: nonedialect: org.hibernate.dialect.MySQL5InnoDBDialectopen-in-view: trueseata:tx-service-group: my_test_tx_groupenabled: trueregistry:type: nacosnacos:application: mdx-seata-server #注册在nacos服务名server-addr: localhost:8848group : mdxnamespace: mdx #注册在nacos命名空间
我们为仓储服务和订单服务分别创建数据库
数据库自己提前建好
// 仓储
DROP TABLE IF EXISTS `storage_tbl`;
CREATE TABLE `storage_tbl` (`id` int(11) NOT NULL AUTO_INCREMENT,`commodity_code` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`count` int(11) NULL DEFAULT 0,PRIMARY KEY (`id`) USING BTREE,UNIQUE INDEX `commodity_code`(`commodity_code`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ----------------------------
-- Records of storage_tbl
-- ----------------------------
INSERT INTO `storage_tbl` VALUES (1, 'S123434455666777', 10);// 订单
DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (`id` int(11) NOT NULL AUTO_INCREMENT,`user_id` varchar(255) DEFAULT NULL,`commodity_code` varchar(255) DEFAULT NULL,`count` int(11) DEFAULT 0,`money` int(11) DEFAULT 0,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
这里只贴出仓储服务的主要几个方法,具体的项目结构可以参考 https://gitee.com/Ji_Agang/mdx-shop
对于数据库的操作我们使用Jpa来实现
依赖参考
org.springframework.boot spring-boot-starter-data-jpa
启动类
注意启动类一定要加 @EnableAutoDataSourceProxy 注解,来开启数据源代理
/*** @author : jiagang* @date : Created in 2022/7/1 11:25*/
@SpringBootApplication
@EnableFeignClients
@EnableAutoDataSourceProxy
public class MdxShopStorageApplication {public static void main(String[] args) {SpringApplication.run(MdxShopStorageApplication.class, args);}
}
controller
/*** @author : jiagang* @date : Created in 2022/7/1 18:42*/
@RestController
@RequestMapping("/storage")
public class StorageController {@Autowiredprivate StorageService service;@GetMapping("/deduct")public CommonResponse deduct(String commodityCode, int count){service.deduct(commodityCode, count);return CommonResponse.success();}
}
接口
/*** @author : jiagang* @date : Created in 2022/7/1 18:40*/
public interface StorageService {/*** 扣除存储数量*/void deduct(String commodityCode, int count);
}
实现类
/*** @author : jiagang* @date : Created in 2022/7/1 18:42*/
@Service
public class StorageServiceImpl implements StorageService {@Autowiredprivate StorageRepository storageRepository;/*** 扣减库存* @param commodityCode* @param count*/@Overridepublic void deduct(String commodityCode, int count) {StorageTbl storageTbl = storageRepository.findByCommodityCode(commodityCode);if (storageTbl == null){throw new BizException("storageTbl is null");}// 这里先不考虑超卖的情况storageTbl.setCount(storageTbl.getCount() - count);// 使用jpa 存在就更新storageRepository.save(storageTbl);}
}
数据层
/*** @author : jiagang* @date : Created in 2023/1/16 15:44*/
@Repository
public interface StorageRepository extends JpaRepository {/*** 通过商品code查询库存* @param commodityCode* @return*/@QueryStorageTbl findByCommodityCode(String commodityCode);
}
这里只贴出订单服务的主要几个方法,具体的项目结构可以参考 https://gitee.com/Ji_Agang/mdx-shop
对于数据库的操作我们同样使用Jpa来实现
application.yml 配置文件
server:port: 9091spring:application:name: mdx-shop-ordercloud:nacos:discovery:server-addr: localhost:8848namespace: mdxgroup: mdxdatasource:type: com.alibaba.druid.pool.DruidDataSourceurl: jdbc:mysql://localhost:3306/mdx_order?autoRec&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8driverClassName: com.mysql.cj.jdbc.Driverusername: rootpassword: Bendi+Ceshi+jpa:show-sql: true #打印执行的sql语句,false则不打印sqlproperties:hibernate:ddl-auto: nonedialect: org.hibernate.dialect.MySQL5InnoDBDialectopen-in-view: trueseata:tx-service-group: my_test_tx_groupenabled: trueregistry:type: nacosnacos:application: mdx-seata-server #注册在nacos服务名server-addr: localhost:8848group : mdxnamespace: mdx #注册在nacos命名空间feign:sentinel:enabled: true
启动类
注意启动类一定要加 @EnableAutoDataSourceProxy 注解,来开启数据源代理
/*** @author : jiagang* @date : Created in 2022/7/1 11:25*/
@SpringBootApplication
@EnableFeignClients
@EnableAutoDataSourceProxy
public class MdxShopOrderApplication {public static void main(String[] args) {SpringApplication.run(MdxShopOrderApplication.class, args);}
}
controller
/*** @author : jiagang* @date : Created in 2022/7/1 18:42*/
@RestController
@RequestMapping("/order")
public class OrderController {@Autowiredprivate OrderService orderService;/*** 用户下单接口* @param userId* @param commodityCode* @return*/@PostMapping("createOrder")public CommonResponse createOrder(String userId, String commodityCode){return CommonResponse.success(orderService.createOrder(userId,commodityCode));}
}
接口
/*** @author : jiagang* @date : Created in 2022/7/1 18:40*/
public interface OrderService {/*** 下单接口* @param userId 用户id* @param commodityCode 商品代码* @return*/String createOrder(String userId, String commodityCode);}
实现类
/*** @author : jiagang* @date : Created in 2022/7/1 18:42*/
@Service
public class OrderServiceImpl implements OrderService {@Resourceprivate OrderRepository orderRepository;@Resourceprivate StorageFeign storageFeign;/*** 下单接口* @param userId 用户id* @param commodityCode 商品代码* @return*/@Overridepublic String createOrder(String userId, String commodityCode) {try {System.out.println("事务id---------------------->" + RootContext.getXID());// 创建订单OrderTbl orderTbl = new OrderTbl();orderTbl.setUserId(userId);orderTbl.setCommodityCode(commodityCode);orderTbl.setCount(1); // 假设为1件orderTbl.setMoney(10); // 假设为十元// 保存订单orderRepository.save(orderTbl);// 保存订单成功后扣减库存storageFeign.deduct(commodityCode,orderTbl.getCount());return "success";}catch (Exception e){throw new BizException("创建订单失败");}}}
数据层
/*** @author : jiagang* @date : Created in 2023/1/16 15:44*/
@Repository
public interface OrderRepository extends JpaRepository {
}
feign接口
对微服务之前使用feign来调用还熟悉的同学可以点下面的链接
springcloud alibaba微服务 – openfeign的使用(保姆级)
/*** @author : jiagang* @date : Created in 2022/7/4 10:26*/
@FeignClient(value = "mdx-shop-storage")
@Component
public interface StorageFeign {/*** 扣减库存* @param commodityCode* @param count* @return*/@GetMapping("storage/deduct")String deduct(@RequestParam String commodityCode,@RequestParam Integer count);}
在进行测试之前,我们先来看下业务逻辑,我们使用postman来调用下单接口进行下单,接口地址为 http://localhost:9091/order/createOrder?userId=admin&commodityCode=S123434455666777 (POST请求) ,然后下单接口保存订单,并通过feign接口调用仓储服务扣减库存。
正常流程下用户下单,订单数据库增加订单,仓储数据库为下单的商品扣减库存。
首先看一下订单数据库order_tbl表和仓储数据库storage_tbl表
订单表没有数据
仓储表有一条商品,库存为10
正常情况下,下单之后(我们只买一件商品)订单增加一条数据,仓储的S123434455666777商品库存减1 为 9
POST 请求调用 下单接口 http://localhost:9091/order/createOrder?userId=admin&commodityCode=S123434455666777
postman提示成功
看一下数据库
订单新增成功
库存减为9
先将数据库订单表清空,仓储表库存继续设置为10(这里不操作也行,大家记住之前的状态就可以了)
我们在仓储服务的扣减库存的方法中手动写一个异常,异常如下
System.out.println(1 / 0);
/*** 扣减库存* @param commodityCode* @param count*/@Overridepublic void deduct(String commodityCode, int count) {System.out.println("事务id---------------------->" + RootContext.getXID());StorageTbl storageTbl = storageRepository.findByCommodityCode(commodityCode);if (storageTbl == null){throw new BizException("storageTbl is null");}// 模拟异常System.out.println(1 / 0);// 这里先不考虑超卖的情况storageTbl.setCount(storageTbl.getCount() - count);// 使用jpa 存在就更新storageRepository.save(storageTbl);}
此时,再来调用下单接口
http://localhost:9091/order/createOrder?userId=admin&commodityCode=S123434455666777
可以看到服务报错,提示创建订单失败
再来观察一下数据库
发现订单依然创建成功,但是库存缺没有减少,还是10,这就导致了用户下单成功了,但是没给人减库存,造成数据不一致,可能会发生超卖。
先将数据库订单表清空(这里不操作也行,大家记住之前的状态就可以了)
为了解决上面的问题,我们为创建订单方法增加seata的分布式注解
@GlobalTransactional
/*** 下单接口* @param userId 用户id* @param commodityCode 商品代码* @return*/@Override@GlobalTransactionalpublic String createOrder(String userId, String commodityCode) {try {System.out.println("事务id---------------------->" + RootContext.getXID());// 创建订单OrderTbl orderTbl = new OrderTbl();orderTbl.setUserId(userId);orderTbl.setCommodityCode(commodityCode);orderTbl.setCount(1); // 假设为1件orderTbl.setMoney(10); // 假设为十元// 保存订单orderRepository.save(orderTbl);// 保存订单成功后扣减库存storageFeign.deduct(commodityCode,orderTbl.getCount());return "success";}catch (Exception e){throw new BizException("创建订单失败");}}
加上注解之后重启服务继续调用下单接口
可以看到创建订单失败
然后再来观察下数据库
发现订单表没有此商品的订单,库存也没变,那就表示事务已经成功回滚了,不会再出现订单创建成功了单库存没减的情况。
这篇文章连查资料、测试、发现问题、解决问题花了两天时间,创作不易,点个赞吧👍
最后的最后送大家一句话
白驹过隙,沧海桑田,与君共勉
上一篇:C语言中的void*是什么?
下一篇:Java集合进阶——Map