likes
comments
collection
share

Redis实现分布式锁

作者站长头像
站长
· 阅读数 17

Redis分布式锁

悲观锁、乐观锁、排它锁、什么 读写锁、只不过是一个名词、具体还得看他加锁的过程以及实现方式、如SynchronizedReentrantLock、都是一种在多线程中为了保证数据资源不被其他线程破坏。

但是这两种方式都是保证了单机情况下的多线程数据安全、如一个jvm中、这两种的加锁方式都不能在多台机器部署运行的情况下使用、如我们使用nginx配置负载均衡、分别在886/887端口进行请求的分配、这样的情况下SynchronizedReentrantLock都是只能在单个jvm中生效、所以我们可以采用Redis来实现加锁的过程以及方式。

Redis set...nx

例子

是通过set key value nx 互斥命令来实现的、如果key存在就插入失败、如果key不存在就插入成功、这样做的话第一次进入加锁成功后面其他线程在进的话就加锁失败了就直接返回请勿频繁操作、最后代码执行完成后我们在将key删除作为解锁。

@Slf4j
@RestController
public class LockController {
  @Resource
  private RedisTemplate<String, Object> redisTemplate;
  @RequestMapping("/lock1")
  public String lock1(@RequestParam("account") Long account) {
    String lockName = "sync_lock.user_" + account;
    Boolean isLock = redisTemplate.opsForValue().setIfAbsent(lockName, "lock");
    if (!isLock) {
      return "error";
    }
    try {
        System.out.println("正在执行业务代码");
        TimeUnit.SECONDS.sleep(5);
    } finally {
      redisTemplate.delete(lockName);
    }
    return "ok";
  }
}

抛出问题

1、忘记删除锁怎么办?

忘记删除锁会导致死锁、导致后续请求永远不会执行该请求中的方法。

解决方法在加锁的时候设置过期时间、即使不解锁到达指定时间也会删除锁。

redisTemplate.opsForValue().setIfAbsent(lockName, "lock", 10, TimeUnit.SECONDS);

2、锁的过期时间设置较短、锁到期被释放会怎么样?

锁过期时间设置为10秒、假设用户1有多个线程抢占执行、线程1拿到锁执行业务时间比较久、锁在10后超时释放了但是业务还在执行、这时候线程2来获取锁肯定是能获取到锁(因为上一把锁被超时释放了)、这时候线程1执行完成后执行redisTemplate.delete(lockName)导致把线程2的锁也删掉了、这时候又来一个线程3也能获取到锁、由此可以看出设置超时时间较短会出现误删的情况。

解决方法:在加锁的时候设置锁的value值、最后解锁的时候先获取锁的内容和当前请求中生成设置的value值和redis中锁的内容比较是否一样、一样就将锁删除掉。

String value = (String)redisTemplate.opsForValue().get(lockName);
if (lockName.equals(value)) {
    redisTemplate.delete(lockName);
}

2.1 极端情况

这个如果出现极端情况还是不能保证解锁的原子性、还是会出现误删的情况、在比较是当前这个线程1的value值判断成立时、准备要删除key的时候、结果堵塞了导致删不了、直到锁到达了过期时间、导致另一个线程2拿到锁也去执行该业务、突然线程1堵塞突然通了、导致删除了线程2的锁、又来一个线程3时锁已经被线程1给删除了、结果线程3也在执行该业务、这样还是会导致误删其他的线程的锁。

解决方法:就是让两个步骤合并成一个、使用Lua脚本、达到判断和删除的一个原子性。

redisTemplate.execute(new DefaultRedisScript<>(
  "if(ARGV[1] == redis.call('get',KEYS[1])) then\n" +
  "    return redis.call('del',KEYS[1])\n" +
  "end", Long.class), Collections.singletonList(lockName), lockName);

最终版实现代码

@RequestMapping("/lock1")
public String lock1(@RequestParam("account") Long account) throws InterruptedException {
  String lockName = "sync_lock.user_" + account;
  String lockValue = UUID.randomUUID().toString();
  Boolean isLock = redisTemplate.opsForValue().setIfAbsent(lockName, lockValue, 10, TimeUnit.SECONDS);
  if (!isLock) {
    return "error";
  }
  try {
    System.out.println("正在执行业务代码");
    TimeUnit.SECONDS.sleep(5);
  } finally {
    redisTemplate.execute(new DefaultRedisScript(
      "if(ARGV[1] == redis.call('get',KEYS[1])) then\n" +
      "    return redis.call('del',KEYS[1])\n" +
      "end", Long.class), Collections.singletonList(lockName), lockValue);
  }
  return "ok";
}

加锁过程

Redis实现分布式锁

set...nx小结

以上还是能满足大部分加锁的需求、但是要可重入锁达不到、以及锁重试机制等......这种需求还是满足不了、可以使用Redisson中封装的加锁来实现。

Redisson

Redisson是在redis基础实现的一个分布式工具的集合、其中就包括分布式锁。 文档地址

基础使用

pom依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.17.7</version>
</dependency>

Redisson客户端

@Bean
public RedissonClient redissonClient(){
  // 配置redis信息
  Config config = new Config();
  config.useSingleServer().setAddress("redis://127.0.0.1:6379");
  // 创建 RedissonClient 对象
  return Redisson.create(config);
}

简单使用例子

@Autowired
private RedissonClient redissonClient;

@RequestMapping("/lock3")
public String lock3(@RequestParam("account") Long account) throws InterruptedException {
  RLock lock = redissonClient.getLock("sync_lock_" + account);
  if (!lock.tryLock()) {
    return "error";
  }
  try {
    System.out.println("正在执行业务代码");
    TimeUnit.SECONDS.sleep(3);
  } finally {
    lock.unlock();
  }
  return "ok";
}

使用Redisson来做为加锁也是一个不错的选择、可以弥补redis set...nx这种方式加锁的不足、比如解决了锁可重入锁重试机制更新key有效期

小结

Redisson只写了一个简单的加锁例子、底层源码还是需要花时间看它的其他功能是如何实现再去总结一遍、而使用redis set...nx这种加锁只是强制性加锁、加锁成功就是成功、加锁失败就是失败、没有考虑什么重入机制、重试机制、key定时更新有效期........等一些功能的实现。