likes
comments
collection
share

分布式缓存的简单实践

作者站长头像
站长
· 阅读数 70

技术选型

分布式缓存方案默认采用的是主流的缓存框架:Redis,即将缓存数据存储在另一台Redis服务器上。 系统在使用缓存时,依赖的是缓存的接口,而非具体的实现;

分布式缓存在更新时不允许并发更新,防止缓存击穿,因此我们在Redis的基础上,采用了基于Redisson分布式锁,在更新分布式缓存前必须先获得锁。

缓存生命周期

更新机制

  • 被动刷新:基于Redis的数据驱逐策略,包括LRU和TTL等;
  • 主动刷新业务数据驱动的数据更新。当业务侧有数据变更时,将会主动刷新分布式缓存。比如当秒杀品下线时,会发出相应的领域事件,而在领域事件的处理中就会刷新缓存。

分布式缓存在刷新的过程中,并不会主动刷新所有服务器上的本地缓存,本地缓存将遵循单机的刷新策略。这意味着,本地缓存可能会有秒级或毫秒级的滞后,对于数据一致性非绝对敏感的场景,这种短时间的延迟下的脏数据是可以接受的,它只是会对用户侧的展示有所影响,而不会影响到服务端的数据状态。

分布式锁

基于Redis实现分布式锁

  1. 利用set nx ex获取锁,并设置过期时间,保存线程标识
  2. 释放锁时先判断线程标识是否与自己一致,一致则删除锁
/**
 * 基于redis的分布式锁
 */
public class SimpleRedisLock implements ILock {

    private String name;
    private StringRedisTemplate stringRedisTemplate;

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    private static final String KEY_PREFIX = "lock:";

    private static final String ID_PREFIX = UUID.randomUUID().toString() + "-";

    /**
     * 锁
     *
     * @param time 锁的过期时间
     * @return
     */
    @Override
    public boolean tryLock(long time) {
        long id = Thread.currentThread().getId();
        // 值用 uuid + 线程id拼接
        String value = ID_PREFIX + id;
        // 自动拆箱有空指针问题
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, value, time, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(aBoolean);
    }

    /**
     * 解锁
     */
    @Override
    public void unLock() {
        String valueInRedis = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
        long id = Thread.currentThread().getId();
        // 值用 uuid + 线程id拼接
        String value = ID_PREFIX + id;
        // 两者相同才释放锁,要先做判断再进行释放
        if (value.equals(valueInRedis)) {
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
    }
}

基于Redison实现分布式锁

导包

<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.16.3</version>
</dependency>

配置文件

@Configuration
    public class RedissonConfig {

        @Bean
        public RedissonClient redissonClient() {
            Config config = new Config();
            config.useSingleServer()
                .setAddress("redis://localhost:6379")
                .setPassword("ezreal")
                .setDatabase(0);

            return Redisson.create(config);
        }
    }

代码实现

@Component
public class RedissonLockService implements DistributedLockFactoryService {
    private final Logger logger = LoggerFactory.getLogger(RedissonLockService.class);

    @Resource
    private RedissonClient redissonClient;
    
    @Override
    public DistributedLock getDistributedLock(String key) {
        RLock rLock = redissonClient.getLock(key);

        return new DistributedLock() {

            @Override
            public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
                boolean isLockSuccess = rLock.tryLock(waitTime, leaseTime, unit);
                logger.info("{} get lock result:{}", key, isLockSuccess);
                return isLockSuccess;
            }

            @Override
            public void lock(long leaseTime, TimeUnit unit) {
                rLock.lock(leaseTime, unit);
            }

            @Override
            public void unlock() {
                if (isLocked() && isHeldByCurrentThread()) {
                    rLock.unlock();
                }
            }

            @Override
            public boolean isLocked() {
                return rLock.isLocked();
            }

            @Override
            public boolean isHeldByThread(long threadId) {
                return rLock.isHeldByThread(threadId);
            }

            @Override
            public boolean isHeldByCurrentThread() {
                return rLock.isHeldByCurrentThread();
            }
        };
    }
}

实现原理

  1. 可重入:利用hash结构记录线程id和重入次数
  2. 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
  3. 超时续约::利用watchDog,每隔一段时间 (releaseTime/3),重置超时时间

分布式缓存的简单实践

分布式缓存

基本逻辑

分布式缓存的简单实践

每次获取缓存的时候,先从本地缓存中获取,再从分布式缓存中获取。

若分布式缓存中不存在对应的值,则需要获取分布式锁,然后对分布式缓存数据进行更新

  1. 若获取锁成功,则查询数据库,获取最新的数据放入缓存;

若数据库中的数据也为空,则也要存储入数据库,防止缓存穿透

  1. 若获取锁失败,则直接返回,不要等待重新获取锁,客户端对这次请求进行静默处理;

实现代码

缓存的接口

实现部分Redis数据结构的存储接口

public interface DistributedCacheService {
    void put(String key, String value);

    void put(String key, Object value);

    void put(String key, Object value, long timeout, TimeUnit unit);

    void put(String key, Object value, long expireTime);

    <T> T getObject(String key, Class<T> targetClass);

    String getString(String key);

    <T> List<T> getList(String key, Class<T> targetClass);

    Boolean delete(String key);

    Boolean hasKey(String key);
}

基本逻辑代码

private SeckillGoodCache updateDistributedSeckillGood(Long itemId) {
        logger.info("更新远程缓存|{}", itemId);

        DistributedLock distributedLock = distributedLockFactoryService.getDistributedLock(UPDATE_ITEMS_CACHE_LOCK_KEY + itemId);
        try {
            boolean tryLock = distributedLock.tryLock(1, 5, TimeUnit.SECONDS);

            // 如果没有获得到锁,就返回重试
            if (!tryLock) {
                return new SeckillGoodCache().tryLater();
            }

            // 再次检查
            SeckillGoodCache distributedSeckillCache = distributedCacheService.getObject(buildItemCacheKey(itemId), SeckillGoodCache.class);
            if (distributedSeckillCache != null) {
                return distributedSeckillCache;
            }

            // 查询数据库
            SeckillGood seckillGood = seckillGoodMapper.selectById(itemId);
            SeckillGoodCache seckillGoodCache = new SeckillGoodCache();
            if (seckillGood == null) {
                // 数据不存在 也要返回 也要存缓存 防止缓存穿透
                seckillGoodCache.notExist();
            } else {
                seckillGoodCache.with(seckillGood).setVersion(System.currentTimeMillis());
            }
            logger.info("itemCache|远程缓存已更新|{}", itemId);
            distributedCacheService.put(buildItemCacheKey(itemId), JSON.toJSONString(seckillGoodCache));
            return seckillGoodCache;
        } catch (InterruptedException e) {

            logger.error("itemCache|远程缓存更新失败|{}", itemId);
            return new SeckillGoodCache().tryLater();
        } finally {

            distributedLock.unlock();
        }
    }
转载自:https://juejin.cn/post/7185204730270842940
评论
请登录