likes
comments
collection
share

Redis分布式锁

作者站长头像
站长
· 阅读数 45

对于分布式微服务,服务部署在不同的服务器上,或者不同的Pod上,遇到需要锁的情况时,需要使用分布式锁。

一个靠谱的分布式锁需要具备的条件:

  • 独占性:任何时刻只有且仅有一个线程持有。
  • 高可用:若Redis集群环境下,不能因为某一个节点宕机而出现获取锁和释放锁失败的情况。高并发请求下,依旧保持一定性能。
  • 防死锁:杜绝死锁,必须有超时控制机制或者撤销操作,有个兜底终止跳出方案。
  • 重入性:同一个节点的同一个线程,如果获得锁之后,它也可以再次获取这个锁,不用重新加锁。
  • 不乱抢:不能unlock别的线程的锁,只能自己加的锁自己释放。

在接下来的实现过程中可以逐步对这些条件有所了解。

Redis的SET指令:

SET key value [NX|XX] [EX seconds][PX milliseconds]

设置键key中存储字符串类型的值value。

EX:设置key在多少秒之后过期。

PX:设置key在多少毫秒之后过期。

NX:当key不存在的时候,才创建key,效果等同于setnx。

XX:当key存在的时候,覆盖key。

Redis的HSET指令:

HSET key field value [field value ...]

将存储在key中的哈希数据的字段field的值设置为value。

代码实现

这个练习的源码地址,文章中只记录关键代码。

先实现一个扣减库存的接口:

type InventoryHandler struct{}

func NewInventoryHandler() InventoryHandler {
	return InventoryHandler{}
}

func (h *InventoryHandler) RegisterRoutes(server *gin.Engine) {
	server.POST("/inventory/sale", h.InventorySale)
}

func (h *InventoryHandler) InventorySale(c *gin.Context) {
	redisClient := dao.GetRedisClient()
	val, err := redisClient.Do(c, "get", constant.Inventory).Result()
	if err != nil {
		c.String(http.StatusInternalServerError, fmt.Sprintf("查询库存失败%v", err))
		return
	}

	inventory, err := strconv.Atoi(val.(string))
	if err != nil {
		c.String(http.StatusInternalServerError, fmt.Sprintf("库存转整数失败%v", err))
		return
	}
	if inventory <= 0 {
		c.String(http.StatusInternalServerError, "库存为空, 不可再扣减库存")
		return
	}
	inventory -= 1
	_, err = redisClient.Do(c, "set", constant.Inventory, inventory).Result()
	if err != nil {
		c.String(http.StatusInternalServerError, "扣减库存失败")
		return
	}
	fmt.Printf("扣减库存成功, 库存剩余:%v\n", inventory)
	c.String(http.StatusOK, "扣减库存成功")
}

为了方便,直接使用Redis存储库存数据,不再引入其他数据库,将数据存储在键inventory中。

当前代码是没有加锁的,因为gin框架的每一个request请求都会开一个goroutine来进行处理,所以在同一个Pod中是对并发请求进行的异步处理。

可以看到go中需要处理的错误很多,代码看起来不是很简洁,但是这可以避免代码中遗漏一些错误处理。

$ redis-cli -h 127.0.0.1 -p 6380
127.0.0.1:6380> set inventory 1000
OK
127.0.0.1:6380> get inventory
"1000"

再使用JMeter并发1000个POST请求:

Redis分布式锁

Redis分布式锁

从并发请求测试的统计结果来看,每个请求的平均处理时间较长,达到了4091毫秒,这是因为JMeter在本地电脑上开线程,模拟用户的并发操作本身就会占用一定资源,我的服务是部署在本地电脑上,请求处理也会对系统造成压力,再加上我的电脑本身资源有限,导致整体的处理时间变长。

Redis分布式锁

并发1000个请求,相当于1000个用户,每个用户都对库存-1,那么最终的库存应该为0才对,但实际上:

127.0.0.1:6380> get inventory
"987"

库存不是0,再从Docker桌面应用程序中看看服务的日志:

Redis分布式锁

库存的变化居然是增长的,很明显不正确。并行处理有很大的不确定性,以下这种情况就可能造成出现库存不减反增:

Redis分布式锁

所以必需要加分布式锁,确保在一个线程扣减库存的时候,其他线程无法读写库存。

分布式锁的作用:

  • 跨进程、跨服务,部署在不同服务器、不同Pod中的服务都能受到限制。
  • 解决超卖,比如上例中库存1000,卖了1000件却还剩987件,导致卖出的数量大于实际库存数量。
  • 防止缓存击穿,缓存击穿指的是大量请求同时查询一个key的时候,key刚好失效了,导致数据库突然接收大量请求,压力剧增。

接下来就逐步实现分布式锁,每个版本的代码进行并发测试的方式是一样的,只是因电脑资源有限,将并发请求数改为200,使用JMeter并发请求200次,如果最终库存的值为0,且请求没有报错,基本就能说明分布式锁是有效的。

v1

使用set nx加锁,用自旋实现重试。

package service

import (
	"fmt"
	"net/http"
	"rdl/internal/constant"
	"rdl/internal/dao"
	"strconv"
	"time"

	"github.com/gin-gonic/gin"
)

type InventoryHandler struct{}

func NewInventoryHandler() InventoryHandler {
	return InventoryHandler{}
}

func (h *InventoryHandler) RegisterRoutes(server *gin.Engine) {
	server.POST("/inventory/sale", h.InventorySale)
}

func (h *InventoryHandler) InventorySale(c *gin.Context) {
	redisClient := dao.GetRedisClient()

	// =================== ▽ 锁相关的主要代码 ▽ ======================
	lockName := constant.DistributedLock

	_, err := redisClient.Do(c, "set", lockName, "1", "nx").Result()
	// 如果没有设置成功,说明没有抢到锁,一定时间后进行重试
	for err != nil {
		time.Sleep(20 * time.Millisecond)
		_, err = redisClient.Do(c, "set", lockName, "1", "nx").Result()
	}
	defer func() {
		_, _ = redisClient.Do(c, "del", lockName).Result()
	}()
	// =================== △ 锁相关的主要代码 △ ======================

	val, err := redisClient.Do(c, "get", constant.Inventory).Result()
	if err != nil {
		c.String(http.StatusInternalServerError, fmt.Sprintf("查询库存失败%v", err))
		return
	}

	inventory, err := strconv.Atoi(val.(string))
	if err != nil {
		c.String(http.StatusInternalServerError, fmt.Sprintf("库存转整数失败%v", err))
		return
	}
	if inventory <= 0 {
		c.String(http.StatusInternalServerError, "库存为空, 不可再扣减库存")
		return
	}
	inventory -= 1
	_, err = redisClient.Do(c, "set", constant.Inventory, inventory).Result()
	if err != nil {
		c.String(http.StatusInternalServerError, "扣减库存失败")
		return
	}
	fmt.Printf("扣减库存成功, 库存剩余:%v\n", inventory)
	c.String(http.StatusOK, "扣减库存成功")
}

代码完成后重新部署再进行测试,每次测试前重置库存数量:

127.0.0.1:6380> set inventory 200
OK
127.0.0.1:6380> get inventory
"200"

使用JMeter并发200个请求后,在JMeter中查看接口没有报错,再看库存的数量为0,说明分布式锁生效。

127.0.0.1:6380> get inventory
"0"

代码_, err := redisClient.Do(c, "set", lockKey, "1", "nx", "ex", "10").Result()也可以替换为redisClient.SetNX(c, lockKey, "1", 10*time.Second).Result()

后续只会展示锁相关的主要代码。

v2

v1版本的实现没有过期时间,如果在加锁之后,程序执行完成走之前,服务宕机了,没有走到解锁的部分,这个锁就会一直存在,其他服务无法拿到锁。加了过期时间之后,一个服务宕机不会影响其他服务,服务宕机没有解锁,到了一定时间这个锁会自动被删除,其他服务可以拿到锁。

设置缓存的默认过期时间时10秒。

	lockName := constant.DistributedLock

	_, err := redisClient.Do(c, "set", lockName, "1", "nx", "ex", 10).Result()
	// 如果没有设置成功,说明没有抢到锁,一定时间后进行重试
	for err != nil {
		time.Sleep(20 * time.Millisecond)
		_, err = redisClient.Do(c, "set", lockName, "1", "nx", "ex", 10).Result()
	}
	defer func() {
		_, _ = redisClient.Do(c, "del", lockName).Result()
	}()

使用JMeter并发200个请求后,在JMeter中查看接口没有报错,再看库存的数量为0,分布式锁生效。

v3

v3版本的实现存在误删key的问题,可能会删除别的线程加的锁:

Redis分布式锁

需要限制自己只能删除自己的锁,不能删除别人的锁。在这之前的实现中,用随意的一个字符串1作为key的值,现在使用一个uuid表示键的值。解锁时判断key对应的值是否一致。

	lockName := constant.DistributedLock
	lockUuid := fmt.Sprintf("%v:%v", constant.DistributedLock, uuid.NewString())

	_, err := redisClient.Do(c, "set", lockName, lockUuid, "nx", "ex", 10).Result()
	// 如果没有设置成功,说明没有抢到锁,一定时间后进行重试
	for err != nil {
		time.Sleep(20 * time.Millisecond)
		_, err = redisClient.Do(c, "set", lockName, lockUuid, "nx", "ex", 10).Result()
	}
	defer func() {
		// 判断是否为当前goroutine加的锁,自己只能删除自己的锁
		lval, _ := redisClient.Do(c, "get", lockName).Result()
		if lval == lockUuid {
			_, _ = redisClient.Do(c, "del", lockName).Result()
		}
	}()

使用JMeter并发200个请求后,在JMeter中查看接口没有报错,再看库存的数量为0,分布式锁生效。

v4

v3版本的实现中,释放锁时getdel是两个操作,不是原子操作。可能会存在潜在的并发问题,所以需要使用Lua脚本保证原子性。

官方文档Distributed Locks with Redis中,给到了删除分布式锁的Lua脚本:

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

Lua脚本中索引从1开始。

在Redis中,eval指令可以解析脚本,将脚本包含的操作当成一个原子操作来执行。

lockName := constant.DistributedLock
	lockUuid := fmt.Sprintf("%v:%v", constant.DistributedLock, uuid.NewString())

	_, err := redisClient.Do(c, "set", lockName, lockUuid, "nx", "ex", 10).Result()
	// 如果没有设置成功,说明没有抢到锁,一定时间后进行重试
	for err != nil {
		time.Sleep(20 * time.Millisecond)
		_, err = redisClient.Do(c, "set", lockName, lockUuid, "nx", "ex", 10).Result()
	}
	defer func() {
		unlockScript := `
			if redis.call("get",KEYS[1]) == ARGV[1] then
				return redis.call("del",KEYS[1])
			else
				return 0
			end
		`
		/*
			[]string{}数组和脚本里的KEYS对应
			从第4个参数开始到之后的参数和ARGV对应
		*/
		_, _ = redisClient.Eval(c, unlockScript, []string{lockName}, lockUuid).Int()
	}()

使用JMeter并发200个请求后,在JMeter中查看接口没有报错,再看库存的数量为0,分布式锁生效。

v5

v4版本的实现中没有实现可重入性,假如存在子函数中也需要加锁的情况,再从子函数中尝试获得锁,会造成死锁。所以需要使用可重入锁,可重入锁是指同一个线程,在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁,不会因为锁之前已经获取过还没释放而阻塞。

func (h *InventoryHandler) InventorySale(c *gin.Context) {
	redisClient := dao.GetRedisClient()

	// =================== ▽ 锁相关的主要代码 ▽ ======================
	lockName := constant.DistributedLock
	lockUuid := fmt.Sprintf("%v:%v", constant.DistributedLock, uuid.NewString())

	_, err := redisClient.Do(c, "set", lockName, lockUuid, "nx", "ex", 10).Result()
	// 如果没有设置成功,说明没有抢到锁,一定时间后进行重试
	for err != nil {
		time.Sleep(20 * time.Millisecond)
		_, err = redisClient.Do(c, "set", lockName, lockUuid, "nx", "ex", 10).Result()
	}
	defer func() {
		unlockScript := `
			if redis.call("get",KEYS[1]) == ARGV[1] then
				return redis.call("del",KEYS[1])
			else
				return 0
			end
		`
		/*
			[]string{}数组和脚本里的KEYS对应
			从第4个参数开始到之后的参数和ARGV对应
		*/
		_, _ = redisClient.Eval(c, unlockScript, []string{lockName}, lockUuid).Int()
	}()

	// 需要加锁的子函数
	subFunc(c, redisClient, lockName, lockUuid)
	// =================== △ 锁相关的主要代码 △ ======================
	......
}

func subFunc(c context.Context, redisClient *redis.Client, lockName string, lockUuid string) {
	_, err := redisClient.Do(c, "set", lockName, lockUuid, "nx", "ex", 10).Result()
	// 如果没有设置成功,说明没有抢到锁,一定时间后进行重试
	for err != nil {
		time.Sleep(20 * time.Millisecond)
		_, err = redisClient.Do(c, "set", lockName, lockUuid, "nx", "ex", 10).Result()
	}
	defer func() {
		unlockScript := `
			if redis.call("get",KEYS[1]) == ARGV[1] then
				return redis.call("del",KEYS[1])
			else
				return 0
			end
		`
		/*
			[]string{}数组和脚本里的KEYS对应
			从第4个参数开始到之后的参数和ARGV对应
		*/
		_, _ = redisClient.Eval(c, unlockScript, []string{lockName}, lockUuid).Int()
	}()
}

要实现可重入锁,需要记录加锁的次数,如果存在需要加锁的子函数,就把加锁次数+1,存在多少个子函数就加多少把锁,解锁的时候也要把所有加的锁全解开。

目前使用的锁存储的值为字符串格式:

127.0.0.1:6379> set distributedLock distributedLock:uuid nx ex 10

一个字符串中无法存储两种信息,既要存uuid,又要存加锁次数,使用hash类型的数据:

127.0.0.1:6379> hset distributedLock distributedLock:uuid 1
(integer) 1
127.0.0.1:6379> hget distributedLock distributedLock:uuid
"1"

新建一个util/distributedLock.go文件,在文件中将加锁和解锁封装为方法。在util/lua目录下存放Lua脚本。

1.加锁的脚本util/lua/lock.lua

-- 锁不存在的时候,新增锁
if redis.call("exists", KEYS[1]) == 0 then
    redis.call("hset", KEYS[1], ARGV[1], 1)
    redis.call("expire", KEYS[1], ARG[2])
    return 1
 -- 锁存在,并且hash中存储了指定的键时,将加锁的计数增加1
 elseif redis.call("hexists", KEYS[1], ARGV[1]) == 1 then
    redis.call("hincrby", KEYS[1], ARGV[1], 1)
    redis.call("expire", KEYS[1], ARG[2])
    return 1
 -- 锁存在,但是hash中不存在指定的键时,什么也不做
 else
    return 0
 end

因为hincrby可以实现新建和自增1,替代hset命令,所以可优化为如下代码:

-- 锁不存在的时候,新增锁
-- 锁存在,并且hash中存储了指定的键时,将加锁的计数增加1
if redis.call("exists", KEYS[1]) == 0 or redis.call("hexists", KEYS[1], ARGV[1]) == 1 then
    redis.call("hincrby", KEYS[1], ARGV[1], 1)
    redis.call("expire", KEYS[1], ARGV[2])
    return 1
-- 锁存在,但是hash中不存在指定的键时,什么也不做
else
    return 0
end

2.解锁的脚本util/lua/unlock.lua

-- 因为Redis的很多指令常常返回0表示失败,返回1表示成功,为了与0和1区分开来,使用10和20
-- 如果锁不存在,直接返回
if redis.call("hexists", KEYS[1], ARGV[1]) == 0 then
    return 10
-- 如果锁存在,并且减1之后计数值已经变为0就删除键
elseif redis.call("hincrby", KEYS[1], ARGV[1], -1) == 0 then
    return redis.call("del", KEYS[1])
-- 如锁存在,但是减1之后计数值不为0,直接返回
else
    return 20
end

3.util/distributedLock.go文件的内容为:

package util

import (
	"context"
	_ "embed"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

var (
	// 下面这行注释会将lua/lock.lua文件的内容解析为字符串存储到lockScript变量中
	//go:embed lua/lock.lua
	lockScript string
	//go:embed lua/unlock.lua
	unlockScript string
)

type RedisDistributedLock struct {
	c           context.Context
	redisClient *redis.Client
	lockName    string
	lockUuid    string
	expireTime  int
}

func NewRedisDistributedLock(c context.Context,
	redisClient *redis.Client,
	lockName string,
	lockUuid string,
	expireTime int) RedisDistributedLock {
	return RedisDistributedLock{
		c:           c,
		redisClient: redisClient,
		lockName:    lockName,
		lockUuid:    lockUuid,
		expireTime:  expireTime,
	}
}

func (l *RedisDistributedLock) Lock() {
	r, err := l.redisClient.Eval(l.c, lockScript, []string{l.lockName}, l.lockUuid, l.expireTime).Int()
	// 如果没有设置成功,说明没有抢到锁,就一段时间之后进行重试
	for r != 1 || err != nil {
		time.Sleep(20 * time.Millisecond)
		r, err = l.redisClient.Eval(l.c, lockScript, []string{l.lockName}, l.lockUuid, l.expireTime).Int()
	}
	fmt.Printf("%v 加锁\n", l.lockName)
}

func (l *RedisDistributedLock) Unlock() {
	_, err := l.redisClient.Eval(l.c, unlockScript, []string{l.lockName}, l.lockUuid).Int()
	if err != nil {
		fmt.Println("unlock error: ", err)
	}
	fmt.Printf("%v 解锁\n", l.lockName)
}

4.service/inventory.go文件中这样使用锁:

	lockName := constant.DistributedLock
	lockUuid := fmt.Sprintf("%v:%v", constant.DistributedLock, uuid.NewString())

	l := util.NewRedisDistributedLock(c, redisClient, lockName, lockUuid, 10)
	l.Lock()
	defer l.Unlock()

5.创建一个子函数,在子函数中也加锁。

func (h *InventoryHandler) InventorySale(c *gin.Context) {
	redisClient := dao.GetRedisClient()

	// =================== ▽ 锁相关的主要代码 ▽ ======================
	lockName := constant.DistributedLock
	lockUuid := fmt.Sprintf("%v:%v", constant.DistributedLock, uuid.NewString())

	l := util.NewRedisDistributedLock(c, redisClient, lockName, lockUuid, 10000)
	l.Lock()
	defer l.Unlock()

	subFunc(&l)
	// =================== △ 锁相关的主要代码 △ ======================

  ...
	fmt.Printf("扣减库存成功, 库存剩余:%v\n", inventory)
	c.String(http.StatusOK, "扣减库存成功")
}

func subFunc(l *util.RedisDistributedLock) {
	l.Lock()
	defer l.Unlock()
	time.Sleep(10 * time.Millisecond)
	fmt.Println("subFunc 执行")
	subFunc1(l)
}

func subFunc1(l *util.RedisDistributedLock) {
	l.Lock()
	defer l.Unlock()
	time.Sleep(10 * time.Millisecond)
	fmt.Println("subFunc1 执行")
}

请求1次接口打印的结果是:

Redis分布式锁

实现了重入性。

使用JMeter并发200个请求后,在JMeter中查看接口没有报错,再看库存的数量为0,分布式锁生效。

v6

添加自动续期功能,确保锁的过期时间大于业务执行的时间。

util/updateExpireTime.lua

-- 锁存在,并且hash中存储了指定的键,说明程序还在执行中,更新锁的过期时间
if redis.call("hexists", KEYS[1], ARGV[1]) == 1 then
    return redis.call("expire", KEYS[1], ARGV[2])
else
    return 0
end

util/distributedLock.go

var (
	//go:embed lua/updateExpireTime.lua
	updateExpireTimeScript string
)

func (l *RedisDistributedLock) Lock() {
	r, err := l.redisClient.Eval(l.c, lockScript, []string{l.lockName}, l.lockUuid, l.expireTime).Int()
	// 如果没有设置成功,说明没有抢到锁,就一段时间之后进行重试
	for r != 1 || err != nil {
		time.Sleep(20 * time.Millisecond)
		r, err = l.redisClient.Eval(l.c, lockScript, []string{l.lockName}, l.lockUuid, l.expireTime).Int()
	}
	fmt.Printf("%v 加锁\n", l.lockName)
	// 加锁成功后,定时更新缓存过期时间
	go l.updateExpireTime()
}

func (l *RedisDistributedLock) updateExpireTime() {
	r, _ := l.redisClient.Eval(l.c, updateExpireTimeScript, []string{l.lockName}, l.lockUuid, l.expireTime).Int()
	d := time.Duration(2 * l.expireTime / 3)
	// r不为10表明程序还在执行中,每隔过期时间的2/3的时候,再执行一次更新过期时间操作
	if r != 10 {
		time.Sleep(d * time.Second)
		l.updateExpireTime()
	}
}

学习地址

Redis分布式锁:www.bilibili.com/video/BV13R… (视频中是使用Java实现的代码,本文用Go实现)

转载自:https://juejin.cn/post/7301242469936185395
评论
请登录