likes
comments
collection
share

Zookeeper分布式锁

作者站长头像
站长
· 阅读数 30

Zookeeper分布式锁

Zookeeper分布式锁原理

  • 核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点
  1. 客户端获取锁时,在lock节点下创建临时顺序节点。
  2. 然后获取lock下面所有的子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
  3. 如果发现自己创建的节点并非lock所有节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册监听器,监听删除事件。
  4. 如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点 是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。

Curator实现分布式锁API

在Curator中有五种锁方案:

  • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
  • InterProcessMutex:分布式可重入排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2:共享信号量

InterProcessMutex为例进行演示:

InterProcessLock lock = new InterProcessMutex(client,"/lock");
try {
    //尝试获取锁
    if (lock.acquire(5, TimeUnit.SECONDS)) {
        System.out.println("获取锁成功,执行。。。");
    }
} catch (Exception e) {
    throw new RuntimeException(e);
} finally {
    //释放锁
    try {
        lock.release();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

Curator分布式锁具体实现

获取锁

以上述演示代码为例

进入acquire方法:

//不带超时时间参数
@Override
public void acquire() throws Exception
{
    if ( !internalLock(-1, null) )
    {
        throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    }
}

//带超时时间参数
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception
{
    return internalLock(time, unit);
}

acquire方法内调用了internalLock方法,如果传入的参数是-1,则会一直阻塞直到加锁成功。

进入internalLock方法:

private boolean internalLock(long time, TimeUnit unit) throws Exception
{
    /*
       Note on concurrency: a given lockData instance
       can be only acted on by a single thread so locking isn't necessary
    */
    
    Thread currentThread = Thread.currentThread();
    
    //获取当前线程的lockData
    LockData lockData = threadData.get(currentThread);
    if ( lockData != null )
    {
        // 重入锁
        lockData.lockCount.incrementAndGet();
        return true;
    }

    //尝试获取锁
    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }

    return false;
}

先获取当前线程,然后从threadData中获取当前线程对应的lockData,如果lockData为空,表示第一次获取锁,则执行attemptLock方法尝试获取。

进入attemptLock方法:

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
    final long      startMillis = System.currentTimeMillis();
    final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
    final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
    int             retryCount = 0;

    String          ourPath = null;
    boolean         hasTheLock = false;
    boolean         isDone = false;
    while ( !isDone )
    {
        isDone = true;

        try
        {
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // gets thrown by StandardLockInternalsDriver when it can't find the lock node
            // this can happen when the session expires, etc. So, if the retry allows, just try it all again
            if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
            {
                isDone = false;
            }
            else
            {
                throw e;
            }
        }
    }

    if ( hasTheLock )
    {
        return ourPath;
    }

    return null;
}
  1. 首先通过driver.createsTheLock方法创建节点:

    @Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        if ( lockNodeBytes != null )
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        }
        else
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }
    

    从createsTheLock方法中可以看到,创建的节点类型为CreateMode.EPHEMERAL_SEQUENTIAL,即临时顺序节点。

  2. 创建完节点后,执行internalLockLoop方法来获取锁

    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }
    
            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
    
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                //获取锁
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
    
                    synchronized(this)
                    {
                        try
                        {
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }
    
                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e )
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }
    

    通过getSortedChildren方法,获取所有子节点,并按照序号排序;

    通过driver.getsTheLock方法判断当前节点是否可以获取锁,返回PredicateResults对象,该对象保存是否加锁成功的信息。

    如果加锁成功,则创建一个LockData对象,保存到threadData中,key为当前线程对象。

实现可重入获取锁

回到internalLock方法,如果判断lockData不为空,则表示当前线程已拥有该锁,此时将lockData中保存的lockCount值加1,表示再次获取到锁;

LockData定义:

private static class LockData
{
    final Thread owningThread;
    final String lockPath;
    final AtomicInteger lockCount = new AtomicInteger(1);

    private LockData(Thread owningThread, String lockPath)
    {
        this.owningThread = owningThread;
        this.lockPath = lockPath;
    }
}

该类封装了获取锁的线程,锁的节点路径,获取锁的次数;

加锁失败后阻塞等待获取锁

回到internalLockLoop方法,如果predicateResults.getsTheLock()返回false,表示获取锁失败。此时,对已获取锁的节点设置监听器,监听节点删除事件。

client.getData().usingWatcher(watcher).forPath(previousSequencePath);

调用Object.wait方法,进入线程等待,如果有超时时间,在等待时间结束后,再次执行while循环,尝试获取锁,如果仍然获取失败,则退出并删除节点。

在等待过程中,如果监听器监听到节点删除,则回调process方法,唤醒该线程继续执行while循环,尝试获取。

监听器watcher定义:

private final Watcher watcher = new Watcher()
{
    @Override
    public void process(WatchedEvent event)
    {
        client.postSafeNotify(LockInternals.this);
    }
};

default CompletableFuture<Void> postSafeNotify(Object monitorHolder)
{
    return runSafe(() -> {
        synchronized(monitorHolder) {
            monitorHolder.notifyAll();
        }
    });
}

执行锁释放

执行release方法:

public void release() throws Exception
{
    /*
        Note on concurrency: a given lockData instance
        can be only acted on by a single thread so locking isn't necessary
     */

    Thread currentThread = Thread.currentThread();
    LockData lockData = threadData.get(currentThread);
    if ( lockData == null )
    {
        throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
    }

    int newLockCount = lockData.lockCount.decrementAndGet();
    if ( newLockCount > 0 )
    {
        return;
    }
    if ( newLockCount < 0 )
    {
        throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
    }
    try
    {
        internals.releaseLock(lockData.lockPath);
    }
    finally
    {
        threadData.remove(currentThread);
    }
}

拿到当前线程对应的lockData对象,如果为空,表示根本没有获取过锁;

如果锁的获取次数减1(表示释放一次)后仍大于0,表示没有释放完成;

如果锁的获取次数减1后小于0,抛出异常;

如果锁的获取次数减1后等于0,表示已经完全释放,此时执行releaseLock方法,删除之前创建的临时有序节点(此时会触发节点删除事件,通知所有监听该节点的监听器执行回调方法)。

final void releaseLock(String lockPath) throws Exception
{
    client.removeWatchers();
    revocable.set(null);
    deleteOurPath(lockPath);
}

最后从threadData中移除当前线程的lockData。

总结

  1. 从上述代码可以看到,基于临时有序节点获取锁的方式实现为公平锁;
  2. Curator的可重入锁是在本地进行管理的,没有保存在Zookeeper服务器进行管理;
转载自:https://juejin.cn/post/7236694100215447611
评论
请登录