likes
comments
collection
share

ZooKeeper系列:实现分布式锁

作者站长头像
站长
· 阅读数 25

本文正在参加「金石计划 . 瓜分6万现金大奖」

锁是为了在多线程的场景中保证数据安全而增加的一种手段,Java中常用的有CountdownLatch,ReentrantLock等单应用中的锁,在现在处处都是分布式的场景需求下就不能满足了,所以就出现了分布式锁。

不同的物理节点有各自的线程,但是他们会访问同一个资源,但是不允许同一时刻访问,所以就有了分布式锁

例如

我们可以通过数据库编写sql来实现分布式锁,但是这种在高并发下性能会出问题,

还有常用的redis实现分布式锁,这个是我们用的最多的一种高性能高并发的实现方式。

今天介绍的一种是通过中间件zookeeper实现分布式锁,也是支持高性能高并发的。

想想实现一个锁想到哪些关键点 ?

争抢锁:只有一个人可以获取锁

获得锁的节点挂了,临时节点 会自动释放

获得锁的人,可以主动释放锁

锁被释放,删除 其他人怎么知道

主动轮训,监听心跳:存在延迟,节点多的情况话压力很大。

根据zk的节点看看是否满足锁

创建持久化节点

zk是创建节点保存数据的,相同节点只允许创建一次,所以我们可以通过成功创建节点实现获取锁的情况。

关键代码

zk.create("/lock",  threadId.getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

结果:

ZooKeeper系列:实现分布式锁

如图只有一个线程获取锁,其他线程都出现异常:NodeExists for /lock ,可以保证同一时刻只有一个线程获取锁。然后获得锁的线程逻辑执行结束后应该删除锁。

存在的问题:如果线程崩溃了,锁就无法释放了,最终导致死锁

持久化节点不行,持久化顺序节点自然也不行了

创建临时节点

临时节点:在客户端断开连接的时候就会自动删除

关键代码:

zk.create("/lock",  threadId.getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

CreateMode.EPHEMERAL意思就是创建临时节点 ,也就是当线程崩溃,无法主动释放锁的时候,会自动删除,避免死锁。

但是

还存在的问题:没有获取锁的线程会出现错误,则需要不断重试,通过死循环直到获取锁。

临时节点+watch

watch:设置监听回调,当监听的节点或其子节点有变更,则会通知客户端,可参考上一篇

关键代码

zk.create("/lock",  threadId.getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
zk.getChildren("/testLock",true, this,"ajisun");

getChildren 获取父节点/testlock下的子节点并设置监听。当子节点/lock被删除 就会触发回调,再次创建节点。

通过watch 避免使用死循环设置堵塞,看似还不错哦。

但是

还还存在问题:所有客户端都去监听同一个父节点,当锁释放的时候,也会通知所有的客户端,带来的压力还是很大。

创建临时顺序节点+watch

临时顺序节点:每个线程都会创建一个临时且有序的节点,互相不冲突

代码如下,十个线程模拟十个客户端

public static void main(String[] args) throws KeeperException, InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/testLock", 3000, new DefaultWatch().setCountDownLatch(countDownLatch));
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        CountDownLatch countDown = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            ZooKeeper finalZk = zk;
            new Thread() {
                @Override
                public void run() {
                    String threadId = Thread.currentThread().getId() + "";
                    try {
                        finalZk.create("/lock", threadId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    countDown.countDown();
                }
            }.start();
        }
        try {
            countDown.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        List<String> list = zk.getChildren("/", false);
        list.forEach(s -> System.out.println(s));
    }

CreateMode.EPHEMERAL_SEQUENTIAL意思就是创建临时顺序节点。

输出节点如下

lock0000000110
lock0000000114
lock0000000113
lock0000000112
lock0000000111
lock0000000107
lock0000000106
lock0000000105
lock0000000109
lock0000000108

每次只有一个客户端可以加锁成功,如果同时有100个客户端,当其中一个释放锁后,通知剩下99个客户端,然后99个客户端同时抢锁,其实只有一个会成功,剩下的98个只是陪跑的,做无用功,白白浪费系统资源。

既然每次只有一个会加锁成功,当一个客户端释放锁的时候,只通知一个客户端不就可以了吗。

怎么做到呢?

就是用到临时顺序节点这个特点,不在监听父节点,而是监听前一个节点。

首先创建节点成功后,获取父节点下的所有子节点,因为各个节点是有顺序,可以按照从小到大的顺序排列后,然后判断自己的节点是不是最小的,如果是则获取锁,不是则监听前一个节点。

关键代码如下

public class WatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
    ZooKeeper zk;
    String threadId;
    CountDownLatch cc = new CountDownLatch(1);
    String pathName;
  
  // set/get省略
    public void tryLock() {
        try {
            //   创建临时有序的锁
            zk.create("/lock", threadId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "abc");
            cc.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public void unLock() {
        try {
            zk.delete(pathName, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
​
    @Override
    public void process(WatchedEvent event) {
        // 如果第一个锁释放了,只有第二个收到回调事件
        // 如果是其他的挂了,对应的后一个也能收到通知
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                zk.getChildren("/", false, this, "bcd");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
        }
    }
    // create call back
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if (name != null) {
            System.out.println(threadId + "create node:" + name);
            pathName = name;
            // 获取所有创建的目录,即参与锁争夺的线程
            zk.getChildren("/", false, this, "bcd");
        }
    }
​
    /**
     * getChildren call back
     * pathName= /lock00000000003
     * children=[lock0000000002,lock0000000008,lock0000000005,lock0000000003]
     * 进入这个回调之后 说明已经创建节点成功,能够看的已经创建的所有节点
     */
​
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
​
        Collections.sort(children);
        // 所在位置
        int i = children.indexOf(pathName.substring(1));
​
        // 判断是不是第一个
        if (i == 0) {
            System.out.println(threadId + " first");
            cc.countDown();
        } else {
            // 不是第一个则监控前一个是否存在,如果前一个删除了需要回调我这个session
            zk.exists("/" + children.get(i - 1), this, this, "xyz");
        }
    }
​
    /**
     * @param rc
     * @param path
     * @param ctx
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
​
    }
}

调用方:

    public void lock() {
        for (int i = 0; i < 10; i++) {
            new Thread() {
                @Override
                public void run() {
                    WatchCallBack watchCallBack = new WatchCallBack();
                    watchCallBack.setZk(zk);
                    String threadId = Thread.currentThread().getId() + "";
                    watchCallBack.setThreadId(threadId);
                    // 抢锁
                    watchCallBack.tryLock();
                    //干活
                    System.out.println(threadId + " working......");
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //释放锁
                    watchCallBack.unLock();
                }
            }.start();
        }
    }

总结总结

zk是通过临时节点,避免死锁问题(session消失,节点消失,锁释放)

通过顺序节点,实现阻塞功能(临时顺序的节点数据)。

通过watch,watch前一个节点,最小的获得锁,一旦最小的锁释放,zk只会给下一个节点回调。避免了抢锁带来的不必要的损耗和压力。

我是纪先生,用输出倒逼输入而持续学习,持续分享技术系列文章,以及全网值得收藏好文,欢迎关注公众号,做一个持续成长的技术人。

转载自:https://juejin.cn/post/7163822115936796702
评论
请登录