likes
comments
collection
share

caffeine_redis二级缓存实现Spring接口

作者站长头像
站长
· 阅读数 13

背景

项目最近在加缓存,因为项目中没有插入或更新操作,所有数据更新都是数据人员通过流程更新。所以使用切面自定义注解实现是比较合适的。参考:juejin.cn/post/722320…

现在在来实现通用型的二级缓存,就是项目里面查询、插入、更新什么都有。并且是实现 Spring 规范的接口。

介绍

在学习的过程中, 发现 JSR107/JCache 缓存,都没听说过,还是学习的时候了解下的。现在就不细讲了,主打的就是一个 Spring 整合。提供一系列接口规范,所有厂商都来实现它即可。和 JDBC 驱动一个样。

下图是 Spring Cache 抽象相关类的层次结构图:

caffeine_redis二级缓存实现Spring接口

  • CacheManager:缓存管理器,管理各种缓存(Cache)组件
  • Cache:为缓存的组件规范定义,包含缓存的各种操作。它有很多实现,比如 RedisCache、CaffeineCache等。

实现

既然上面的层次结构图清楚了,那么我们就逐个实现即可。

因为 Spring 中提供了 AbstractValueAdaptingCache 抽象接口,对 Cache 做了简单的封装,所以我们不用实现 Cache 接口,继承 AbstractValueAdaptingCache 就好了。

继承 AbstractValueAdaptingCache

因为涉及到 Cache、Redis 整合,所以属性肯定有这两个。然后为了在 yml 配置文件中可配置,写了个Properties 类

public class RedisCaffeineCache extends AbstractValueAdaptingCache {

    private Logger logger = LoggerFactory.getLogger(RedisCaffeineCache.class);
    private String cacheName;
    private String cachePrefix;
    private String topic;
    private RedisTemplate<Object, Object> redisTemplate;
    private Map<String, ReentrantLock> keyLockMap = new ConcurrentHashMap();
    private Cache<Object, Object> caffeineCache; //使用 Caffeine 作为本地缓存
    private RedisCaffeineProperties redisCaffeineProperties;
    @Autowired
    private RedisUtil redisUtil;

    public RedisCaffeineCache(String cacheName,
                              RedisTemplate<Object, Object> redisTemplate,
                              Cache<Object, Object> caffeineCache,
                              RedisCaffeineProperties redisCaffeineProperties) {
        super(redisCaffeineProperties.getAllowNull());
        this.cacheName = cacheName;
        this.redisTemplate = redisTemplate;
        this.cachePrefix = redisCaffeineProperties.getCachePrefix();
        this.caffeineCache = caffeineCache;
        this.redisCaffeineProperties = redisCaffeineProperties;
        this.topic = redisCaffeineProperties.getTopic();
    }
    xxxx

lookup

caffeine_redis二级缓存实现Spring接口

使用 @Cacheable 注解的时候,会执行这个查询逻辑。描述起来很简单,实现也不难

  • 从本地缓存 Caffeine 取数,有就直接返回
  • 从 Redis 取数,有就插入本地缓存,并返回
  • 都没有则运行程序本来的逻辑,并执行 put 方法

可以说 lookup 与 put 方法是相辅相成的。

    @Override
    protected Object lookup(Object key) {
        String redisKey = getRedisKey(key);
        Object value = caffeineCache.getIfPresent(key);
        if (value != null) {
            logger.debug("从本地缓存中获得key, the key is : {}", key);
            return value;
        }
        value = redisTemplate.opsForValue().get(redisKey);

        if (value != null) {
            logger.debug("从redis中获得值,将值放到本地缓存中, the key is : {}", redisKey);
            caffeineCache.put(key, value);
        }
        return value;
    }

put

当有更新操作或者像上面那样两个缓存都没有数据的时候会调用 put 方法。

  • 先保存 caffeine ,再保存 Redis ,如果为 null 就保存在 caffeine 不用保存 Redis ,防止缓存穿透。
  • 通知其他节点更新缓存。
    @Override
    public void put(Object key, Object value) {
        if (!super.isAllowNullValues() && value == null) {
            this.evict(key);
            return;
        }

        long expire = getExpire();
        logger.debug("put:{},expire:{}", getRedisKey(key), expire);
        redisTemplate.opsForValue().set(getRedisKey(key), toStoreValue(value), expire, TimeUnit.MINUTES);

        //缓存变更时通知其他节点清理本地缓存
        push(new CacheMessage(this.cacheName, key));
    }

evict

删除操作会调用,就是直接删除当前缓存数据,并通知其他节点删除

    @Override
    public void evict(Object key) {
        // 先清除redis中缓存数据,然后清除caffeine中的缓存,避免短时间内如果先清除caffeine缓存后其他请求会再从redis里加载到caffeine中
        redisTemplate.delete(getRedisKey(key));

        push(new CacheMessage(this.cacheName, key));

        caffeineCache.invalidate(key);
    }

clear

和 evit 一样。只不过它删除所有的缓存「这里是也是使用@CacheEvict() 注解,不过里面需要加上:allEntries = true」

这里需要注意 redis.keys 命令一般生产环境是禁用的,所以我们需要使用 scan 替换下。

    @Override
    public void clear() {
        // 先清除redis中缓存数据,然后清除caffeine中的缓存,避免短时间内如果先清除caffeine缓存后其他请求会再从redis里加载到caffeine中
        //这里使用 scan 遍历(生产环境一般会禁用 keys 函数)
        Collection<String> keys = redisUtil.scan(this.cacheName.concat(":*"));
        redisUtil.deleteObject(keys);

        push(new CacheMessage(this.cacheName, null));
        caffeineCache.invalidateAll();
    }

get

使用注解时不走这个方法,实际走父类的 get 方法

且这个方法,父类要求实现此接口的时候需要自己保证同步

    @Override
    public <T> T get(Object key, Callable<T> valueLoader) {
        Object value = lookup(key);
        if (value != null) {
            return (T) value;
        }
        //key 在 redis 和缓存中均不存在,执行原方法,并更新到缓存
        /**
         * 这里加锁是为了并发访问同一个 key 时,valueLoader 只调用一次 (Cache 接口的解释)
         */
        ReentrantLock lock = null;
        try {
            lock = keyLockMap.get(key);
            if (lock == null) {
                logger.debug("create lock for key : {}", key);
                keyLockMap.putIfAbsent(key.toString(), new ReentrantLock());
                lock = keyLockMap.get(key.toString());
            }
            lock.lock();
            value = lookup(key);
            if (value != null) {
                return (T) value;
            }
            //执行原方法获得value
            value = valueLoader.call();
            // toStoreValue 是父类提供的方法
            Object storeValue = toStoreValue(value);
            put(key, storeValue);
            return (T) value;
        } catch (Exception e) {
            throw new ValueRetrievalException(key, valueLoader, e.getCause());
        } finally {
            lock.unlock();
        }
    }

实现 CacheManager

缓存管理器的实现,当 debug 的时候就会发现,每次注解起作用都会先走这个类里面的方法,后面才会走 Cache 里的方法。这当然也是显而易见,因为他是 Cache 的管理器,先从管理器里面拿到 Cache 才能进行具体的操作。

public class RedisCaffeineCacheManager implements CacheManager {

    private final Logger logger = LoggerFactory.getLogger(RedisCaffeineCacheManager.class);

    private static Map<String, Cache> cacheMap = new ConcurrentHashMap<String, Cache>();

    private RedisCaffeineProperties redisCaffeineProperties;

    private RedisTemplate<Object, Object> redisTemplate;
    /**
     * RedisCaffeienCacheConfig 是一个 Bean ,项目启动的时候会加载,里面的 cacheManager() 方法也是一个 Bean 也会加载
     */
    public RedisCaffeineCacheManager(RedisTemplate<Object, Object> stringKeyRedisTemplate,
                                     RedisCaffeineProperties redisCaffeineProperties) {
        this.redisCaffeineProperties = redisCaffeineProperties;
        this.redisTemplate = stringKeyRedisTemplate;
    }
xxxx
}

getCache

  1. 使用一个 ConcurrentHashMap 保存已经创建好的 Cache
  2. 没有的话,就调用 Cache 创建一个
    @Override
    public Cache getCache(String name) {
        Cache cache = cacheMap.get(name);
        if (Objects.nonNull(cache)) {
            return cache;
        }
        try {
            cache = new RedisCaffeineCache(name, redisTemplate, createCaffeineCache(name), redisCaffeineProperties);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        Cache oldCache = cacheMap.putIfAbsent(name, cache);
        return oldCache == null ? cache : oldCache;
    }

    /**
     * 创建一个 Cache
     */
    public com.github.benmanes.caffeine.cache.Cache createCaffeineCache(String cacheName) throws Exception {

        if (StringUtils.isEmpty(cacheName)) {
            throw new Exception("cacheName 为空");
        }

        Caffeine<Object, Object> caffeineBuilder = Caffeine.newBuilder();

        List<CaffeineEntity> caffeineEntityList = redisCaffeineProperties.getCaffeineEntityList();
        CaffeineEntity caffeineEntity = null;
        for (int i = 0; i < caffeineEntityList.size(); i++) {
            if (cacheName.equals(caffeineEntityList.get(i).getCacheName())) {
                caffeineEntity = caffeineEntityList.get(i);
                break;
            }
        }
        if (caffeineEntity == null) {
            throw new Exception("cacheName 与配置文件不一致");
        }

        caffeineBuilder.initialCapacity(caffeineEntity.getInitSize());
        caffeineBuilder.maximumSize(caffeineEntity.getMaxSize());
        caffeineBuilder.expireAfterAccess(caffeineEntity.getExpireAfterAccess(), TimeUnit.MINUTES);
        caffeineBuilder.expireAfterWrite(caffeineEntity.getExpireAfterWrite(), TimeUnit.MINUTES);
        caffeineBuilder.recordStats();
        return caffeineBuilder.build();
    }

分布式

因为有本地缓存,不像 Redis 那样,天然满足分布式。当有变动的时候,我们还需要通知其他节点做相应的变动。

RedisMessageListenerContainer

  1. 里面重写了 RedisTemplate,否则保存的数据不可视化,很难受。
  2. RedisMessageListenerContainer 注册了监听容器,并实现了监听到变动后的逻辑
    @Bean
    @ConditionalOnBean(RedisTemplate.class)
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisTemplate<Object, Object> stringRedisTemplate,
                                                                       RedisCaffeineCacheManager redisCaffeineCacheManager) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(stringRedisTemplate.getConnectionFactory());
        CacheMessageListener cacheMessageListener = new CacheMessageListener(stringRedisTemplate, redisCaffeineCacheManager);
        redisMessageListenerContainer.addMessageListener(cacheMessageListener, new ChannelTopic(redisCaffeineProperties.getTopic()));
        return redisMessageListenerContainer;

    }



    /**
     * 利用 redis 发布订阅通知其他节点清除本地缓存
     *
     * @param message
     * @param pattern
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        CacheMessage cacheMessage = (CacheMessage) redisTemplate.getValueSerializer().deserialize(message.getBody());
        logger.info("收到redis清除缓存消息, 开始清除本地缓存, the cacheName is {}, the key is {}", cacheMessage.getCacheName(), cacheMessage.getKey());
//
        redisCaffeineCacheManager.clearLocal(cacheMessage.getCacheName(), cacheMessage.getKey());

    }

测试

三个注解的用法

Cacheable

@Cacheable(cacheNames = "cacheA", key = "#id")

先走 lookup 方法,没有值,再走具体的业务逻辑,再走 put 方法

CachePut

@CachePut(cacheNames = "cacheB", key = "#name")

变动时调用,直接走 put 方法

CacheEvict

@CacheEvict(cacheNames = "cacheA", key = "#id")

走 evit 方法,删除当前 cache 中指定 key 的缓存

@CacheEvict(cacheNames = "cacheB", key = "#id", allEntries = true)

走 clear 方法,删除当前 cache 的所有缓存

测试接口

简单搞个 Controller 测试下各个注解搭配使用是否正常。

package com.demo.rediscaffeine.test;

import com.common.rediscaffeine.RedisCaffeineCacheManager;
import com.demo.rediscaffeine.util.SpringContextUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.web.bind.annotation.*;

import java.util.concurrent.Callable;

/**
 * @description:
 * @author:nihongyu
 * @date: 2023/4/19
 */
@RestController
@RequestMapping("test")
public class TestController {

    @Autowired
    private TestMapper testMapper;


    @GetMapping("get/{id}")
    @Cacheable(cacheNames = "cacheA", key = "#id")
    public Object get(@PathVariable("id") Long id) {
        return testMapper.getById(id, "123");
    }

    @GetMapping("getb/{id}")
    @Cacheable(cacheNames = "cacheB", key = "#id")
    public Object getB(@PathVariable("id") Long id) {
        return testMapper.getById(id, "123");
    }

    @GetMapping("update/{name}")
    @CachePut(cacheNames = "cacheB", key = "#name")
    public void updateOrder(@PathVariable("name") String name) {
        testMapper.updateById(name);
    }

    @GetMapping("delete/{id}")
    @CacheEvict(cacheNames = "cacheA", key = "#id")
    public void deleteOrder(@PathVariable("id") Long id) {
        testMapper.deleteById(id);
    }

    @GetMapping("deleteb/{id}")
    @CacheEvict(cacheNames = "cacheB", key = "#id", allEntries = true)
    public void clearOrder(@PathVariable("id") Long id) {
        testMapper.deleteById(id);
    }


    @GetMapping("query/{id}")
    public Object getOrderById2(@PathVariable("id") Long id) {
        RedisCaffeineCacheManager cacheManager = SpringContextUtil.getBean(RedisCaffeineCacheManager.class);
        Cache cache = cacheManager.getCache("cacheA");
        Object order = cache.get(id, (Callable<Object>) () -> {
          return  testMapper.getOrderById2(id);
        });
        return order;
    }

}

总结

  • 继承 AbstractValueAdaptingCache 类实现本地缓存的具体操作
  • 实现 CacheManager 管理 Cache
  • 使用 Redis 的发布订阅功能,实现简单的各节点通信,项目中因为对数据丢失不敏感,且项目中不会存在更新动作,所以使用 Redis 即可。

具体 starter 源码以及使用方法参看:gitee.com/ncharming/r…