记录一次 Bug 分析过程 关于「RxJavaCallAdapterFactory」
我为什么要记录这次 bug 分析过程?因为此次分析过程 3 ~ 4 小时,过程比较曲折,但是最后的原因又很简单,让我觉得浪费了不少时间,如果当时换个思路就能更快的发现问题。同时也由于“一些原因”,让我对 RxJava3CallAdapterFactory 略有微辞。
故障现象
前提:工程里有 RxJava2 和 RxJava3 两个版本,为了统一决定全工程使用 RxJava3 版本。
代码变更内容很简单,将 RxJava2 的依赖全部替换成 RxJava3 版本(也包括 Retrofit、Room 的 adapter 依赖)。
基础功能测试完毕,就去跑单元测试,然后单元测试结果失败了,失败的地方是网络请求方法,异常如下:
单元测试方法如下(类似代码演示):
@Test
fun login() {
WanAndroidService.create()
.login("1", "22")
.test()
.assertValueCount(1)
}
interface WanAndroidService {
@FormUrlEncoded
@POST("user/login")
fun login(
@Field("username") username: String,
@Field("password") password: String
): Single<BaseRezhsponse<User>>
companion object {
private const val BASE_URL = "https://www.wanandroid.com/"
fun create(): WanAndroidService {
val client = OkHttpClient.Builder()
.build()
return Retrofit.Builder()
.baseUrl(BASE_URL)
.client(client)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava3CallAdapterFactory.create())
.build()
.create(WanAndroidService::class.java)
}
}
}
分析原因
这两个类中的修改仅仅是将 RxJava2 版本更新为 RxJava3,RxJava3CallAdapterFactory 变更为 RxJava3CallAdapterFactory 。在问题出现后,我先将分支切换到 RxJava2 版本,发现测试是通过的,那问题可以确定就是在本次提交。难道是 RxJava3 有什么机制变更?
分析异常日志
Value counts differ; Expected: 1, Actual: 0 (latch = 1, values = 0, errors = 0, completions = 0)
预期数量为 1,实际为 0,所以抛出异常。这里有个 latch = 1
,成功、失败、完成 数量都为 0,说明这个 Single 订阅并没有执行完测试就结束了。来看看源码。
Single#test
方法会创建一个 TestObserver
并且直接订阅。
public final TestObserver<T> test() {
TestObserver<T> ts = new TestObserver<T>();
subscribe(ts);
return ts;
}
public final U assertValueCount(int count) {
int s = values.size();
if (s != count) {
throw fail("Value counts differ; Expected: " + count + ", Actual: " + s);
}
return (U)this;
}
protected final AssertionError fail(String message) {
StringBuilder b = new StringBuilder(64 + message.length());
b.append(message);
b.append(" (")
// 这里的 down 就是 CountDownLatch
// 在 TestObserver 的 onComplete 和 onError 方法都会执行
// down.countDown,方法较长就不列出来了,大家可以自行查看
.append("latch = ").append(done.getCount()).append(", ")
.append("values = ").append(values.size()).append(", ")
.append("errors = ").append(errors.size()).append(", ")
.append("completions = ").append(completions)
.append(')')
;
AssertionError ae = new AssertionError(b.toString());
if (!errors.isEmpty()) {
if (errors.size() == 1) {
ae.initCause(errors.get(0));
} else {
CompositeException ce = new CompositeException(errors);
ae.initCause(ce);
}
}
return ae;
}
从以上几段代码能够看到,如果 RxJava 正常执行,latch 应该为 0,那为什么这边会提前结束呢?说明 RxJava 中切换了线程,而主线程提前结束了,所以 latch = 1
。
关于
CountDownLatch
的机制,如果想详细了解的可以通过其他途径了解,我这边就简单说下。CountDownLatch
是一个线程同步工具,其内部维护了一个计数器count
,当count
不为 0 时,所有调用CountDownLatch#await()
方法的线程都会被阻塞,当count
为 0 时,唤醒所有被阻塞线程。
为什么 RxJava 会切换线程
从代码能看到,这里并没有 subscribeOn
代码,理论上就应该是在主线程执行啊?那我们先确认下,切换的是哪个线程,执行如下代码
fun main() {
WanAndroidService.create()
.login("1", "22")
.subscribe({
println("success:$it ${Thread.currentThread().name}")
}) {
println("error:$it ${Thread.currentThread().name}")
}
}
输出结果:
success:BaseResponse(data=null, errorCode=-1, errorMsg=账号密码不匹配!) OkHttp https://www.wanandroid.com/...
OkHttp 的线程,那我能不能指定执行线程呢,试试看。
fun main() {
WanAndroidService.create()
.login("1", "22")
.subscribeOn(Schedulers.io())
.subscribe({
println("success:$it ${Thread.currentThread().name}")
}) {
println("error:$it ${Thread.currentThread().name}")
}
Thread.sleep(4000)
}
输出结果还是一样:
success:BaseResponse(data=null, errorCode=-1, errorMsg=账号密码不匹配!) OkHttp https://www.wanandroid.com/...
我不禁对我此前的 RxJava 经验陷入了怀疑,竟然指定线程不生效?RxJava3 跟 RxJava2 有差异?基本没这个可能,作为全球受众极广的技术框架,不可能有这种差异还没人讨论。这咋办呢?只能先看看 OkHttp 源码,看看在什么地方创建线程吧。
OkHttp 源码
OkHttp#Call 执行网络请求的方法有两个,一个同步一个异步
fun execute(): Response
fun enqueue(responseCallback: Callback)
enqueue
方法调用时不需要切线程,因为其内部执行网络请求操作时会切换到子线程,在调用 Callback
回调时再切换到主线程,而 execute
方法不会切线程,是同步方法。所以切换线程的操作肯定是在这个方法内,我们来看看这个方法,这个方法在 Call
的子类 RealCall
中。
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
这个方法将 responseCallback
包装到 AsyncCall
中,再将其入队到 client.dispatcher
,而 AsyncCall
是一个 Runnable
,所以应该能猜到,client.dispatcher
就是一个线程调度器。
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {//循环遍历 executableCalls,executableCalls里正是`AsyncCall`集合
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)//asynCall 加到 executableCalls
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService) //真正执行请求操作的地方
}
return isRunning
}
这里主要看 promoteAndExecute
方法,这个方法会遍历可执行的 asnycCall
, asyncCall.executeOn(executorService)
这段代码的意思是在指定的线程池中执行 asyncCall
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
}
看到了,看到了,threadName("OkHttp ${redactedUrl()}")
这不就是刚刚看到的线程名么?那思路就是没错,确实是执行了Call#enque
才会导致切换了新线程。但是在 RxJava2
中执行,我记得是会抛出不能在主线程执行网络操作
的异常呀。那就只有一个可能了,RxJava3
准确的说是 RxJava3CallAdapterFactory
一定跟 RxJava2CallAdapterFactory
有差异,对比看一下。
这就是破案了呀,构造参数isAsync
在 RxJava2 为 fasle, 在 RxJava3 中为 true,然后这个参数在 RxJavaCallAdapter
中用来创建不同的 Observer
public Object adapt(Call<R> call) {
Observable<Response<R>> responseObservable =
isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call);
......
}
创建了 EnqueObservable 和 ExecuteObservable,相应去执行 Call#enque
Call#execute
两个方法。到这边其实就定位到问题根本原因了,就是 RxJava2CallAdapterFactory
和 RxJava3CallAdapterFactory
的 create 方法不一样,前者是同步,后者是异步。
思考
为什么两个版本要做这个调整?
public static RxJava2CallAdapterFactory create() {
return new RxJava2CallAdapterFactory(null, false);
}
public static RxJava2CallAdapterFactory createAsync() {
return new RxJava2CallAdapterFactory(null, true);
}
public static RxJava3CallAdapterFactory create() {
return new RxJava3CallAdapterFactory(null, true);
}
public static RxJava3CallAdapterFactory createSynchronous() {
return new RxJava3CallAdapterFactory(null, false);
}
可以看到旧版本 create
创建的是同步请求,而新版本变成了异步。我大概能猜到作者的意图,作者应该是觉得:网络请求必须在子线程,create
方法又是最常用的方法,如果是同步请求,那需要开发者去指定订阅 Scheduler
,这个工作有点重复。如果是异步请求,则开发者不管指定不指定 Scheduler
最终都会在 OkHttp
的线程执行,虽然由同步变成了异步,不过在大多数情况开发者是无感知的,不会影响历史代码,而如果开发者看到了 create
的变更,则下次执行 Retrofit
请求可能就不会去指定 Scheduler
了。也许是这样吧。
确实,上述单元测试没过的代码,在业务代码中执行是没有问题的。但是当一段代码在大多数时候执行没有问题,然后在某一刻出现莫名其妙的异常时,往往网络上的相关资料是比较少的,在定位问题时就会走一些弯路。Square
在全世界的 Android 开发者中有着举足轻重的地位,这个变更他们团队应该也是深思熟虑的决定。只不过我个人变成了遇到问题的少数人,当定位到问题后还是有些许震惊,希望能帮到有类似疑惑的人把。
总结
在以后的工作中,遇到类似问题,真不能着急自我否定,还是着手于当前的变更内容,分析差异。如果这个问题,我早点看到变更类的create
方法,可能半小时就搞定了。
转载自:https://juejin.cn/post/7202920716320423996