likes
comments
collection
share

榜单模型(五):利用基于Redis的分布式锁实现榜单任务的调度一、需求 如果我们将榜单的程序部署到了多个实例,那么有可能

作者站长头像
站长
· 阅读数 8

一、需求

如果我们将榜单的程序部署到了多个实例,那么有可能出现以下情景和问题:

  1. 同一时刻,多个实例同时执行热榜计算的任务,导致计算资源的浪费。

    榜单模型(五):利用基于Redis的分布式锁实现榜单任务的调度一、需求 如果我们将榜单的程序部署到了多个实例,那么有可能
  2. 即使同一时刻只有一个实例计算热榜,但控制不住别的实例再去计算热榜,导致热榜的重复计算。

    榜单模型(五):利用基于Redis的分布式锁实现榜单任务的调度一、需求 如果我们将榜单的程序部署到了多个实例,那么有可能

所以我们希望可以实现:一直都只有一个节点计算榜单任务榜单模型(五):利用基于Redis的分布式锁实现榜单任务的调度一、需求 如果我们将榜单的程序部署到了多个实例,那么有可能

二、方案设计与技术选型

2.1 方案设计

采用分布式锁的方案,确保整个分布式环境下,只有一个 goroutine 能够拿到锁。即,节点先抢分布式锁,如果抢到了分布式锁,那么就执行任务,否则就不执行。

榜单模型(五):利用基于Redis的分布式锁实现榜单任务的调度一、需求 如果我们将榜单的程序部署到了多个实例,那么有可能

如果抢到分布式锁之后,仅仅是执行一次榜单任务就释放锁,则还不能解决问题2

为了解决问题2,我们可以考虑扩大锁的时间范围,即在启动的时候拿到锁,而后不管计算几次,都不会释放锁(让某个抢到分布式锁的节点专门执行计算榜单的业务)。

榜单模型(五):利用基于Redis的分布式锁实现榜单任务的调度一、需求 如果我们将榜单的程序部署到了多个实例,那么有可能

2.2 技术选型

本文基于一个开源的 Redis 的分布式锁实现

go get github.com/gotomicro/redis-lock@latest

下一篇文章介绍基于 Mysql 的分布式任务调度的实现(催更doge)。

三、代码实现

3.1 实现同一时刻只有一个节点计算榜单

(1)原 Job 层的 Run() 方法

func (r *RankingJob) Run() error {
    // 每一次 job 的过期时间是 30s
    ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
    defer cancel()
    return r.svc.TopN(ctx)
}

(2)为 Job 层面添加分布式锁

package job

import (
    "context"
    rlock "github.com/gotomicro/redis-lock"
    "refactor-webook/webook/internal/service"
    "refactor-webook/webook/pkg/logger"
    "time"
)

type RankingJob struct {
    svc     service.RankingService
    timeout time.Duration
    client  *rlock.Client
    key     string
    l       logger.LoggerV1
}

func NewRankingJob(svc service.RankingService, timeout time.Duration, l logger.LoggerV1) *RankingJob {
    return &RankingJob{svc: svc, timeout: timeout, key: "job:ranking", l: l}
}

func (r *RankingJob) Name() string {
    return "RankingJob"
}

func (r *RankingJob) Run() error {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*4) // note 4s怎么计算出来的?即,计算获取锁的超时时间 = 超时次数 * 每一次重试的超时时长 + 若干重试间隔【注意:与job的过期时间无关】
    defer cancel()
    // note 获得分布式锁
    lock, err := r.client.Lock(ctx, r.key,
       // 锁内任务执行的过期时间,等于svc.TopN()的过期时间
       r.timeout,
       &rlock.FixIntervalRetry{
          // 重试间隔为100ms
          Interval: time.Millisecond * 100,
          Max:      3,
          // 每一次重试的超时时间为1s
       }, time.Second)
    if err != nil {
       // 不需要处理 err,若没抢到分布式锁lock,就认为其他实例抢到了
       return err
    }
    // note 释放分布式锁
    defer func() {
       ctx, cancel = context.WithTimeout(context.Background(), time.Second)
       defer cancel()
       er := lock.Unlock(ctx)
       // 不需要处理这个err,因为如果释放锁失败的话,锁也会因为过期而被释放的
       if er != nil {
          r.l.Warn("ranking job释放分布式锁失败", logger.Error(er))
       }
    }()

    ctx, cancel = context.WithTimeout(context.Background(), r.timeout) // 每一次 job 的过期时间是 30s
    defer cancel()
    return r.svc.TopN(ctx)
}

3.2 实现扩大锁的时间范围

实现:上面的实现方式是每执行一次榜单计算就获取锁然后再释放(锁是一次性),现在进行优化,实现一次锁能执行多次榜单计算。

(1)实现

func (r *RankingJob) Run() error {
    if r.lock == nil {
       // note 抢分布式锁
       ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
       defer cancel()
       lock, err := r.client.Lock(ctx, r.key,
          // 锁内任务执行的过期时间,等于svc.TopN()的过期时间
          r.timeout,
          &rlock.FixIntervalRetry{
             // 重试间隔为100ms
             Interval: time.Millisecond * 100,
             Max:      3,
             // 每一次重试的超时时间为1s
          }, time.Second)
       if err != nil {
          // 不需要处理 err,若没抢到分布式锁lock,就认为其他实例抢到了
          return err
       }
       // 将新获得的锁保存
       r.lock = lock

       // note 自动续约
       go func() {
          // 并不是非得一半就续约
          er := lock.AutoRefresh(r.timeout/2, r.timeout)
          if er != nil {
             // 续约失败了
             r.lock = nil
          }
       }()
    }
    // 锁不为空,原本就有锁
    ctx, cancel := context.WithTimeout(context.Background(), r.timeout) // 每一次 job 的过期时间是 30s
    defer cancel()
    return r.svc.TopN(ctx)
}

(2)访问分布式锁的并发问题

因为是另开了一个 goroutine 来实现自动续约的,所以在访问 分布式锁lock 变量时会存在至少两个协程的单机的并发问题,所以引入 sync.Mutex 单机锁来对 lock 加锁。

func (r *RankingJob) Run() error {
    // 为 r.lock 加一个本地的锁
    r.localLock.Lock()
    lock := r.lock

    if lock == nil {
       // note 抢分布式锁
       ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
       defer cancel()
       lock, err := r.client.Lock(ctx, r.key,
          // 锁内任务执行的过期时间,等于svc.TopN()的过期时间
          r.timeout,
          &rlock.FixIntervalRetry{
             // 重试间隔为100ms
             Interval: time.Millisecond * 100,
             Max:      3,
             // 每一次重试的超时时间为1s
          }, time.Second)
       if err != nil {
          r.localLock.Unlock()
          // 不需要处理 err,若没抢到分布式锁lock,就认为其他实例抢到了
          r.l.Warn("获取分布式锁失败", logger.Error(err))
          return err
       }
       r.localLock.Unlock()
       // 将新获得的锁保存
       r.lock = lock

       // note 自动续约
       go func() {
          // 并不是非得一半就续约
          er := lock.AutoRefresh(r.timeout/2, r.timeout)
          if er != nil {
             // 续约失败了
             r.localLock.Lock()
             r.lock = nil
             r.localLock.Unlock()
          }
       }()
    }
    // 锁不为空,原本就有锁
    ctx, cancel := context.WithTimeout(context.Background(), r.timeout) // 每一次 job 的过期时间是 30s
    defer cancel()
    return r.svc.TopN(ctx)
}

四、补充

  1. 若是单机条件下保护共享资源免受并发访问的影响,就用 sync.Mutexsync.RWMutex 这两种锁。(下图外面的方框表示一个进程)。 榜单模型(五):利用基于Redis的分布式锁实现榜单任务的调度一、需求 如果我们将榜单的程序部署到了多个实例,那么有可能

所以,分布式锁和单机锁比起来,就是抢锁的从协程 goroutine 变成了 实例。分布式锁之所以难,基本上都和网络有关(因为不知道没抢到锁的原因是请求没发出去还是没收到请求等)。

榜单模型(五):利用基于Redis的分布式锁实现榜单任务的调度一、需求 如果我们将榜单的程序部署到了多个实例,那么有可能
转载自:https://juejin.cn/post/7400195628847054884
评论
请登录