一、分布式事务实现方式
1.1、两阶段提交(2PC) 需要数据库厂商的支持,java组件有atomikos等
两阶段提交(Two-phase Commit,2PC),通过引入协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务
- 准备阶段
协调者询问参与者事务是否执行成功,参与者发回事务执行结果
- 提交阶段
如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务
需要注意的是,在准备阶段,参与者执行了事务,但还未提交。只有在提交阶段接收到协调者发来的通知之后,才进行提交或回滚
存在的问题:
- 同步阻塞:所有事务参与者在等待其它参与者响应的时候都处于同步阻塞状态,无法进行其它操作
- 单点问题:协调者在 2PC 中起到非常大的作用,发生故障会造成很大影响。特别是在阶段二发生故障,所有参与者会一直处于等待状态,无法完成其它操作
- 数据不一致:在阶段二,如果协调者只发送了部分
commit
消息,此时网络发生异常,那么只有部分参与者接收到commit
消息,也就是说只有部分参与者提交了事务,使得系统数据不一致 - 太过于保守:任意一个节点失败就会导致整个事务失败,没用完善的容错机制
1.2、补偿事务(TCC)严选,阿里,蚂蚁金服
TCC 其实就是采用补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段
- Try:阶段主要是对业务系统做检测及资源预留
- Confirm:阶段主要是对业务系统做确认提交,Try 阶段执行成功并开始执行 Confirm 阶段时,默认 Confirm 阶段是不会出错的。即:只要 Try 成功,Confirm 一定成功
- Cancel:阶段主要是在业务执行异常时,需要回滚的状态下执行业务取消,预留资源释放
1.3、本地消息表(异步确保)比如:支付宝、微信支付主动查询支付状态,对账单的形式
本地消息表对业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的操作满足事务特性,并且使用了消息队列来保证最终一致性
- 在分布式事务操作的一方完成写业务数据的操作之后向本地消息表发送一个消息,本地事务能保证这个消息一定会被写入本地消息表中
- 之后将本地消息表中的消息转发到 Kafka 等消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发
- 在分布式事务操作的另一方,从消息队列中读取一个消息,并执行消息中的操作
优点:一种非常经典的实现,避免了分布式事务,实现了最终一致性
缺点:消息表会耦合到业务系统中,如果没用封装好的解决方案,会有很多杂务需要处理
1.4、MQ 事务消息 异步场景,通用性较强,拓展性较高
有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如 Kafka 不支持。
以阿里的 RocketMQ 为例,其思路大致为
- 第一阶段 Prepared 消息,会拿到消息的地址。第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态
- 在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息,如果确认消息发送失败了,RabbitMQ 会定期扫描消息集群中的事务消息,这时候发现了 Prepared 消息,它会向消息发送者确认,所以生产方需要需要实现一个 ckeck 接口,RabbitMQ 会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败
优点:实现了最终一致性,不需要以来本地数据库事务
缺点:实现难度大,主流MQ不支持,RockerMQ 事务消息部分代码为开源
二、RabbitMQ 分布式事务实现
2.1、分布式案例
测试
2.1.1、创建数据库
- 建数据库
rabbitmq_dispatch
和rabbitmq_order
# 数据库 rabbitmq_dispatch
CREATE DATABASE `rabbit_dispatch`;
# 数据库 rabbitmq_order
CREATE DATABASE `rabbit_dispatch`;
# 在数据库 rabbit_dispatch 中创建表 hd_dispatch_order
CREATE TABLE `hd_dispatch_order` (
`dispatch_id` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL,
`order_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`status` int DEFAULT NULL,
`order_content` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`user_id` int DEFAULT NULL,
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
# 在数据库 rabbit_order 中创建表 hd_order
CREATE TABLE `hd_order` (
`order_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`user_id` int DEFAULT NULL,
`order_content` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
2.1.2、创建一个 SpringBoot 工程 dispatcher_service
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--MySQL连接驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--jdbc-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!--json数据格式转换-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
yml 配置文件如下
server:
port: 9094
spring:
datasource:
url: jdbc:mysql://localhost:3306/rabbit_dispatcher?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
rabbitmq:
addresses: localhost:5671,localhost:5672,localhost:5673
username: guest
password: guest
virtual-host: /
service
包下的 DispatchService 类
@Service
@Transactional(rollbackFor = Exception.class)
public class DispatchService {
@Resource
private JdbcTemplate jdbcTemplate;
/**
* 运单接收
*/
public void dispatch(String orderId) throws Exception {
// 定义存储 sql
String sqlStr = "insert into hd_dispatch_order(order_id,dispatch_id,status,order_content,user_id) values(?,?,?,?,?)";
// 添加运动记录
int count = jdbcTemplate.update(sqlStr, orderId, UUID.randomUUID().toString(), 0, "点了一份快餐", "1");
if (count != 1) {
throw new Exception("订单创建失败[数据库语句操作失败]");
}
}
}
controller
层代码
@RestController
@RequestMapping("dispatch")
public class DispatchController {
@Autowired
private DispatchService dispatchService;
/**
* 添加订单后,添加调度信息
*/
@GetMapping("/order")
public String lock(String orderId) throws Exception {
// 分配订单
dispatchService.dispatch(orderId);
return "success";
}
}
2.1.3、创建一个 SpringBoot 工程 order_service
引入依赖,同上
yml 配置文件如下
server:
port: 9093
spring:
datasource:
url: jdbc:mysql://localhost:3306/rabbit_order?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
rabbitmq:
addresses: localhost:5671,localhost:5672,localhost:5673
username: guest
password: guest
virtual-host: /
实体类
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Order implements Serializable {
private String orderId;
private Integer userId;
private String orderContent;
private Date createTime;
public Order(String orderId, Integer userId, String orderContent) {
this.orderId = orderId;
this.userId = userId;
this.orderContent = orderContent;
}
}
业务层代码
@Service
public class OrderService {
@Autowired
private OrderDataBaseService orderDataBaseService;
// 创建订单
@Transactional(rollbackFor = Exception.class)
public void createOrder(Order order) throws Exception {
// 订单信息
orderDataBaseService.savaOrder(order);
// 通过 http 接口发送订单信息到系统
String result = dispatchHttpApi(order);
if (!"success".equals(result)) {
throw new Exception("订单创建失败,由于运单创建失败");
}
}
private String dispatchHttpApi(Order order) {
// 发送 http 请求
String url = "http://localhost:9094/dispatch/order?orderId=" + order.getOrderId();
RestTemplate restTemplate = new RestTemplate();
return restTemplate.getForObject(url, String.class);
}
}
@Service
@Transactional(rollbackFor = Exception.class)
public class OrderDataBaseService {
@Resource
private JdbcTemplate jdbcTemplate;
public void savaOrder(Order order) throws Exception {
String sql = "insert into hd_order(order_id,user_id,order_content) values (?,?,?)";
int count = jdbcTemplate.update(sql, order.getOrderId(), order.getUserId(), order.getOrderContent());
if (count != 1) {
throw new Exception("订单创建失败" );
}
// 在下订单可能会 rabbit 出现宕机,就导致消息没有放入 MQ,为了消息可靠生产,对消息做一次冗余
saveLocalMessage(order);
}
}
2.1.4、启动 dispatcher_service
3.1.5、order_service 测试添加订单
@SpringBootTest
public class CreateOrderTest {
@Autowired
private OrderService orderService;
@Test
public void createOrderTest() throws Exception {
Order order = new Order("1000001", 1, "点了一份快餐");
orderService.createOrder(order);
}
}
流程为,用户创建订单,派送服务接收到订单服务的请求,生成派送订单,此时流程没问题
3.1.6、问题
此时修改 dispatcher_service controller
接口,模拟业务耗时
@GetMapping("/order")
public String lock(String orderId) throws Exception {
if (orderId.equals("1000001")) {
// 模拟业务耗时,接口调用者会认为超时
TimeUnit.SECONDS.sleep(3);
}
// 分配订单
dispatchService.dispatch(orderId);
return "success";
}
修改 order_service 请求接口代码,模拟请求超时
private String dispatchHttpApi(Order order) {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
// 设置连接超时时间
factory.setConnectTimeout(3000);
// 处理超时时间
factory.setReadTimeout(2000);
// 发送 http 请求
String url = "http://localhost:9094/dispatch/order?orderId=" + order.getOrderId();
RestTemplate restTemplate = new RestTemplate(factory);
return restTemplate.getForObject(url, String.class);
}
可以看到此时请求超时,order_service 由于事务回滚,没有生成订单,然后 dispatcher_service 只是业务处理时间过长,实际 hd_dispatch_order 表中,已经生成派送订单,这就导致分布式事务不一致。
2.2、MQ 分布式事务实现
整体设计思路如下
分布式事务问题 - 可靠生产
可靠消费
2.2.1、可靠生产实现
在 rabbit_order 数据库创建表 message
CREATE TABLE `message` (
`order_id` int(11) DEFAULT NULL,
`status` varchar(20) DEFAULT NULL,
`order_content` varchar(100) DEFAULT NULL,
`unique_id` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
由于这里需要用到 rabbitmq 的回调机制,需要修改 yml 配置文件,具体配置如下
server:
port: 9093
spring:
datasource:
url: jdbc:mysql://localhost:3306/rabbit_order?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
rabbitmq:
addresses: localhost:5671,localhost:5672,localhost:5673
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated
业务层代码
@Service
public class MqOrderService {
@Autowired
private OrderDataBaseService orderDataBaseService;
@Autowired
private OrderMqService orderMqService;
public void createOrder(Order order) throws Exception {
// 订单信息插入
orderDataBaseService.savaOrder(order);
// 通过 http 接口发送订单信息到运单系统
orderMqService.sendMessage(order);
}
}
由于这里需要消息冗余,修改orderDataBaseService.savaOrder()
方法代码
@Service
@Transactional(rollbackFor = Exception.class)
public class OrderDataBaseService {
@Resource
private JdbcTemplate jdbcTemplate;
public void savaOrder(Order order) throws Exception {
String sql = "insert into hd_order(order_id,user_id,order_content) values (?,?,?)";
int count = jdbcTemplate.update(sql, order.getOrderId(), order.getUserId(), order.getOrderContent());
if (count != 1) {
throw new Exception("订单创建失败" );
} // 在下订单可能会 rabbit 出现宕机,就导致消息没有放入 MQ,为了消息可靠生产,对消息做一次冗余
saveLocalMessage(order);
}
private void saveLocalMessage(Order order) throws Exception {
// 定义 sql String sql = "insert into message(order_id,order_content,status,unique_id) values (?,?,?,?)";
int count = jdbcTemplate.update(sql, order.getOrderId(), order.getOrderContent(), 0, 1);
if (count != 1) {
throw new Exception("message 出现异常");
} }
}
mq 包下代码
@Configuration
@Slf4j
public class OrderMqService {
@Resource
private JdbcTemplate jdbcTemplate;
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMessage(Order order) throws JsonProcessingException {
rabbitTemplate.convertAndSend("order_fanout_exchange", "", new ObjectMapper().writeValueAsString(order), new CorrelationData(order.getOrderId()));
}
// 该注解被用来修饰一个非静态的void方法,被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,
// 并且只会被服务器执行一次,PostConstruct在构造函数之后执行,init()方法之前执行
@PostConstruct
public void regCallback() {
// rabbitTemplate.setMandatory(true);
// 消息发送成功之后,给予生产者的消息回执,来确保生产者的可靠性
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("case: {}", ack);
// 如果 ack 为 true,代表消息已经收到
String orderId = correlationData.getId();
if (!ack) {
// 这里可能要进行其它方式存储
log.info("MQ 队列应答失败,orderId 是: {}",orderId);
return;
}
String sql = "update message set status = 1 where order_id = ?";
int count = 0;
try {
count = jdbcTemplate.update(sql, orderId);
} catch (DataAccessException e) {
log.info("本地消息状态修改失败,出现异常:{}",e.getMessage());
}
if (count == 1) {
log.info("本地消息状态修改成功,消息成功投递到消息队列中...");
}
});
}
}
运行测试
@SpringBootTest
public class CreateOrderTest {
@Autowired
private MqOrderService mqOrderService;
@Test
public void mqCreateOrderTest() throws Exception {
Order order = new Order("1000001", 1, "点了一份快餐");
mqOrderService.createOrder(order);
}
}
可以看到,数据库订单和冗余数据都生产了,并且 rabbitmq 队列中也有数据,冗余数据的状态在回调中也被修改成功
2.2.2、可靠消费
dispathcer_service 项目中
mq 包下
@Service
@Slf4j
@RabbitListener(queues = {"order.queue"})
public class OrderMqConsumer {
@Autowired
private DispatchService dispatchService;
private int count = 1;
@RabbitHandler
public void messageConsumer(String orderMsg, Channel channel, CorrelationData correlationData,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception {
// 获取到消息队列的消息
log.info("收到 MQ 消息是: {}", orderMsg);
// 获取订单服务信息
Order order = new ObjectMapper().readValue(orderMsg, Order.class);
// 获取订单id
String orderId = order.getOrderId();
// 保存订单
dispatchService.dispatch(orderId);
}
}
运行 dispatcher_service,可以看到 rabbitmq 消费端获取到了数据,并且将派送订单保存到了数据库中。
由于在创建订单记录又或者在创建消息记录时,已经将消息推送到队列中了,但是数据库中的数据状态可能没有修改,这时需要创建一个定时任务,每个几分钟或几秒查询数据库的消息记录状态,确保已经推送到队列中的消息在数据库中完成了修改
添加 Message 实体类
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Message {
private Integer orderId;
private String status;
private String orderContent;
private Integer uniqueId;
}
添加定时任务类 TaskSchedule
@EnableScheduling
public class TaskSchedule {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@Scheduled(cron = "* 0/5 * * * ?")
public void sendMessage() {
// 查询所有状体啊为 0 的记录
String sql = "select * from message where status = 0";
List<Message> messages = jdbcTemplate.queryForList(sql, Message.class);
// 循环将消息推送到消息队列中
// messages.forEach(message -> rabbitTemplate.convertAndSend("order_faonout_exchange","", JsonUtil.obj2String(message),new CorrelationData()));
}
}
问题,在上述情况中,可能还是会出现问题,例如在保存订单时,可能会出现异常
2.3、可靠消费重试机制
由于在接受消息的时候出现了故障,会进入死循环,并触发 RabbitMQ 的重试机制,队列里的消息一直无法应答
解决方案有如下几种
- 控制重发的次数
- try + catch + 手动ack
- try + catch + 手动ack + 死信队列处理
但是,手动 ack 可能会导致消息丢失,这里直接用第三种方案替代第二种
2.3.1、控制重发的次数
在 dispatcher_service 项目中,修改 application.yml 文件
server:
port: 9094
spring:
datasource:
url: jdbc:mysql://localhost:3306/rabbit_dispatcher?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
rabbitmq:
addresses: localhost:5671,localhost:5672,localhost:5673
username: guest
password: guest
virtual-host: /
# 这里是手动开启 ack,让程序控制 MQ 消息的重发、删除和转移
listener:
simple:
# 手动 ack 机制,默认是 none
acknowledge-mode: manual
retry:
# 开启重试,默认为 false
enabled: true
# 最大重试次数,默认为 3
max-attempts: 10
# 重试间隔时间
initial-interval: 2000ms
# 消息确定机制
publisher-confirm-type: correlated
try + catch + 手动ack + 死信队列处理
在 order_service 中添加如下配置
@Configuration
public class DeadLetterMQConfig {
@Bean
public FanoutExchange deadExchange() {
return new FanoutExchange("dead_order_fanout_exchange", true, false);
}
@Bean
public Queue deadOrderQueue() {
return new Queue("dead.order.queue", true);
}
@Bean
public Binding deadQueueBinding() {
return BindingBuilder.bind(deadOrderQueue()).to(deadExchange());
}
// 将订单与死信队列绑定,需要将之前创建的队列和交换机删除
@Bean
public FanoutExchange orderFanoutExchange() {
return new FanoutExchange("order_fanout_exchange", true, false);
}
@Bean
public Queue orderQueue() {
HashMap<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dead_order_fanout_exchange");
return new Queue("order.queue", true, false, false, args);
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue()).to(orderFanoutExchange());
}
}
在 dispatcher_service 项目中,在 mq 包下
OrderMqConsumer 模拟消费信息时出现异常
@Service
@Slf4j
@RabbitListener(queues = {"order.queue"})
public class OrderMqConsumer {
@Autowired
private DispatchService dispatchService;
private int count = 1;
@RabbitHandler
public void messageConsumer2(String orderMsg, Channel channel, CorrelationData correlationData,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
// 获取消息队列消息
log.info("order queue 收到 MQ 消息是: {}", orderMsg);
try {
// 获取订单信息
Order order = new ObjectMapper().readValue(orderMsg, Order.class);
// 获取订单 id
String orderId = order.getOrderId();
// 保存订单
dispatchService.dispatch(orderId);
// 模拟异常
System.out.println(1 / 0);
// 关闭 ack
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 如果出现异常的情况下,则判断为无效应答,根据实际情况重发
// 参数1:消息的 tag,参数2:false 多条处理,参数3:requeue 重发
// false 不会重发,false 的话会扔掉消息,将消息转移到死信队列
// true 会循环重发,建议使用 true 的话,不要加 try / catch,否则会造成死循环,并且在配置文件中配置的重发次数失效
channel.basicNack(deliveryTag, false, false);
}
}
}
处理死信队列里的消息
@Slf4j
@Service
@RabbitListener(queues = {"dead.order.queue"})
public class DeadMqConsumer {
@Autowired
private DispatchService dispatchService;
@RabbitHandler
public void messageConsumer(String orderMsg, Channel channel,
CorrelationData correlationData,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
// 获取死信队列中的数据
log.info("dead letter queue 收到 MQ 消息是: {}", orderMsg);
try {
// 获取订单服务的信息
Order order = new ObjectMapper().readValue(orderMsg, Order.class);
// 获取订单 id
String orderId = order.getOrderId();
// 这里原本已经将消息存入数据库中,应该是执行更新操作,换成 update 操作
dispatchService.updateStatus(orderId);
// 关闭 ack 机制
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
e.printStackTrace();
log.info("人工干预");
log.info("发短信预警");
log.info("同时将消息转移到别的存储 DB");
// 移除死信队列中的消息
channel.basicNack(deliveryTag, false, false);
}
}
}