Go-Redsync包底层源码实现
背景
redsync包
是一个基于 Redis 实现的分布式锁的 Go 语言库。它基于Redlock算法
可以在多个Redis节点上实现一个具有容错性的分布式锁。
包内实现了,重试机制(获取锁),支持多个reids节点,分布式锁,续租等功能。
Redlock (红锁)算法:是在多个Redis节点之间实现锁机制。它采用主节点过半机制,即获取锁或释放锁成功的标志是在过半的节点上操作成功。例如,如果有5个Redis主节点,那么至少需要在3个节点上成功获取锁才算成功。这种机制有效地解决了单点失败的问题,并提高了系统的稳定性和可靠性,详情可看cloud.tencent.com。
使用
下面是初始化,加锁,释放锁使用例子:
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v8"
"github.com/go-redis/redis/v8"
)
func main() {
// 创建 Redis 客户端连接
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis 服务地址
DB: 0, // 使用默认的数据库编号
})
// 使用 Redsync 创建一个分布式锁的实例
pool := goredis.NewPool(redisClient) // 创建连接池
rs := redsync.New(pool)
// 定义锁的名称
lockName := "my_distributed_lock"
// 创建一个锁对象
lock := rs.NewMutex(lockName, redsync.WithTTL(10*time.Second))
// 尝试获取锁
if err := lock.Lock(); err != nil {
log.Fatal(err)
}
fmt.Println("Lock acquired")
// 执行业务逻辑
// ...
// 模拟业务逻辑处理时间
time.Sleep(5 * time.Second)
// 完成业务逻辑后释放锁
if err := lock.Unlock(); err != nil {
log.Fatal(err)
}
fmt.Println("Lock released")
}
看看它的源码实现:
实现
分布式锁的实现原理是采用redis的setnx +过期时间
来实现的;
运用Redlock算法
来解决多个redis,获取释放锁成功是否的判断依据;
初始化
通过Mutex
结构来构建实现流程的参数配置,下面可以看见Mutex
配置的含义,可以看到锁的过期时间默认为8秒
func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
m := &Mutex{
name: name,
expiry: 8 * time.Second,
tries: 32,
delayFunc: func(tries int) time.Duration {
return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
},
genValueFunc: genValue,
driftFactor: 0.01,
timeoutFactor: 0.05,
quorum: len(r.pools)/2 + 1,
pools: r.pools,
}
for _, o := range options {
o.Apply(m)
}
if m.shuffle {
randomPools(m.pools)
}
return m
}
type Mutex struct {
// name 是锁的名称,用于标识不同的锁实例。
name string
// expiry 是锁的过期时间,即锁在被自动释放之前的有效时间。
expiry time.Duration
// tries 是尝试获取锁的次数,用于重试机制。
tries int
// delayFunc 是延迟函数,用于在尝试获取锁之间等待一段时间。
delayFunc DelayFunc
// driftFactor 是时钟漂移因子,用于计算锁的最小有效时间。
driftFactor float64
// timeoutFactor 是超时因子,用于计算在尝试获取锁时的最大等待时间。
timeoutFactor float64
// quorum 是最小锁数量,即在多个 Redis 实例中需要成功获取锁的最小数量。
quorum int
// genValueFunc 是生成值的函数,用于生成唯一的锁值。
//每一次加锁都会生成一个随机数,加锁成功会附值到value上,
//当释放锁的时候会对值进行对比,防止误删的情况
genValueFunc func() (string, error)
//每次加锁成功的唯一值
value string
// until 是锁的有效期截止时间。
until time.Time
// shuffle 表示是否在尝试获取锁之前对 Redis 实例进行随机排序。
shuffle bool
// failFast 表示是否在第一次获取锁失败时就立即返回错误。
failFast bool
// setNXOnExtend 表示在扩展锁的过期时间时是否使用 SETNX 命令。
setNXOnExtend bool
// pools 是 Redis 连接池的切片,用于存储与多个 Redis 实例的连接。
pools []redis.Pool
}
加锁
通过下面TryLock()
方法去尝试1次加锁或通过Lock()
方法去使用配置的重试次数
去加锁:
// TryLock only attempts to lock m once and returns immediately regardless of success or failure without retrying.
func (m *Mutex) TryLock() error {
return m.TryLockContext(context.Background())
}
// TryLockContext only attempts to lock m once and returns immediately regardless of success or failure without retrying.
func (m *Mutex) TryLockContext(ctx context.Context) error {
return m.lockContext(ctx, 1)
}
// Lock locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
func (m *Mutex) Lock() error {
return m.LockContext(context.Background())
}
// LockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
func (m *Mutex) LockContext(ctx context.Context) error {
return m.lockContext(ctx, m.tries)
不管是那种方法,核心的方法流程是在lockContext
方法里处理:
// lockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
func (m *Mutex) lockContext(ctx context.Context, tries int) error {
if ctx == nil {
ctx = context.Background()
}
value, err := m.genValueFunc()
if err != nil {
return err
}
var timer *time.Timer
for i := 0; i < tries; i++ {
if i != 0 {
if timer == nil {
timer = time.NewTimer(m.delayFunc(i))
} else {
timer.Reset(m.delayFunc(i))
}
//防止要防止频繁重试,让重试拥有一个延期时间
select {
case <-ctx.Done():
timer.Stop()
// Exit early if the context is done.
return ErrFailed
case <-timer.C:
// Fall-through when the delay timer completes.
}
}
start := time.Now()
//多个reids进行加锁
n, err := func() (int, error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
defer cancel()
return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.acquire(ctx, pool, value)
})
}()
//判断加锁成功节点是否大于一半节点数并且是否超过锁的过期时间
now := time.Now()
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
if n >= m.quorum && now.Before(until) {
m.value = value
m.until = until
return nil
}
//加锁失败,需要把每个redis中加锁成功的节点释放锁
_, _ = func() (int, error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
defer cancel()
return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.release(ctx, pool, value)
})
}()
if i == m.tries-1 && err != nil {
return err
}
}
return ErrFailed
}
实现流程:
- 检查上下文:首先检查传入的
context.Context
是否为nil
,如果是,则使用一个空的背景上下文context.Background()
。 - 生成锁的值:调用
m.genValueFunc()
生成一个唯一的锁值,这个值用于在多个 Redis 实例上设置锁。 - 重试循环:进入一个循环,尝试
tries
次获取锁。如果在第一次之外的重试,会根据重试次数i
使用m.delayFunc(i)
计算延迟时间,并等待这个延迟时间。 - 上下文超时检查:在每次重试之前,检查上下文是否已取消(
ctx.Done()
),如果是,则停止计时器并返回错误。 - 执行锁操作:调用一个匿名函数,该函数在超时上下文中尝试在所有 Redis 实例上异步执行锁操作。这个操作是通过
m.actOnPoolsAsync()
方法实现的,它内部会调用m.acquire()
方法尝试获取锁。 - 计算锁的有效期:如果成功在大多数 Redis 实例上获取到锁,并且从开始尝试获取锁到当前时间的时间差加上时钟漂移因子计算出的有效期还没过期,则认为获取锁成功。
- 设置锁的值和有效期:如果锁获取成功,设置
Mutex
结构体的value
和until
字段。 - 释放锁:如果在尝试过程中锁获取失败,会调用
m.release()
方法尝试释放所有 Redis 实例上的锁。 - 错误处理:如果在最后一次尝试后仍然失败,并且发生了错误,则返回这个错误。
在第5步执行锁的操作代码流程:
func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {
type result struct {
node int
statusOK bool
err error
}
ch := make(chan result, len(m.pools))
for node, pool := range m.pools {
go func(node int, pool redis.Pool) {
r := result{node: node}
//加锁操作
r.statusOK, r.err = actFn(pool)
ch <- r
}(node, pool)
}
var (
n = 0
taken []int
err error
)
for range m.pools {
r := <-ch
if r.statusOK {
n++
} else if r.err == ErrLockAlreadyExpired {
err = multierror.Append(err, ErrLockAlreadyExpired)
} else if r.err != nil {
err = multierror.Append(err, &RedisError{Node: r.node, Err: r.err})
} else {
taken = append(taken, r.node)
err = multierror.Append(err, &ErrNodeTaken{Node: r.node})
}
if m.failFast {
// fast retrun
if n >= m.quorum {
return n, err
}
// fail fast
if len(taken) >= m.quorum {
return n, &ErrTaken{Nodes: taken}
}
}
}
if len(taken) >= m.quorum {
return n, &ErrTaken{Nodes: taken}
}
return n, err
}
它会去多个redis节加锁,加锁成功n++
,失败叠加错误信息
这个acquire(xxx)
方法,我们就熟悉了,使用SetNX
命令去redis设置值,加了过期时间,防止死锁。
func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
reply, err := conn.SetNX(m.name, value, m.expiry)
if err != nil {
return false, err
}
return reply, nil
}
释放锁
释放锁是跟上面加锁中流程中的释放锁调用的是同一个方法actOnPoolsAsync
,方法的内容如下:
func (m *Mutex) UnlockContext(ctx context.Context) (bool, error) {
n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.release(ctx, pool, m.value)
})
if n < m.quorum {
return false, err
}
return true, nil
}
我们来看看release
是怎么操作redis的?
func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
status, err := conn.Eval(deleteScript, m.name, value)
if err != nil {
return false, err
}
if status == int64(-1) {
return false, ErrLockAlreadyExpired
}
return status != int64(0), nil
}
var deleteScript = redis.NewScript(1, `
local val = redis.call("GET", KEYS[1])
if val == ARGV[1] then
return redis.call("DEL", KEYS[1])
elseif val == false then
return -1
else
return 0
end
`)
它是去执行的一个lua命令,它会去获取key对应的值,并判断值是否跟当前值相等,相等的话就删除key
当通过key的value值的判断来确保误删的情况,那什么时候会出现误删的情况呢?如图
当进程a加锁成功后,执行的业务代码过久,在a释放锁之前,锁就过期了,如果这个时候进程b去加锁,是会加锁成功的,如果a再去释放锁就会错误的释放掉b加的锁,出现了误删的情况。 为了防止误删的情况,会在删除锁之前进行一个value值的判断,确保每次加锁释放锁成功都是一个进程的操作。
续租锁
当我们不能预估业务执行的完成时间,那这个时候我们设置锁的超时时间就不好预估,这个时候,续租锁就可以解决这个问题。
封装续约方法,当我们在获取到锁后,就异步执行我们的业务代码,并且定时去执行redsync包提供的ExtendContext
方法进行续约
//LockWithTTL 锁的key ttl 过期时间 handle:业务函数
func LockWithTTL(ctx context.Context, key string, ttl time.Duration,
handle func(ctx context.Context)) error {
if ttl < 500*time.Millisecond {
return errors.New("ttl time less than 500 Millsecond")
}
if Redis == nil {
return errors.New("please init redis client")
}
//可以初始化多个redis
locker := redsync.New(goredis.NewPool(Redis))
mutex := locker.NewMutex(key, redsync.WithTries(1), redsync.WithExpiry(ttl))
if err := mutex.LockContext(ctx); err != nil {
return err
}
defer func() {
if _, err := mutex.UnlockContext(ctx); err != nil {
logrus.WithError(err).Errorln()
}
}()
//业务函数结束chan
waitChain := make(chan struct{})
//续约定时频率默认为500毫秒
checkDuration := 500 * time.Millisecond
ticker := time.NewTicker(checkDuration)
cancelCtx, cancel := context.WithCancel(ctx)
go func(waitChain chan<- struct{}, fn func(ctx context.Context)) {
//执行我们的业务程序
fn(cancelCtx)
//结束续约
close(waitChain)
ticker.Stop()
}(waitChain, handle)
for {
select {
case <-ticker.C:
//到期时间-当前时间 如果小于等于了续约的定时频率,那么当前周期就去续约
diff := mutex.Until().Sub(time.Now())
if diff <= checkDuration {
//redsync提供的续约方法
if _, err := mutex.ExtendContext(cancelCtx); err != nil {
logrus.WithError(err).Errorln()
cancel()
return err
}
}
case <-waitChain:
ticker.Stop()
return nil
case <-cancelCtx.Done():
return nil
}
}
}
我们看看ExtendContext
方法是怎么实现的?
func (m *Mutex) ExtendContext(ctx context.Context) (bool, error) {
start := time.Now()
n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.touch(ctx, pool, m.value, int(m.expiry/time.Millisecond))
})
if n < m.quorum {
return false, err
}
now := time.Now()
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
if now.Before(until) {
m.until = until
return true, nil
}
return false, ErrExtendFailed
}
func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
touchScript := touchScript
if m.setNXOnExtend {
touchScript = touchWithSetNXScript
}
status, err := conn.Eval(touchScript, m.name, value, expiry)
if err != nil {
// extend failed: clean up locks
_, _ = func() (int, error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
defer cancel()
return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.release(ctx, pool, value)
})
}()
return false, err
}
return status != int64(0), nil
}
//最后在redis执行的lua脚本
var touchWithSetNXScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
elseif redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "NX") then
return 1
else
return 0
end
`)
//最后在redis执行的lua脚本
var touchScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
else
return 0
end
`)
- 通过
actOnPoolsAsync
方式去每个redis执行touch
方法,返回执行成功的次数和err; touch
方法会根据setNXOnExtend
参数是否设置去执行对应的lua
脚本,setNXOnExtend
为false执行touchScript
脚本为true执行touchWithSetNXScript
;touchScript
作用:如果获取的value值跟当前的值一样,就去更新过期时间。touchWithSetNXScript
作用:获取的value值跟当前的值一样,就去更新过期时间,如果获取的value值跟当前的值不一样,会去设置key的值并且初始化过期时间,类似update或者insert的作用;
总结
redsync包
使用redlcoak
算法解决的多个redis的分布式锁;- 删除或者续约采用随机数的判断,实现了误删或者误续的情况;
- 增加重试机制,提高获取锁的性能;
- 通过续租方法,解决长时间运行的任务,导致锁过期的情况;
参考
【Redisson–红锁(Redlock)–使用/原理】cloud.tencent.com
转载自:https://juejin.cn/post/7384750303521292303