likes
comments
collection
share

Redis 分布式锁 解决在多实例下数据并发问题 ~

作者站长头像
站长
· 阅读数 41

背景

分布式是发展的趋势,随着信息化高度发达,业务量也是在呈几何式的上升,我们都会面临的就是项目分布式化,但同样的分布式也会碰见很多令人很头疼的问题,分布式不仅需要我们横向的拆分我们的细分业务,还需要为了高可用和高并发,横向的扩容和复制分裂实例。我们公司最近去年新开的一个大型项目,基本架构如图:

Redis 分布式锁 解决在多实例下数据并发问题 ~

这就需要我们考虑的问题有几点:

  • 资源竞争问题: 多个节点需要同时访问共享资源,可能导致竞争条件和数据不一致性。
  • 重复操作问题: 由于网络延迟或其他原因,可能导致任务被重复执行。
  • 协同工作问题: 多个节点需要协同工作以完成某个任务,可能导致协调困难。
  • 任务调度问题: 在分布式任务调度系统中,同一任务可能被多个节点同时执行。
  • 分布式事务问题: 多个节点涉及到分布式事务时,可能导致事务的不一致性。

问题

具体问题

假如我们用户中心有一亿用户,然后现在为了服务的高可用,我们弹性扩容实例。现在有一个需求是需要给每个人按照创建时间生成一个顺序编号,并用编号去完成一些任务。

问题分析

基于我们的架构我们可以弹性扩容100台实例(别管成本),让一百台实例全部去生成编号,生成完成之后异步去完成任务。又有问题了:如何保证每个用户的编号的顺序呢?

解决方案

我们碰见问题就得认真思考,如何解决这个问题?

我先提出一个不成熟的方案。基于Mysql的自增主键来实现这个并发问题。我们可以在数据库新创建一张资源争抢表,id主键自增,有两个字段 一个自增id、 一个容器ip。然后所有实例while循环同时去往这张表新增数据(自己容器ip),新增完之后查询这张表全部数据,判断id = 1 的数据是否是自己的容器ip,如果是,就相当于拿到一把锁,去执行自己的任务,确定自己的任务之后将任务交给新开的线程,然后清空这张表并将主键的自增初始化为1(等于释放了这把锁)。然后重新进入抢id为1的大军中继续争夺资源。这样就实现了编号的顺序和性能。

Redis 分布式锁 解决在多实例下数据并发问题 ~

能解决吗?看起来可以,但是这恐怖的IO看着就头皮发麻,但是问题也最终是解决了。还有更好的方法吗?有的。

Redis 分布式锁

1、分布式锁的概念

分布式锁是用来控制分布式系统中互斥访问共享资源的一种手段(进程之间共享资源),避免多线程并行访问导致结果不可控。基本的实现原理和单进程锁是一致的,通过一个共享标识来确定唯一性,对共享标识进行修改时能够保证原子性和和对锁服务调用方的可见性。由于分布式环境需要考虑各种异常因素,为实现一个靠谱的分布式锁服务引入了一定的复杂度。

分布式锁一般需要能够保证以下几点:

  1. 同一时刻只能有一个线程持有锁
  2. 锁能够可重入
  3. 不会发生死锁
  4. 具备阻塞锁特性,且能够及时从阻塞状态被唤醒
  5. 锁服务保证高性能和高可用
  6. 锁数据本身的安全性

2、Redis 分布式锁 概念

Redis分布式锁是一种基于Redis的机制,用于在分布式系统中实现资源的互斥访问。它通过Redis的原子性操作和分布式特性,解决了在多个节点上同时进行的进程或线程之间的并发控制问题。常见的实现方式包括使用SETNX(SET if Not eXists)命令或者SET命令的带有EXNX选项的组合。

3、关键词

  • 锁的持有者: 获取锁的进程或线程。
  • 锁的标识: 用于标识锁的唯一键,通常是一个字符串。
  • 锁的超时时间: 锁的自动释放时间,防止锁被持有者一直不释放而导致死锁。

4、基本步骤

尝试获取锁:使用SETNX命令或者SET命令的NX选项,尝试在Redis中创建一个键,作为锁的标识。如果成功创建,表示获取了锁。

SET lock_key unique_identifier NX EX lock_timeout

锁的超时机制: 为了防止锁的持有者在异常情况下无法释放锁,通常会设置锁的超时时间,确保即使锁的持有者未能正常释放锁,锁也会在一定时间后自动释放。 释放锁: 当任务完成时,通过删除锁的标识来释放锁。

DEL lock_key

5、使用RedisTemplate实现分布式锁(简单实现)

import org.springframework.data.redis.core.RedisTemplate;
import java.util.concurrent.TimeUnit;

public class RedisDistributedLock {

    private RedisTemplate<String, String> redisTemplate;
    private String lockKey;
    private int expireTime; // 锁的过期时间,单位秒

    public RedisDistributedLock(RedisTemplate<String, String> redisTemplate, String lockKey, int expireTime) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey;
        this.expireTime = expireTime;
    }

    public boolean lock() {
        Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "locked", expireTime, TimeUnit.SECONDS);
        return result != null && result;
    }

    public void unlock() {
        redisTemplate.delete(lockKey);
    }
}

使用:

import org.springframework.data.redis.core.RedisTemplate;
import java.util.concurrent.TimeUnit;

public class MyService {

    private RedisTemplate<String, String> redisTemplate;
    private RedisDistributedLock lock;

    public MyService(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
        this.lock = new RedisDistributedLock(redisTemplate, "my_lock", 30);
    }

    public void myMethod() {
        try {
            if (lock.lock()) {
                // 执行需要加锁的操作
                System.out.println("Lock acquired. Do something...");
            } else {
                System.out.println("Failed to acquire lock. Another process holds the lock.");
            }
        } finally {
            lock.unlock();
        }
    }
}

在上述代码中,RedisDistributedLock类的构造函数接受一个RedisTemplate实例、锁的键名和过期时间。lock方法使用setIfAbsent方法来实现分布式锁,如果返回值是true,表示加锁成功,然后通过expire方法设置锁的过期时间。unlock方法使用delete方法释放锁。

6、使用Redis分布式锁的好处

  1. 互斥性保证: 分布式锁确保在同一时刻只有一个节点能够持有锁,从而避免了多个节点同时对共享资源进行修改的问题,保证了互斥性。
  2. 协同工作: 当多个节点需要协同工作时,分布式锁可以确保在同一时刻只有一个节点能够执行关键代码块,协调节点之间的工作流程,确保任务按照预期顺序执行。
  3. 避免竞争条件: 在分布式系统中,多个节点可能竞争对某个资源进行操作,分布式锁能够避免竞争条件,从而避免数据不一致性和逻辑错误。
  4. 防止重复执行: 分布式锁可以用于确保在给定时间内只有一个节点能够执行特定任务,防止由于网络延迟等原因导致的任务重复执行。
  5. 分布式事务协调: 在分布式系统中,多个节点涉及到分布式事务时,分布式锁可以协调节点之间的事务执行顺序,确保事务的一致性。
  6. 限流和控制并发: 分布式锁可以用于限制对某个资源或服务的并发访问,实现流量控制和资源管理,防止系统过载。
  7. 容错性和高可用性: 使用Redis分布式锁可以充分利用Redis的高可用性和持久性特性,提高系统的容错性。
  8. 自动释放锁: 通过设置锁的超时时间,即使锁的持有者因为异常情况无法主动释放锁,锁也能在一定时间后自动释放,避免死锁问题。

SpringBoot 如何使用 Redis 分布式锁

使用redis的watch命令进行实现

引入jar包

<dependency>
     <groupId>redis.clients</groupId>
     <artifactId>jedis</artifactId>
     <version>2.9.0</version>
 </dependency>

具体实现

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

import java.util.List;


public class RedisWatchTest extends  Thread {

    private String auctionCode;
    public RedisWatchTest
(String auctionCode) {
        super(auctionCode);
        this.auctionCode = auctionCode;
    }
    private static int bidPrice = 100;

    public static void main(String[] args) {
        System.out.println(Thread.currentThread().getName() + "主线程运行开始!");
        //更改key为a的值
        Jedis jedis=new Jedis("127.0.0.1",6379);
        jedis.set("goodsprice","0");
        System.out.println("输出初始化值:"+jedis.get("goodsprice"));
        jedis.close();
        RedisWatchTest thread1 = new RedisWatchTest("A001");
        RedisWatchTest thread2  = new RedisWatchTest("B001");
        thread1.start();
        thread2.start();
        try{
            thread1.join();
            thread2.join();
       }catch(InterruptedException e){
           e.printStackTrace();
       }
        System.out.println(Thread.currentThread().getName() + "主线程运行结束!");
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "线程运行开始 ");
        Jedis jedis=new Jedis("127.0.0.1",6379);
        try {
            if(Thread.currentThread().getName()=="B001"){
                sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //监视KEY
        jedis.watch("goodsprice");
        //A先进
        String v =  jedis.get("goodsprice");
        Integer iv = Integer.valueOf(v);
        //条件都给过
        if(bidPrice > iv){
            Transaction tx = jedis.multi();// 开启事务
            Integer bp = iv + 100;
            //出价成功,事务未提交
            tx.set("goodsprice",String.valueOf(bp));
            System.out.println("子线程" + auctionCode + "set成功");
            try {
                if(Thread.currentThread().getName()=="A001"){
                    sleep(2000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            List<Object> list = tx.exec();
            if (list == null ||list.size()==0) {
                System.out.println("子线程" + auctionCode + ",出价失败");
            }else{
                System.out.println("子线程"+this.auctionCode +", 出价:"+ jedis.get("goodsprice") +",出价时间:"
+ System.nanoTime());
            }
        }else{
            System.out.println("出价低于现有价格!");
        }
        jedis.close();
        System.out.println(Thread.currentThread().getName() + "线程运行结束");
    }

}

运行结果

main主线程运行开始!  
输出初始化值:0  
A001线程运行开始  
B001线程运行开始  
子线程B001set成功  
子线程A001set成功  
子线程A001,出价失败  
A001线程运行结束  
子线程B001, 出价:100,出价时间:63023805246506  
B001线程运行结束  
main主线程运行结束!

使用redis的setnx命令进行实现

具体代码

import redis.clients.jedis.Jedis;

import java.util.Collections;

public class RedisSetNXTest extends  Thread{

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    private String auctionCode;
    public RedisSetNXTest
            (String auctionCode) {
        super(auctionCode);
        this.auctionCode = auctionCode;
    }
    private static int bidPrice = 100;

    public static void main(String[] args) {
        System.out.println(Thread.currentThread().getName() + "主线程运行开始!");
        //更改key为a的值
        Jedis jedis=new Jedis("127.0.0.1",6379);
        jedis.set("goodsprice","0");
        System.out.println("输出初始化值:"+jedis.get("goodsprice"));
        jedis.close();
        RedisSetNXTest thread1 = new RedisSetNXTest("A001");
        RedisSetNXTest thread2  = new RedisSetNXTest("B001");
        thread1.start();
        thread2.start();
        try{
            thread1.join();
            thread2.join();
        }catch(InterruptedException e){
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "主线程运行结束!");
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "线程运行开始 ");
        Jedis jedis=new Jedis("127.0.0.1",6379);
        try {
            if(Thread.currentThread().getName()=="B001"){
                sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //让A先拿到锁
        boolean isOk=  tryGetDistributedLock(jedis, "goods_lock", Thread.currentThread().getName() , 10000);

        try {
            if(Thread.currentThread().getName()=="A001"){
                sleep(2000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        if(isOk) {
            System.out.println("子线程"+this.auctionCode +"拿到锁");
            String v =  jedis.get("goodsprice");
            Integer iv = Integer.valueOf(v);
            //条件都给过
            if(bidPrice > iv){

                Integer bp = iv + 100;
                //出价成功,事务未提交
                jedis.set("goodsprice",String.valueOf(bp));
                System.out.println("子线程"+this.auctionCode +", 出价:"+ jedis.get("goodsprice") +",出价时间:"
                        + System.nanoTime());

            }else{
                System.out.println("出价低于现有价格!");
            }
            boolean isOk1=  releaseDistributedLock(jedis, "goods_lock", Thread.currentThread().getName());
            if(isOk1){
                System.out.println("子线程"+this.auctionCode +"释放锁");
            }

        }else{

            System.out.println("子线程" + auctionCode + "未拿到锁");
        }
        jedis.close();
        System.out.println(Thread.currentThread().getName() + "线程运行结束");
    }
    /**
     * 尝试获取分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @param expireTime 超期时间
     * @return 是否获取成功
     */
    public boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;

    }

    private static final Long RELEASE_SUCCESS = 1L;

    /**
     * 释放分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @return 是否释放成功
     */
    public  boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections
                .singletonList(requestId));

        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;

    }
}

可以看到,我们加锁就一行代码:jedis.set(String key, String value, String nxxx, String expx, int time),这个set()方法一共有五个形参:

  • 第一个为key,我们使用key来当锁,因为key是唯一的。
  • 第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。
  • 第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
  • 第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。
  • 第五个为time,与第四个参数相呼应,代表key的过期时间。

总的来说,执行上面的set()方法就只会导致两种结果:1. 当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,同时value表示加锁的客户端。2. 已有锁存在,不做任何操作。

总结

一定要多思考,如果人永远待在舒适圈的话,人永远不会成长。共勉

觉得作者写的不错的,值得你们借鉴的话,就请点一个免费的赞吧!这个对我来说真的很重要。૮(˶ᵔ ᵕ ᵔ˶)ა