[TOC]

1.1. 第一章 事务基础概念

1.1.1. 1、事务的回顾

【1】事务的定义

     是数据库操作的最小工作单元,是作为单个逻辑工作单元执行的一系列操作,这些操作作为一个整体一起向系统提交,要么都执行、要么都不执行;事务是一组不可再分割的操作集合 

【2】事务的ACID原则

事务具有4个基本特性:原子性、一致性、隔离性、持久性。也就是我们常说的ACID原则

  • 原子性(Atomicity):

    一个事务已经是一个不可再分割的工作单位。事务中的全部操作要么都做;要么都不做
    
    例如:A和B两个人一共1000元,A给B转账100元,A付款100元,B收款100元,
         A的付款行为和B的收款行为要么都成功,要么都失败
    
  • 一致性(Consistency):

    事务的执行使数据从一个状态转换为另一个状态,但是对于整个数据的完整性保持稳定。 
    
    例如:A和B两个人一共1000元,无论A,B两人互相转账多少次,A和B两个人总额都应该是1000元
    
  • 隔离性(Isolation):

    事务允许多个用户对同一个数据进行并发访问,而不破坏数据的正确性 和完整性。同时,并行事务的修改必须与其他并行事务的修改相互独立。 
    
    例如:万达影院有《叶问4》电影票100张,允许所有人同时去淘票票上购票,当第100张电影票被A,B,C3人同时购买,如果A拿到第100张电影票,但是还在犹豫要不要付钱,则B,C必须等待A的决定,如果A决定付钱,B.C就无法抢到票,如果A超时不付钱,则这第100张电影票回归票池,从新分配。
    
  • 持久性(Durability): 一个事务一旦提交,它对数据库中数据的改变会永久存储起来。其他操作不会对它产生影响

    例如:万达影院有《叶问4》电影票100张,100张电影票销售完毕,对于每个购买者来说,他的购买记录已经产生,即使退票,他之前的购买记录也不会消失。
    

【3】隔离级别

【3.1】为什么要事务隔离?

如果没有定义事务隔离级别:

  • 脏读

    ​ 在一个事务中读取到了另外一个事务修改的【未提交的数据】,而导致多次读取同一个数据返回的结果不一致 (必须要解决的)

例如:
    1.事务1,小明的原工资为1000, 财务人员将小明的工资改为了8000【但未提交事务】
    2.事务2,小明读取自己的工资 ,发现自己的工资变为了8000,欢天喜地!
    3.事务1,财务发现操作有误,回滚了操作,小明的工资又变为了1000
      像这样,小明读取的工资数8000是一个脏数据
  • 不可重复读

    ​ 在一个事务中读取到了另外一个事务修改的【已提交的数据】,而导致多次读取同一个数据返回的结果不一致

例如:
    1.事务1,小明读取了自己的工资为1000,操作还没有完成 
    2.事务2,这时财务人员修改了小明的工资为2000,并提交了事务.
    3.事务1,小明再次读取自己的工资时,工资变为了2000
  • 幻读(虚读):

    ​ 一个事务读取了几行记录后,另一个事务插入一些记录,幻读就发生了。再后来的查询中,第一个事务就会发现有些原来没有的记录

例如:   
   1.事务1,财务统计所有工资为5000的员工有10人。
   2.事务2,人事向user表插入了一条员工记录,工资也为5000
   3.事务1,财务再次读取所有工资为5000的员工 共读取到了11条记录,明明刚刚是10人啊?产生幻觉了?
【3.2】spring事务隔离级别

事务隔离就是帮助我们解决:脏读、不可重复读、幻读(虚读)

隔离级别由低到高【读未提交】=>【读已提交】=>【可重复读】=>【序列化操作】

隔离级别 说明 脏读 不可重复读 幻读
ISOLATION_DEFAULT spring默认数据库的隔离级别 -- -- --
ISOLATION_READ_UNCOMMITTED 读未提交
ISOLATION_READ_COMMITTED 读已提交 ×
ISOLATION_REPEATABLE_READ 可重复读 × ×
ISOLATION_SERIALIZABLE 序列化操作 × × ×

对大多数数据库来说就是:READ_COMMITTED(读已提交)

MySQL默认采用:REPEATABLE_READ(可重复读),

Oracle采用:READ__COMMITTED()

Spring的隔离级别默认数据库的隔离级别:ISOLATION_DEFAULT

1.1.2. 2、事务分类讨论

【1】本地事务

​ 本地事务是关系型数据库中,由一组SQL组成的一个执行单元,该单元要么整体成功,要么整体失败。它的缺点就是:仅支持单库事务,并不支持跨库事务。

image-20210108162120348

【2】分布式事务

可随着业务量的不断增长,单体架构渐渐扛不住巨大的流量,此时就需要对数据库、表做分库分表处理,将应用SOA 服务化拆分。也就产生了订单中心、用户中心、库存中心等,由此带来的问题就是业务间相互隔离,每个业务都维护着自己的数据库,数据的交换只能进行RPC 调用。

当用户再次下单时,需同时对订单库 order、账户库 account、交易库 Trading 进行操作,可此时我们只能保证自己本地的数据一致性,无法保证调用其他服务的操作是否成功,所以为了保证整个下单流程的数据一致性,就需要分布式事务介入

image-20210108163049126

1.1.3. 3、分布式基础理论

【1】CAP原理

一句话概括CAP:在分布式系统中,即使网络故障,服务出现瘫痪,整个系统的数据保持一致性

image-20210103165841687

【举个栗子】

image-20210108163226400

美团下单到派单,并且扣除优惠券100元

张三在美团上点了外卖,然后下订单,然后在通知外卖小哥接单。思考三个问题:

1:如何体现C数据的一致性

整个分布式系统中,一致性体现这笔订单,必须通知外卖小哥送单。必须扣除100元的优惠券。

2:如果体现A可用性

在整个分布式系统中,可用性体现在张三下订单的时候,如果送单服务或卡券服务瘫痪了,这时候不能影响张三下单

3:如何体现P分区容错性

整个分布式系统中,分区容错主要体现中张三下单的时候,突然订单服务和卡券服务之间的网络突然断开了,但是不能影响张三下订单。

image-20210108163257502

当前服务之间出现网络故障的情况下:

1:如何保证订单服务和卡券服务高可用

2:下一笔订单同时扣除100元优惠券如何实现:

分布式系统解决方案:AP

AP:==牺牲一致性,保证高可用==,(保证订单服务可以正常访问,保证卡券服务可以正常访问,是牺牲了数据的一致性),张三下单成功,但是不扣除100元优惠券。在这种情况下:张三下订单成功后再去查看100元优惠券,居然还存在。

如何解决呢?一般做法是:当网络恢复正常的情况下,订单服务重试请求卡券服务,再扣除100元优惠券。使用消息队列来做。

CP:==牺牲可用性,保证数据一致性==,即保证数据的强一致性,当张三来下单的时候,提示:系统维护中 等服务间的网络恢复正常后,张三再来下单。

CA:可以实现码?当然是不可以的,因为网络故障是一定会存在的,因为我们没办法去控制网络。你没办法去控制网络。也就是P必须要容忍。不要P分区容错性,即不允许网络出现故障,这是不可能实现的。所以在分布式系统中,是不存在CA的,即使单体系统也做不到CA,因为单体系统也会出现单一故障问题。你可能说我可以用集群,但是一旦做了集群就由网络问题

【2】BASE原理

image-20210104091312080

BASE 是指基本可用(Basically Available)、软状态( Soft State)、最终一致性( Eventual Consistency),核心思想是即使无法做到强一致性(CAP 的一致性就是强一致性),但应用可以采用适合的方式达到最终一致性。

基本可用(Basically Available)

分布式系统在出现故障时,允许损失部分可用性,即保证核心可用。

这里的关键词是“部分”和“核心”,具体选择哪些作为可以损失的业务,哪些是必须保证的业务,是一项有挑战的工作。例如,对于一个用户管理系统来说,“登录”是核心功能,而“注册”可以算作非核心功能。因为未注册的用户本来就还没有使用系统的业务,注册不了最多就是流失一部分用户,而且这部分用户数量较少。如果用户已经注册但无法登录,那就意味用户无法使用系统。例如,充了钱的游戏不能玩了、云存储不能用了……这些会对用户造成较大损失,而且登录用户数量远远大于新注册用户,影响范围更大。

软状态(Soft State)

允许系统存在中间状态,而该中间状态不会影响系统整体可用性。这里的中间状态就是 CAP 理论中的数据不一致。

最终一致性(Eventual Consistency)

系统中的所有数据副本经过一定时间后,最终能够达到一致的状态。

这里的关键词是“一定时间” 和 “最终”,“一定时间”和数据的特性是强关联的,不同的数据能够容忍的不一致时间是不同的。举一个微博系统的例子,用户账号数据能在 1 分钟内就达到一致状态。因为用户在 A 节点注册或者登录后,1 分钟内不太可能立刻切换到另外一个节点,但 10 分钟后可能就重新登录到另外一个节点了,而用户发布的微博,可以容忍 30 分钟内达到一致状态。因为对于用户来说,看不到某个明星发布的微博,用户是无感知的,会认为明星没有发布微博。“最终”的含义就是不管多长时间,最终还是要达到一致性的状态。

【小结】

BASE 理论本质上是对 CAP 的延伸和补充,更具体地说,是对 CAP 中 AP 方案的一个补充。前面在剖析 CAP 理论时,提到了其实和 BASE 相关的两点:==其实base理论就是告诉我们:虽然在分布式开发中,存在网络问题,我们虽然达不到一致性,但是可以通过一些技术和手段让我们数据达到最终一致性。==

CAP 理论是忽略延时的,而实际应用中延时是无法避免的。

1.2. 第二章 分布式事务的解决方案【重点】

​ 实现分布式事务的方案比较多,常见的比如基于XA 协议的2PC、3PC,基于业务层的TCC,还有应用消息队列 + 消息表实现的最终一致性方案,还有今天要说的Seata 中间件,下边看看各个方案的优缺点。

1.2.1. 1、刚性事务

【1】2PC【基于XA协议】

基本要求:

XA协议是需要数据库支持的,我们不会采用传统数据源支持,而是需要使用支持XA的数据源,mysql5.6以后对这个XA事务的支持比较友好

使用事务管理器的时候,需要配置支持XA事务的事务管理期

【1.1】流程图解

两阶段提交【2PC】,对业务侵⼊很小,它最⼤的优势就是对使⽤⽅透明,用户可以像使⽤本地事务⼀样使⽤基于 XA 协议的分布式事务,能够严格保障事务 ACID 特性。

image-20210108164140989

可 2PC的缺点也是显而易见,它是一个强一致性的同步阻塞协议,事务执⾏过程中需要将所需资源全部锁定,也就是俗称的 刚性事务。所以它比较适⽤于执⾏时间确定的短事务,整体性能比较差。

一旦事务协调者宕机或者发生网络抖动,会让参与者一直处于锁定资源的状态或者只有一部分参与者提交成功,导致数据的不一致。因此,在⾼并发性能⾄上的场景中,基于 XA 协议的分布式事务并不是最佳选择。

image-20210108164227125

【1.2】案例编写

image-20210108152637079

pom.xml文件

transaction-xa继承itheitma-transaction-parent模块,居然POM文件如下:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>itheitma-transaction-parent</artifactId>
        <groupId>com.itheima.transaction</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>transaction-xa</artifactId>

    <name>transaction-xa</name>

    <dependencies>
        <!--springboot的web支持-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--lombok支持-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!--swagger2支持-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
        </dependency>

        <!--  knife4j版接口文档 访问/doc.html      -->
        <dependency>
            <groupId>com.github.xiaoymin</groupId>
            <artifactId>knife4j-spring-boot-starter</artifactId>
        </dependency>

        <!--MySQL支持-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!--druid的配置-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
        </dependency>

        <!--springboot关于mybatis-plus-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>

        <!--代码生成器模板引擎 相关依赖-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-generator</artifactId>
        </dependency>

        <!--springboot的freemarker支持-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-freemarker</artifactId>
        </dependency>

        <!-- sharding-jdbc -->
        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
        </dependency>

        <!-- 使用XA事务时,需要引入此模块 -->
        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>sharding-transaction-xa-core</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>

        <!-- 拷贝对象 -->
        <dependency>
            <groupId>ma.glasnost.orika</groupId>
            <artifactId>orika-core</artifactId>
            <version>${orika-core.version}</version>
        </dependency>

        <!--工具包-->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>${commons.lang3.version}</version>
        </dependency>

        <!--springboot的测试支持-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

    </dependencies>

    <build>

    </build>
</project>

application.yml文件

这里我们配置:3个数据源、彼此之间互不干扰,具体内容如下

#服务配置
server:
  #端口
  port: 8080
  #服务编码
  tomcat:
    uri-encoding: UTF-8
#spring相关配置
spring:
  #应用配置
  application:
    #应用名称
    name: transaction-xa
  #开启jta事务的支持
  jta:
    atomikos:
      properties:
        log-base-dir: D:/home/transaction-xa-atomikos
        log-base-name: ${spring.application.name}
  #数据源配置
  shardingsphere:
    datasource:
      names: order-db,account-db,storage-db
      order-db:
        type: com.alibaba.druid.pool.xa.DruidXADataSource
        driverClassName: com.mysql.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/t_order-db?useAffectedRows=true&serverTimezone=UTC&characterEncoding=utf-8
        username: root
        password: root
      account-db:
        type: com.alibaba.druid.pool.xa.DruidXADataSource
        driverClassName: com.mysql.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/t_account-db?useAffectedRows=true&serverTimezone=UTC&characterEncoding=utf-8
        username: root
        password: root
      storage-db:
        type: com.alibaba.druid.pool.xa.DruidXADataSource
        driverClassName: com.mysql.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/t_storage-db?useAffectedRows=true&serverTimezone=UTC&characterEncoding=utf-8
        username: root
        password: root
    sharding:
      tables:
        tab_account:
          actualDataNodes: account-db.tab_account
        tab_order:
          actualDataNodes: order-db.tab_order
        tab_storage:
          actualDataNodes: storage-db.tab_storage
    props:
      sql.show: true
#mubatis配置
mybatis-plus:
  # MyBaits 别名包扫描路径,通过该属性可以给包中的类注册别名
  type-aliases-package: com.itheima.springboot.pojo
  # 该配置请和 typeAliasesPackage 一起使用,如果配置了该属性,则仅仅会扫描路径下以该类作为父类的域对象 。
  type-aliases-super-type: com.itheima.transaction.basic.BasicPojo
  configuration:
    # 这个配置会将执行的sql打印出来,在开发或测试的时候可以用
    log-impl: org.apache.ibatis.logging.slf4j.Slf4jImpl
    # 驼峰下划线转换
    map-underscore-to-camel-case: true
    use-generated-keys: true
    default-statement-timeout: 60
    default-fetch-size: 100
  global-config:
    db-config:
      #主键类型(雪花ID)
      id-type: assign_id
    #机器 ID 部分(影响雪花ID)
    worker-id: 1
    #数据标识 ID 部分(影响雪花ID)
    datacenter-id: 1
logging:
  config: classpath:logback.xml

image-20210108153252237

OrderServiceImpl

在下列代码中:我们需要吧下订单业务,收付款业务、扣库存业务、都放在一个事务单元中完成,他所依赖的是基于JTA【atomikos】的事务,并且依赖于sharding-jdbc的XA事务类型的支持

package com.itheima.transaction.service.impl;

import com.itheima.transaction.pojo.Order;
import com.itheima.transaction.mapper.OrderMapper;
import com.itheima.transaction.service.IAccountService;
import com.itheima.transaction.service.IOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.itheima.transaction.service.IStorageService;
import com.itheima.transaction.utils.SnowflakeIdWorker;
import org.apache.shardingsphere.transaction.annotation.ShardingTransactionType;
import org.apache.shardingsphere.transaction.core.TransactionType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;

/**
 * @Description:订单表 服务实现类
 */
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {

    @Autowired
    IAccountService accountService;

    @Autowired
    IStorageService storageService;

    @Autowired
    SnowflakeIdWorker snowflakeIdWorker;

    @Override
    @Transactional
    @ShardingTransactionType(TransactionType.XA)
    public Boolean payOrder(Long goodsId,
                            Long goodsNum,
                            Long payeeId,
                            Long payerId,
                            BigDecimal money) {
        Boolean flagAccount = true;
        Order order = Order.builder()
                .productOrderNo(snowflakeIdWorker.nextId())
                .goodsId(goodsId)
                .goodsNum(goodsNum)
                .payerId(payerId)
                .payerName("李四")
                .payeeId(payeeId)
                .payeeName("王婆")
                .realAmount(money)
                .build();
        flagAccount = save(order);
        if (!flagAccount){
            throw  new RuntimeException("下订单出错");
        }
        //李四买瓜
        flagAccount = accountService.updateAccountBalance(payerId, new BigDecimal("200").negate());
        if (!flagAccount){
            throw  new RuntimeException("支付出错");
        }
        //王婆收钱
        flagAccount = accountService.updateAccountBalance(payeeId, new BigDecimal("200"));
        if (!flagAccount){
            throw  new RuntimeException("收钱出错");
        }
        flagAccount = storageService.updateStorageResidue(goodsId,goodsNum);
        if (!flagAccount){
            throw  new RuntimeException("扣库存出错");
        }
        return flagAccount;
    }
}

案例测试:

使用TransactionXAStart启动项目

数据源初始化完成

image-20210108155111787

JTA事务管理器初始化完成

image-20210108155214426

访问:http://127.0.0.1:8080/transaction/order/pay-order/1

开启新事务事务

image-20210108155418305

执行保存订单时候,把当前事务加入到分布式事务中

image-20210108155539781

李四买瓜,修改李四余额,吧当前事务加入到分布式事务中

image-20210108155622061

王婆收钱,修改玩牌余额,继续沿用李四的数据源,此时此数据源是沿用的,对于事务的支持也沿用

image-20210108155953692

修改库存:把当前事务加入到分布式事务

image-20210108160045264

最终完成,完成第二段提交,分布式事务提交

image-20210108160134084

【1.3】XA事务小结
性能问题:所有参与者在事务提交阶段处于同步阻塞状态,占用系统资源,容易导致性能瓶颈。

可靠性问题:如果协调者存在单点故障问题,或出现故障,提供者将一直处于锁定状态。

数据一致性问题:在阶段 2 中,如果出现协调者和参与者都挂了的情况,有可能导致数据不一致。

优点:尽量保证了数据的强一致,适合对数据强一致要求很高的关键领域。

缺点:实现复杂,牺牲了可用性,对性能影响较大,不适合高并发高性能场景。

【2】3PC【了解】

三段提交【3PC】是二阶段提交【2PC】的一种改进版本 :

为解决两阶段提交协议的阻塞问题,上边提到两段提交,当协调者崩溃时,参与者不能做出最后的选择,就会一直保持阻塞锁定资源

2PC 中没有协调者有超时机制,3PC 在协调者和参与者中都引入了超时机制,协调者出现故障后,参与者就不会一直阻塞。而且在第一阶段和第二阶段中又插入了一个准备阶段,保证了在最后提交阶段之前各参与节点的状态是一致的。

image-20210108164517092

虽然 3PC用超时机制,解决了协调者故障后参与者的阻塞问题,但与此同时却多了一次网络通信,性能上反而变得更差,也不太推荐。

1.2.2. 2、柔性事务

【1】最终一致性【基于RabbitMQ】

需要保证以下三要素:

  1. 确认生产者一定要将数据投递到MQ服务器中(采用MQ消息发送确认机制)
  2. MQ消费者消息能够正确消费消息,采用手动ACK模式(==注意重试幂等性问题==)
  3. 如何保证第一个事务先执行,采用补偿机制,在创建一个补单消费者进行监听,如果订单没有创建成功,进行补单。(如果第一个事务中出错,补单消费者会在重新执行一次第一个事务,例如第一个事务是添加订单表,如果失败在补单的时候重新生成订单记录,由于订单号唯一,所以不会重复)
【1.1】流程图解

​ 消息事务其实就是基于消息中间件的两阶段提交,将本地事务和发消息放在同一个事务里,保证本地操作和发送消息同时成功。 下单扣库存原理图

image-20210108171856762

【1.2】案例编写

image-20210108172216454

【1.2.1】生产者【下订单】

Rabbitmq配置:信息如下

package com.itheima.transaction.config;

import com.itheima.transaction.contants.RabbitContants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName RabbitMqConfig.java
 * @Description rabbitMq配置
 */
@Configuration
public class RabbitMqConfig {

    // 1.定义扣库存队列
    @Bean
    public Queue storageQueue() {
        return new Queue(RabbitContants.QUEUE_STORAGE);
    }

    // 2.定义订单补偿队列
    @Bean
    public Queue orderCompensationQueue() {
        return new Queue(RabbitContants.QUEUE_ORDER_COMPENSATION);
    }

    // 2.定义事务交换机
    @Bean
    public Exchange transactionTopicExchange() {
        return new TopicExchange(RabbitContants.EXCHANGE_TRANSACTION_TOPIC);
    }

    // 3.扣库存队列与交换机绑定
    @Bean
    Binding bindingStorageQueue() {
        return BindingBuilder
                .bind(storageQueue())
                .to(transactionTopicExchange())
                .with(RabbitContants.ROUTINGKEY_PAY_ORDER).noargs();
    }

    // 3.订单补偿队列与交换机绑定
    @Bean
    Binding bindingOrderCompensationQueue() {
        return BindingBuilder
                .bind(orderCompensationQueue())
                .to(transactionTopicExchange())
                .with(RabbitContants.ROUTINGKEY_PAY_ORDER).noargs();
    }
}

生产发送接口:OrderSender实现RabbitTemplate.ConfirmCallback,从写confirm,用于确定当前消息发送到交换机或队列中

package com.itheima.transaction.rabbitmq;

import com.alibaba.fastjson.JSONObject;
import com.itheima.transaction.contants.RabbitContants;
import com.itheima.transaction.pojo.RabbitMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/***
 * @description 订单扣库存消息发送
 * @return
 */
@Slf4j
@Component
public class OrderSender implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /***
     * @description 订单下单发送消息的方法
     * @param rabbitMessage 发送的消息
     * @param rabbitMessage 订单信息
     * @return
     */
    public void sendMessage(RabbitMessage rabbitMessage){
        log.info("发送消息给库存服务");
        // 构建回调返回的数据
        CorrelationData correlationData = new CorrelationData(JSONObject.toJSONString(rabbitMessage));
        //设置投递规则
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        // 发送消息
        rabbitTemplate.convertAndSend(
                RabbitContants.EXCHANGE_TRANSACTION_TOPIC,
                RabbitContants.ROUTINGKEY_PAY_ORDER,
                rabbitMessage,correlationData);
    }
    /**
     * 订单发送交换机确认回调的处理方法
     * 如果ack为true,则表示当前的消息已经发送到的交换机
     * 俄国ack为false,则进行再次投递,直到投递成功
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String errorInfo) {
        String rabbitMessage = correlationData.getId();
        if (ack) {
            log.info("订单消息发送交换机确认成功{}",rabbitMessage);
        } else {
            log.warn("订单消息发送交换机确认失败:{},错误:{}",rabbitMessage,errorInfo);
            log.info("订单消息重新投递{}",rabbitMessage);
            sendMessage(JSONObject.parseObject(rabbitMessage,RabbitMessage.class));
        }
    }
}
package com.itheima.transaction.rabbitmq;

import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.itheima.transaction.contants.RabbitContants;
import com.itheima.transaction.contants.RedisConstant;
import com.itheima.transaction.pojo.Order;
import com.itheima.transaction.pojo.OrderVo;
import com.itheima.transaction.pojo.RabbitMessage;
import com.itheima.transaction.service.IOrderService;
import com.itheima.transaction.utils.BeanConv;
import com.itheima.transaction.utils.EmptyUtil;
import com.rabbitmq.client.Channel;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * @ClassName RabbitmqListener.java
 * @Description 订单补单监听
 */
@Component
@RabbitListener(queues = RabbitContants.QUEUE_ORDER_COMPENSATION)
public class OrderListener {

    @Autowired
    IOrderService orderService;

    @Autowired
    RedissonClient redissonClient;

    @RabbitHandler
    public void process(RabbitMessage rabbitMessage, Channel channel, Message message) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        OrderVo orderVo = JSONObject.parseObject(rabbitMessage.getConten(), OrderVo.class);
        RLock lock = redissonClient.getLock(RedisConstant.LOCK_PREFIX+orderVo.getProductOrderNo().toString());
        try {
            //幂等性处理
            if (lock.tryLock(RedisConstant.REDIS_WAIT_TIME,RedisConstant.REDIS_LEASETIME, TimeUnit.MILLISECONDS)) {
                //查询订单是否存在
                QueryWrapper<Order> queryWrapper = new QueryWrapper<>();
                queryWrapper.eq(StringUtils.camelToUnderline(Order.Fields.productOrderNo),orderVo.getProductOrderNo());
                Order orderResult = orderService.getOne(queryWrapper);
                boolean flag = true;
                //不存在则补单
                if (EmptyUtil.isNullOrEmpty(orderResult)){
                    orderVo.setId(null);
                    Order order = BeanConv.toBean(orderVo, Order.class);
                    flag = orderService.save(order);
                }
                if (flag){
                    //签收消息
                    channel.basicAck(deliveryTag, false);
                }else {
                    throw  new RuntimeException("未正常处理补单业务,进入重试:"+orderVo.toString());
                }
            }
        }catch (Exception ex){
            // 拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。
            // 设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行
            channel.basicReject(deliveryTag, true);
        }finally {
            lock.unlock();
        }
    }
}

OrderListener:此监听主要处理,当发送RabbitMQ成功,但是由于生产端数据库网络波动,导致丢单,此处会从新发送。【注意:幂等性处理】

package com.itheima.transaction.rabbitmq;

import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.itheima.transaction.contants.RabbitContants;
import com.itheima.transaction.contants.RedisConstant;
import com.itheima.transaction.pojo.Order;
import com.itheima.transaction.pojo.OrderVo;
import com.itheima.transaction.pojo.RabbitMessage;
import com.itheima.transaction.service.IOrderService;
import com.itheima.transaction.utils.BeanConv;
import com.itheima.transaction.utils.EmptyUtil;
import com.rabbitmq.client.Channel;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * @ClassName RabbitmqListener.java
 * @Description 订单补单监听
 */
@Component
@RabbitListener(queues = RabbitContants.QUEUE_ORDER_COMPENSATION)
public class OrderListener {

    @Autowired
    IOrderService orderService;

    @Autowired
    RedissonClient redissonClient;

    @RabbitHandler
    public void process(RabbitMessage rabbitMessage, Channel channel, Message message) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        OrderVo orderVo = JSONObject.parseObject(rabbitMessage.getConten(), OrderVo.class);
        RLock lock = redissonClient.getLock(RedisConstant.LOCK_PREFIX+orderVo.getProductOrderNo().toString());
        try {
            //幂等性处理
            if (lock.tryLock(RedisConstant.REDIS_WAIT_TIME,RedisConstant.REDIS_LEASETIME, TimeUnit.MILLISECONDS)) {
                //查询订单是否存在
                QueryWrapper<Order> queryWrapper = new QueryWrapper<>();
                queryWrapper.eq(StringUtils.camelToUnderline(Order.Fields.productOrderNo),orderVo.getProductOrderNo());
                Order orderResult = orderService.getOne(queryWrapper);
                boolean flag = true;
                //不存在则补单
                if (EmptyUtil.isNullOrEmpty(orderResult)){
                    orderVo.setId(null);
                    Order order = BeanConv.toBean(orderVo, Order.class);
                    flag = orderService.save(order);
                }
                if (flag){
                    //签收消息
                    channel.basicAck(deliveryTag, false);
                }else {
                    throw  new RuntimeException("未正常处理补单业务,进入重试:"+orderVo.toString());
                }
            }
        }catch (Exception ex){
            // 拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。
            // 设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行
            channel.basicReject(deliveryTag, true);
        }finally {
            lock.unlock();
        }
    }
}
【1.2.2】消费者【扣库存】

transaction-mq-customer的application.yml中设置,手动签收,且设置重试

server:
  port: 8081
spring:
  application:
    name: transaction-mq-product
  redis:
    redisson:
      config: classpath:singleServerConfig.yaml
  datasource:
    druid:
      driver-class-name: com.mysql.jdbc.Driver
      url: jdbc:mysql://127.0.0.1:3306/t_storage-db?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8
      username: root
      password: root
  rabbitmq:
    host: 192.168.112.129
    port: 5672
    virtual-host: /itheima
    username: admin
    password: pass
    listener:
      simple:
        #手动签收
        acknowledge-mode: manual
        #并发消费者初始化值
        concurrency: 10
        #并发消费者的最大值
        max-concurrency: 20
        #每个消费者每次监听时可拉取处理的消息数量
        prefetch: 5
        retry:
          #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
          enabled: true
          #最大重试次数
          max-attempts: 2
          #重试间隔时间(单位毫秒)
          initial-interval: 20000
          #重试最大时间间隔(单位毫秒):当前时间间隔<max-interval(重试最大时间间隔)
          max-interval: 1200000
          #应用于前一重试间隔的乘法器:当前时间间隔=上次重试间隔*multiplier。
          multiplier: 1
#mybatis配置
mybatis-plus:
  # MyBaits 别名包扫描路径,通过该属性可以给包中的类注册别名
  type-aliases-package: com.itheima.springboot.pojo
  # 该配置请和 typeAliasesPackage 一起使用,如果配置了该属性,则仅仅会扫描路径下以该类作为父类的域对象 。
  type-aliases-super-type: com.itheima.transaction.basic.BasicPojo
  configuration:
    # 这个配置会将执行的sql打印出来,在开发或测试的时候可以用
    log-impl: org.apache.ibatis.logging.slf4j.Slf4jImpl
    # 驼峰下划线转换
    map-underscore-to-camel-case: true
    use-generated-keys: true
    default-statement-timeout: 60
    default-fetch-size: 100
  global-config:
    db-config:
      #主键类型(雪花ID)
      id-type: assign_id
    #机器 ID 部分(影响雪花ID)
    worker-id: 1
    #数据标识 ID 部分(影响雪花ID)
    datacenter-id: 1
logging:
  config: classpath:logback.xml

OrderListener:设计消费的幂等性,如果业务执行失败,是将数据重新丢回队列里,那么下次还会消费这消息

package com.itheima.transaction.rabbitmq;

import com.alibaba.fastjson.JSONObject;
import com.itheima.transaction.contants.RabbitContants;
import com.itheima.transaction.contants.RedisConstant;
import com.itheima.transaction.pojo.OrderVo;
import com.itheima.transaction.pojo.RabbitMessage;
import com.itheima.transaction.service.IStorageService;
import com.rabbitmq.client.Channel;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * @ClassName RabbitmqListener.java
 * @Description 扣款监听
 */
@Component
@RabbitListener(queues = RabbitContants.QUEUE_STORAGE)
public class OrderListener {

    @Autowired
    IStorageService storageService;

    @Autowired
    RedissonClient redissonClient;

    @RabbitHandler
    public void process(RabbitMessage rabbitMessage, Channel channel, Message message) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        OrderVo orderVo = JSONObject.parseObject(rabbitMessage.getConten(), OrderVo.class);
        RLock lock = redissonClient.getLock(RedisConstant.LOCK_PREFIX+"storage:"+orderVo.getProductOrderNo().toString());
        try {
            //幂等性处理
            if (lock.tryLock(RedisConstant.REDIS_WAIT_TIME,RedisConstant.REDIS_LEASETIME, TimeUnit.MILLISECONDS)) {
                Boolean flag = storageService.updateStorageResidue(orderVo.getGoodsId(), orderVo.getGoodsNum());
                if (flag){
                    //签收消息
                    channel.basicAck(deliveryTag, false);
                }else {
                    throw  new RuntimeException("未正常处理扣库存业务,进入重试:"+orderVo.toString());
                }
            }
        }catch (Exception ex){
            // 拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。
            // 设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行
            channel.basicReject(deliveryTag, true);
        }finally {
            lock.unlock();
        }
    }
}

【2】TCC机制【基于seata】

image-20210110205855582

TCC 分布式事务模型需要业务系统提供三段业务逻辑:

  1. 初步操作 Try:完成所有业务检查,预留必须的业务资源。
  2. 确认操作 Confirm:真正执行的业务逻辑,不做任何业务检查,只使用 Try 阶段预留的业务资源。因此,只要 Try 操作成功,Confirm 必须能成功。另外,Confirm 操作需满足幂等性,保证一笔分布式事务能且只能成功一次。
  3. 取消操作 Cancel:释放 Try 阶段预留的业务资源。同样的,Cancel 操作也需要满足幂等性。
【2.1】Seata的TCC模型

TC :事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。

TM:控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。

RM:控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:

  • 一阶段 prepare 行为
  • 二阶段 commit 或 rollback 行为

image-20210110214542317

相应的,TCC 模式,不依赖于底层数据资源的事务支持:

  • 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
  • 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
  • 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑
【2.2】余额付款模型

image-20210110213057723

扣款场景:

A 转账 30 元给 B。账户 A 的余额中有 100 元,需要扣除其中 30 元,对应TCC设计如下:

第一阶段Try:需要检查并预留业务资源,因此,我们在扣钱 TCC 资源的 Try 接口里先检查 A 账户余额是否足够,然后预留余额里的业务资源,即扣除 30 元。

第二阶段 Confirm :需要把冻结-30元,进行平账,且提交事务

第二阶段 Cancel:需要把冻结-30元返还给账户,进行业务回滚

【2.3】余额收款模型

image-20210111101347476

收款场景:

B收款,为其可用余额添加30元

第一阶段 Try :接口里不能直接给账户加钱,如果这个时候给账户增加了可用余额,那么在一阶段执行完后,账户里的钱就可以被使用了。但是一阶段执行完以后,有可能是要回滚的。因此,真正加钱的动作需要放在 Confirm 接口里。对于加钱这个动作

第二阶段 Confirm :释放冻结30元,做实际加钱动作,提交事务

第三阶段Cancel:需要把冻结30元返还给,进行业务回滚

【2.4】案例实现

场景描述:A给B转账,A付款、B收款、然后扣除库存,==此处省略:seata-seaver服务的搭建过程==

image-20210110215628094

在transaction-seata-tcc-interface项目中添加SeataWebMvcConfig管理XID传递

package com.itheima.transaction.config;

import com.alibaba.cloud.seata.web.SeataHandlerInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;

/**
 * @ClassName UserWebMvcConfig.java
 * @Description webMvc高级配置
 */
@Configuration
public class SeataWebMvcConfig extends WebMvcConfigurationSupport {

    /***
     * @description 解决XID传递问题
     * @return
     * @return: com.alibaba.cloud.seata.web.SeataHandlerInterceptor
     */
    @Bean
    public SeataHandlerInterceptor seataHandlerInterceptor(){
        return new SeataHandlerInterceptor();
    }

    /**
     * @Description 拦截器
     */
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(seataHandlerInterceptor()).addPathPatterns("/**");
    }

    /**
     * 资源路径 映射
     */
    @Override
    protected void addResourceHandlers(ResourceHandlerRegistry registry) {
        /**
         * 支持webjars
         */
        registry.addResourceHandler("/webjars/**")
                .addResourceLocations("classpath:/META-INF/resources/webjars/");
        /**
         * 支持swagger
         */
        registry.addResourceHandler("swagger-ui.html")
                .addResourceLocations("classpath:/META-INF/resources/");
        super.addResourceHandlers(registry);
    }

}
【2.4.1】生产者【支付服务】

application.yml

server:
  port: 8080
spring:
  application:
    name: transaction-seata-tcc-product
  redis:
    redisson:
      config: classpath:singleServerConfig.yaml
  datasource:
    druid:
      driver-class-name: com.mysql.jdbc.Driver
      url: jdbc:mysql://127.0.0.1:3306/t_account-db?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8
      username: root
      password: root
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.112.128:8848
        namespace: public
        group: SEATA_GROUP
    alibaba:
      seata:
        tx-service-group: project_tx_group
#mybatis配置
mybatis-plus:
  # MyBaits 别名包扫描路径,通过该属性可以给包中的类注册别名
  type-aliases-package: com.itheima.springboot.pojo
  # 该配置请和 typeAliasesPackage 一起使用,如果配置了该属性,则仅仅会扫描路径下以该类作为父类的域对象 。
  type-aliases-super-type: com.itheima.transaction.basic.BasicPojo
  configuration:
    # 这个配置会将执行的sql打印出来,在开发或测试的时候可以用
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    # 驼峰下划线转换
    map-underscore-to-camel-case: true
    use-generated-keys: true
    default-statement-timeout: 60
    default-fetch-size: 100
  global-config:
    db-config:
      #主键类型(雪花ID)
      id-type: assign_id
    #机器 ID 部分(影响雪花ID)
    worker-id: 1
    #数据标识 ID 部分(影响雪花ID)
    datacenter-id: 1
logging:
  config: classpath:logback.xml
seata:
  tx-service-group: project_tx_group
  enabled: true
  application-id: ${spring.application.name}
  enable-auto-data-source-proxy: true
  service:
    #这里的名字与file.conf中vgroup_project_tx_group = "default"相同
    vgroup-mapping:
      project_tx_group: default
    #这里的名字与file.conf中default.grouplist = "127.0.0.1:8091"相同
    grouplist:
      default: 192.168.112.128:9200
  config:
    type: nacos
    nacos:
      group: SEATA_GROUP
      server-addr: 192.168.112.128:8848
      namespace: public
      username: nacos
      password: nacos
  registry:
    type: nacos
    nacos:
      group: SEATA_GROUP
      server-addr: 192.168.112.128:8848
      namespace: public
      username: nacos
      password: nacos

TradeController服务入口,这里再doTeade方法上添加 @GlobalTransactional,管理所有事务

package com.itheima.transaction.web;


import com.itheima.transaction.service.TradeService;
import com.itheima.transaction.utils.SnowflakeIdWorker;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.math.BigDecimal;

/**
 * @Description:订单表 前端控制器
 */
@RestController
public class TradeController {

    @Autowired
    TradeService tradeService;


    @RequestMapping("do-trade/{goodsId}/{goodsNum}/{payeeId}/{payerId}/{money}")
    //添加全局事务
    @GlobalTransactional
    public Boolean doTeade(
            @PathVariable("goodsId") Long goodsId,
            @PathVariable("goodsNum") Long goodsNum,
            @PathVariable("payeeId") Long payeeId,
            @PathVariable("payerId") Long payerId,
            @PathVariable("money")BigDecimal money){
        Boolean flag = tradeService.tryTrade(goodsId, goodsNum, payeeId, payerId, money);
        return flag;
    }
}

TradeServiceImpl:业务实现,调用、付款、收款、远程的扣库存服务

package com.itheima.transaction.service.impl;

import com.itheima.transaction.mapper.AccountMapper;
import com.itheima.transaction.service.PaymentService;
import com.itheima.transaction.service.ReceiptService;
import com.itheima.transaction.service.TradeService;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;

import java.math.BigDecimal;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
 * @ClassName TradeServiceImpl.java
 * @Description 交易实现
 */
@Service
@Slf4j
public class TradeServiceImpl implements TradeService {

    @Autowired
    PaymentService paymentService;

    @Autowired
    ReceiptService receiptService;

    @Autowired
    RestTemplate restTemplate;

    @Override
    public Boolean tryTrade(
            Long goodsId,
            Long goodsNum,
            Long payeeId,
            Long payerId,
            BigDecimal money) {
        //付款
        boolean payment = paymentService.tryPayment(null,payerId, money);
        if (!payment){
            throw new RuntimeException("付款失败");
        }
        //收款
        boolean receipt = receiptService.tryReceipt(null,payeeId, money);
        if (!receipt){
            throw new RuntimeException("收款失败");
        }
        //扣库存
        Boolean storage = restTemplate.getForObject("http://127.0.0.1:8081/do-storage"+"/"+goodsId+"/"+goodsNum, Boolean.class, "");
        if (!storage){
            throw new RuntimeException("扣库存失败");
        }
        return storage;
    }

}

PaymentService:付款接口

@LocalTCC:开启TCC托管接口

@BusinessActionContextParameter:方法上下文传递参数

package com.itheima.transaction.service;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

import java.math.BigDecimal;

/**
 * @ClassName PaymentService.java
 * @Description 支付操作
 */
@LocalTCC
public interface PaymentService {

    /***
     * @description 付款接口
     * @param payerId 付款人
     * @param money 交易金额
     * @return: java.lang.Boolean
     */
    @TwoPhaseBusinessAction(
            name="payment-flow",
            commitMethod = "confirmPayment",
            rollbackMethod = "cancelPayment")
    Boolean tryPayment(
            BusinessActionContext actionContext,
            @BusinessActionContextParameter(paramName = "payerId") Long payerId,
            @BusinessActionContextParameter(paramName = "money") BigDecimal money);

    public Boolean confirmPayment(BusinessActionContext actionContext);

    public Boolean cancelPayment(BusinessActionContext actionContext);


}

ReceiptService:收款接口

package com.itheima.transaction.service;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

import java.math.BigDecimal;

/**
 * @ClassName ReceiptService.java
 * @Description 收款接口
 */
@LocalTCC
public interface ReceiptService {

    /***
     * @description 收款接口
     * @param payeeId 收款人
     * @param money 交易金额
     * @return: java.lang.Boolean
     */
    @TwoPhaseBusinessAction(
            name="receipt-flow",
            commitMethod = "confirmReceipt",
            rollbackMethod = "cancelReceipt")
    Boolean tryReceipt(
            BusinessActionContext actionContext,
            @BusinessActionContextParameter(paramName = "payeeId")  Long payeeId,
            @BusinessActionContextParameter(paramName = "money") BigDecimal money);

    public Boolean confirmReceipt(BusinessActionContext actionContext);

    public Boolean cancelReceipt(BusinessActionContext actionContext);

}
【2.4.2】消费者【扣库存服务】

application.yml

server:
  port: 8081
spring:
  application:
    name: transaction-seata-tcc-customer
  redis:
    redisson:
      config: classpath:singleServerConfig.yaml
  datasource:
    druid:
      driver-class-name: com.mysql.jdbc.Driver
      url: jdbc:mysql://127.0.0.1:3306/t_storage-db?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8
      username: root
      password: root
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.112.128:8848
        namespace: public
        group: SEATA_GROUP
    alibaba:
      seata:
        tx-service-group: project_tx_group
#mybatis配置
mybatis-plus:
  # MyBaits 别名包扫描路径,通过该属性可以给包中的类注册别名
  type-aliases-package: com.itheima.springboot.pojo
  # 该配置请和 typeAliasesPackage 一起使用,如果配置了该属性,则仅仅会扫描路径下以该类作为父类的域对象 。
  type-aliases-super-type: com.itheima.transaction.basic.BasicPojo
  configuration:
    # 这个配置会将执行的sql打印出来,在开发或测试的时候可以用
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    # 驼峰下划线转换
    map-underscore-to-camel-case: true
    use-generated-keys: true
    default-statement-timeout: 60
    default-fetch-size: 100
  global-config:
    db-config:
      #主键类型(雪花ID)
      id-type: assign_id
    #机器 ID 部分(影响雪花ID)
    worker-id: 1
    #数据标识 ID 部分(影响雪花ID)
    datacenter-id: 1
logging:
  config: classpath:logback.xml
seata:
  tx-service-group: project_tx_group
  enabled: true
  application-id: ${spring.application.name}
  enable-auto-data-source-proxy: true
  service:
    #这里的名字与file.conf中vgroup_project_tx_group = "default"相同
    vgroup-mapping:
      project_tx_group: default
    #这里的名字与file.conf中default.grouplist = "127.0.0.1:8091"相同
    grouplist:
      default: 192.168.112.128:9200
  config:
    type: nacos
    nacos:
      group: SEATA_GROUP
      server-addr: 192.168.112.128:8848
      namespace: public
      username: nacos
      password: nacos
  registry:
    type: nacos
    nacos:
      group: SEATA_GROUP
      server-addr: 192.168.112.128:8848
      namespace: public
      username: nacos
      password: nacos

StorageController:库存服务

package com.itheima.transaction.web;

import com.itheima.transaction.service.IStorageService;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName StorageController.java
 * @Description 库存服务
 */
@RestController
@RequestMapping
public class StorageController {

    @Autowired
    IStorageService storageService;

    @RequestMapping("/do-storage/{goodsId}/{goodsNum}")
    //开启全局事务
    @GlobalTransactional
    public Boolean doStorage(
            @PathVariable("goodsId") Long goodsId,@PathVariable("goodsNum") Long goodsNum){
       return storageService.tryUpdateResidue(null,goodsId,goodsNum);
    }
}

IStorageService

package com.itheima.transaction.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.itheima.transaction.pojo.Storage;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

import java.math.BigDecimal;

/**
 * @Description:库存表 服务类
 */
@LocalTCC
public interface IStorageService extends IService<Storage> {

    /***
     * @description 修改库存
     * @param goodsId 商品ID
     * @param goodsNum 购买数量
     * @return: java.lang.Boolean
     */
    @TwoPhaseBusinessAction(
            name="pay-flow",
            commitMethod = "confirmUpdateResidue",
            rollbackMethod = "cancelUpdateResidue")
    public Boolean tryUpdateResidue(
            BusinessActionContext actionContext,
            @BusinessActionContextParameter(paramName = "goodsId") Long goodsId,
            @BusinessActionContextParameter(paramName = "goodsNum") Long goodsNum);

    public Boolean confirmUpdateResidue(BusinessActionContext actionContext);

    public Boolean cancelUpdateResidue(BusinessActionContext actionContext);

}
【2.5】空回滚设计

​ ==空回滚就是对于一个分布式事务,在没有调用 TCC 资源 Try 方法的情况下,调用了二阶段的 Cancel 方法,Cancel 方法需要识别出这是一个空回滚,然后直接返回成功,而不去执行具体的取消业务==

处理方式,

第一阶段Try:

image-20210110214040654

第二阶段Cancel:

image-20210110214145425

==注意:此方案也不是100%解决,空回滚问题,只是在一定程度上避免,只要存在网络链接,则必然存在波动==

【3】AT机制【基于seata】

TC :事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。

TM:控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。

RM:控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

image-20210131163523049

【3.1】第一阶段

此阶段是信息收集,加入的undo_log的阶段

image-20210110222304616

在本地事务提交前,各分支事务需向 全局事务协调者 TC 注册分支 ( Branch Id) ,为要修改的记录申请 全局锁 ,要为这条数据加锁,利用 SELECT FOR UPDATE 语句。而如果一直拿不到锁那就需要回滚本地事务。TM 开启事务后会生成全局唯一的 XID,会在各个调用的服务间进行传递。

【3.2】第二阶段

第二阶段:此阶段是根据各分支的决议做提交或回滚:

​ 决议是全局提交,此时各分支事务已提交并成功,这时 全局事务协调者(TC) 会向分支发送第二阶段的请求。收到 TC 的分支提交请求,该请求会被放入一个异步任务队列中,并马上返回提交成功结果给 TC。异步队列中会异步和批量地根据 Branch ID 查找并删除相应 UNDO LOG 回滚记录。

分支提交模型:

image-20210111165239225

​ 如果决议是全局回滚,过程比全局提交麻烦一点,RM 服务方收到 TC 全局协调者发来的回滚请求,通过 XIDBranch ID 找到相应的回滚日志记录,通过回滚记录生成反向的更新 SQL 并执行,以完成分支的回滚。

分支回滚模型:

image-20210110233946392

【3.3】案例实现

==场景描述:A给B转账,A付款、B收款、然后扣除库存==

image-20210110232233765

在transaction-seata-at-interface项目中添加SeataWebMvcConfig管理XID传递

package com.itheima.transaction.config;

import com.alibaba.cloud.seata.web.SeataHandlerInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;

/**
 * @ClassName UserWebMvcConfig.java
 * @Description webMvc高级配置
 */
@Configuration
public class SeataWebMvcConfig extends WebMvcConfigurationSupport {

    /***
     * @description 解决XID传递问题
     * @return
     * @return: com.alibaba.cloud.seata.web.SeataHandlerInterceptor
     */
    @Bean
    public SeataHandlerInterceptor seataHandlerInterceptor(){
        return new SeataHandlerInterceptor();
    }

    /**
     * @Description 拦截器
     */
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(seataHandlerInterceptor()).addPathPatterns("/**");
    }

    /**
     * 资源路径 映射
     */
    @Override
    protected void addResourceHandlers(ResourceHandlerRegistry registry) {
        /**
         * 支持webjars
         */
        registry.addResourceHandler("/webjars/**")
                .addResourceLocations("classpath:/META-INF/resources/webjars/");
        /**
         * 支持swagger
         */
        registry.addResourceHandler("swagger-ui.html")
                .addResourceLocations("classpath:/META-INF/resources/");
        super.addResourceHandlers(registry);
    }

}
【3.3.1】生产者【支付服务】

application.yml

server:
  port: 8080
spring:
  application:
    name: transaction-seata-at-product
  redis:
    redisson:
      config: classpath:singleServerConfig.yaml
  datasource:
    druid:
      driver-class-name: com.mysql.jdbc.Driver
      url: jdbc:mysql://127.0.0.1:3306/t_account-db?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8
      username: root
      password: root
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.112.128:8848
        namespace: public
        group: SEATA_GROUP
    alibaba:
      seata:
        tx-service-group: project_tx_group
#mybatis配置
mybatis-plus:
  # MyBaits 别名包扫描路径,通过该属性可以给包中的类注册别名
  type-aliases-package: com.itheima.springboot.pojo
  # 该配置请和 typeAliasesPackage 一起使用,如果配置了该属性,则仅仅会扫描路径下以该类作为父类的域对象 。
  type-aliases-super-type: com.itheima.transaction.basic.BasicPojo
  configuration:
    # 这个配置会将执行的sql打印出来,在开发或测试的时候可以用
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    # 驼峰下划线转换
    map-underscore-to-camel-case: true
    use-generated-keys: true
    default-statement-timeout: 60
    default-fetch-size: 100
  global-config:
    db-config:
      #主键类型(雪花ID)
      id-type: assign_id
    #机器 ID 部分(影响雪花ID)
    worker-id: 1
    #数据标识 ID 部分(影响雪花ID)
    datacenter-id: 1
logging:
  config: classpath:logback.xml
seata:
  tx-service-group: project_tx_group
  enabled: true
  application-id: ${spring.application.name}
  enable-auto-data-source-proxy: true
  service:
    #这里的名字与file.conf中vgroup_project_tx_group = "default"相同
    vgroup-mapping:
      project_tx_group: default
    #这里的名字与file.conf中default.grouplist = "127.0.0.1:8091"相同
    grouplist:
      default: 192.168.112.128:9200
  config:
    type: nacos
    nacos:
      group: SEATA_GROUP
      server-addr: 192.168.112.128:8848
      namespace: public
      username: nacos
      password: nacos
  registry:
    type: nacos
    nacos:
      group: SEATA_GROUP
      server-addr: 192.168.112.128:8848
      namespace: public
      username: nacos
      password: nacos

TradeController服务入口,这里再doTeade方法上添加 @GlobalTransactional,管理所有事务

package com.itheima.transaction.web;


import com.itheima.transaction.service.TradeService;
import com.itheima.transaction.utils.SnowflakeIdWorker;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.math.BigDecimal;

/**
 * @Description:订单表 前端控制器
 */
@RestController
public class TradeController {

    @Autowired
    TradeService tradeService;


    @RequestMapping("do-trade/{goodsId}/{goodsNum}/{payeeId}/{payerId}/{money}")
    //添加全局事务
    @GlobalTransactional(rollbackFor = Exception.class)
    public Boolean doTeade(
            @PathVariable("goodsId") Long goodsId,
            @PathVariable("goodsNum") Long goodsNum,
            @PathVariable("payeeId") Long payeeId,
            @PathVariable("payerId") Long payerId,
            @PathVariable("money")BigDecimal money){
        Boolean flag = tradeService.tryTrade(goodsId, goodsNum, payeeId, payerId, money);
        return flag;
    }
}
package com.itheima.transaction.service;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

import java.math.BigDecimal;

/**
 * @ClassName TransactionService.java
 * @Description 交易操作
 */
public interface TradeService {

    /***
     * @description 交易接口
     * @param goodsId 商品ID
     * @param goodsNum 购买数量
     * @param payeeId 收款人
     * @param payerId 付款人
     * @param money 交易金额
     * @return: java.lang.Boolean
     */
    Boolean tryTrade(
            Long goodsId,
            Long goodsNum,
            Long payeeId,
            Long payerId,
            BigDecimal money);
}

TradeServiceImpl

package com.itheima.transaction.service.impl;

import com.itheima.transaction.mapper.AccountMapper;
import com.itheima.transaction.service.PaymentService;
import com.itheima.transaction.service.ReceiptService;
import com.itheima.transaction.service.TradeService;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;

import java.math.BigDecimal;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
 * @ClassName TradeServiceImpl.java
 * @Description 交易实现
 */
@Service
@Slf4j
public class TradeServiceImpl implements TradeService {

    @Autowired
    PaymentService paymentService;

    @Autowired
    ReceiptService receiptService;

    @Autowired
    RestTemplate restTemplate;

    @Override
    public Boolean tryTrade(
            Long goodsId,
            Long goodsNum,
            Long payeeId,
            Long payerId,
            BigDecimal money) {
        //付款
        boolean payment = paymentService.tryPayment(payerId, money);
        if (!payment){
            throw new RuntimeException("付款失败");
        }
        //收款
        boolean receipt = receiptService.tryReceipt(payeeId, money);
        if (!receipt){
            throw new RuntimeException("收款失败");
        }
        //扣库存
        Boolean storage = restTemplate.getForObject("http://127.0.0.1:8081/do-storage"+"/"+goodsId+"/"+goodsNum, Boolean.class, "");
        if (!storage){
            throw new RuntimeException("扣库存失败");
        }
        return storage;
    }

}
【3.4.2】消费者【扣库存服务】

application.yml

server:
  port: 8081
spring:
  application:
    name: transaction-seata-at-customer
  redis:
    redisson:
      config: classpath:singleServerConfig.yaml
  datasource:
    druid:
      driver-class-name: com.mysql.jdbc.Driver
      url: jdbc:mysql://127.0.0.1:3306/t_storage-db?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8
      username: root
      password: root
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.112.128:8848
        namespace: public
        group: SEATA_GROUP
    alibaba:
      seata:
        tx-service-group: project_tx_group
#mybatis配置
mybatis-plus:
  # MyBaits 别名包扫描路径,通过该属性可以给包中的类注册别名
  type-aliases-package: com.itheima.springboot.pojo
  # 该配置请和 typeAliasesPackage 一起使用,如果配置了该属性,则仅仅会扫描路径下以该类作为父类的域对象 。
  type-aliases-super-type: com.itheima.transaction.basic.BasicPojo
  configuration:
    # 这个配置会将执行的sql打印出来,在开发或测试的时候可以用
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    # 驼峰下划线转换
    map-underscore-to-camel-case: true
    use-generated-keys: true
    default-statement-timeout: 60
    default-fetch-size: 100
  global-config:
    db-config:
      #主键类型(雪花ID)
      id-type: assign_id
    #机器 ID 部分(影响雪花ID)
    worker-id: 1
    #数据标识 ID 部分(影响雪花ID)
    datacenter-id: 1
logging:
  config: classpath:logback.xml
seata:
  tx-service-group: project_tx_group
  enabled: true
  application-id: ${spring.application.name}
  enable-auto-data-source-proxy: true
  service:
    #这里的名字与file.conf中vgroup_project_tx_group = "default"相同
    vgroup-mapping:
      project_tx_group: default
    #这里的名字与file.conf中default.grouplist = "127.0.0.1:8091"相同
    grouplist:
      default: 192.168.112.128:9200
  config:
    type: nacos
    nacos:
      group: SEATA_GROUP
      server-addr: 192.168.112.128:8848
      namespace: public
      username: nacos
      password: nacos
  registry:
    type: nacos
    nacos:
      group: SEATA_GROUP
      server-addr: 192.168.112.128:8848
      namespace: public
      username: nacos
      password: nacos

StorageController:库存服务

package com.itheima.transaction.web;

import com.itheima.transaction.service.IStorageService;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName StorageController.java
 * @Description 库存服务
 */
@RestController
@RequestMapping
public class StorageController {

    @Autowired
    IStorageService storageService;

    @RequestMapping("/do-storage/{goodsId}/{goodsNum}")
    //开启全局事务
    @GlobalTransactional(rollbackFor = Exception.class)
    public Boolean doStorage(
            @PathVariable("goodsId") Long goodsId,@PathVariable("goodsNum") Long goodsNum){
       return storageService.tryUpdateResidue(goodsId,goodsNum);
    }
}

IStorageService

package com.itheima.transaction.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.itheima.transaction.pojo.Storage;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

import java.math.BigDecimal;

/**
 * @Description:库存表 服务类
 */
public interface IStorageService extends IService<Storage> {

    /***
     * @description 修改库存
     * @param goodsId 商品ID
     * @param goodsNum 购买数量
     * @return: java.lang.Boolean
     */
    public Boolean tryUpdateResidue(Long goodsId, Long goodsNum);


}

StorageServiceImpl

package com.itheima.transaction.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.itheima.transaction.mapper.StorageMapper;
import com.itheima.transaction.pojo.Storage;
import com.itheima.transaction.service.IStorageService;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.util.concurrent.TimeUnit;

/**
 * @Description:库存表 服务实现类
 */
@Service
public class StorageServiceImpl extends ServiceImpl<StorageMapper, Storage> implements IStorageService {

    @Autowired
    StorageMapper storageMapper;

    @Autowired
    RedissonClient redissonClient;

    @Override
    public Boolean tryUpdateResidue(Long goodsId, Long goodsNum) {
        Integer storage = storageMapper.tryUpdateResidue(goodsId, goodsNum);
        boolean flag = storage == 1;
        return flag;
    }

}

results matching ""

    No results matching ""