Redis使用setnx实现分布式锁及其问题、优化
最近在工作中用到了分布式锁,然后查了很多分布式锁的实现方式。比较熟悉redis或者说,redis的用法比较简单,所以查了一下redis使用setnx实现分布式锁的方式。其中有一篇文章搜索到的次数最多,多到我不知道哪个是原创文章,就贴一下看到的链接吧blog.csdn.net/lihao21/art…
reids > setnx(key,value) //设置key.
redis > delete(key) //将key删除
当key存在的时候,设置会失败,返回-1。当key不存在的时候,设置成功,返回0。
实现方式一:
lock_key = "distribute_lock"
def get_lock():
while True:
lock = redis_client.setnx(lock_key,1)
if lock: # 设置成功
break
else:
time.sleep(0.5) #如果没有获取成功,等待0.5继续获取,当然这个地方你也可以设置重试次数。
return True
if get_lock():
# do your work,处理临界资源
redis_client.delete(lock_key) //为防止死锁,处理完临界资源要及时释放锁。
可以看一下这个代码有什么问题?
问题出在了,最后锁的释放,假如在处理临界资源的过程中,进程挂掉了或者在执行删除操作的时候redis链接断掉了,那么这个分布式锁将永远得不到释放,进而产生死锁。所以接下来的优化是如果进程挂掉了,能够及时的释放锁,你想到了什么?超时机制。
实现方式二:
为了避免出现方式一的问题,所以加入了超时机制。
setnx(key, <current Unix time + lock timeout + 1>)
本质就是将key的value值设置成一个超时时间,按照方式一流程:
lock_key = "distribute_lock"
def get_lock():
while True:
lock = redis_client.setnx(lock_key,<now_time+ lock timeout + 1>) # 直接设置成功
if lock: # 设置成功
break
now_time = <current Unix time + lock timeout + 1>
lock_time_out = redis.client.get(lock_key)
if now_time > lock_time_out: #检查是否超时
# 如果已经超时
redis_client.delete(lock_key) # 将这个key删除,进行重新设置。
lock = redis_client.setnx(lock_key,<now_time+ lock timeout + 1>)
if lock: # 设置成功
break
else:
time.sleep(0.5) #如果没有获取成功,等待0.5继续获取,当然这个地方你也可以设置重试次数。
return True
if get_lock():
# do your work,处理临界资源
redis_client.delete(lock_key) //为防止死锁,处理完临界资源要及时释放锁。
继续思考一下,这又会出现什么问题?
- 假如进程p1获取到了锁,同时进程p2和p3在不断的检测锁是否已经超时。
- 然后p1进程挂掉了,没有及时删除锁。
- 过了一段时间,p2和p3同时检测到了这个锁已经超时,即程序中now_time > lock_time_out都成立。
- 首先p2将锁删除了,然后将锁设置了超时时间,致此p2获取到了锁。
- p3也检测到了锁超时了,只不过它执行速度比慢,直接将锁删除了,实际上p3删除的是p2设置的锁,这个时候问题就出现了,p3和p2都获取到了锁。
实现方式三
实现方式二的关键问题是什么?是p3在删除锁的时候,没有检查是否又有新的进程获取到了该锁。 为了避免这种情况,p3在执行set操作的时候,用这个命令:
getset(lock_key,now_time+lock timeout + 1)
这个命令会返回旧,然后设置成新的值。在set之前判断一下当前时间是否大于lock_key的旧值。如果大于,说明已经超时,获取到了锁。假如再次出现上面的情况,p2和p3同时检测到锁的超时,然后p2删除、并获取到了锁。p3执行getset操作,然后当前时间和lock_key的旧值(p2设置的)比较,当前时间小于旧值。获取锁失败,继续下一轮的等待。
其次最后删除的时候,也不能像前几次一样直接删除。要先判断一下,当前时间小于锁的超时时间的话在删除。避免删除其他进程设置的锁。程序如下:
def get_lock():
LOCK_TIMEOUT = 3
lock = 0
lock_timeout = 0
lock_key = 'distribute_lock'
# 获取锁
while lock != 1:
now = int(time.time())
lock_timeout = now + LOCK_TIMEOUT + 1
lock = redis_client.setnx(lock_key, lock_timeout)
if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout)):
break
else:
time.sleep(0.001)
if get_lock():
# dou your lock.处理临界资源()
# 释放锁
now = int(time.time())
if now < lock_timeout:
redis_client.delete(lock_key)
继续看,有什么问题?
-
为什么要+1 current Unix time + lock timeout + 1 ? +1是因为在 Redis 2.4 版本中,过期时间的延迟在 1 秒钟之内 —— 也即是,就算 key 已经过期,但它还是可能在过期之后一秒钟之内被访问到,而在新的 Redis 2.6 版本中,延迟被降低到 1 毫秒之内。
-
这段代码问题在哪里?1) 由于是客户端自己生成过期时间,所以需要强制要求分布式下每个客户端的时间必须同步。 2)当锁过期的时候,如果多个客户端同时执行jedis.getSet()方法,那么虽然最终只有一个客户端可以加锁,但是这个客户端的锁的过期时间可能被其他客户端覆盖。3) 锁不具备拥有者标识,即任何客户端都可以解锁。
(1)首先第一个问题是存在的,但是一般对于同一个分布群的项目,时间肯定是同步的,而且你见过哪台机器时间是用本地时间的?一般都是联网同步互联网全球设定的时区时间。 (2)Redis是单线程的,所以不存在你说的这种情况。是不可能同时执行的。即使客户端A,B,C 在同一时刻(精确到纳秒)发送了getset给redis。redis也会按照队列顺序依次执行。因此绝对保证只有一个客户端获得锁并进行业务处理最后释放锁,其他客户端一定会返回一个大于当前时间的结果,从而导致或许锁失败,也就不可能出现其他任何客户端能够解锁。你可能会说,“同时”。但是你说“同时”的时候,我就知道,兄弟你对redis的底层没有了解到这一层面,redis是强制单线程的。除非你改人家源码,就算你改人家源码,如果你改了人家的单线程设计理念,就等于说你不赞同redis作者的理念。然而,redis的单线程,却没有影响它的高性能。
- 实践中我发现的问题.
if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout))
会出现bug,因为当锁被删除之后,如果正好处于redis_client.get(lock_key)检查的时候,这个时候锁被删除了,所以redis_client.get(lock_key)为None,int(None)会出现类型转换错误。 然后这个时候如果为None的话,可以直接赋值为0,这样也不会出现问题,因为本身这个锁就被删除了,和时间超时是一样的,所以提出了优化。
if lock == 1 or (now > int(redis_client.get(lock_key) or 0)) and now > int(redis_client.getset(lock_key, lock_timeout) or 0)
其次为了便于使用,我还优化成了一个类:
class DistributeLock(object):
def __init__(self, lock_key=None, lock_timeout=2):
"""
:param lock_key: 分布式锁的key
:param lock_timeout: 锁的超时时间
"""
self.LOCK_TIMEOUT = lock_timeout
self.lock = 0
self.lock_timeout = 0
self.lock_key = lock_key # # 分布式锁的key
self.redis_client = #初始化redis客户端。
def __enter__(self):
# 获取锁
while self.lock != 1:
now = int(time.time())
self.lock_timeout = now + self.LOCK_TIMEOUT + 1
lock = self.redis_client.setnx(self.lock_key, self.lock_timeout)
if lock == 1 or (now > int(self.redis_client.get(self.lock_key) or 0)) and now > int(
self.redis_client.getset(self.lock_key, self.lock_timeout) or 0):
break
else:
time.sleep(0.001)
def __exit__(self, ex_type, ex_value, traceback):
# 释放锁
now = int(time.time())
if now < self.lock_timeout:
self.redis_client.delete(self.lock_key)
# 使用方式:
with DistributeLock(lock_key="your_distribute_lock",lock_timeout=3):
# do your work.
do_work() # 不用在关心锁的获取或者释放。
其实以上,还有有些问题:就是没有考虑redis挂掉或者主从切换的情况,后续再更新。
关注我,让我们一起成长。
转载自:https://juejin.cn/post/6900575031718313991