likes
comments
collection
share

一文了解Java分布式锁

作者站长头像
站长
· 阅读数 3

本文主要介绍了Java中常用的分布式锁解决方案。

一文了解Java分布式锁

在介绍分布式锁之前,需要先弄清楚什么是分布式锁。分布式锁主要来解决部署在不同服务器上的共享资源被并发访问时的安全问题。在单机程序中,可以直接使用JDK自带的加锁方式synchronizedReentrantLock,但是在项目中一般不会只部署一台服务器,不同的服务器中的JVM是不能被共享的,因此JDK的自带加锁方式无法起到作用。

分布式锁保证了并发访问共享资源时,只有一个服务获取到锁,能够对资源进行修改。其它服务需要等待获取锁成功才能进行后续逻辑。

一文了解Java分布式锁

1、基于Mysql实现(基本不用)

1.1 悲观锁

操作数据库时,直接锁住整张表,当前线程执行完毕后释放锁。

select ... for update;

具体操作:可以建一张专门用于加锁的表,获取锁向表中插入数据,释放锁将记录删除即可。

1.2 乐观锁

增加一个version字段,每次更新修改,都会自增加一。

例如:为id是1的用户余额加10

select version,balance from account where user_id ='1';

进行更新时,where条件附带上版本号:

update account set balance = balance+10 ,version = version+1 where version 

= #{version} and user_id ='1';

如果更新失败,则循环上面两步。

2、基于Redis实现

2.1 SETNX

setnx 的含义就是 SET if Not Exists,该指定有两个参数key和value,当key不存在时才设置成功返回1,如key成功则设置失败返回0。

 setnx(key, value)

用于加锁中,就可以通过设置key的值是否存在来判断加锁是否成功。

  • 加锁

setnx("锁id","UUID值")

  • 解锁

del("锁id") 把锁id删除即可以解锁成功,当锁id没有被删除时,其它线程不能加锁成功

  • 锁超时

如果锁定的线程在执行任务时锁定,无法显式解锁,则此资源将被永久锁定,其他线程不应再进入。因此必须锁有个超时时间,可以通过expire(key,value)指令给指定的key设置过期时间。

问题一:setnx和expire的非原子性

setnx加锁成功后,expire还没来得及执行,redis节点就挂了,会造成key无过期时间。

redis2.6. 12或更高版本在set指令中添加了可选参数,伪代码如下:

set(key,1,30,NX ) 这样就代替了setnx指令,可以保证指令的原子性。

问题二:del锁误删除

一个极端场景,线程A成功锁定,设置的超时时间为10秒。如果线程A因为任何原因运行缓慢,并且10秒后仍无法运行,则锁定将过期并自动释放。

此时线程B得到了锁,随后,线程A执行完了任务,线程A接着执行del指令来释放锁。但这时候线程B还没执行完,线程A实际上删除的是线程B加的锁。

问题三:加锁的并发性问题

线程A和线程B同时获取锁,线程A首先获取到锁后,执行一段时间后锁到期了,此时线程A的业务还没处理完,锁就被线程B获取到。

需要添加一个机制,保证业务未处理完时,锁的时间可以得到更新,当业务处理完后自动释放锁。

2.2 Redisson

redisson的整体加锁流程如图所示,redisson通过引入watch dog机制,每隔一段时间去检查当前线程是否还持有锁,若持有锁则会刷新锁时间,可以弥补redis使用setnx这种加锁时间不灵活的缺陷。由于redisson的底层使用了netty框架,因此加锁和解锁主要介绍的是主要逻辑,最后的请求执行逻辑不详细展开。

一文了解Java分布式锁

Redisson加锁流程图

Redisson的坐标依赖



<dependency>

    <groupId>org.redisson</groupId>

    <artifactId>redisson-spring-boot-starter</artifactId>

    <version>3.15.6</version>

</dependency>

2.2.1 基本原理

  • 基本使用方法如下:


 @RestController

public class SellController {


  @Autowired
 private RedissonClient redissonClient;


    @GetMapping("/sell")

    public Object sellProduct() {

        RLock lock = redissonClient.getLock("lock::user");

        lock.lock();

        try {

            int count = DataUtils.getCount() - 1;

            if (count > 0) {

                DataUtils.setCount(count);

                System.out.println("redisson分布式锁:" + count);

            } else {

                System.out.println("库存不足......");

            }

        } finally {

            lock.unlock();

        }

        return "redis";

    }

}
  • 基本结构
RLock lock = redissonClient.getLock("lock::user");

lock.lock();

try {

    // doSometing();

} finally {

    lock.unlock();

}

先看一下redisson的顶级加锁接口定义,从图中我们可以看到RLock 继承了 JDK 源码 JUC 包下的 Lock 接口, 同时也继承了 RLockAsync。Lock 接口定义了加锁解锁的规范,RLockAsync从名字可以看出,加锁是可以异步完成的。

一文了解Java分布式锁

2.2.1.1 加锁流程

  • 加锁代码 lock.lock();加锁逻辑的流程如下图所示。

从下面的加锁流程可以看出,加锁的调用链是lock()->lock(-1,null,fasle)->tryAcquire(-1,leaseTime,unit,threadID)。

一文了解Java分布式锁

核心代码逻辑在RedissonLock#lock下,其中三个参数的作用如下:

  • leaseTime: 加锁到期时间, -1 使用默认值 30 秒
  • unit :时间单位, 毫秒、秒、分钟、小时...
  • interruptibly:是否可被中断标示

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {

        // 获取当前线程ID

        long threadId = Thread.currentThread().getId();

        // 尝试获取锁

        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);

        // 加锁成功,过期时间为空

 if (ttl == null) {

            return;

        }

        // 订阅分布式锁,解锁时会通知

        RFuture<RedissonLockEntry> future = subscribe(threadId);

        if (interruptibly) {

            commandExecutor.syncSubscriptionInterrupted(future);

        } else {

            commandExecutor.syncSubscription(future);

        }



        try {

           // 自旋获取锁

            while (true) {

                // 再次获取锁

                ttl = tryAcquire(-1, leaseTime, unit, threadId);

                // 加锁成功,过期时间为空

 if (ttl == null) {

                    break;

                }

                // 锁过期时间大于0 ,则进行带过期时间的阻塞获取锁

 if (ttl >= 0) {

                    try {

                        // 获取不到锁,阻塞 semaphore 解锁时释放信号量

                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

                    } catch (InterruptedException e) {

                        if (interruptibly) {

                            throw e;

                        }

                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

                    }

                    // 过期时间小于0,则死等。分可中断和不可中断

                } else {

                    if (interruptibly) {

                        future.getNow().getLatch().acquire();

                    } else {

                        future.getNow().getLatch().acquireUninterruptibly();

                    }

                }

            }

        } finally {

            // 最后解除锁的订阅

            unsubscribe(future, threadId);

        }

 //        get(lockAsync(leaseTime, unit));

 }

真正执行加锁逻辑在tryAcquireAsync方法中,其中通过异步获取锁的方式获取锁的剩余时间,核心代码如下,加锁成功后会开启一个定时任务来更新加锁时间



private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {

    RFuture<Long> ttlRemainingFuture;

    if (leaseTime != -1) {

        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);

    } else {

    // 尝试异步获取锁,获取成功则返回空,否则返回锁的剩余过期时间

        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,

                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);

    }

    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {

        if (e != null) {

            return;

        }



        // 获取到锁

 if (ttlRemaining == null) {

           

            if (leaseTime != -1) {

                internalLockLeaseTime = unit.toMillis(leaseTime);

            } else {

                // 这里使用定时任务刷新加锁时间

                scheduleExpirationRenewal(threadId);

            }

        }

    });

    return ttlRemainingFuture;

}

tryAcquire(-1,leaseTime,unit,threadID)方法的调用链流程如图所示。

一文了解Java分布式锁

底层请求redis加锁的逻辑是通过lua脚本实现,lua脚本加锁的逻辑如下:

一文了解Java分布式锁

lua脚本的流程如图:

一文了解Java分布式锁

其中部分参数含义如下:

  • KEYS[1] :表示设置加锁的key名称,上面的lock::user
  • ARGV[1]: 这个是过期时间, 自己测试的, 单位毫秒
  • ARGV[2]: UUID + 线程 ID

2.2.1.2 解锁流程

解锁的逻辑是调用lock.unlock();相对来说比较简单,跟进这个方法后,我们可以看到是通过异步解锁的方式解锁。

一文了解Java分布式锁

真正的解锁逻辑代码在unlockAsync(threadId)方法中,代码如下。解锁成功后,会取消自动续时,并返回解锁成功

一文了解Java分布式锁

底层执行解锁的逻辑是unlockInnerAsync(threadId)方法,代码如下,也是通过执行lua脚本的方式。

一文了解Java分布式锁

lua脚本解锁的流程如下图所示。

一文了解Java分布式锁

其中参数介绍如下:

  • KEYS[1]: 表示设置加锁的key名称,上面的lock::user
  • KEYS[2]:发布消息订阅的管道名称
  • ARGV[1]: 发布的消息内容
  • ARGV[2]: (锁过期时间)
  • ARGV[3]: 线程标识ID名称

2.2.2 存在问题

Redis主从异步复制造成的锁丢失,比如:主节点没来的及把刚刚set进来这条数据给从节点,就挂了。

暂时无法在文档外展示此内容

锁没同步到从节点,Redis从节点挂了。其它线程就可以加锁成功,这样就会对系统的一致性问题造成影响。这主要的原因在于,Redis是主节点加锁成功后直接返回给客户端,不会去管是否把消息同步到从节点。

2.3 RedLock

为了解决Redis主从异步复制导致的分布式锁不一致的问题,Redis引入了一个新的分布式锁算法RedLock。官网关于RedLock锁的介绍如下:

一文了解Java分布式锁

大致翻译:

在分布式算法中,假设我们有 N 个 Redis 主节点。 这些节点是完全独立的,因此我们不使用复制或任何其他隐式协调系统。 我们已经描述了如何在单个实例中安全地获取和释放锁。 我们理所当然地认为算法会在单个实例中使用这种方法来获取和释放锁。 在我们的示例中,我们设置了 N=5,这是一个合理的值,因此我们需要在不同的计算机或虚拟机上运行 5 个 Redis 主节点,以确保它们以几乎独立的方式失败。

为了获取锁,客户端执行以下操作:

1.它以毫秒为单位获取当前时间。

2.它尝试顺序获取所有 N 个实例中的锁,在所有实例中使用相同的键名和随机值。 在第 2 步中,当在每个实例中设置锁时,客户端使用一个比总锁自动释放时间更小的超时来获取它。 例如,如果自动释放时间为 10 秒,则超时可能在 ~ 5-50 毫秒范围内。 这可以防止客户端长时间保持阻塞状态,试图与已关闭的 Redis 节点通信:如果实例不可用,我们应该尽快尝试与下一个实例通信。

3.客户端通过从当前时间中减去步骤 1 中获得的时间戳来计算获取锁所用的时间。当且仅当客户端能够在大多数实例(至少 3 个)中获取锁,并且获取锁所用的总时间小于锁有效时间,则认为该锁被获取。

4.如果获得了锁,则其有效时间被认为是初始有效时间减去经过的时间,如步骤 3 中计算的那样。

5.如果客户端由于某种原因获取锁失败(或者它无法锁定 N/2+1 个实例或有效时间为负),它将尝试解锁所有实例(即使是它认为没有锁定的实例)能够锁定)。

整个算法的核心思想:

不能只在一个redis实例上创建锁,应该是在多个redis实例上创建锁,n / 2 + 1,必须在大多数redis节点上都成功创建锁,才能算这个整体的RedLock加锁成功。

在Redisson框架中已经实现了RedLock算法,代码实现起来也比较容易,主要是需要搭建N台完全独立的Redis节点。代码示例如下:

RLock lock1 = redisson1.getLock("lock1");

RLock lock2 = redisson2.getLock("lock2");

RLock lock3 = redisson3.getLock("lock3");

RLock redLock = anyRedisson.getRedLock(lock1, lock2, lock3);

 // traditional lock method

redLock.lock();

 // or acquire lock and automatically unlock it after 10 seconds

redLock.lock(10, TimeUnit.SECONDS);

 // or wait for lock aquisition up to 100 seconds

 // and automatically unlock it after 10 seconds

boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS);

if (res) {  

    try {   

        ...   

    } 

    finally {     

      redLock.unlock();  

    }

 }

如果在项目中需要使用RedLock来提高系统分布式锁的可靠性,一定要看一下下面两篇文章。是一个分布式架构师对于RedLock算法的质疑,以及Redis作者的回应。

3、基于zookeeper实现

Zookeeper可以解决Redis主从结构下的加锁存在不一致性问题,最主要的原因在于,zookeeper采用的是ZAB原子广播协议,必须等leader节点把加锁成功的消息同步给follower节点之后才会返回给客户端加锁成功,因此可以解决分布式锁不一致的问题。

3.1 基本原理

  • 加锁:在zookeeper下创建有序临时节点,检查当前节点是否是最小节点,若是则加锁成功。若不是则监控前一个节点,等前一个节点释放锁后再去获取锁。
  • 解锁:删除当前的有序临时节点。

暂时无法在文档外展示此内容

ZK原生代码实现:

  • DistributeLock 分布式锁实现


public class DistributeLock {



    String connectString = "172.28.109.47:2181";

    int sessionTimeout = 2000;

    ZooKeeper zk = null;



    String currentMode = null;



    private String waitPath;



    private CountDownLatch connectLatch = new CountDownLatch(1);

    private CountDownLatch awaitLatch = new CountDownLatch(1);





    public DistributeLock() throws IOException, InterruptedException, KeeperException {

        //连接上zk

 zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

            @Override

 public void process(WatchedEvent watchedEvent) {

                //zk正常连接服务器

 if (watchedEvent.getState()==Event.KeeperState.SyncConnected){

                    connectLatch.countDown();

                    System.out.println("zk connected...");

                }



                if (watchedEvent.getType()==Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){

                    awaitLatch.countDown();

                }

            }

        });



        //等待zk连接成功后执行下面的代码

 connectLatch.await();



        Stat stat = zk.exists("/locks", false);



        if (stat==null){ //节点为空,说明是头一次上锁,需要创建上锁节点

 String node = zk.create("/locks", "locks".getBytes(),

                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        }

    }



    public void lock() throws InterruptedException, KeeperException {



         currentMode  = zk.create("/locks/" +"seq-",null,

                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println("currentMode:"+currentMode);

         //获取locks下的所有节点,用来判断当前节点是否是最小节点,从而是否加锁

 List<String> list = zk.getChildren("/locks", false);



        if (list.size()==1){

            //加锁成功

 return;

        }else {



            Collections.sort(list);



            // 获取节点名称 seq-00000000

 String thisNode = currentMode.substring("/locks/".length());



            int index = list.indexOf(thisNode);



            if (index==-1){

                System.out.println("数据异常");

            }else if (index==0){

                return;

            }else {

                //需要监听上一个节点

 waitPath= "/locks/" + list.get(index - 1);

                System.out.println("waitPath:"+waitPath);

                zk.getChildren(waitPath,true);

                awaitLatch.await();

                return;

            }

        }

    }



    public void unlock() throws InterruptedException, KeeperException {

        zk.delete(currentMode,-1);

    }



}
  • 分布式锁测试


public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

    final DistributeLock lock1 = new DistributeLock();



    final DistributeLock lock2 = new DistributeLock();



    new Thread(new Runnable() {

        @Override

 public void run() {

            try {

                lock1.lock();

                System.out.println("线程1 启动,获取到锁");

                Thread.sleep(5 * 1000);



                lock1.unlock();

                System.out.println("线程1 释放锁");

            } catch (InterruptedException e) {

                e.printStackTrace();

            } catch (KeeperException e) {

                e.printStackTrace();

            }

        }

    }).start();



    new Thread(new Runnable() {

        @Override

 public void run() {



            try {

                lock2.lock();

                System.out.println("线程2 启动,获取到锁");

                Thread.sleep(5 * 1000);



                lock2.unlock();

                System.out.println("线程2 释放锁");

            } catch (InterruptedException e) {

                e.printStackTrace();

            } catch (KeeperException e) {

                e.printStackTrace();

            }

        }

    }).start();





}

3.2 Curator框架

  • 问题

Zookeeper原生客户端底层需要注意很对细节开发工作,包括连接重连、反复注册Watcher等,开发较为复杂,一般都会使用较为成熟的客户端框架,如Curator框架。

Curator对比zookeeper原生API

  • 原生API的超时重连,需要手动操作,而Curator封装了很多重连策略,自动重连
  • 原生API不支持递归创建节点,Curator可以递归创建节点
  • 是对原生API的进一步封装,功能更多,更容易使用
  • Curator 是Fluent的API风格

依赖引入,一般引入curator-recipes就可以使用。



 <!--    curator相关依赖-->

 <!--    1.对zookeeper的底层api的一些封装-->

 <dependency>

      <groupId>org.apache.curator</groupId>

      <artifactId>curator-framework</artifactId>

      <version>4.3.0</version>

    </dependency>

 <!--    2.封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等-->

 <dependency>

      <groupId>org.apache.curator</groupId>

      <artifactId>curator-recipes</artifactId>

      <version>4.3.0</version>

    </dependency>

 <!--    3.提供一些客户端的操作,例如重试策略等-->

 <dependency>

      <groupId>org.apache.curator</groupId>

      <artifactId>curator-client</artifactId>

      <version>4.3.0</version>

    </dependency>

测试



public class DistributeLock {



    public static void main(String[] args) {

        //consistency 一致性

 //available  可用性

 //partition tolerance 分区容错性



 // 创建分布式锁1

 InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");



        new Thread(new Runnable() {

            @Override

 public void run() {

                try {

                    lock1.acquire();

                    System.out.println("lock1 加锁");

                    lock1.acquire();

                    System.out.println("lock1 再加锁");

                    Thread.sleep(3000);

                    System.out.println("lock1 解锁");

                    lock1.release();

                    System.out.println("lock1 再解锁");

                    System.out.println("-------------");

                    lock1.release();

                } catch (Exception e) {

                    e.printStackTrace();

                }

            }

        }).start();



        new Thread(new Runnable() {

            @Override

 public void run() {

                try {

                    lock2.acquire();

                    System.out.println("lock2 加锁");

                    lock2.acquire();

                    System.out.println("lock2 再加锁");

                    Thread.sleep(3000);

                    System.out.println("lock2 解锁");

                    lock2.release();

                    System.out.println("lock2 再解锁");

                    lock2.release();

                } catch (Exception e) {

                    e.printStackTrace();

                }

            }

        }).start();





    }



    private static CuratorFramework getCuratorFramework() {



        //重试次数

 ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);



        //创建zk的curator客户端连接

 CuratorFramework client = CuratorFrameworkFactory.builder()

                                                        .connectString("172.21.29.35:2181")

                                                        .connectionTimeoutMs(2000)

                                                        .sessionTimeoutMs(20000)

                                                        .retryPolicy(policy).build();



        // 启动客户端

 client.start();



        System.out.println("zookeeper 启动成功");

        return client;

    }

}

使用Curator实现分布式锁,只需要连接上zookeeper后,使用InterProcessMutex类提供的API就可以完成加锁(acquire() )和解锁(release() )操作,流程变得非常简单,而且可以很容易的实现锁的重入。

4、使用场景分析

4.1 基于Mysql

优点

基于数据库的这种实现方式很简单

缺点

  1. 因为是基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能
  2. 不具备可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据
  3. 没有锁失效机制
  4. 不具备阻塞锁特性,获取不到锁直接返回失败,所以需要优化获取逻辑,循环多次去获取

数据库的方式实现分布式锁的方式基本不会在生产环境下使用。

4.2 基于Redis

推荐使用redisson的方式使用分布式锁,虽然在主从配置的场景下锁存在一定的概率失效,出现不一致的问题,但是这种概率很小。而且任何系统都不存在完全没有失败的概率,当机器存在问题后,可以人工干预的方式恢复。

RedLock一般也不推荐使用,毕竟也不是100%的安全,而且使用这个锁需要的Redis实例资源也是比较多的,如果有些场景需要非常强的一致性可以考虑使用zookeeper。

4.3 基于zookeeper

zookeeper实现的分布式锁具有较强的一致性,但是正因为这种一致性,导致了zookeeper的并发性能没有redis高。因此对于能够容忍系统出现一定概率失败的场景,优先推荐使用redisson。

redis和 zookeeper 比较

redis分布式锁和zk分布式锁的侧重点是不同的,这是redis和zk本身的定位决定的,redis分布式锁侧重高性能,zk分布式锁侧重高可靠性。所以一般项目中redis分布式锁和zk分布式锁的选择,是基于业务来决定的。如果你的业务需要保证加锁的可靠性,不能出错,那么zk分布式锁就比较符合你的要求;如果你的业务对于加锁的可靠性没有那么高的要求,那么redis分布式锁是个不错的选择。