Featured image of post 分布式锁 + 接口幂等 + 分布式事务 + 消息可靠性

分布式锁 + 接口幂等 + 分布式事务 + 消息可靠性

4 大分布式核心组件实战:Redisson 分布式锁(可重入/公平/读写锁)、Redis + Lua 接口幂等(注解+AOP+CacheLock)、Seata 分布式事务(TC/TM/RM + AT/TCC/SAGA/XA)、消息队列延迟方案 + 重复消费解决

本文写于 2023 年 9 月——Redisson 已经成为分布式锁事实标准,Seata AT 模式广泛落地,RocketMQ / RabbitMQ 可靠性投递也趋于成熟。

一、分布式锁

1.1 为什么需要分布式锁

ZooKeeper 有 Znode 顺序节点,数据库有表级锁和乐观/悲观锁,Redis 有 SETNX,殊途同归——最终还是要回到互斥上来

实现一致性性能复杂度推荐场景
MySQL 行锁不推荐
ZooKeeper 临时顺序节点强一致场景
Redis SETNX + Lua最终通用
Redisson最终Java 生态首选

1.2 Redisson 分布式锁

引入依赖

1
2
3
4
5
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.15.5</version>
</dependency>

配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
@Configuration
public class MyRedissonConfig {
    
    @Bean(destroyMethod = "shutdown")
    public RedissonClient redisson() throws IOException {
        Config config = new Config();
        // 单机模式
        config.useSingleServer()
              .setAddress("redis://127.0.0.1:6379")
              .setDatabase(0);
        // 集群模式
        // config.useClusterServers()
        //       .addNodeAddress("redis://127.0.0.1:7001", "redis://127.0.0.1:7002");
        return Redisson.create(config);
    }
}

伪代码

1
2
3
4
5
6
7
8
9
// 1. 设置分布式锁
RLock lock = redisson.getLock("orderLock");
// 2. 占锁
lock.lock();
// 3. 执行业务
createOrder();
deductStock();
// 4. 释放锁
lock.unlock();

1.3 Redisson 锁的高级特性

锁类型API用途
可重入锁getLock(key)同一线程可多次 lock
公平锁getFairLock(key)按请求顺序排队
读写锁getReadWriteLock(key)读共享 / 写互斥
联锁(MultiLock)getMultiLock(lock1, lock2, ...)多资源原子加锁
红锁(RedLock)getRedLock(lock1, lock2, ...)跨机房强一致(谨慎使用
信号量getSemaphore(key)限流
闭锁getCountDownLatch(key)协调多线程

1.4 接口防重(基于 Redisson)

思路:用 Redisson 的分布式锁给"幂等性"做兜底——同一用户 1 秒内多次点击只处理第一次。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public boolean isRepeatSubmit(String userId, String bizType) {
    String key = "repeat:" + bizType + ":" + userId;
    RLock lock = redisson.getLock(key);
    try {
        // 尝试加锁,最多等 0 秒,锁 1 秒过期
        return lock.tryLock(0, 1, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        return false;
    }
}

二、接口幂等性

2.1 什么是接口幂等

幂等性:多次调用产生的结果与一次调用相同。

典型场景

  • 订单创建(重复点击 → 多个订单)
  • 支付回调(网络重试 → 多次扣款)
  • 表单提交(重复提交 → 重复数据)

2.2 基于 Redis + Lua 的接口幂等

思路:用 SETNX 抢锁,抢到就执行业务,LUA 保证释放锁时只删自己加的(避免误删别人的锁)。

Lua 释放锁脚本

1
2
3
4
5
if redis.call('get', KEYS[1]) == ARGV[1] then
    return redis.call('del', KEYS[1])
else
    return 0
end

2.3 完整实现(注解 + AOP + CacheLock)

第一步:Redis 依赖

1
2
3
4
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

第二步:自定义注解

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CacheLock {
    /** key 前缀 */
    String prefix() default "";
    /** 过期秒数,默认 5 秒 */
    int expire() default 5;
    /** 超时时间单位 */
    TimeUnit timeUnit() default TimeUnit.SECONDS;
    /** Key 分隔符 */
    String delimiter() default ":";
}

第三步:AOP 切面

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Aspect
@Component
public class LockCheckAspect {
    
    /** Lua:释放锁时校验 value,避免误删 */
    private static final String RELEASE_LOCK_LUA_SCRIPT = 
        "if redis.call('get', KEYS[1]) == ARGV[1] then " +
        "  return redis.call('del', KEYS[1]) " +
        "else return 0 end";
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Pointcut("@annotation(cn.example.api.config.aop.CacheLock)")
    public void pointCut() {}
    
    @Around("pointCut()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        // 1. 获取业务唯一 key(如用户手机号)
        String phone = getUserPhone();
        
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        CacheLock cacheLock = method.getAnnotation(CacheLock.class);
        String prefix = cacheLock.prefix();
        if (StringUtils.isBlank(prefix)) {
            throw new GlobalException("CacheLock prefix can't be null");
        }
        
        // 2. 拼接 key
        String lockKey = prefix + cacheLock.delimiter() + phone;
        String uuid = cn.hutool.core.lang.UUID.fastUUID().toString();
        
        try {
            // 3. 抢锁:SETNX + 过期时间
            boolean success = redisTemplate.opsForValue()
                .setIfAbsent(lockKey, uuid, cacheLock.expire(), cacheLock.timeUnit());
            if (!success) {
                throw new CustomDeniedException("请勿重复提交");
            }
            
            // 4. 执行业务
            return joinPoint.proceed();
        } finally {
            // 5. 释放锁(Lua 原子操作,避免误删)
            DefaultRedisScript<Long> redisScript = 
                new DefaultRedisScript<>(RELEASE_LOCK_LUA_SCRIPT, Long.class);
            redisTemplate.execute(redisScript, Collections.singletonList(lockKey), uuid);
        }
    }
}

第四步:Controller

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@RestController
@RequestMapping("/charge")
@AllArgsConstructor
public class ChargeController {
    
    @PostMapping("/startCharge")
    @CacheLock(prefix = "recharge")
    public Result startCharge(@RequestBody @Validated ChargeQuery query) {
        return chargeService.startCharge(query);
    }
}

2.4 幂等性方案对比

方案实现成本性能适用场景
数据库唯一索引强幂等(如订单号)
Redis SETNX + Lua通用
状态机业务流(订单状态扭转)
Token 机制防重提交
乐观锁 / version更新场景

三、Seata 分布式事务

3.1 什么是 Seata

Seata(Simple Extensible Autonomous Transaction Architecture) 是阿里开源的分布式事务解决方案,致力于在微服务架构下提供 高性能 + 简单易用 的分布式事务服务。

官网:https://seata.io/zh-cn/

4 种事务模式

  • AT(Auto Transaction,默认 + 最常用
  • TCC(Try-Confirm-Cancel)
  • SAGA(长事务)
  • XA(强一致)

3.2 三大核心组件

角色简称作用
Transaction CoordinatorTC事务协调器(Seata Server),维护全局/分支事务状态,驱动提交/回滚
Transaction ManagerTM事务管理器(业务侧),定义全局事务范围,发起开始/提交/回滚决议
Resource ManagerRM资源管理器(业务侧数据库),管理分支事务资源

部署架构

  • TC:以 Seata Server 形式独立部署
  • TM + RM:以 Seata Client 形式集成在微服务中

3.3 工作流程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
TM                    TC                      RM (Service-A)        RM (Service-B)
│                      │                            │                       │
│ 1. begin global tx   │                            │                       │
│ ──────────────────> │                            │                       │
│ 2. 创建 XID = 192.168│                            │                       │
│ <────────────────── │                            │                       │
│                      │                            │                       │
│ 3. RPC call ──────────────────────────────────────> │                       │
│    (XID 透传)         │                            │                       │
│                      │                            │ 4. register local     │
│                      │ <────────────────────────── │   branch to XID       │
│                      │                            │                       │
│                      │ 5. RPC call ──────────────────────────────────────>  │
│                      │                            │                       │
│                      │ <────────────────────────────────────── 6. register │
│                      │                            │                       │
│ 7. commit / rollback │                            │                       │
│ ──────────────────> │                            │                       │
│                      │ 8. 调度所有分支 ──────────> │ ────────────────────> │
│                      │   提交/回滚                │                       │
│                      │ <────────────── 9. 上报结果 │ <──────────────────── │

流程详解

  1. TM 向 TC 申请开启一个全局事务,全局事务创建成功后,TC 会针对这个全局事务生成全局唯一的 XID
  2. XID 通过服务的调用链传递到其他服务
  3. RM 向 TC 注册一个分支事务,并将其纳入 XID 对应全局事务的管辖
  4. TM 根据 TC 收集的各个分支事务的执行结果,向 TC 发起全局事务提交或回滚决议
  5. TC 调度 XID 下管辖的所有分支事务完成提交或回滚操作

3.4 AT 模式(最常用)

特点:与 AT 模式相比,AT 模式可以应对大多数业务场景,且基本可以做到无业务入侵,开发人员能够有更多精力关注于业务逻辑开发。

核心机制

  1. 解析 SQL → 生成前镜像 + 后镜像 + 回滚日志(undo log)
  2. 提交本地事务,释放本地锁
  3. 申请全局锁(Seata Server 协调)
  4. 根据全局决议:
    • 提交:异步删除回滚日志
    • 回滚:用后镜像反向恢复为前镜像

使用示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Service
public class OrderService {
    
    @GlobalTransactional  // 关键注解
    public void createOrder(Order order) {
        // 1. 扣库存(远程调用)
        storageService.deduct(order.getProductId(), order.getCount());
        // 2. 创建订单
        orderMapper.insert(order);
        // 3. 扣账户余额(远程调用)
        accountService.debit(order.getUserId(), order.getMoney());
    }
}

业务代码只需加 @GlobalTransactional 注解,Seata 自动处理分布式事务

3.5 4 种模式对比

模式一致性性能业务侵入适用场景
AT最终通用(首选
TCC高性能 + 强一致
SAGA最终长事务 / 跨服务多步骤
XA数据库支持 XA

四、消息队列可靠性

4.1 延迟消息的 4 种实现

场景

  • 网上直播授课时,在课程开始前 15 分钟通知所有学生准备上课
  • 订单提交成功后1 小时内未支付,订单需要及时关闭并且释放对应商品的库存
  • 用户超过 15 天未登录时,给该用户发送召回推送
  • 工单提交后超过 24 小时未处理,向相关责任人发送催促处理的提醒

5 种实现方案

方案复杂度精度推荐
Redis 有序集合秒级短期延迟
RabbitMQ 插件 rabbitmq_delayed_message_exchange毫秒级短期延迟
ActiveMQ 内置毫秒级老项目
RocketMQ 内置毫秒级生产首选
定时任务 + 数据库扫描分钟级长延迟(> 小时)

4.2 RocketMQ 延迟消息实战

1
2
3
4
5
6
Message msg = new Message("OrderTopic", "orderTag", 
    JSON.toJSONBytes(order));
// 1 = 1s, 2 = 5s, 3 = 10s, 4 = 30s, 5 = 1m, 6 = 2m, 7 = 3m, 8 = 4m, ...
// 18 = 2h, ... 0 = 不延迟
msg.setDelayTimeLevel(3);
SendResult result = producer.send(msg);

4.3 定时任务 + 数据库(长延迟)

1
2
3
4
5
6
7
8
-- 订单表加字段
ALTER TABLE t_order ADD COLUMN close_status INT DEFAULT 0;
ALTER TABLE t_order ADD COLUMN expire_time DATETIME;

-- 每分钟扫描过期订单
SELECT id FROM t_order 
WHERE close_status = 0 AND expire_time < NOW() 
LIMIT 100;
1
2
3
4
5
6
7
@Scheduled(cron = "0 * * * * ?")
public void closeExpiredOrders() {
    List<Long> ids = orderMapper.selectExpiredIds(100);
    for (Long id : ids) {
        orderService.closeOrder(id);
    }
}

4.4 消息重复消费解决方案

问题根源:生产端为了保证消息发送成功,可能会重复推送(直到收到成功 ACK),会产生重复消息。

生产端:成熟的 MQ Server 框架一般会想办法解决,避免存储重复消息(比如空间换时间,存储已处理过的 message_id),给生产端提供一个幂等性的发送消息接口

消费端:靠业务幂等性保证。

幂等方案实现
唯一 ID + 数据库消费前查 message_id 是否已处理
Redis SETNXmessage_id 作为 key,重复消费被 Redis 拒绝
业务状态机订单状态从 已创建已支付,重复支付被业务拒绝
乐观锁 / version每次更新带 version 字段

消费端代码示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "order-consumer")
public class OrderConsumer implements RocketMQListener<Order> {
    
    @Override
    public void onMessage(Order order) {
        String msgId = order.getMsgId();
        // 幂等:检查是否已处理
        Boolean processed = redisTemplate.opsForValue()
            .setIfAbsent("msg:" + msgId, "1", 7, TimeUnit.DAYS);
        if (Boolean.FALSE.equals(processed)) {
            log.info("重复消息,跳过:{}", msgId);
            return;
        }
        
        // 执行业务
        orderService.process(order);
    }
}

五、4 大方案对比与组合

问题推荐方案备选
分布式锁RedissonZK 临时节点
接口幂等Redis SETNX + Lua + 业务状态数据库唯一索引
分布式事务Seata ATRocketMQ 事务消息
消息可靠性RocketMQ 事务消息 + 消费端幂等RabbitMQ confirm + 本地消息表

完整链路组合

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
用户下单
@CacheLock (Redis SETNX 防重提交)
@GlobalTransactional (Seata AT 分布式事务)
   ├── 1. 扣库存 RPC
   ├── 2. 创建订单 DB
   └── 3. 扣账户 RPC
RocketMQ 事务消息发到下游
   ├── 库存系统消费(幂等)
   ├── 物流系统消费(幂等)
   └── 延迟消息:30 分钟后检查未支付订单关闭

六、面试常问

Q:Redisson 分布式锁怎么续期? A:Redisson 默认开启 Watch Dog(看门狗)——加锁成功后,每 lockWatchdogTimeout / 3(默认 10s)自动续期到 30s。业务执行完调 unlock() 终止看门狗。如果业务执行超过 30s,看门狗会自动续期不会让锁过期

Q:Redis 主从切换时分布式锁会失效吗?怎么解决? A:会——主从复制是异步的,Master 写入锁后还没同步到 Slave 就宕机,Slave 升级为新 Master 但没有这个锁。解法:① Redisson RedLock(部署到多个独立 Master 节点);② 业务幂等兜底(更推荐)。

Q:Seata AT 模式的全局锁和本地锁有什么区别? A:

  • 本地锁SELECT ... FOR UPDATE):事务执行期间持有,保证本事务内不并发写
  • 全局锁(Seata Server 协调):事务提交前持有,保证其他全局事务不会并发修改同一行
  • AT 模式先释放本地锁,再申请全局锁——这是它性能高的关键(写隔离

Q:MQ 消息为什么一定会重复?怎么彻底解决? A:网络不可靠 → 生产端重试 → 重复消息。彻底解决只能靠消费端业务幂等,因为没有任何 MQ 能保证"恰好一次"投递。RocketMQ 事务消息能保证"至少一次 + 业务幂等"。

七、小结

  • Redisson 分布式锁:Java 生态首选,可重入 / 公平 / 读写锁 + Watch Dog 自动续期
  • Redis + Lua 接口幂等:注解 + AOP 一行代码搞定重复提交
  • Seata AT 模式:无业务侵入的分布式事务,4 种模式中最常用
  • MQ 可靠性:生产端事务消息 + 消费端幂等 = 至少一次 + 幂等 = 恰好一次(业务层)
使用 Hugo 构建
主题 StackJimmy 设计