likes
comments
collection
share

Golang 中的 Redis 性能和原子性:pipelines、事务和 Lua 脚本

作者站长头像
站长
· 阅读数 101

通过Upstash可以免费创建一个Redis/Kafka集群,每天可以在免费的套餐上使用最多10k个请求。

Redis 是一项深受喜爱的技术,已在大多数产品中使用。虽然开始使用 Redis 并将其集成到代码库中非常简单,但它提供了一些您以前可能没有见过的有用功能。 在本文中,我们讨论 Redis 中的管道、事务和 Lua 脚本,以及如何使用它们来提高性能和可靠性。

使用Redis来缓存文章

​ 想象一个场景,我们有一个受欢迎的博客,需要缓存帖子以每秒处理许多请求。假设我们有一个job,从数据库中检索所有帖子,并在 Redis 丢失内存或由于任何其他原因为空时加载到redis中。

Golang 中的 Redis 性能和原子性:pipelines、事务和 Lua 脚本

​ 它尽可能简单,但我们可以采用不同的方法来解决这个问题。最简单的方法是用帖子一张一张地填充缓存,每个帖子有一个请求。

首先,让我们创建一个简单的函数来模拟数据库的行为:

type Post struct {
   Slug    string // the unique identifier
   Content string
}

func GetLastNPostsFromDatabase(n int) []Post {
   var posts []Post
   for i := 0; i < n; i++ {
      posts = append(posts, Post{
         Slug:    fmt.Sprintf("post-%d-slug", i),
         Content: fmt.Sprintf("Some random content for the post #%d", i),
      })
   }
   return posts
}

然后,我们可以将这些帖子一一保存:

func FillCacheWithPostsOneByOne(ctx context.Context, rdb *redis.Client, posts []Post) error {
   for _, post := range posts {
      // save each post one by one
      if err := rdb.Set(ctx, fmt.Sprintf("post:%s", post.Slug), post.Content, 0).Err(); err != nil {
         return err
      }
   }
   return nil
}

​ 如您所见,FillCacheWithPostsOneByOne 函数需要 Redis 客户端。要连接到 Redis,您可以使用 Docker 或独立安装或 Upstash 的免费 Redis 集群在您的计算机中安装 Redis。

​ 如果您想使用本地 Redis 实例,可以使用以下代码进行连接:

rdb := redis.NewClient(&redis.Options{})

或者,如果您使用 Upstash,您可以轻松地从仪表板复制连接代码。

Golang 中的 Redis 性能和原子性:pipelines、事务和 Lua 脚本

func main() {
   rdb := ...

   startTime := time.Now()
   posts := GetLastNPostsFromDatabase(100)
   if err := FillCacheWithPostsOneByOne(context.Background(), rdb, posts); err != nil {
      log.Printf("error while filling the cache: %v\n", err)
   }
   fmt.Println("took ", time.Since(startTime))
}

​ 如果运行代码,根据您的计算机距离 Redis 服务器的距离,大约需要 100 毫秒,这并不是一个大数字。然而,在本例中,我们假设数据库中有 100 个帖子。如果我们有一百万怎么办?如果每个请求大约需要 1 毫秒,则大约需要 16 分钟。

是什么让它变慢

  • Redis实例和客户端之间的延迟
  • Redis必须为每个请求进行系统调用
  • 客户端的I/O开销

以下是这些问题的一些解决方案:

Pipelines 一次性发送所有请求

​ Redis 允许通过一个请求一次性发送所有请求。这种方法称为管道。简而言之,您可以创建一个请求列表,获取和/或设置请求,并将它们一起发送到 Redis 实例,Redis 执行完所有请求后,将结果发送回客户端。

Golang 中的 Redis 性能和原子性:pipelines、事务和 Lua 脚本

正如您所看到的,Redis 立即接收所有命令,执行它们,然后一起返回所有结果。它非常适合这个用例!

func FillCacheWithPostsInBatches(ctx context.Context, rdb *redis.Client, posts []Post) error {
   _, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
      for _, post := range posts {
         // save each post one by one
         if err := pipe.Set(ctx, fmt.Sprintf("post:%s", post.Slug), post.Content, 0).Err(); err != nil {
            return err
         }
      }
      return nil
   })
   return err
}

​ 使用 go-redis,您可以使用 Pipelined 方法并在函数中编写代码,也可以使用 Pipeline 方法,该方法返回一个管道实例,可用于向管道添加新命令,然后执行。

​ 结果令人着迷!对于 100 个帖子,需要 7 毫秒。对于 1k 个帖子,需要 7ms,对于 10k 个帖子,只需要 24ms。让我们将其与最后一种方法进行比较:

Golang 中的 Redis 性能和原子性:pipelines、事务和 Lua 脚本

​ 对于 100 万个帖子,只需大约 2 秒即可完成,而不是 16 分钟。这是一个巨大的进步。如果由于任何原因,缓存变空,它几乎可以立即被填满。

在这个特定场景中,我可以使用 MSet 来代替。 Redis 主要支持使用 M 命令(例如 MSet、MGet 和 HMset)运行具有多个值的同一命令。但是,您无法在一个请求中运行不同的命令,而是可以使用管道。

Pipeline 存在的限制

  • 如果您需要某个键的值,您会在执行所有命令后收到它。您无法实现需要某些键的值来修改其他键的值的逻辑。我们将在本文的后面部分讨论这个问题。
  • 管道不是事务。其他Redis客户端可以发出请求并在管道的命令之间运行。

Golang 中的 Redis 性能和原子性:pipelines、事务和 Lua 脚本

Transation 执行过程中不能执行其他命令

​ Redis 是一个单核应用程序。在大多数情况下,您并不关心原子性,因为 Redis 本质上是原子性的,并且提供了一些很棒的命令,例如 INCRBY,因此您不需要获取、更改和设置可能导致不一致的值。

​ 事务允许一步执行一组命令。 Redis 按顺序执行这组命令,并且不会执行其间其他请求中的其他命令。

​ 事务的一大好处是 WATCH 命令。一般来说,如果 WATCHed key 在事务期间没有更改,您可以要求 Redis 应用更改。

​ 让我们讨论一个新场景。想象一下我们需要限制页面的查看次数。例如,我们希望一页仅被查看十次。想想像赠品这样的事情,只有前十个人才能看到代码。我们如何通过 Redis 和事务来实现这一点?

使用Watch进行乐观锁

监视“WATCH”键以检测对它们的更改。如果在“EXEC”命令之前至少修改了一个监视的键,则整个事务将中止,并且“EXEC”返回一个 Null 回复以通知事务失败。

一般来说,这意味着我们可以有一个监视键的列表,在获取它们并运行逻辑后,如果它们没有改变,我们可以用事务保存逻辑结果。否则,交易将会失败。

func MakeNewPage(ctx context.Context, rdb *redis.Client, slug string, viewLimit int) error {
   if err := rdb.Set(ctx, fmt.Sprintf("page:%s:views", slug), 0, 0).Err(); err != nil {
      return fmt.Errorf("error while saving page default view: %v", err)
   }
   if err := rdb.Set(ctx, fmt.Sprintf("page:%s:viewLimit", slug), viewLimit, 0).Err(); err != nil {
      fmt.Errorf("error while setting page view limit: %v", err)
   }
   return nil
}


func CheckIfCanVisitPageWithoutTransaction(ctx context.Context, rdb *redis.Client, slug string) (bool, error) {
   limit, err := rdb.Get(ctx, fmt.Sprintf("page:%s:viewLimit", slug)).Int()
   if err != nil {
      return false, fmt.Errorf("error while getting page view limit: %v", err)
   }

   currentViews, err := rdb.Get(ctx, fmt.Sprintf("page:%s:views", slug)).Int()
   if err != nil {
      return false, fmt.Errorf("error while getting page's current views: %v", err)
   }

   // the page has reached its view limit
   if currentViews >= limit {
      return false, nil
   }

   // adding the new view
   if err := rdb.Set(ctx, fmt.Sprintf("page:%s:views", slug), currentViews+1, 0).Err(); err != nil {
      // if an error happens the view has not been added, and we don't show the user the page
      return false, fmt.Errorf("error while saving page default view: %v", err)
   }
   return true, nil
}

​ 然后,通过这个函数,我们可以检查用户是否可以看到该页面。它接收查看次数限制和页面的当前查看次数,如果用户可以看到该页面,它将查看次数增加一。

但是这种实现是并发不安全的。

Golang 中的 Redis 性能和原子性:pipelines、事务和 Lua 脚本

​ 想象一下两个用户同时访问该页面。两个用户最终都会访问该页面,最终的访问次数将为 10,而不是 11。

​ 我们怎样才能预防这个问题呢?实际上我们有两种方法来解决这个问题。

1、我们可以在一个lua脚本中运行整个逻辑。Redis中的Lua脚本以原子方式运行。因此第二个用户等到第一个用户完成计算后,访问次数将为11,第二个用户将看不到优惠码。

2、我们可以同时运行两者,但是在执行过程中,如果访问键值的值发生更改,Redis不会设置新的值并终止事务。

使用Watch修复查看次数限制

​ 所以,首先我们需要定义Redis需要watch的key。查看计数器键是导致问题的关键,因此,我们将观察这个键。然后,在接收到所有密钥并完成计算后,我们使用事务来保存最终值。

func CheckIfCanVisitPageWithoutTransaction(ctx context.Context, rdb *redis.Client, slug string) (bool, error) {
   viewLimitKey := fmt.Sprintf("page:%s:viewLimit", slug)
   viewsKey := fmt.Sprintf("page:%s:views", slug)

   canView := false
   return canView, rdb.Watch(ctx, func(tx *redis.Tx) error {
      // using tx instead of rdb ensures if those values has changed, the transaction will fail
      limit, err := tx.Get(ctx, viewLimitKey).Int()
      if err != nil {
         return fmt.Errorf("error while getting page view limit: %v", err)
      }

      currentViews, err := tx.Get(ctx, viewsKey).Int()
      if err != nil {
         return fmt.Errorf("error while getting page's current views: %v", err)
      }

      // the page has reached its view limit
      if currentViews >= limit {
         return nil
      }

      _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
         // adding the new view
         if err := pipe.Set(ctx, viewsKey, currentViews+1, 0).Err(); err != nil {
            // if an error happens the view has not been added, and we don't show the user the page
            return fmt.Errorf("error while saving page default view: %v", err)
         }
         return nil
      })

      if err != nil {
         return fmt.Errorf("error while executing the pipeline: %v", err)
      }
      canView = true
      return nil

   }, viewsKey)

}

​ 正如您所看到的,我将viewsKey传递给了watch方法,并且在回调中,我使用了TxPipelined,它将命令包装在Redis中的MULTI/EXEC块中。因此,总而言之,在从 Redis 获取值之前,我指定了视图键,然后在获取键并完成一些工作之后,我使用事务将新值保存在 Redis 中,如果事务位于 WATCH 命令内,则它如果该值发生更改,将会失败。

现在,让我们模拟一个 WATCHed key 被更改的场景。

package main

import (
   "context"
   "fmt"
   "github.com/redis/go-redis/v9"
   "sync"
   "time"
)

func MakeNewPage(ctx context.Context, rdb *redis.Client, slug string, viewLimit int) error {
   if err := rdb.Set(ctx, fmt.Sprintf("page:%s:views", slug), 0, 0).Err(); err != nil {
      return fmt.Errorf("error while saving page default view: %v", err)
   }
   if err := rdb.Set(ctx, fmt.Sprintf("page:%s:viewLimit", slug), viewLimit, 0).Err(); err != nil {
      fmt.Errorf("error while setting page view limit: %v", err)
   }
   return nil
}

func CheckIfCanVisitPageWithoutTransaction(ctx context.Context, rdb *redis.Client, slug string) (bool, error) {
   viewLimitKey := fmt.Sprintf("page:%s:viewLimit", slug)
   viewsKey := fmt.Sprintf("page:%s:views", slug)

   canView := false
   return canView, rdb.Watch(ctx, func(tx *redis.Tx) error {
      // using tx instead of rdb ensures if those values has changed, the transaction will fail
      limit, err := tx.Get(ctx, viewLimitKey).Int()
      if err != nil {
         return fmt.Errorf("error while getting page view limit: %v", err)
      }

      currentViews, err := tx.Get(ctx, viewsKey).Int()
      if err != nil {
         return fmt.Errorf("error while getting page's current views: %v", err)
      }

      <-time.After(time.Second) // added manual delay

      // the page has reached its view limit
      if currentViews >= limit {
         return nil
      }

      _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
         // adding the new view
         if err := pipe.Set(ctx, viewsKey, currentViews+1, 0).Err(); err != nil {
            // if an error happens the view has not been added, and we don't show the user the page
            return fmt.Errorf("error while saving page default view: %v", err)
         }
         return nil
      })

      if err != nil {
         return fmt.Errorf("error while executing the pipeline: %v", err)
      }
      canView = true
      return nil

   }, viewsKey)

}

func main() {
   rdb := redis.NewClient(&redis.Options{})

   if err := MakeNewPage(context.Background(), rdb, "test-1", 10); err != nil {
      panic(err)
   }

   var wg sync.WaitGroup
   wg.Add(2)
   go func() {
      can, err := CheckIfCanVisitPageWithoutTransaction(context.Background(), rdb, "test-1")
      if err != nil {
         panic(err)
      }
      fmt.Println("Can #1", can)
      wg.Done()
   }()
   go func() {
      <-time.After(time.Millisecond * 500) // to ensure the first one will change the second one's value
      can, err := CheckIfCanVisitPageWithoutTransaction(context.Background(), rdb, "test-1")
      if err != nil {
         panic(err)
      }
      fmt.Println("Can #2", can)
      wg.Done()
   }()
   wg.Wait()

}

返回结果:

Can #1 true
panic: error while executing the pipeline: redis: transaction failed

可以看到,交易失败成功!然而,该页面只有一个查看次数,用户必须能够看到该页面。我们不能返回500。该代码需要重试。

func TransactionWithRetry(callback func() error, maxRetries int) error {
   retries := 0
   for {
      err := callback()
      // the transaction executed successfully
      if err == nil {
         return nil
      }

      if errors.Is(err, redis.TxFailedErr) {
         retries++
         fmt.Println("> retry happened.")
         if retries > maxRetries {
            return ErrMaxRetriesReached
         }
         continue
      }

      // something unexpected happened
      return err

   }
}

​ 要了解发生的情况,您可以使用 MONITOR 命令来记录每个命令。这就是正在发生的事情:

1696485643.305621 [0 172.17.0.1:46678] "watch" "page:test-1:views"
1696485643.306484 [0 172.17.0.1:46678] "get" "page:test-1:viewLimit"
1696485643.307711 [0 172.17.0.1:46678] "get" "page:test-1:views"
1696485644.311047 [0 172.17.0.1:46678] "multi"
1696485644.311065 [0 172.17.0.1:46678] "set" "page:test-1:views" "1"
1696485644.311069 [0 172.17.0.1:46678] "exec"
1696485644.312144 [0 172.17.0.1:46678] "unwatch"

​ 了解正在发生的事情很重要。正如您所看到的,代码分为两个范围:WATCH 和 MULTI/EXEC。

Golang 中的 Redis 性能和原子性:pipelines、事务和 Lua 脚本

​ 正如我之前提到的,只有 MULTI/EXEC 之间的命令组才会阻止 Redis 执行其他命令。因此,代码的其他部分可以与其他 Redis 命令同时执行。这意味着Redis可以具有更好的整体性能。但是,正如下一部分所讨论的,如果您使用 Lua 脚本,则在 Lua 脚本完全执行之前,Redis 不会响应其他请求。

完全原子性的Lua脚本

​ 现在我们了解了 Redis 中事务的工作原理,我们可以讨论 Lua 脚本并将它们与 MULTI/EXEC 进行比较。一般来说,我们将Lua脚本发送到Redis服务器一次,然后我们可以使用不同的参数调用该脚本来执行。除了Lua脚本的完全原子性之外,它还可以减少由于在Redis实例内部执行而导致的复杂逻辑的延迟。然而,对于简单的使用,它比使用内置命令慢得多。因此,始终尝试使用内置命令或 Redis 事务寻找替代解决方案,然后如果有意义,请使用 Lua 脚本。

​ Lua 脚本将在执行其他命令时阻止 Redis 执行它们!像 PostgreSQL 这样的数据库通过其多版本并发控制(MVCC)系统提供对事务并发执行的支持。每笔交易都独立于其数据运行。然而,Redis是单核的,可以同时运行一个命令。因此,从本质上讲,Lua 脚本会使用该核心,直到执行完成。

让我们将视图限制逻辑转换为Lua脚本(script.lua):

local viewLimit = redis.call("GET", "page:" .. KEYS[1] .. ":viewLimit")
local currentViews = redis.call("GET", "page:" .. KEYS[1] .. ":views")
-- convert them to a number. redis.call("GET") returns a string
viewLimit = tonumber(viewLimit)
currentViews = tonumber(currentViews)
-- the viewing limit has been reached
if currentViews >= viewLimit then
    return "no"
end
-- the user can view the page, let's add a new view to the views key
redis.log(redis.LOG_WARNING, currentViews)
redis.call("SET", "page:" .. KEYS[1] .. ":views", currentViews + 1)
return "yes"

​ 在您的脚本中,您可以使用“redis”API 访问 Redis。请注意“redis.call”返回一个字符串。您可以使用“tonumber”函数将其转换为数字。

func main() {
   rdb := redis.NewClient(&redis.Options{})

   if err := MakeNewPage(context.Background(), rdb, "test-1", 10); err != nil {
      panic(err)
   }

   scriptContent, _ := os.ReadFile("./script.lua")
   script := redis.NewScript(string(scriptContent))

   var wg sync.WaitGroup
   for i := 0; i < 20; i++ {
      wg.Add(1)
      go func(i int) {
         can, err := script.Run(context.Background(), rdb, []string{"test-1"}).Result()
         if err != nil {
            panic(err)
         }
         fmt.Printf("can #%d = %v\n", i, can)
         wg.Done()
      }(i)
   }
   wg.Wait()

}

​ 我通过调用“NewScript”函数创建了一个新脚本。 Redis 的好处是您不需要每次执行脚本时都上传脚本。 go-redis 包将脚本转换为 sha 哈希值,第一次上传脚本后,它使用该哈希值执行后续命令。

两种方式对比

  • Lua script 花了100ms
  • Transaction with retry 花了4s

​ 对于Lua脚本,我们用一条命令运行脚本,Redis实例内的脚本可以访问数据,而无需任何往返时间。另一方面,事务必须向 Redis 服务器发出七个不同的请求,这增加了很多延迟:

1696488282.976886 [0 172.17.0.1:49286] "watch" "page:test-1:views"
1696488282.977527 [0 172.17.0.1:49286] "get" "page:test-1:viewLimit"
1696488282.978079 [0 172.17.0.1:49286] "get" "page:test-1:views"
1696488283.989333 [0 172.17.0.1:49286] "multi"
1696488283.989357 [0 172.17.0.1:49286] "set" "page:test-1:views" "2"
1696488283.989360 [0 172.17.0.1:49286] "exec"
1696488283.990405 [0 172.17.0.1:49286] "unwatch"

​ 这就是go-redis 限流器为什么使用Lua脚本来实现的主要原因,您可以在这里读到更多内容。

结论

总之,Redis 提供了一系列可以极大增强应用程序的性能和可靠性的功能。了解何时以及如何使用管道、事务和 Lua 脚本可以使我们在项目中充分发挥 Redis 的潜力。

转载自:https://juejin.cn/post/7337928395610390562
评论
请登录