分布式事务解决方案:从CAP理论到Seata实战
分布式事务解决方案从CAP理论到Seata实战一、分布式事务概述1.1 为什么需要分布式事务在微服务架构中单体数据库事务无法满足需求跨服务操作一次业务操作涉及多个服务分库分表数据分布在多个数据库实例多数据源不同服务使用不同数据库外部系统集成调用第三方支付、短信等服务1.2 CAP理论┌─────────────────────────────────────────────────────────────────────────┐ │ CAP三角 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ Consistency │ │ ▲ │ │ ╱ ╲ │ │ ╱ ╲ │ │ ╱ ╲ │ │ ╱ ╲ │ │ ╱ ╲ │ │ ╱ ╲ │ │ ╱ ╲ │ │ ▼ ▼ │ │ Availability ────────── Partition Tolerance │ │ │ │ 分布式系统只能同时满足以下两个特性 │ │ • C A: 放弃分区容错 │ │ • C P: 放弃可用性 │ │ • A P: 放弃一致性最终一致性 │ │ │ └─────────────────────────────────────────────────────────────────────────┘1.3 BASE理论特性说明实现方式Basically Available基本可用降级、熔断、限流Soft state软状态中间状态允许短期不一致Eventually consistent最终一致性异步补偿、重试机制二、事务模式概述2.1 事务模式对比模式一致性复杂度性能适用场景2PC/3PC强一致高低对一致性要求极高TCC最终一致高中业务可干预Saga最终一致中高长流程业务本地消息表最终一致中高可接受轮询MQ事务消息最终一致中中异步场景2.2 模式选择指南┌─────────────────────────────────────────────────────────────────────┐ │ 模式选择决策树 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ 开始选择 │ │ │ │ │ ▼ │ │ ┌────────────────┐ │ │ │ 是否需要强一致 │ │ │ └───────┬────────┘ │ │ ┌──────┴──────┐ │ │ │ Yes │ No │ │ ▼ ▼ │ │ ┌───────────┐ ┌───────────┐ │ │ │ 2PC/3PC │ │ 是否业务可控│ │ │ └───────────┘ └───────┬─────┘ │ │ ┌─────┴─────┐ │ │ │ Yes │ No │ │ ▼ ▼ │ │ ┌───────────┐ ┌───────────┐ │ │ │ TCC │ │ 长流程业务│ │ │ └───────────┘ └───────┬─────┘ │ │ ┌─────┴─────┐ │ │ │ Yes │ No │ │ ▼ ▼ │ │ ┌───────────┐ ┌───────────┐ │ │ │ Saga │ │ MQ事务消息 │ │ │ └───────────┘ └───────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘三、Seata介绍3.1 Seata架构┌─────────────────────────────────────────────────────────────────────────┐ │ Seata架构 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ 应用A │ │ 应用B │ │ 应用C │ │ 应用D │ │ │ │TMRM │ │TMRM │ │TMRM │ │TMRM │ │ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ │ │ │ │ │ └──────────────┼──────────────┼──────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────┐ │ │ │ TC(事务协调器) │ │ │ │ Transaction │ │ │ │ Coordinator │ │ │ └───────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────┐ │ │ │ Seata Server │ │ │ │ (AT/TCC/Saga) │ │ │ └───────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘3.2 Seata部署配置# docker-compose.yml version: 3.8 services: seata: image: seataio/seata-server:1.7.0 ports: - 8091:8091 environment: - STORE_MODEdb - SEATA_CONFIG_NAMEfile:/root/seata-config/registry - JVM_XMS512m - JVM_XMX512m volumes: - ./config:/root/seata-config - seata-data:/root depends_on: - mysql mysql: image: mysql:8.0 environment: MYSQL_ROOT_PASSWORD: root MYSQL_DATABASE: seata ports: - 3306:3306 volumes: - mysql-data:/var/lib/mysql - ./script:/docker-entrypoint-initdb.d volumes: seata-data: mysql-data:3.3 Seata Server配置# registry.conf registry { type nacos nacos { application seata-server serverAddr 127.0.0.1:8848 namespace group SEATA_GROUP cluster default username nacos password nacos } } config { type nacos nacos { serverAddr 127.0.0.1:8848 namespace group SEATA_GROUP username nacos password nacos } }四、AT模式4.1 AT模式原理AT模式是Seata的核心模式自动处理SQL实现分布式事务阶段1注册分支 → 执行SQL → 记录undo log → 报告结果 阶段2TC判断 → 全局提交 → 异步删除undo log → 全局回滚 → 执行undo log回滚4.2 依赖配置dependency groupIdio.seata/groupId artifactIdseata-spring-boot-starter/artifactId version1.7.0/version /dependency4.3 应用配置seata: enabled: true application-id: ${spring.application.name} tx-service-group: my_tx_group service: vgroup-mapping: my_tx_group: default enable-degrade: false disable-global-transaction: false config: type: nacos nacos: server-addr: ${NACOS_HOST:127.0.0.1}:${NACOS_PORT:8848} namespace: ${NACOS_NAMESPACE:} group: SEATA_GROUP username: ${NACOS_USERNAME:nacos} password: ${NACOS_PASSWORD:nacos} registry: type: nacos nacos: server-addr: ${NACOS_HOST:127.0.0.1}:${NACOS_PORT:8848} namespace: ${NACOS_NAMESPACE:} group: SEATA_GROUP username: ${NACOS_USERNAME:nacos} password: ${NACOS_PASSWORD:nacos}4.4 AT模式使用Service Slf4j public class OrderService { GlobalTransactional(name create-order, rollbackFor Exception.class) public Order createOrder(CreateOrderRequest request) { log.info(开始创建订单XID: {}, RootContext.getXID()); // 1. 创建订单 Order order orderRepository.save(Order.create(request)); log.info(订单创建成功订单ID: {}, order.getId()); // 2. 扣减库存远程调用 inventoryClient.deductStock(request.getProductId(), request.getQuantity()); log.info(库存扣减成功); // 3. 扣减账户余额远程调用 accountClient.deductBalance(request.getUserId(), order.getTotalAmount()); log.info(余额扣减成功); return order; } }五、TCC模式5.1 TCC模式原理TCC模式需要业务代码实现Try、Confirm、Cancel三个阶段Try阶段预留资源锁定业务数据 Confirm阶段确认使用预留资源 Cancel阶段释放预留资源5.2 TCC接口定义LocalTCC public interface AccountTccService { /** * Try阶段预留账户余额 */ TwoPhaseBusinessAction( name deductAccount, commitMethod confirm, rollbackMethod cancel ) boolean tryDeduct( BusinessActionContextParameter(paramName userId) String userId, BusinessActionContextParameter(paramName amount) BigDecimal amount ); /** * Confirm阶段确认扣减 */ boolean confirm(BusinessActionContext context); /** * Cancel阶段取消扣减释放预留 */ boolean cancel(BusinessActionContext context); }5.3 TCC实现Service Slf4j public class AccountTccServiceImpl implements AccountTccService { Autowired private AccountRepository accountRepository; Autowired private AccountFreezeRepository freezeRepository; Override public boolean tryDeduct(String userId, BigDecimal amount) { log.info(TCC Try阶段预留账户余额userId{}, amount{}, userId, amount); // 1. 检查账户余额是否足够 Account account accountRepository.findByUserId(userId); if (account.getBalance().compareTo(amount) 0) { log.warn(账户余额不足userId{}, balance{}, required{}, userId, account.getBalance(), amount); return false; } // 2. 创建冻结记录 AccountFreeze freeze AccountFreeze.builder() .userId(userId) .freezeAmount(amount) .status(FreezeStatus.TRYING) .build(); freezeRepository.save(freeze); // 3. 预扣减余额乐观锁 int updated accountRepository.freezeBalance(userId, amount); return updated 0; } Override public boolean confirm(BusinessActionContext context) { String userId context.getActionContext(userId, String.class); BigDecimal amount context.getActionContext(amount, BigDecimal.class); log.info(TCC Confirm阶段确认扣减userId{}, amount{}, userId, amount); // 更新冻结记录状态 freezeRepository.updateStatusByUserId(userId, FreezeStatus.CONFIRMED); return true; } Override public boolean cancel(BusinessActionContext context) { String userId context.getActionContext(userId, String.class); BigDecimal amount context.getActionContext(amount, BigDecimal.class); log.info(TCC Cancel阶段取消扣减userId{}, amount{}, userId, amount); // 1. 解冻余额 accountRepository.unfreezeBalance(userId, amount); // 2. 更新冻结记录状态 freezeRepository.updateStatusByUserId(userId, FreezeStatus.CANCELLED); return true; } }5.4 TCC使用Service Slf4j public class OrderService { Autowired private OrderRepository orderRepository; Autowired private AccountTccService accountTccService; GlobalTransactional(name create-order, rollbackFor Exception.class) public Order createOrder(CreateOrderRequest request) { log.info(创建订单开始); // 1. 创建订单 Order order Order.create(request); orderRepository.save(order); // 2. TCC扣减账户余额 boolean deducted accountTccService.tryDeduct( request.getUserId(), order.getTotalAmount() ); if (!deducted) { throw new BusinessException(余额不足); } return order; } }六、Saga模式6.1 Saga模式原理Saga模式将长事务拆分为多个本地事务通过补偿机制实现最终一致┌─────────────────────────────────────────────────────────────────────┐ │ Saga长事务流程 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ 订单服务 ──▶ 库存服务 ──▶ 支付服务 ──▶ 物流服务 │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ 创建订单 扣减库存 扣减余额 创建物流 │ │ │ │ │ │ │ │ │ │ │ │ │ │ 异常◀────────┴────────────┴────────────┘ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ 取消订单 ───▶ 释放库存 ───▶ 退还余额 ───▶ 取消物流 │ │ │ └─────────────────────────────────────────────────────────────────────┘6.2 Saga状态机配置{ Name: orderSaga, Comment: 订单创建Saga, StartState: CreateOrder, States: { CreateOrder: { Type: ServiceTask, ServiceName: order-service, ServiceMethod: createOrder, CompensateState: CancelOrder, Next: DeductInventory, Status: { Success: SU, Fail: FA, Compensating: CO, Compensated: CL } }, CancelOrder: { Type: ServiceTask, ServiceName: order-service, ServiceMethod: cancelOrder, Next: End }, DeductInventory: { Type: ServiceTask, ServiceName: inventory-service, ServiceMethod: deductInventory, CompensateState: RestoreInventory, Next: DeductBalance, Status: { Success: SU, Fail: FA, Compensating: CO, Compensated: CL } }, RestoreInventory: { Type: ServiceTask, ServiceName: inventory-service, ServiceMethod: restoreInventory, Next: End }, DeductBalance: { Type: ServiceTask, ServiceName: account-service, ServiceMethod: deductBalance, CompensateState: RefundBalance, Next: End, Status: { Success: SU, Fail: FA, Compensating: CO, Compensated: CL } }, RefundBalance: { Type: ServiceTask, ServiceName: account-service, ServiceMethod: refundBalance, Next: End } } }6.3 Saga服务实现Service Slf4j public class OrderSagaService { Autowired private StateMachineEngine stateMachineEngine; public void startOrderSaga(Order order) { SagaRequest request SagaRequest.builder() .orderId(order.getId()) .userId(order.getUserId()) .productId(order.getProductId()) .quantity(order.getQuantity()) .amount(order.getTotalAmount()) .build(); StateMachineInstance instance stateMachineEngine.startWithToken( orderSaga, null, request ); log.info(Saga started: instanceId{}, instance.getId()); } } Service public class OrderService { Autowired private OrderRepository orderRepository; public void createOrder(Order order) { order.setStatus(OrderStatus.CREATED); orderRepository.save(order); } public void cancelOrder(Long orderId) { orderRepository.findById(orderId) .ifPresent(order - { order.setStatus(OrderStatus.CANCELLED); orderRepository.save(order); }); } }七、本地消息表模式7.1 原理通过本地消息表实现最终一致性┌─────────────────────────────────────────────────────────────────────┐ │ 本地消息表模式 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ 业务操作 │ │ 消息表 │ │ 消息发送 │ │ │ │ 写入消息 │ ──▶ │ 存储消息 │ ──▶ │ 发送MQ │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌──────────────┐ │ │ │ │ │ 消息确认 │ │ │ │ │ └──────┬───────┘ │ │ │ ▼ │ │ │ │ ┌──────────────┐ │ │ │ │ │ 定时任务 │ ◀────────────┘ │ │ │ │ 重试发送 │ │ │ │ └──────────────┘ │ │ ▼ │ │ ┌──────────────┐ │ │ │ 本地事务 │ │ │ │ 提交/回滚 │ │ │ └──────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘7.2 消息表设计CREATE TABLE IF NOT EXISTS outbox_message ( id BIGINT PRIMARY KEY AUTO_INCREMENT, message_id VARCHAR(64) NOT NULL UNIQUE, aggregate_type VARCHAR(64) NOT NULL, aggregate_id VARCHAR(64) NOT NULL, event_type VARCHAR(64) NOT NULL, payload TEXT NOT NULL, status VARCHAR(32) NOT NULL DEFAULT PENDING, retry_count INT DEFAULT 0, max_retry INT DEFAULT 3, next_retry_time DATETIME, created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_status_created (status, created_at), INDEX idx_aggregate (aggregate_type, aggregate_id) ) ENGINEInnoDB DEFAULT CHARSETutf8mb4;7.3 实现Service Slf4j public class OutboxMessageService { Autowired private OutboxMessageRepository repository; Autowired private RabbitTemplate rabbitTemplate; Transactional public void saveAndPublish(String aggregateType, String aggregateId, String eventType, Object payload) { // 1. 生成消息 String messageId UUID.randomUUID().toString(); // 2. 保存消息到本地表 OutboxMessage message OutboxMessage.builder() .messageId(messageId) .aggregateType(aggregateType) .aggregateId(aggregateId) .eventType(eventType) .payload(toJson(payload)) .status(MessageStatus.PENDING) .retryCount(0) .maxRetry(3) .nextRetryTime(LocalDateTime.now()) .build(); repository.save(message); } Scheduled(fixedDelay 1000) public void processPendingMessages() { ListOutboxMessage pending repository.findPendingMessages(100); for (OutboxMessage message : pending) { try { // 发送消息到MQ rabbitTemplate.convertAndSend( message.getEventType(), message.getAggregateId(), message.getPayload() ); // 更新状态 message.setStatus(MessageStatus.SENT); repository.save(message); } catch (Exception e) { handleFailure(message, e); } } } private void handleFailure(OutboxMessage message, Exception e) { message.setRetryCount(message.getRetryCount() 1); if (message.getRetryCount() message.getMaxRetry()) { message.setStatus(MessageStatus.FAILED); log.error(Message send failed after max retries: {}, message.getMessageId(), e); } else { message.setNextRetryTime(LocalDateTime.now().plusMinutes(1 message.getRetryCount())); log.warn(Message send failed, will retry: {}, message.getMessageId(), e); } repository.save(message); } }八、MQ事务消息8.1 RocketMQ事务消息Service Slf4j public class OrderTransactionService { Autowired private RocketMQTemplate rocketMQTemplate; Autowired private OrderRepository orderRepository; public void createOrder(Order order) { // 发送事务消息 Transaction transaction rocketMQTemplate.sendMessageInTransaction( order-topic, MessageBuilder.withPayload(order).build(), order ); } } Component Slf4j public class OrderTransactionListener implements RocketMQLocalTransactionListener { Autowired private OrderRepository orderRepository; Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { Order order (Order) arg; try { // 执行本地事务 order.setStatus(OrderStatus.CREATED); orderRepository.save(order); log.info(Local transaction executed: orderId{}, order.getId()); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error(Local transaction failed: orderId{}, order.getId(), e); return RocketMQLocalTransactionState.ROLLBACK; } } Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String orderId (String) msg.getHeaders().get(orderId); Order order orderRepository.findById(orderId).orElse(null); if (order ! null) { return RocketMQLocalTransactionState.COMMIT; } return RocketMQLocalTransactionState.UNKNOWN; } }九、实战案例9.1 分布式扣款场景Service Slf4j public class PaymentService { GlobalTransactional(name payment-tx, rollbackFor Exception.class) public PaymentResult pay(PaymentRequest request) { log.info(开始支付userId{}, amount{}, request.getUserId(), request.getAmount()); // 1. 校验账户余额 Account account accountRepository.findByUserId(request.getUserId()); if (account.getBalance().compareTo(request.getAmount()) 0) { throw new InsufficientBalanceException(余额不足); } // 2. 冻结金额 account.freeze(request.getAmount()); accountRepository.save(account); // 3. 调用支付网关 PaymentGatewayResult gatewayResult paymentGateway.pay(request); // 4. 更新支付状态 Payment payment Payment.create(request, gatewayResult); payment.setStatus(PaymentStatus.SUCCESS); paymentRepository.save(payment); // 5. 扣除余额 account.deductBalance(request.getAmount()); account.unfreeze(request.getAmount()); accountRepository.save(account); log.info(支付成功paymentId{}, payment.getId()); return PaymentResult.success(payment.getId()); } }9.2 分布式扣库存场景Service public class InventoryService { GlobalTransactional(name inventory-tx, rollbackFor Exception.class) public void deductStock(String productId, int quantity) { log.info(扣减库存productId{}, quantity{}, productId, quantity); Inventory inventory inventoryRepository.findByProductId(productId) .orElseThrow(() - new InventoryNotFoundException(productId)); if (inventory.getStock() quantity) { throw new InsufficientStockException(productId); } inventory.setStock(inventory.getStock() - quantity); inventory.setFrozenStock(inventory.getFrozenStock() quantity); inventoryRepository.save(inventory); log.info(库存扣减成功productId{}, remaining{}, productId, inventory.getStock()); } }十、最佳实践10.1 事务模式选择场景推荐模式原因强一致性场景2PC/Seata AT需要强一致保证简单跨服务操作Seata TCC业务可控性能好长流程业务Saga支持复杂流程编排异步消息场景MQ事务消息解耦异步处理低成本实现本地消息表实现简单无需中间件10.2 注意事项注意事项说明幂等性所有操作必须幂等防止重复执行超时处理设置合理的超时时间异常处理做好异常捕获和日志记录监控告警监控异常事务及时处理事务隔离合理设置隔离级别避免脏读10.3 监控配置management: endpoints: web: exposure: include: health,metrics,prometheus endpoint: health: show-details: always seata: metrics: enabled: true registry-type: compact exporter-list: prometheus十一、总结分布式事务是微服务架构中的核心挑战通过本文的介绍你可以CAP/BASE理论理解分布式一致性的理论基础事务模式对比2PC、TCC、Saga、本地消息表、MQ事务消息Seata架构AT、TCC、Saga三种模式详解AT模式实战自动处理SQL的分布式事务TCC模式实战业务可控的三阶段提交Saga模式实战长事务的编排与补偿本地消息表低成本实现最终一致性MQ事务消息RocketMQ事务消息实现选择合适的分布式事务方案需要综合考虑一致性要求、业务复杂度、性能开销和运维成本。