likes
comments
collection
share

【Redis】【Go】分布式锁和续租:Redis分布式锁与Redsync源码解读

作者站长头像
站长
· 阅读数 21

1.为什么需要用到分布式锁?

在一个分布式系统中,多个服务器可能会并发地访问和修改同一份数据。在这种情况下,如果没有适当的同步机制,可能会出现数据不一致的问题。为了解决这个问题,我们可以使用分布式锁。

分布式锁是一种同步机制,它可以在分布式系统中的多个节点间确保对共享资源的互斥访问。在具体操作之前,服务器会请求获取一个锁,如果获取成功,那么该服务器就有权利去执行对应的操作,其他的服务器在此期间则不能进行该操作。操作完成后,服务器会释放该锁,这样其他的服务器就可以获取锁来进行自己的操作。

2.分布式锁使用

本文使用redSync库来演示

2.1快速开始

package main

import (
	goredislib "github.com/go-redis/redis/v8"
	"github.com/go-redsync/redsync/v4"
	"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)

func main() {
	// 使用go-redis创建一个连接池,redsync会使用这个连接池与Redis进行通信
	// 这个连接池也可以是任何实现了`redis.Pool`接口的连接池
	client := goredislib.NewClient(&goredislib.Options{
		Addr: "localhost:6379",
	})
	pool := goredis.NewPool(client) // 或者,pool := redigo.NewPool(...)

	// 创建一个redsync实例,用于获取互斥锁
	rs := redsync.New(pool)

	// 通过使用相同的名字获取一个新的互斥锁,想要获取同一个锁的所有实例都需要使用相同的名字
	mutexname := "my-global-mutex"
	mutex := rs.NewMutex(mutexname)

	// 获取我们的互斥锁的锁,这个操作成功后,没有人能获取同一个锁(同一个互斥锁名字)直到我们解锁它
	if err := mutex.Lock(); err != nil {
		panic(err)
	}

	// 在这里执行需要锁的工作
    ...
	

	// 释放锁,这样其他进程或线程就可以获取锁了
	if ok, err := mutex.Unlock(); !ok || err != nil {
		panic("unlock failed")
	}
}

3.redSync源码解读

刚刚我们轻松实现了分布式的加锁和释放,现在我们来看看RedSync是怎么实现的。

3.1redSync加锁

func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
	conn, err := pool.Get(ctx)
	if err != nil {
		return false, err
	}
	defer conn.Close()
    //尝试在Redis中设置一个键值对,键是锁的名称,值是传入的value,过期时间是锁的有效期
	reply, err := conn.SetNX(m.name, value, m.expiry)
	if err != nil {
		return false, err
	}
	return reply, nil
}

总的来说,这个 acquire 方法尝试在 Redis 中设置一个值,如果成功设置了值,那么表示成功获取了锁,否则表示锁已经被其他人获取。

3.2.分布式锁释放

说起锁释放,我们可能会遇到什么问题呢?

假设一个进程获取了一个锁,并开始执行一些操作。如果这些操作需要的时间超过了锁的过期时间,那么锁就会被自动释放。这时,另一个进程可能会获取到这个锁并开始执行它的操作。当第一个进程完成它的操作后,它可能会尝试释放锁,但这时锁已经被第二个进程获取,因此会导致第二个进程的锁被误删。

所以,每次获取锁时,都生成一个唯一的值作为锁的标识,只有拥有正确标识的进程才能删除锁。这样即使锁超时,也不会误删其他进程的锁。

func genValue() (string, error) {
	b := make([]byte, 16)
	_, err := rand.Read(b)
	if err != nil {
		return "", err
	}
	return base64.StdEncoding.EncodeToString(b), nil
}
  1. func genValue() (string, error) : 这是一个生成随机值的函数,它返回一个字符串和一个错误。
  2. b := make([]byte, 16) : 创建了一个长度为16的字节切片。
  3. _, err := rand.Read(b) :使用 rand.Read 函数填充字节切片 b,使其包含16个随机字节。如果在生成随机字节时发生错误,那么返回错误。
  4. return base64.StdEncoding.EncodeToString(b), nil: 这行代码将字节切片 b 转换为一个 Base64 编码的字符串,并返回这个字符串和 nil 错误。

总的来说,这个函数生成一个随机的、长度为16的字节切片,然后将这个字节切片转换为一个 Base64 编码的字符串。这个字符串可以用作 Redis 分布式锁的唯一标识,保证每次获取锁时都生成一个唯一的值,以防止误删问题。

接下来我们看一下删除的代码

func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
    conn, err := pool.Get(ctx)
    if err != nil {
       return false, err
   }
    defer conn.Close()
    status, err := conn.Eval(deleteScript, m.name, value)
    if err != nil {
       return false, err
   }
    return status != int64(0), nil
}
var deleteScript = redis.NewScript(1, `
	if redis.call("GET", KEYS[1]) == ARGV[1] then
		return redis.call("DEL", KEYS[1])
	else
		return 0
	end
`)

status, err := conn.Eval(deleteScript, m.name, value)

运行 Lua 脚本 deleteScript 来删除 Redis 中的锁。这个脚本接收锁的名字(m.name)和锁的值(value)作为参数。这个脚本的目的是确保只有当 Redis 中的锁的值和 value 匹配时才删除锁,这是为了避免误删问题。如果在运行脚本时发生错误,那么返回错误。

Redis 使用单线程模型,Lua 脚本在 Redis 中执行时是原子的。这意味着在 Lua 脚本执行期间,不会有其他的 Redis 命令被执行。

3.3分布式锁续租机制

在分布式系统中,一个进程可能会获取一个锁,然后开始执行一些耗时的操作。这个锁会设置一个过期时间,以防止进程在执行操作时崩溃,导致锁无法被释放,阻塞其他进程获取锁。

然而,如果这个进程的操作时间超过了锁的过期时间,那么锁会被自动释放,其他进程可能会获取到这个锁并开始执行它的操作。当原进程完成操作,尝试释放锁时,就会出现问题,因为锁已经被其他进程获取。

这时候,续租机制就起到作用了。续租机制是通过一个后台进程或者线程,定期检查锁的状态。如果锁即将到达过期时间,而关联的进程还在执行操作,那么就会更新锁的过期时间,即"续租"。这样,即使进程的操作时间超过了最初的过期时间,也不会导致锁被自动释放,保证了锁的安全性。

我们来看一下redSync是如何实现的续租

// ExtendContext resets the mutex's expiry and returns the status of expiry extension.
func (m *Mutex) ExtendContext(ctx context.Context) (bool, error) {
	start := time.Now()
	n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
		return m.touch(ctx, pool, m.value, int(m.expiry/time.Millisecond))
	})
	if n < m.quorum {
		return false, err
	}
	now := time.Now()
	until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
	if now.Before(until) {
		m.until = until
		return true, nil
	}
	return false, ErrExtendFailed
}

func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) {
	conn, err := pool.Get(ctx)
	if err != nil {
		return false, err
	}
	defer conn.Close()
    //toachScript为续租lua脚本,下文会介绍
	status, err := conn.Eval(touchScript, m.name, value, expiry)
	if err != nil {
		return false, err
	}
	return status != int64(0), nil
}
  1. *func (m Mutex) ExtendContext(ctx context.Context) (bool, error) : ExtendContext 方法接收一个 context.Context 对象,并返回一个布尔值和一个错误。这个方法的目的是尝试为 Redis 中的锁续租。
  2. start := time.Now() : 记录续租操作开始的时间。
  3. n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {...}) : 这段代码通过异步方式对所有的 Redis 连接池执行 touch 操作。touch 操作就是尝试更新 Redis 中的锁的过期时间。如果在 touch 操作过程中发生错误,将返回错误。
  4. if n < m.quorum {...} : 这个检查确保成功更新过期时间的 Redis 实例数量大于或等于预设的法定数(默认是总数/2+1)(quorum)。如果没有达到法定数,那么续租操作失败,返回 false 和 错误。
  5. now := time.Now() 和 *until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)m.driftFactor))) : 计算新的锁过期时间。
  6. if now.Before(until) {...} : 如果当前时间早于新的过期时间,那么更新锁的过期时间并返回 truenil 错误,表示续租操作成功。

总的来说,这个函数尝试为 Redis 中的锁续租,只有当成功更新过期时间的 Redis 实例数量达到法定数,且新的过期时间晚于当前时间,才认为续租操作成功。

//续租脚本
var touchScript = redis.NewScript(1, `
	if redis.call("GET", KEYS[1]) == ARGV[1] then
		return redis.call("PEXPIRE", KEYS[1], ARGV[2])
	else
		return 0
	end
`)

主要逻辑如下:

  1. 使用 redis.call("GET", KEYS[1]) 获取锁对应的值。这里的 KEYS[1] 是 Lua 脚本的第一个键参数,通常是锁的名字。
  2. 使用 == ARGV[1] 检查获取到的锁的值是否和预期的值相等。这里的 ARGV[1] 是 Lua 脚本的第一个值参数,通常是锁的值。
  3. 如果锁的值和预期的值相等,那么使用 redis.call("PEXPIRE", KEYS[1], ARGV[2]) 更新锁的过期时间。这里的 ARGV[2] 是 Lua 脚本的第二个值参数,通常是新的过期时间。
  4. 如果锁的值和预期的值不相等,那么返回 0

在 Go 代码中,redis.NewScript(1, ...) 创建了一个新的 Lua 脚本对象。1 是 Lua 脚本的键参数数量,后面的字符串是 Lua 脚本的代码。

这个 Lua 脚本主要用于在 Redis 中为锁续租,即更新锁的过期时间。这个操作需要是原子的,以防止在检查和更新过期时间之间的时间窗口内,锁的值被其他进程修改。

4.手写一个自动续租实现

Redsync 没有提供自动续租功能,续租需要用户自己写。

这里使用goRoutine和channel实现了一个简易版的自动续租,提供个读者参考。

package main

import (
	"fmt"
	"time"

	goredislib "github.com/go-redis/redis/v8"
	"github.com/go-redsync/redsync/v4"
	"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)

func main() {
	client := goredislib.NewClient(&goredislib.Options{
		Addr: "localhost:6379",
	})

	pool := goredis.NewPool(client)

	rs := redsync.New(pool)

	mutexname := "my-global-mutex"
	mutex := rs.NewMutex(mutexname, redsync.WithExpiry(10*time.Second))

	if err := mutex.Lock(); err != nil {
		panic(err)
	}
	//创建一个channel,用来通知续租goroutine任务已经完成
	done := make(chan bool)

	// 开启一个goroutine,周期性地续租锁
	go func() {
		ticker := time.NewTicker(5 * time.Second) // 按照需求调整
		defer ticker.Stop()

		for {
			select {
			case <-ticker.C:
				ok, err := mutex.Extend()
				if err != nil {
					fmt.Println("Failed to extend lock:", err)
				} else if !ok {
					fmt.Println("Failed to extend lock: not successes")
				}
			case <-done:
				return
			}
		}
	}()

	// 执行需要锁的工作
	time.Sleep(30 * time.Second) 
	//通知goRoutine停止续租
	close(done) 

	if ok, err := mutex.Unlock(); !ok || err != nil {
		panic("unlock failed")
	}
}

这段代码首先创建一个go-redis客户端和连接池,并使用它们创建一个redsync实例。然后,创建一个带有10秒过期时间的互斥锁并尝试获取它。在获取锁后,开启一个goroutine来周期性地续租锁。当需要锁的工作完成后,关闭done channel以通知续租goroutine停止,并释放锁。

如果对你有帮助的话,希望可以给本文点个赞,谢谢啦~~~

转载自:https://juejin.cn/post/7233284282964770871
评论
请登录