2.分布式事务-MQ

分布式事务产生原因?

分布式系统拆分导致的

存储拆分

分库分表

image-20240910151240942

服务拆分

image-20240910151827142

每个服务有自己的库,不能跨库访问

用户再下单时,创建订单和扣减库存,需要同时对订单DB和库存DB进行操作。两步操作必须同时成功,否则就会造成业务混乱,可此时我们只能保证自己服务的数据一致性,无法保证调用其他服务的操作是否成功,所以为了保证整个下单流程的数据一致性,就需要分布式事务介入。

分布式系统中,通常谈论一致性,都是最终一致性,因为即时一致性是不切实际的

场景

背景知识:MQ能够实现 at least once(消费者自己处理幂等)

image-20240910154832741
  1. A作为生产者,B作为消费者
  2. A执行动作1,然后发送消息,驱动B执行动作2(动作2幂等)

问题:

  1. 如果B服务是业务失败(库存不足),如何回滚A的动作?
  2. A (1)执行动作 (2)发送消息, 如何保证这两个动作的原子性? (重点问题)

解决方案

尝试解决

可能的解决方案:基于本地事务包裹消息投递操作的实现方式,对应执行步骤如下:

image-20240910155737406

致命问题:

  1. 在和数据库交互的本地事务中,夹杂了和第三方组件的 IO 操作,可能存在引发长事务的风险
  2. 执行消息投递时,可能因为超时或其他意外原因,导致出现消息在事实上已投递成功,但 producer 获得的投递响应发生异常的问题,这样就会导致本地事务被误回滚的问题
  3. 在执行事务提交操作时,可能发生失败. 此时事务内的数据库修改操作自然能够回滚,然而 MQ 消息一经发出,就已经无法回收了

本地消息表

非可靠消息+本地消息事物表+定时任务校对

  1. 不可靠消息:业务活动主动方,在完成业务处理之后,向业务活动的被动方发送消息,允许消息丢失(不可靠消息)。

  2. 主动方需要本地事务写入 业务数据+消息数据

  3. 定期校对:定时查看处于中间态的消息,重新投送

    image-20240910175403560

优点:

  • 逻辑简单、不依赖特定消息中间件
  • 引入消息表,占用资源,影响数据库性能

MQ事务消息

  1. 本地事物不能包裹消息发送,所以先发一个半消息
  2. 核心是对半消息+check接口判断本地事务的状态(MQ主动发起询问)
  • 本地事务失败,或者直接宕机,check保证消息被推送到失败状态
  • 本地事务执行成功,然后宕机,check同样保证消息能被推送到成功状态(特别阶段情况下,比如上游一直没恢复还是会不一致)
  • check的机制有点像本地任务表中的定时任务

image-20240830214006481

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
//演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。
private static boolean checkOrderById(String orderId) {
return true;
}
//演示demo,模拟本地事务的执行结果。
private static boolean doLocalTransaction() {
return true;
}
public static void main(String[] args) throws ClientException {
ClientServiceProvider provider = new ClientServiceProvider();
MessageBuilder messageBuilder = new MessageBuilderImpl();
//构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
Producer producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> {
/**
* 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。
* 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。
*/
final String orderId = messageView.getProperties().get("OrderId");
if (Strings.isNullOrEmpty(orderId)) {
// 错误的消息,直接返回Rollback。
return TransactionResolution.ROLLBACK;
}
return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
})
.build();
//开启事务分支。
final Transaction transaction;
try {
transaction = producer.beginTransaction();
} catch (ClientException e) {
e.printStackTrace();
//事务分支开启失败,直接退出。
return;
}
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。
.addProperty("OrderId", "xxx")
//消息体。
.setBody("messageBody".getBytes())
.build();
//发送半事务消息
final SendReceipt sendReceipt;
try {
sendReceipt = producer.send(message, transaction);
} catch (ClientException e) {
//半事务消息发送失败,事务可以直接退出并回滚。
return;
}
/**
* 执行本地事务,并确定本地事务结果。
* 1. 如果本地事务提交成功,则提交消息事务。
* 2. 如果本地事务提交失败,则回滚消息事务。
* 3. 如果本地事务未知异常,则不处理,等待事务消息回查。
*
*/
boolean localTransactionOk = doLocalTransaction();
if (localTransactionOk) {
try {
transaction.commit();
} catch (ClientException e) {
// 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
e.printStackTrace();
}
} else {
try {
transaction.rollback();
} catch (ClientException e) {
// 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
e.printStackTrace();
}
}
}

缺点:不具备回滚的能力

TCC

组成

  1. 应用方 Application:指的是需要使用到分布式事务能力的应用方,即这套 TCC 框架服务的甲方
  2. TCC 组件 TCC Component:指的是需要完成分布式事务中某个特定步骤的子模块. 这个模块通常负责一些状态数据的维护和更新操作,需要对外暴露出 Try、Confirm 和 Cancel 三个 API
    • Try:锁定资源,【冻结】资源,保留后续变化的可能性 gpt参考
      • 订单服务:添加一个try状态
      • 库存服务:添加一个预占库存列 预占库存列++; 库存–;同时插入冻结流水记录 用于回滚
    • Confirm:对 Try 操作进行二次确认,将记录中的【冻结】态改为【成功】态
      • try->success
      • 预占库存列–; 流水修改try->success
    • Cancel:对 Try 操作进行回滚,将记录中的【冻结】状消除或者改为【失败】态. 其底层对应的状态数据会进行回滚
      • try->cancel
      • 预占库存– ;库存++ ;流水修改try->cancel
  3. 事务协调器 TX Manager:负责统筹分布式事务的执行:
    • 实现 TCC Component 的注册管理功能
    • 负责和 Application 交互,提供分布式事务的创建入口,给予 Application 事务执行结果的响应
    • 串联 Try -> Confirm/Cancel 的两阶段流程. 在第一阶段中批量调用 TCC Component 的 Try 接口,根据其结果,决定第二阶段是批量调用 TCC Component 的 Confirm 接口还是 Cancel 接口
    • 对于每一个事物,都有一个tx id并且记录到 事物日志表,以及明细表记录每一个tcc组件状态

image-20240910162811798

执行流程

  1. Application 调用 TX Manager 的接口,创建一轮分布式事务;声明,这次操作涉及到的 TCC Component 范围,包括 订单组件、账户组件和库存组件;传递好,用于和每个 TCC Component 交互的请求参数( TX Manager 调用 Component Try 接口时需要传递)
  2. TX Manager 分配一个全局唯一的事务主键 Transaction ID,记录日志表
  3. TX Manager 分别调用订单、账户、库存组件的 Try 接口,流水表记录每一个try的结果
    1. 某一个失败,执行Cancel;在cancel都响应后设置日志表状态【失败】
    2. 都成功,执行Confirm ;在Confirm 都响应后设置日志表状态【成功】
  4. 根据try结构,返回给application最终结果(不等待CC的结果,只看try)

一致性保证:轮询

TX Manager 轮询重试 + TCC Component 幂等去重(携带tx id)

定时任务:轮询所有未被更新为【成功/失败】对应终态的事务,推进到最终状态

  • 如果try已经存在失败,则再补偿性调用Cancel 操作
  • 如果try都成功,则补偿性地批量调用Confirm操作
  • 如果try还在进行中,则检查时间,如果太长按照失败处理

image-20240910170713079

问题

悬挂问题:try操作拥堵了,cancel操作先到达了,如果 cancel -> try 会导致后续try无法释放

解决:支持空回退:记录下cancel操作Transaction ID,之后try到达了直接忽略

优缺点

优点:

  • 支持回滚
  • 成功率高
    • 进行了try 说明网络不错
    • try 保证了资源的充分
    • 轮询重试保证补偿

缺点:

  • 最终一致性
  • 严格上:如果confirm或cancel始终失败,还是会不一致
  • 实现成本高,需要TCC组件格式

下单场景

现在假设我们需要维护一个电商后台系统,需要处理来自用户的支付请求. 每当有一笔支付请求到达,我们需要执行下述三步操作,并要求其前后状态保持一致性:

  • 在订单模块中,创建出这笔订单流水记录
  • 在账户模块中,对用户的账户进行相应金额的扣减
  • 在库存模块中,对商品的库存数量进行扣减

image-20240910163518305

image-20240910163539671

核心就是订单表添加try状态,库存表添加预占库存概念

1. 订单模块

Try 阶段:

  • 订单模块的 Try 操作主要是创建订单流水,但此时订单状态还不会设置为“已确认”。

  • 在这个阶段,系统会预创建一个订单记录,订单状态可以标记为“处理中”

  • 同时,订单模块可以记录操作的上下文信息(如用户、商品、金额等)以便后续 Confirm 或 Cancel 使用。

  • 操作

    • 写入订单流水表,标记订单状态为“处理中”。
    • 返回成功结果,并携带订单的流水号供后续操作使用。

Confirm 阶段:

  • 当所有模块的 Try 操作都成功时,订单模块会进入 Confirm 阶段。

  • 在 Confirm 阶段,订单模块需要确认订单,将订单状态由“处理中”更新为“已确认”。

  • 操作

    • 将订单状态从“处理中”改为“已确认”。
    • 写入操作日志,更新订单的最终状态。

Cancel 阶段:

  • 如果任意一个模块的 Try 操作失败,订单模块会执行 Cancel 操作,取消订单

  • 在 Cancel 阶段,订单模块需要将“处理中”的订单作废或删除。

  • 操作

    • 将订单状态从“处理中”更新为“已取消”。
    • 记录取消原因和操作日志。

2. 账户模块


3. 库存模块

Try 阶段:

  • 库存模块的 Try 操作是预扣减库存,这意味着暂时锁定所需的库存量,确保后续可以顺利完成库存扣减。

  • 此时并不会真正减少商品库存,而是将库存设置为冻结状态。

  • 操作

    • 检查商品库存是否充足,充足则冻结相应的库存。同时插入冻结流水
    • 将冻结的库存数量记录到库存系统,确保其他订单无法占用这部分库存。
    • 返回冻结成功的结果,并记录库存操作流水号。

Confirm 阶段:

  • 当所有模块的 Try 操作都成功时,库存模块进入 Confirm 阶段,执行实际的扣减库存操作

  • 在这个阶段,库存模块会将冻结的库存真正扣减,减少商品的可用库存。

  • 操作

    • 将商品的冻结库存数量正式扣除,减少可用库存。
    • 更新库存状态,移除冻结记录。
    • 记录库存扣减成功的操作日志。

Cancel 阶段:

  • 如果任意一个模块的 Try 操作失败,库存模块会执行 Cancel 操作,解冻库存,恢复冻结的库存数量。
  • 操作
    • 将冻结的库存数量恢复到可用库存中。
    • 更新库存的冻结记录,确保库存回到 Try 操作之前的状态。
    • 记录取消操作日志。

完整流程示例

假设用户 A 在系统中下单购买商品 B,总价为 100 元,库存需要扣减 1 件:

  1. Try 阶段
    • 订单模块创建订单流水,状态为“处理中”。
    • 库存模块冻结商品 B 的 1 件库存。(库存-1 冻结库存+1)
  2. Confirm 阶段
    • 订单模块将订单状态更新为“已确认”。
    • 库存模块从库存中正式扣减 1 件商品 B。 (冻结库存-1)
  3. Cancel 阶段(假设某个服务的 Try 阶段失败或网络问题导致整体操作失败):
    • 订单模块将订单状态更新为“已取消”。
    • 库存模块解冻商品 B 的 1 件库存,还原库存状态。 (库存+1 冻结库存-1)

目前其实3个方案都加入了一个定时的任务,取轮询状态推进到最终态

两阶段提交2PC

简单而言:参与者(participant)用来管理资源,协调者(coordinator)用来协调事务状态

两段提交(2PC - Prepare & Commit)是指两个阶段的提交:

  • 第一阶段: 准备阶段;
    • 协调者向所有参与者发送 REQUEST-TO-PREPARE
    • 当参与者收到REQUEST-TO-PREPARE 消息后, 它向协调者发送消息PREPARED或者NO,表示事务是否准备好;如果发送的是NO,那么事务要回滚;
  • 第二阶段: 提交阶段。
    • 协调者收集所有参与者的返回消息, 如果所有的参与者都回复的是PREPARED, 那么协调者向所有参与者发送COMMIT 消息;否则,协调者向所有回复PREPARED的参与者发送ABORT消息;
    • 参与者如果回复了PREPARED消息并且收到协调者发来的COMMIT消息,或者它收到ABORT消息,它将执行提交或回滚,并向协调者发送DONE消息以确认。

image-20240910152819014

二阶段提交看似能够提供原子性的操作,但它存在着严重的缺陷:

  • 网络抖动导致的数据不一致:第二阶段中协调者向参与者发送commit命令之后,一旦此时发生网络抖动,导致一部分参与者接收到了commit请求并执行,可其他未接到commit请求的参与者无法执行事务提交。进而导致整个分布式系统出现了数据不一致。
  • 超时导致的同步阻塞问题:2PC中的所有的参与者节点都为事务阻塞型,当某一个参与者节点出现通信超时,其余参与者都会被动阻塞占用资源不能释放。
  • 单点故障的风险:由于严重的依赖协调者,一旦协调者发生故障,而此时参与者还都处于锁定资源的状态,无法完成事务commit操作。虽然协调者出现故障后,会重新选举一个协调者,可无法解决因前一个协调者宕机导致的参与者处于阻塞状态的问题。

三阶段提交3PC

3PC的三个阶段分别是CanCommit、PreCommit、DoCommit

  • CanCommit:协调者向所有参与者发送CanCommit命令,询问是否可以执行事务提交操作。如果全部响应YES则进入下一个阶段。
  • PreCommit:协调者向所有参与者发送PreCommit命令,询问是否可以进行事务的预提交操作,参与者接收到PreCommit请求后,如参与者成功的执行了事务操作,则返回Yes响应,进入最终commit阶段。一旦参与者中有向协调者发送了No响应,或因网络造成超时,协调者没有接到参与者的响应,协调者向所有参与者发送abort请求,参与者接受abort命令执行事务的中断。
  • DoCommit:在前两个阶段中所有参与者的响应反馈均是YES后,协调者向参与者发送DoCommit命令正式提交事务,如协调者没有接收到参与者发送的ACK响应,会向所有参与者发送abort请求命令,执行事务的中断。

参考