likes
comments
collection
share

封装优雅的缓存组件库(Redis 缓存与内存缓存)

作者站长头像
站长
· 阅读数 25

为什么需要缓存

如果接口qps不高、接口内部逻辑没有密集IO/高频热点数据,另外就是数据变动非常频繁,我理解并不需要缓存。需要缓存一般是下面几点。

  1. 提高应用程序的性能:通过存储常用数据在内存中(内存缓存/分布式缓存),避免了每次请求时都从慢速存储设备(如磁盘)中读取数据,提高了应用程序的响应时间。

  2. 减少数据库负载:频繁访问数据库会给数据库带来压力,使用缓存可以减少对数据库的查询次数,从而减轻数据库的负担。

  3. 提高用户体验:快速响应和减少用户的等待时间。

需求背景

按照我平时的使用习惯需求比较简单,如下

  1. 支持多类型缓存,比如:分布式缓存(redis、memcached)、内存等缓存等,设计方便后续扩展;

  2. 支持多种缓存组合使用,比如内存缓存和分布式缓存组合,内存缓存命中直接返回数据,否则查询分布式缓存,直到找到对应数据,然后再回写缓存;

  3. 使用方数据结构不同,所以,需要支持多种数据结构,需要使用范型;

  4. 若缓存 Miss,从数据库/下游回查数据,需要支持自动查询数据库/下游接口,并且回写缓存。(为什么会有这个需求?看下面伪代码,这种方式对业务方来说太繁琐了并且研发同学如果不留意可能会忘记回写缓存);

先从缓存中查询
if 缓存命中{
    return
}

if 缓存miss{
    查询数据库/下游
    if 成功{
        写回缓存
        return
    }
}

5.  分布式组件支持监控、自动重连机制;

6.  支持日志、metrics等能力,这期先不支持。

类图

简单画了一个类图,只画了核心几个类,我用了适配器模式和责任链模式,后面详细讲解。 封装优雅的缓存组件库(Redis 缓存与内存缓存)

关键描述

  1. Adaptor 定义适配器接口,被子类实现,常见子类:redis适配器子类、内存适配器子类等,主是对缓存的适配,接口很简单只有三个,理解成增、删、改就可以了,后面可以扩展更多适配器。

  2. 缓存接口 ICache,定义了几个外部方法(方法定义很克制,没有必要定义太多)Get、Set、Del、GetAndSet(这个比较重要,一般业务开发中用的多),留意,有一个实现类(MultiCache[T any])含义是多级缓存,是对分布式缓存和内存缓存的编排,用了责任链模式。

  3. 其它类都是被引用的类,不太重要,这里不赘述。

核心代码

完整代码参考:GitHub - PycMono/go-cache: 基于三方库的二次实现

缓存适配器

适配器接口(Idaptor)
import (
	"context"
	"time"
)

// IAdaptor 接口转换器
type IAdaptor interface {
	Set(ctx context.Context, params map[string][]byte, expire time.Duration) error
	Get(ctx context.Context, k []string) (map[string][]byte, error)
	Del(ctx context.Context, k []string) error
}

接口定义非常克制,就3个方法,不用定义太多,完全够用了。

分布式缓存适配器实现类

以 redis 为案例,我主要分 redis Client、IAdaptor 接口实现 2 部分来讲。

 Client

意如名称,redis 连接客户端,主要负责连接redis服务器、监控、重连机制。

// Config 配置文件
type Config struct {
	name         string        // app name
	addr         string        // redis addr,例如 127.0.0.1:6379
	password     string        // redis password
	db           int           // redis db
	poolSize     int           // redis pool size
	poolTimeout  time.Duration // redis 超时(单位秒,默认为0)
	readTimeout  time.Duration // redis 超时(单位秒,默认为0)
	writeTimeout time.Duration // redis 超时(单位秒,默认为0)
}
// ... 省略一些代码

func (c Config) build() (*redis.Options, error) {
	if len(c.name) == 0 {
		return nil, fmt.Errorf("name为空")
	}
	if len(c.addr) == 0 {
		return nil, fmt.Errorf("addr为空")
	}
	if len(c.password) == 0 {
		return nil, fmt.Errorf("password为空")
	}
	if c.db == 0 {
		return nil, fmt.Errorf("db为空")
	}
	if c.poolSize == 0 {
		c.poolSize = 30
	}

	return &redis.Options{
		Addr:         c.addr,
		ClientName:   c.name,
		Password:     c.password,
		DB:           c.db,
		WriteTimeout: c.writeTimeout,
		PoolSize:     c.poolSize,
		PoolTimeout:  c.poolTimeout,
		ReadTimeout:  c.readTimeout,
	}, nil
}

type Client struct {
	redisClient redis.UniversalClient
	conf        *Config
	ctx         context.Context
	sm          sync.RWMutex
}

func NewRedisClient(conf *Config) (*Client, error) {
	ctx := context.Background()
	redisClient, err := connect(ctx, conf)
	if err != nil {
		return nil, err
	}

	c := &Client{
		conf:        conf,
		ctx:         ctx,
		redisClient: redisClient, // 首次初始化不用加锁
	}
	go c.monitoring() // 监控

	return c, nil
}

func (c *Client) GetRedisClient() redis.UniversalClient {
	c.sm.RLock()
	defer c.sm.RUnlock()

	return c.redisClient
}

// monitoring 重连监控,无限循环,检查redis客服端是否断开连接,如果断开重新连接
func (c *Client) monitoring() {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println(err)
			return
		}
	}()

	for {
		// 先休眠30秒
		time.Sleep(30 * time.Second)
		if c.ping() {
			continue
		}

		fmt.Println("redis 异常断开,正在尝试重连~~~~~")
		c.reConnect()
	}
}

func (c *Client) ping() bool {
	_, err := c.redisClient.Ping(c.ctx).Result()
	return err == nil
}

// reConnect 重连
func (c *Client) reConnect() {
	redisClient, err := connect(c.ctx, c.conf)
	if err != nil {
		fmt.Println("redis 重连失败...")
	}

	// 尝试关闭历史连接
	c.redisClient.Close()

	c.sm.Lock()
	defer c.sm.Unlock()
	c.redisClient = redisClient
}

// connect 建立redis连接
func connect(ctx context.Context, conf *Config) (*redis.Client, error) {
	opt, err := conf.build()
	if err != nil {
		return nil, err
	}

	redisClient := redis.NewClient(opt)
	_, err = redisClient.Ping(ctx).Result()
	if err != nil {
		return nil, err
	}

	// 设置 redisClient name(app name),方便定位问题
	if err = redisClient.Process(ctx, redis.NewStringCmd(ctx, "client", "setname", fmt.Sprintf("%s", conf.name))); err != nil {
		return nil, err
	}

	return redisClient, nil
}

redis 监控,我简单写了一个 monitoring 方法,死循环每隔 30s ping 一次 redis 服务器,若失败,则重连。

Redis 适配器

以 Redis 为案例,实现 IAdaptor 接口

type Cache struct {
	client *Client
}

func NewRedisAdaptor(client *Client) client.IAdaptor {
	return &Cache{client: client}
}

func (r *Cache) Set(ctx context.Context, params map[string][]byte, expire time.Duration) error {
	m := make(map[string]interface{})
	for k, v := range params {
		m[k] = string(v)
	}

	_, err := r.client.GetRedisClient().Pipelined(ctx, func(pipe redis.Pipeliner) error {
		for key, value := range m {
			err := pipe.Set(ctx, key, value, expire).Err()
			if err != nil {
				return err
			}
		}
		return nil
	})
	return err
}

func (r *Cache) Del(ctx context.Context, k []string) error {
	out := r.client.GetRedisClient().Del(ctx, k...)
	return out.Err()
}

func (r *Cache) Get(ctx context.Context, k []string) (map[string][]byte, error) {
	pipe := r.client.GetRedisClient().Pipeline()
	for _, key := range k {
		_, _ = pipe.Get(ctx, key).Result()
	}
	cmds, err := pipe.Exec(ctx)
	if err != nil && !errors.Is(err, redis.Nil) {
		return nil, err
	}

	out := make(map[string][]byte)
	for _, cmd := range cmds {
		args := cmd.Args()
		if v, ok := args[1].(string); ok {
			err = cmd.Err()
			if errors.Is(err, redis.Nil) {
				continue
			}
			if err != nil {
				return nil, err
			}

			if cmd, ok := cmd.(*redis.StringCmd); ok {
				str := cmd.Val()
				out[v] = []byte(str)
			}
		}
	}
	return out, nil
}

Redis 适配器实现 3 个接口,Del 方法比较简单,不多赘述。Get 和 Set 方法均采用管道的方式,对管道不熟悉的看下 Redis 文档命令篇就会了。

缓存编排管理器

缓存接口(ICache),我定义为缓存编排器接口,主要是负责对缓存适配器的编排。接口非常简单,仅有下面几个方法,需要注意的是 GetAndSet、GetAndSetSingle 2 个方法,主要是提供获取缓存并且设置缓存能力。

type ICache[T any] interface {
	Set(ctx context.Context, params map[string]T) error
	Get(ctx context.Context, keys []string) (map[string]T, error)
	GetAndSet(ctx context.Context, keys []string, f func(keys []string) (map[string]T, error)) (map[string]T, error)
	GetAndSetSingle(ctx context.Context, k string, f func(k string) (T, boolerror)) (T, boolerror)
	Del(ctx context.Context, keys []stringerror
}
缓存编排器实现类

我以多级缓存为案例(更复杂一些),编排 Redis 缓存和内存缓存,流程如下

  1. 优先查内存缓存,内存缓命中则返回业务方;

  2. 内存缓存 miss 再查 Redis 缓存,Redis 缓存命中则返回,并且回写内存缓存;

  3. Redis 缓存 miss 再查业务下游/数据库;

  4. 数据库/业务下游查回数据,回写 Redis 缓存和内存缓存,ps:数据库/业务下游未成功查询数据根据实际情况是否写入缓存标志。

// MultiCacheOptions 可选参数
type MultiCacheOptions struct {
	Base
	EnableLog bool          // 是否输出日志
	WriteNil  bool          // 缓存miss是否写入nil防止缓存穿透,默认不写入
	Expire    time.Duration // 过期时间
}

// MultiCache 多级缓存
type MultiCache[T any] struct {
	// 多级缓存适配器
	// 后续处理逻辑是根据数组的顺序遍历,建议把离用户最近的缓存设置到下标0的位置,依次排列。需注意,一定要保证离用户最近的缓存有数据
	// 假设缓存顺序,内存缓存、redis缓存、缓存 miss 透传数据库
	// 若内存缓存未miss,直接返回
	// 若内存缓存miss,从redis中查询,redis 缓存miss 再从数据库中查询,一定要回写内存缓存
	handlers []client.IAdaptor
	opts     *MultiCacheOptions // 基础配置
	sf       singleflight.Group
	// 还可以增加一些中间件比如日志输出、埋点之类的
}

func NewMultiCache[T any](opts *MultiCacheOptions, handlers ...client.IAdaptor) ICache[T] {
	return &MultiCache[T]{
		handlers: handlers,
		opts:     opts,
	}
}

func (c *MultiCache[T]) Set(ctx context.Context, params map[string]T) error {
	kv := make(map[string][]byte)
	for k, v := range params {
		key := c.opts.buildKey(k)
		b, err := sonic.Marshal(v)
		if err != nil {
			return err
		}
		kv[key] = b
	}

	for _, v := range c.handlers {
		err := v.Set(ctx, kv, c.opts.Expire)
		if err != nil {
			return err
		}
	}

	return nil
}

func (c *MultiCache[T]) Get(ctx context.Context, keys []string) (map[string]T, error) {
	var (
		tmpKeys = c.opts.buildKeys(keys)
	)

	// 多级缓存查询
	// 思路:第一个client先查找,若miss,将miss的key集合投递下一个client查找,直到所有client查找完成,或者keys全部找到
	var (
		kvMap     = make(map[string][]byte)
		missKeys  = tmpKeys
		preClient client.IAdaptor
	)
	for _, cli := range c.handlers {
		if len(missKeys) == 0 {
			break // 退出循环
		}

		tmpKvMap, err := cli.Get(ctx, missKeys)
		if err != nil {
			return nil, err
		}
		for k, v := range tmpKvMap {
			kvMap[k] = v
		}

		missKeys = []string{} // 重新设置值
		for _, key := range tmpKeys {
			if _, ok := tmpKvMap[key]; !ok {
				missKeys = append(missKeys, key)
			}
		}

		if len(tmpKvMap) == 0 || preClient == nil {
			preClient = cli
			continue
		}

		err = preClient.Set(ctx, tmpKvMap, c.opts.Expire) // 如果1级缓存miss了,2级缓存加载后,回写1级缓存
		if err != nil {
			fmt.Println(err)
		}
		preClient = cli
	}

	// 处理数据返回
	out := make(map[string]T)
	for k, v := range kvMap {
		var obj T
		err := sonic.Unmarshal(v, &obj)
		if err != nil {
			return nil, err
		}

		key := c.opts.splitKey(k) // 切割key
		out[key] = obj
	}

	return out, nil
}

// GetAndSet 缓存 miss,支持调用f函数从其它db中获取数据
func (c *MultiCache[T]) GetAndSet(ctx context.Context, k []string, f func(k []string) (map[string]T, error)) (map[string]T, error) {
	kvMap, err := c.Get(ctx, k)
	if err != nil {
		return nil, err
	}

	var (
		missKeys = []string{}
	)
	for _, v := range k {
		if _, ok := kvMap[v]; ok {
			continue
		}
		missKeys = append(missKeys, v)
	}
	if len(missKeys) == 0 {
		return kvMap, nil
	}

	tmpKvMap, err := f(missKeys)
	if err != nil {
		return nil, err
	}
	for k, v := range tmpKvMap {
		kvMap[k] = v
	}

	// 检查外部数据源数据查询是否一致
	if c.opts.WriteNil && len(missKeys) != len(tmpKvMap) {
		for _, v := range missKeys {
			if _, ok := tmpKvMap[v]; ok {
				continue
			}
			var obj T
			tmpKvMap[v] = obj
		}
	}
	if len(tmpKvMap) > 0 {
		err = c.Set(ctx, tmpKvMap)
		if err != nil {
			// todo 打印日志就好了,不影响后续流程,下次请求再次尝试加载到缓存
			fmt.Println(err)
		}
	}

	return kvMap, nil
}

func (c *MultiCache[T]) GetAndSetSingle(ctx context.Context, k string, f func(k string) (T, bool, error)) (T, bool, error) {
	var (
		val T
		ok  bool
	)

	kvMap, err := c.Get(ctx, []string{k})
	if err != nil {
		return val, false, err
	}
	val, ok = kvMap[k]
	if ok {
		return val, true, nil
	}

	// 缓存miss 从外部查询
	if f == nil {
		return val, false, nil
	}

	// 单飞查询
	_, err, _ = c.sf.Do(k, func() (interface{}, error) {
		val, ok, err = f(k)
		if err != nil {
			return nil, err
		}
		return val, nil
	})
	if err != nil {
		return val, false, err
	}

	// 写入缓存
	var (
		tmpKvMap = make(map[string]T)
	)

	// 写入缓存条件:1、数据存在;2、数据不存在并且WriteNil为true
	if ok || (c.opts.WriteNil && !ok) {
		tmpKvMap[k] = val
		err = c.Set(ctx, tmpKvMap)
		if err != nil {
			fmt.Println(err)
		}
	}

	return val, ok, nil
}

func (c *MultiCache[T]) Del(ctx context.Context, k []string) error {
	var (
		tmpKeys = c.opts.buildKeys(k)
	)
	for _, v := range c.handlers {
		err := v.Del(ctx, tmpKeys)
		if err != nil {
			return err
		}
	}

	return nil
}

代码不多大概230行,抽几个比较简单的来讲

a. 查询并为用单飞,单飞是对同一个 id 多次查询操作;

b. 若缓存 miss ,对 miss keys 特殊处理,调用参数函数查询数据库/业务下游接口;

c. 数据查询成功/失败回写缓存。

下面看看如何使用,初始化 Adaptor。

func getRedisAdaptor() client.IAdaptor {
	conf := redis.Config{}.WithName("test").
		WithDB(10).
		WithAddr(":6379").
		WithPassword("12345").
		WithPoolSize(100)
	redisClient, err := redis.NewRedisClient(&conf)
	if err != nil {
		panic(err)
	}

	return redis.NewRedisAdaptor(redisClient)
}

func getMemAdaptor() client.IAdaptor {
	conf := mem.Config{}.WithCacheSize(1024 * 1024 * 1024).WithGCPercent(20)

	return mem.NewMemoryAdaptor(mem.NewMemCache(&conf))
}

增、删、改、查

func main() {
	type Person struct {
		Name string `json:"name"`
		Age  int    `json:"age"`
	}
	cache := goCache.NewMultiCache[*Person](&goCache.MultiCacheOptions{
		Base:      goCache.Base{Prefix: "demo"},
		EnableLog: false,
		WriteNil:  false,
		Expire:    time.Minute * 10,
	}, getMemAdaptor(), getRedisAdaptor())

	var (
		m = make(map[string]*Person)
	)
	m["xxxxx_test1"] = &Person{
		Name: "李四",
		Age:  20,
	}
	err := cache.Set(context.TODO(), m)
	if err != nil {
		panic(err)
	}

	kvMap, err := cache.Get(context.TODO(), []string{"12344pyc-test1"})
	if err != nil {
		panic(err)
	}
	b, _ := sonic.Marshal(kvMap)
	fmt.Println(string(b))

	time.Sleep(time.Second * 10)
	err = cache.Del(context.TODO(), []string{"12344pyc-test1"})
	if err != nil {
		panic(err)
	}

	kvMap, err = cache.Get(context.TODO(), []string{"12344pyc-test1"})
	if err != nil {
		panic(err)
	}
	b, _ = sonic.Marshal(kvMap)
	fmt.Println(string(b))
}

好了,整体讲完啦

关于缓存与数据库不一致问题

很多、很多、很多文章都在讲缓存与数据库不一致的问题,我先解释下缓存的场景

  1. 对业务下游数据缓存:比如 A 应用调用 B 应用,B 应用就是业务下游。这种情况可能比较少,比如:组织架构被所有系统依赖,QPS 非常高,某些特定情况下客户端做了缓存。但是,删除缓存非常复杂,B 应用将数据变更如何通知 A 应用问题,常见的做法是广播(抛一个事件到 MQ),使用缓存客户端均消费 MQ 数据删除数据,容易出现缓存一致性问题,一般不建议这么做;

  2. 对数据库缓存数据:一般情况增、删、改、查均在一个应用内。所以,缓存的设置和删除都比较简单。

不建议大家对业务下游数据缓存,除非固定不变的数据。比如:租户信息,开户之后不会变化。

缓存与数据库不一致问题盘点下来应该就是下面几种情况造成的。

  1. 先更新缓存,再更新数据库;

  2. 先更新数据库,再更新缓存;

  3. 先删除缓存,再更新数据库;

  4. 先更新数据库,再删除缓存。

分析下每种方案的最坏情况

  1. 缓存更新成功,数据库更新失败。导致缓存中数据是最新的,数据库还是历史数据,缓存中明显就是脏数据了,并且后期很难和数据库保持一致,一般不用。

  2. 先更新数据库,再更新缓存。数据库更新成功,缓存更新失败,查询结果是历史数据。勉强能接受,但是一般也不会用,QPS 高的情况下,可能会导致 Redis 被多次更新。

  3. 先删除缓存,再更新数据库。第一步失败和第二步失败都不会造成缓存和数据库一致性问题。但,在 QPS 高的情况,第二步删除数据库的同时,可能存在有线程将历史数据查询后回写缓存造成数据不一致问题,一般不用。

  4. 先更新数据库,再删除缓存。数据库更新成功,缓存删除失败,数据不一致了,这种是最常用的方案,提供几种方案解决一致性问题

    a. QPS 低的场景可以用事务,第二步删除失败回滚数据库就可以了,QPS 高的场景不适合;

    b. 用重试机制,如果删除缓存失败,尝试多次即可,比如:每次删除失败间隔3s,删除3次任然失败则不处理了。

    c. 异步方案,更新投递 MQ 或采集数据库更新日志(binlog、oplog等投递 MQ,消费 MQ 删除缓存。

异步方案听上去很牛逼,但是谁用谁坑不建议使用,主要是有下面几个问题:

  1. 不能灰度,跟基建有关;

  2. 链路复杂,更新数据库-->采集日志投递 mq-->消费 mq 删除缓存,线上出现数据一致性问题,整个链路都排查一通,可能引入兄弟团队协助排查;

  3. 对后面接手业务的同学不友好,后续同学接手快,设计越简单越好。

我并不建议大家用异步方案,别给后面同学留坑,删除数据用重试机制,另外缓存都设置过期时间(根据业务来定时间长短),保证数据最终一致性,营销自动化 QPS 每秒 400-500 左右没出现一次缓存一致性问题。另外,想想整个业务团队都用异步方案,多少 MQ?每个业务方都有一条 MQ 链路,太复杂了。

总结

  1. 本文讲了封装 Redis 基础组件思路,主要以简单为主,尽量别提供一对乱七八糟的操作,用了一些设计模式,比如:构建者模式、适配器模式、责任链模式。

  2. 缓存不一致问题,常见方案是先更新数据库,再删除缓存,另外缓存设置过期时间保证数据最终一致性,建议不使用 MQ 方案。

转载自:https://juejin.cn/post/7373504907460329481
评论
请登录