榜单模型(五):利用基于Redis的分布式锁实现榜单任务的调度一、需求 如果我们将榜单的程序部署到了多个实例,那么有可能
一、需求
如果我们将榜单的程序部署到了多个实例,那么有可能出现以下情景和问题:
-
同一时刻,多个实例同时执行热榜计算的任务,导致计算资源的浪费。
-
即使同一时刻只有一个实例计算热榜,但控制不住别的实例再去计算热榜,导致热榜的重复计算。
所以我们希望可以实现:一直都只有一个节点计算榜单任务。
二、方案设计与技术选型
2.1 方案设计
采用分布式锁的方案,确保整个分布式环境下,只有一个 goroutine
能够拿到锁。即,节点先抢分布式锁,如果抢到了分布式锁,那么就执行任务,否则就不执行。

如果抢到分布式锁之后,仅仅是执行一次榜单任务就释放锁,则还不能解决问题2。
为了解决问题2,我们可以考虑扩大锁的时间范围,即在启动的时候拿到锁,而后不管计算几次,都不会释放锁(让某个抢到分布式锁的节点专门执行计算榜单的业务)。

2.2 技术选型
本文基于一个开源的 Redis
的分布式锁实现
go get github.com/gotomicro/redis-lock@latest
下一篇文章介绍基于 Mysql
的分布式任务调度的实现(催更doge)。
三、代码实现
3.1 实现同一时刻只有一个节点计算榜单
(1)原 Job
层的 Run()
方法
func (r *RankingJob) Run() error {
// 每一次 job 的过期时间是 30s
ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
defer cancel()
return r.svc.TopN(ctx)
}
(2)为 Job
层面添加分布式锁
package job
import (
"context"
rlock "github.com/gotomicro/redis-lock"
"refactor-webook/webook/internal/service"
"refactor-webook/webook/pkg/logger"
"time"
)
type RankingJob struct {
svc service.RankingService
timeout time.Duration
client *rlock.Client
key string
l logger.LoggerV1
}
func NewRankingJob(svc service.RankingService, timeout time.Duration, l logger.LoggerV1) *RankingJob {
return &RankingJob{svc: svc, timeout: timeout, key: "job:ranking", l: l}
}
func (r *RankingJob) Name() string {
return "RankingJob"
}
func (r *RankingJob) Run() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4) // note 4s怎么计算出来的?即,计算获取锁的超时时间 = 超时次数 * 每一次重试的超时时长 + 若干重试间隔【注意:与job的过期时间无关】
defer cancel()
// note 获得分布式锁
lock, err := r.client.Lock(ctx, r.key,
// 锁内任务执行的过期时间,等于svc.TopN()的过期时间
r.timeout,
&rlock.FixIntervalRetry{
// 重试间隔为100ms
Interval: time.Millisecond * 100,
Max: 3,
// 每一次重试的超时时间为1s
}, time.Second)
if err != nil {
// 不需要处理 err,若没抢到分布式锁lock,就认为其他实例抢到了
return err
}
// note 释放分布式锁
defer func() {
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
defer cancel()
er := lock.Unlock(ctx)
// 不需要处理这个err,因为如果释放锁失败的话,锁也会因为过期而被释放的
if er != nil {
r.l.Warn("ranking job释放分布式锁失败", logger.Error(er))
}
}()
ctx, cancel = context.WithTimeout(context.Background(), r.timeout) // 每一次 job 的过期时间是 30s
defer cancel()
return r.svc.TopN(ctx)
}
3.2 实现扩大锁的时间范围
实现:上面的实现方式是每执行一次榜单计算就获取锁然后再释放(锁是一次性),现在进行优化,实现一次锁能执行多次榜单计算。
(1)实现
func (r *RankingJob) Run() error {
if r.lock == nil {
// note 抢分布式锁
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
defer cancel()
lock, err := r.client.Lock(ctx, r.key,
// 锁内任务执行的过期时间,等于svc.TopN()的过期时间
r.timeout,
&rlock.FixIntervalRetry{
// 重试间隔为100ms
Interval: time.Millisecond * 100,
Max: 3,
// 每一次重试的超时时间为1s
}, time.Second)
if err != nil {
// 不需要处理 err,若没抢到分布式锁lock,就认为其他实例抢到了
return err
}
// 将新获得的锁保存
r.lock = lock
// note 自动续约
go func() {
// 并不是非得一半就续约
er := lock.AutoRefresh(r.timeout/2, r.timeout)
if er != nil {
// 续约失败了
r.lock = nil
}
}()
}
// 锁不为空,原本就有锁
ctx, cancel := context.WithTimeout(context.Background(), r.timeout) // 每一次 job 的过期时间是 30s
defer cancel()
return r.svc.TopN(ctx)
}
(2)访问分布式锁的并发问题
因为是另开了一个 goroutine
来实现自动续约的,所以在访问 分布式锁lock
变量时会存在至少两个协程的单机的并发问题,所以引入 sync.Mutex
单机锁来对 lock
加锁。
func (r *RankingJob) Run() error {
// 为 r.lock 加一个本地的锁
r.localLock.Lock()
lock := r.lock
if lock == nil {
// note 抢分布式锁
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
defer cancel()
lock, err := r.client.Lock(ctx, r.key,
// 锁内任务执行的过期时间,等于svc.TopN()的过期时间
r.timeout,
&rlock.FixIntervalRetry{
// 重试间隔为100ms
Interval: time.Millisecond * 100,
Max: 3,
// 每一次重试的超时时间为1s
}, time.Second)
if err != nil {
r.localLock.Unlock()
// 不需要处理 err,若没抢到分布式锁lock,就认为其他实例抢到了
r.l.Warn("获取分布式锁失败", logger.Error(err))
return err
}
r.localLock.Unlock()
// 将新获得的锁保存
r.lock = lock
// note 自动续约
go func() {
// 并不是非得一半就续约
er := lock.AutoRefresh(r.timeout/2, r.timeout)
if er != nil {
// 续约失败了
r.localLock.Lock()
r.lock = nil
r.localLock.Unlock()
}
}()
}
// 锁不为空,原本就有锁
ctx, cancel := context.WithTimeout(context.Background(), r.timeout) // 每一次 job 的过期时间是 30s
defer cancel()
return r.svc.TopN(ctx)
}
四、补充
- 若是单机条件下保护共享资源免受并发访问的影响,就用
sync.Mutex
和sync.RWMutex
这两种锁。(下图外面的方框表示一个进程)。
所以,分布式锁和单机锁比起来,就是抢锁的从协程 goroutine
变成了 实例
。分布式锁之所以难,基本上都和网络有关(因为不知道没抢到锁的原因是请求没发出去还是没收到请求等)。

转载自:https://juejin.cn/post/7400195628847054884