Project Reactor源码阅读-Scheduler
Reacor在进行线程切换(subscribeOn/publishOn
)以及并行计算(runOn
)都会使用Scheduler
。
通过前面的源码分析,我们知道对于每次切换,实际上都会先基于选择的Scheduler
,创建一个对应的Worker
,真正异步执行是通过调用Worker#schedule()
实现的。
本文主要分析各种Scheduler
实现原理。
立即调度-ImmediateScheduler
通过Schedulers.immediate()
创建,在调用者线程立即执行任务,底层就是直接调用run()
方法执行。
单线程池调度-SingleScheduler
通过Schedulers.single()
创建,底层是一个单线程池,将任务交给改单线程池执行。
通过
Schedulers.newSingle("test")
形式,会为创建名称为test的单线程池。
弹性调度-ElasticScheduler
通过Schedulers.elatic()
创建,底层实现比较复杂,我们详细分析一下。
代码示例
@Test
public void testElastic() {
Scheduler scheduler = Schedulers.elastic();
for (int i = 0; i < 10; i++) {
scheduler.schedule(new Task(i, 1000));
}
sleep(10000);
}
直接调用scheduler.schedule()
提交任务,并发执行。
@Test
public void testElasticWorker() {
Scheduler.Worker worker = Schedulers.elastic().createWorker();
for (int i = 0; i < 10; i++) {
worker.schedule(new Task(i, 1000));
}
sleep(10000);
}
调用同一个worker
提交任务,异步执行,但是实际上只有1个线程在工作。
Schedulers#elatic()
涉及到到的3个静态常量如下:
static AtomicReference<CachedScheduler> CACHED_ELASTIC = new AtomicReference<>();
static final String ELASTIC = "elastic";
static final Supplier<Scheduler> ELASTIC_SUPPLIER =
() -> newElastic(ELASTIC, ElasticScheduler.DEFAULT_TTL_SECONDS, true);
Schedulers#cache()
这里的目的是缓存Scheduler
,重点是调用supplier.get()
,实际执行 newElastic(ELASTIC, ElasticScheduler.DEFAULT_TTL_SECONDS, true)
;
Schedulers#newElastic()
实际上会调用Factory#newElatic()
来创建Scheduler
。
Factory#newElastic()
Factory
里面创建了ElasticScheduler
对象。后续一般都调用createWorker()
方法。
我们可以自定义实现Factory
接口,实现全局覆盖Schedulers
行为。
ElasticScheduler#createWorker()
先调用pick()
方法,然后创建ElasticWorker
对象。
ElasticScheduler#pick()
- 优先从
cache
中拿数据,如果存在,直接返回缓存对象。 - 否则创建
CacheService
。
cache
实际上就是对CacheService
进行缓存,因为该对象底层包含线程池,资源消耗高,应该尽可能复用。
在调用
dispose()
时,会将当前cacheService
加入到cache
中。
CachedService
重点是调用了ElasticScheduler#get()
获取线程池。
ElasticScheduler#get()
每次都创建了一个单线程池!
当我们使用Worker异步执行任务时,实际上是将任务交给对应的单线程池执行。接下来看看为什么直接调用ElasticScheduler#schedule()
就能多线程并发执行任务?
ElasticScheduler#schedule()
每次都会重新选择CachedService
来执行任务,因此能多线程并发执行任务。
有点类似线程池缓存,只不过一个是针对线程,另一个是针对单线程池。
有边界弹性调度-BoundedElasticScheduler
使用ElaticScheduler
有OOM
风险:
- 可能创建非常多
CacheService
对象,即可能创建非常多的单线程池。 - 可能提交非常多任务。
针对这两个问题,Reactor
推荐使用BoundedElasticScheduler
,该Scheduler
加上了边界限制。
具体来说是增加了线程容量限制和任务队列容量限制,默认为10倍核心线程数和100000, 均可通过配置进行修改。
BoundedElasticScheduler
只是在原有ElasticScheduler
加上边界限制,底层仍然是单线程池。
并行调度-ParallelScheduler
在启动时,会按照一定数量(默认cpu数量)创建单线程池,创建Worker
时会选择一个单线程池。能实现并行调度,是因为会创建多个Worker
。
为何底层都是单线程池
不管是弹性调度还是并行调度,Reactor
底层都是基于单线程池来实现的。原因在于对于一个流水线上面的操作,理应都是串行处理的。如果使用多个线程的线程池,会存在资源浪费。考虑以下这个例子:
@Test
public void test() {
delayPublishFlux(10, 1, 100)
.publishOn(Schedulers.fromExecutorService(Executors.newFixedThreadPool(100)))
.doOnNext(x -> sleep(1000))
.subscribe(i -> logInt(i, "消费"));
sleep(100000);
}
在分析publishOn()
源码中,我们知道此时上游的数据是交由Worker
异步添加到队列的,此时实际会有多个线程执行。但是下游消费能力有限,根本没必要用多个线程来做。
委派调度-DelegateServiceScheduler
将任务委派给指定的ExecutorService
执行。
转载自:https://juejin.cn/post/7144292235880693791