换一种思维使用缓存
最近在公司的开发任务中接到一个优化的任务,主要是将一个查询次数比较多,但是数据量不大的一个表的查询由DB查询改为走缓存查询,当领导将这个任务交给我时,瞬间就想到了redis,然后就联想到了缓存击穿、缓存穿透、缓存雪崩的一系列问题。然后自认为可以提交一份满意的答卷,一个晚上就写了一个使用redis缓存的方案,里面包含了解决缓存常见问题的思路。
第二天一早,就拿着方案找上老板,然后巴拉巴拉说了一堆,然后老板说,你这个怎么解决缓存穿透等问题的,然后我就拿着从网上找的一堆关于三个问题的解决方案,什么分布式锁、热点数据永不过期、过期时间随机、布隆过滤器.... 然后说了一堆,老板直接来了句:我现在想要,一百个请求进来,只有一个请求去查数据库,然后其他请求原地等待,然后这个请求拿到数据后,设置缓存,其他99个请求就直接走缓存,然后我看你方案里说使用分布式锁,即使是使用double check的方式,还是会难以避免其他请求去竞争这个分布式锁的资源。
伪代码如下:
get(code) {
// 走缓存
if (redis.get(code) != null) {
return redis.get(code);
} else {
// 缓存没有 加锁走数据库
Lock lock = redis.createLock(code);
lock.lock;
try {
// double check 走缓存
if (redis.get(code) != null) {
return redis.get(code);
} else {
// 查询数据库 设置缓存
result = db.find(code);
redis.set(result, time);
return result;
}
} catch (Eeception e) {
...
} finally {
lock.unlock();
}
}
}
当时就很气,心想着你这不是为难我胖虎吗,
然后我又拿着方案回去改。想着,不就是让其他请求不去竞争分布式锁资源吗?然后我就想着,直接使用分布式锁的tryLock,抢到锁资源的去查db,抢不到就sleep一段时间,回调get方法不久行了吗,然后又一版方案出来了。
伪代码如下:
get(code) {
// 走缓存
if (redis.get(code) != null) {
return redis.get(code);
} else {
// 缓存没有 加锁走数据库
Lock lock = redis.createLock(code);
// 竞争锁资源
if (lock.tryLock()) {
try {
// 查询数据库 设置缓存
result = db.find(code);
redis.set(result, time);
return result; } finally {
redis.unlock();
}
} else {
Thread.sleep(500);
get(code);
}
}
}
心想着,这样子不就ok了吗,
方案一出,我就去找老板交差了,然后老板看了一下,说:你这个方案是可以用,但是睡眠也不好,如果并发特别高,线程池一下子就会被打满.... ,然后被他喷了一脸口水之后,他又说,这种情况可以考虑一下本地缓存,这张表的数据量并不是很大,但是用到的地方非常多,查询次数非常多(配置表),放redis不太好。然后我脱口而出,那不是线上每一台机器都会有自己的一个本地缓存。然后老板又说,是的,因为本来量就不大,每一台机器都有一个本地缓存,开销也不大。并且你要使用本地缓存之后,也要完成这种,只有一个请求打到数据库,其他的等数据库查出来走缓存的效果,不能去竞争锁资源....
带着老板的要求,一脸蒙蔽的回到自己的工位,想了半天,没有一点思路,想用Synchronized锁,但是同时还是避免不了争夺锁资源的情况。
可能,老板看出了我的一脸茫然,然后他就跟我说,可以去看一看AQS的几种同步工具,作为校招生的我,当时又是一脸茫然,偷偷回去恶补知识,好像老板还特意提了一句读写锁,当时我就恨不得为老板出生入死,连特么做梦都在加班。
然后回去又疯狂研究AQS的几种同步工具,尤其是读写锁,一看读写锁的特性,读写锁实际是一种特殊的自旋锁,它把对共享资源的访问者划分成读者和写者,读者只对共享资源进行读访问,写者则需要对共享资源进行写操作。这样一想,卧槽,感觉发现了新大陆,这不就完美解决了一个请求去请求数据库,其他被堵塞在原地了吗。
但是写出demo之后发现,emmm,所有的读锁被阻塞之后还是会去竞争写锁的资源,并且,读锁还无法针对更细粒度的对象加锁,直接把不同的code也给阻塞了,当时我就感觉整个人就不好了。
本地缓存加读写锁伪代码:
public void readWriteMathod(String key){
readWriteLock.readLock().lock();//读锁,只对写的线程互斥
Object value = null;
try {
// 尝试从本地缓存中获取数据
value = map.get(key);
if (value != null) {
System.out.println(Thread.currentThread().getName() + "拿到了值,值为" + value);
return;
}
if (value == null) {
readWriteLock.readLock().unlock();
//发现目标值为null,释放掉读锁
readWriteLock.writeLock().lock();
//发现目标值为null,需要取值操作,上写锁
try {
value = map.get(key);
// 很严谨这一步。再次取目标值
if (value == null) { //很严谨这一步。再次判断目标值,防止写锁释放后,后面获得写锁的线程再次进行取值操作
// 模拟DB操作
result = db.find(code);
// 本地缓存 map.put(key, value);
System.out.println(Thread.currentThread().getName() + "设置了值,值为" + value);
}
readWriteLock.readLock().lock();//再次对读进行锁住,以防止写的操作,造成数据错乱
} finally {
/** 先加读锁再释放写锁读作用:
* 防止在100行出多个线程获得写锁进行写的操作,所以在写锁还没有释放前要上读锁
*/
readWriteLock.writeLock().unlock();
}
}
} finally {
readWriteLock.readLock().unlock();
}
}
然后在心累的同时,又发现了一个更有意思的东西,ReentrantLock的Condition,简介:
condition可以通俗的理解为条件队列。当一个线程在调用了await方法以后,直到线程等待的某个条件为真的时候才会被唤醒。这种方式为线程提供了更加简单的等待/通知模式。Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。
顿时灵感一来,又一个demo出现。
condition伪代码:
private static Lock lock = new ReentrantLock();
private static Condition cacheCondition = lock.newCondition();
get(code) {
// 从缓存中查
if (map.get(code) != null) {
return map.get(code);
} else {
//缓存中没有
if (lock.tryLock()) {
try {
value = db.find(code);
map.put(code, value);
if (condition != null) {
condition.signalAll();
}
return value;
} finally {
lock.unlock();
}
} else {
try {
lock.lock();
if (map.get(key) == null)
{
condition.await();
get(code);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
}
demo写完之后,仔细去回看代码,发现这个方案解决了由一个请求去查db,其他的等待这个请求查到数据后唤醒它们,乍一看,非常符合我老板的要求,但是,没一会我又发现了,这个锁的粒度同样很大,直接将所有的请求都给阻塞了。这个方案又out了。
正当我焦头烂额的时候,另一个大佬问我有什么问题吗,然后我就给他说了一遍老板的需求和目前的一个情况。然后他给我说,你可以去研究一下ConcurrentHashMap的putIfAbsent方法,顺便给我写了个小demo。
伪代码:
void get(code) {
if (map.get(code) != null) {
return map.get(code);
}
Future f = futureMap.get(code);
if (f == null) {
MyFuture myFuture = new MyFuture(code);
FutureTask curTask = new FutureTask<>(myFuture);
// 如果已存在,返回已存在的,如果不存在,返回null,并且将当前的值插入到map FutureTask pre = map.putIfAbsent(code, curTask); if (pre == null) {
// 执行当前任务,查数据库
new Thread(curTask).start();
value = curTask.get();
// 设置缓存
map.set(code, value);
// 移除当前任务
futureMap.remove(code);
return value;
} else {
// 返回已插入任务的返回值,最多等500ms,无值抛出超时异常,有则返回
return pre.get(500, TimeUnit.MILLISECONDS);
}
}
// 获取f任务的返回值,最多等500ms
return f.get(500, TimeUnit.MILLISECONDS);
}
MyFutureTask implements Callable<String> {
// 构造方法
MyFuture(code) {
code = code;
}
@Override
public String call() throws Exception {
value = db.find(code);
return value;
}
}
完美利用ConcurrentHashMap的putIfAbsent的特性,同时只允许一个线程进行插入操作,并且第一次插入返回null,后面插入返回第一次插入的值。源码如下
具体的源码我这个小菜鸟就不解释了,大佬跟我说,他的上一家电商公司,用这种方案,经过了8000QPS的考验,对我现在这个场景的缓存来说(接近400QPS),完全足够了,然后,我就只能在大佬的帮助下,交出了这一次的方案。终究还是一只小菜鸟。在网上找到的巨多的方案,都只是非常普通的加分布式锁..... 心累,在这把这个方案分享一下,在分享一下踩过的坑。
不过虽然,花了将近4天研究这些东西,但是对我一个刚从大学毕业的小菜鸟来说,收获真的蛮大的。在附上一句,打工人,打工魂,打工都是人上人!
转载自:https://juejin.cn/post/6891905723308572680