likes
comments
collection
share

一文快速实战Kotlin协程与Flow

作者站长头像
站长
· 阅读数 16

前言

不知道大家有没有跟我一样的感受:即使自己用心在网上学过协程和Flow了,但过了一段时间就又忘掉了。这大部分的原因其实是因为我们缺少实战。我平时工作里根本就接触不到协程和Flow,自己又不敢硬往上写,万一出问题了咋整?所以一直就处于理论学习阶段,导致我学了就跟没学一样。

今天就带大家一起来解决这个问题,通过几个简单的Demo和实战,巩固我们Kotlin协程和Flow的知识体系,从而能更有信心地用到实际项目中去。

PS:这篇文章就不写理论知识了,笔者也能力有限,有兴趣的同学可以在文末的文章中进行学习。

协程战前动员

协程实战之前,有几点需要注意的细节需要提前说一下,这些也是平时开发中比较容易遇到的。

协程的取消

调用cancel方法并不保证能取消协程,取消协程的前提是代码块在执行过程中对协程的状态进行了校验。常见的挂起函数如withContextdelayyield都有做校验。但如果协程内部没有挂起函数,或者没有对取消状态做校验的话,即使调用了cancel也是会继续执行的。若这个场景下也要做到即时取消,需要在内部对CoroutineScopeisActive属性进行check。

@Test
fun testCancel() {
    runBlocking {
        val cancelJob = launch(Dispatchers.Default) {
            repeat(1000) {
                // 代码1
                if (!isActive) {
                    return@repeat
                }
                println("协程执行中...$it")
            }
        }

        delay(1)
        println("Cancel!")
        cancelJob.cancel()
        println("Done!")

    }
}

如上在代码1处,如果不加isActive属性的判断,即使cancel了,循环内的日志仍然是会继续打印的。

协程的异常处理

协程代码块中仍然是使用try-catch进行异常捕获。CoroutineExceptionHandler作为CoroutineContext的基类,是用于捕获未捕获的异常,类似于Java.Thread.DefaultUncaughtExceptionHandler

默认情况下,协程中未捕获的异常是会传递到其同级与父级协程的。若不想有这种传递,可以用SupervisorScope或者SupervisorJob启动协程,此时子协程若发生异常不会扩散。但SupervisorJob改变的只是异常的传递方式,而不具备捕获异常的能力,因此需要用CoroutineExceptionHandler去捕获异常,否则子协程的异常仍然会造成应用的崩溃。

CoroutineScope(context).launch {
    val supervisorJob = SupervisorJob()
    with(CoroutineScope(context + supervisorJob)) {
        // 即使是SupervisorJob,子协程如果发生异常,仍然会崩溃,需要用try catch或者CoroutineExceptionHandler捕获
        val firstChild = launch (CoroutineExceptionHandler{_, exception -> {}} ) {
            throw RuntimeException()
        }
        val secondChild = launch {
            firstChild.join()
            try {
                delay(Long.MAX_VALUE)
            } catch (e: Exception) {
                print("cancelled because supervisor job is cancelled")
            }
        }
        firstChild.join()
        // supervisorJob取消后,所有子协程也会取消
        supervisorJob.cancel()
        secondChild.join()
    }
}

如果没用SupervisorScope或者SupervisorJob改变异常的传递方式,那么子协程的异常是会委托给父协程去处理的。所以此时即使在子协程的构造方法中声明了CoroutineExceptionHandler,也捕获不到异常,这个时候就需要在根协程构造的时候或者Scope初始化的时候去声明CoroutineExceptionHandler

CoroutineScope(context + CoroutineExceptionHandler{_, exception -> {}}).launch {
    with(CoroutineScope(context {
        // 子协程发生异常,传递到父协程,父协程捕获
        val firstChild = launch {
            throw RuntimeException()
        }
        val secondChild = launch {
            firstChild.join()
            try {
                delay(Long.MAX_VALUE)
            } catch (e: Exception) {
                print("cancelled because supervisor job is cancelled")
            }
        }
        firstChild.join()
        // supervisorJob取消后,所有子协程也会取消
        supervisorJob.cancel()
        secondChild.join()
    }
}

另外,想要改变哪个协程的异常传递方式,就将SupervisorJob声明在哪个协程。如果SupervisorJob声明在父协程,子协程没声明,那么子协程的CoroutineContext是会覆盖父类的,异常仍会到传递到父类。

CoroutineScope(context + CoroutineExceptionHandler{_, exception -> {}} + SupervisorJob()).launch {
    with(CoroutineScope(context {
        // 子协程发生异常,传递到父协程,父协程捕获
        val firstChild = launch {
            throw RuntimeException()
        }
        val secondChild = launch {
            firstChild.join()
            try {
                delay(Long.MAX_VALUE)
            } catch (e: Exception) {
                print("cancelled because supervisor job is cancelled")
            }
        }
        firstChild.join()
        // supervisorJob取消后,所有子协程也会取消
        supervisorJob.cancel()
        secondChild.join()
    }
}

如上,子协程发生的异常,仍然会传递到父协程,父协程的SupervisorJob并不会传递给子线程。

协程的资源同步

传统的synchronized关键字或者Kotlin提供的Mutex互斥锁可以解决资源同步的问题。

CoroutineScope(context).launch {
    var sum = 0
    launch {
        synchronized(this@launch) {
            sum++
        }
    }

    launch {
        synchronized(this@launch) {
            sum++
        }
    }
}

synchronized的lock要用外层的对象,而不是this,否则lock就不是互斥的了。而Mutex用法上也很简单,性能上也差不了多少。

CoroutineScope(context).launch {
    var sum = 0
    val mutex = Mutex()
    launch {
        mutex.lock()
        sum++
        mutex.unlock()
    }

    launch {
        mutex.lock()
        sum++
        mutex.unlock()
    }
}

协程实战

下面就是协程的实战。理论上,因为协程是一个异步框架,所以哪里需要开线程,哪里就能用协程!而并发或串行依赖任务更需要协程!但是如果我们要将协程运用到实际项目中,我们不可能非常随意地先用线程的地方直接就替换为协程,那么哪些地方可以适合让我们“练练手”呢?这里介绍四处:LifecycleOwner、ViewModel、数据层、LiveData。

LifecycleOwner使用协程

Activity或者Fragment中,可以使用lifecycleScope.launch启动协程,这种方式启动的协程会随着LifecycleOwner的销毁而销毁,避免内存泄漏。但如果架构符合UI与数据分离的话,一般很少会在UI层使用到协程(除非使用Flow,后面会看到)。

class UserActivity {

    lifecycleScope.launch {
    }

    lifecycle.coroutineScope.launch {

    }
}

这两种启动协程的写法是一样的。另外还有launchWhenXXX方法可以让我们指定在某个生命周期启动协程:

class UserActivity {

    lifecycleScope.launchWhenResumed {
    }
}

launchWhenResumed就是在onResume时启动协程,不在onResume生命周期后就会将协程挂起。比如APP切换后台再切回来,协程就会先挂起再恢复。

ViewModel使用协程

viewModelScope.launch启动的协程生命周期跟随ViewModel。一般在调用下层Repository接口时需要启动一个协程,从而能调用Repository层的挂起函数。

class UserViewModel {

    fun getUser(userName: String) {
        viewModelScope.launch(Dispatchers.Default) {
            userRepository.getUser(userName)
        }
    }
}

数据层使用协程

数据层用withContext切换到IO或Default线程池,进行网络数据的请求或者内存、持久层的数据读写。需要开多个协程并行执行任务时,可以LiveData监听结果,ViewModel持有这个LiveData,UI层监听。如果需要多个有依赖关系的协程串行执行,就用async+await方法。

class UserRepository {
    private val _userMutableLiveData = MutableLiveData<User>()
    val userLiveData: LiveData<User>
        get() = _userMutableLiveData

    suspend fun getUser(username: String) =
        withContext(Dispatchers.Default) {
            launch(SupervisorJob()) {
                val response = userService.getUser(username)
                if (response.isSuccessful && response.body() != null) {
                    _userMutableLiveData.value = response.body()
                }
            }
            launch(SupervisorJob()) {
                UserDB.getUser(username)?.apply {
                    _userMutableLiveData.value = this
                }
            }
        }
}

这里并行去本地和网络读取User的数据,读到之后可以用LiveData将数据发出去。如果没有并行任务的要求就比较简单了,挂起函数可以直接返回结构体:

class UserRepository{

    suspend fun getUserHome(username: String): Home? =
        withContext(Dispatchers.Default) {
            val getUserDeferred = async(SupervisorJob()) {
                userService.getUser(username)
            }
            val response = getUserDeferred.await()
            if (response.isSuccessful && response.body() != null) {
                userService.getUserHome(response.body()!!).body()
            } else {
                null
            }
        }
}

上面代码用async和await函数实现串行任务。

LiveData使用协程

上面ViewModel层从Repository层拿到Model后,还是需要声明LiveData暴露给UI层才可以完成数据的完整传递。 比如这样:

class UserViewModel {
    
    val userLiveData = userRepository.userLiveData
   
    fun getUser(userName: String) {
        viewModelScope.launch(Dispatchers.Default) {
            userRepository.getUser(userName)
        }
    }
}
class UserActivity {
    ...
    viewModel.userLiveData.observe(this) {

    }
}

但LiveData提供了liveData函数,可以用liveData{}直接写协程代码块,在ViewModel中直接作为函数结果,拿到的数据通过emit发送出去。

class UserViewModel {

    fun getUserHome(userName: String) = liveData {
        viewModelScope.launch(Dispatchers.Default) {
            emit(userRepository.getUserHome(userName))
        }
    }
}

这样有一个好处就是ViewModel层不用再特意声明LiveData变量给UI层了。UI层原先需要调用ViewModel的一个方法并且对暴露的LiveData变量做监听,用liveData{}的话UI层只需要调用这一个方法就同时完成了请求与监听。

class UserActivity {
    ...
    viewModel.getUserHome("abc").observe(this) {

    }
}

小结

LifecycleOwner、ViewModel、数据层、LiveData处使用到协程的机会是比较多的,使用过程中再多注意一下协程的取消,不要内存泄漏;注意协程的异常处理;如果多个协程之间涉及到资源同步,就用synchronized或者Mutex解决。

Flow战前动员

在写这篇文章之前,我一直有一个疑问:单在异步线程处理上,协程已经足够优秀,配合LiveData处理数据的渲染,足以应付大多数的需求。那为什么又需要Flow呢?在什么地方使用Flow才是比较合适的呢?

针对第一个问题,为什么需要Flow?flow既能提供异步线程框架,又能处理数据,相当于是协程+liveData的结合体。并且flow可以随协程取消,并且处理更复杂的数据流,也可以设置数据重发量并解决背压,这是LiveData做不到的。就是在写法上可能会有坑,操作符相对LiveData比较复杂,处理起来也比较麻烦,比如collect末端操作符要注意不能影响到主线程。

那既然Flow这么强大,是不是无脑写Flow,LiveData可以直接淘汰了呢?这个就见仁见智了,不同团队不同项目做出的选择肯定也是不一样的。在我理解下来,因为我们大部分的业务场景涉及到的数据流是比较简单的,而且不需要做什么复杂的线程切换,那我们就直接用LiveData,非常简单。如果数据流比较复杂,需要做线程切换,又或者要变换数据,就用Flow。如果在这基础上你还需要Flow重发数据,那就选择SharedFlow。如果你只需要重发最新的数据,也可以选择StateFlow,但需要注意StateFlow不会发送重复的数据。

所以针对这个原则,我们就可以解决第二个问题,在什么地方使用Flow比较合适,或者说比较容易上手呢?那就是Repository层。因为通常我们需要在Repository层获取网络数据或者获取本地、内存的数据,有时候不同数据源的数据是需要进行结合或者变换的,所以这里用到Flow的可能性是比较大的。Repository对数据进行处理后,ViewModel拿到的其实就是一个完整可用的数据结构了,ViewModel就可以简单地用LiveData完成与UI层的数据传递。

如果你一定要在UI层进行Flow的监听,那就需要在UI层起一个协程。这里需要注意的是,这里直接launch协程的话会不够安全,因为APP在后台仍会接收Flow数据的更新,容易引发崩溃。那么我们可以用launchedOnXXX或者repeatOnLifecycle来将flow与生命周期关联起来。

class UserActivity {

    lifecycleScope.launchWhenResumed {
        viewModel.userFlow.collect {

        }
    }
    lifecycleScope.launch(Dispatchers.Default) {
        repeatOnLifecycle(Lifecycle.State.RESUMED) {
            viewModel.userFlow.collect {

            }
        }
    }
}

launchedOnXXX在不符合生命周期的情况下会暂停flow。repeatOnLifecycle在每次触发时,会关掉之前的flow重新启动。

Flow实战

那么下面给大家看一下我是怎么写Repository->ViewModel->UI层的数据流转的。

class UserRepository {

    /**
     * Flow处理Repository层数据
     */
    suspend fun getObservableUserHome(username: String): Flow<Home?> {
        // 本地数据
        return UserDB.getUserHome(username)
            // 数据转换
            .map { Home("a") }
            // 网络数据
            .flatMapConcat {
                flow {
                    emit(userService.getUserHome(User("", "")).body())
                }
            }
    }

    /**
     * Flow并行协程
     */
    suspend fun getObservableUser(username: String): Flow<User?> {
        return withContext(Dispatchers.Default) {
            flowOf(
                // 本地数据
                UserDB.getObservableUser(username),
                // 网络数据
                userService.getObservableUser(username).map { it.body() }
            ).flattenConcat()
        }
    }
}

getObservableUserHome方法获取了本地数据之后,再经过一次数据转换,再去获取网络数据。这是典型的串行任务,如果没有串行任务就更简单了,去掉flatMapConcat就可以了。

getObservableUser则是并行任务,获取本地数据和网络数据的两个flow是并发执行的。这里需要注意flattenConcat()操作符只能先接收前一个flow的emit,再接收后一个flow的emit

然后我们在ViewModel层调用Repository层的方法:

class UserViewModel {

    suspend fun getObservableUser(userName: String): LiveData<User?> {
        return userRepository.getObservableUser(userName).asLiveData()
    }

    suspend fun getObservableUserHome(userName: String): LiveData<Home?> {
        return userRepository.getObservableUserHome(userName).asLiveData()
    }
}

这里我们用asLiveData()方法直接将flow转换成了LiveData,非常方便,看源码也可以看到里面是包了一层collect

@JvmOverloads
public fun <T> Flow<T>.asLiveData(
    context: CoroutineContext = EmptyCoroutineContext,
    timeoutInMs: Long = DEFAULT_TIMEOUT
): LiveData<T> = liveData(context, timeoutInMs) {
    collect {
        emit(it)
    }
}

如果有不一样的监听逻辑,比如collectLast,也可以自己写。

然后我们就在UI层调用ViewModel方法就可以了:

class UserActivity {

    lifecycleScope.launchWhenCreated {
        viewModel.getObservableUser("").observe(this@CoroutineActivity) {

        }
        viewModel.getObservableUserHome("").observe(this@CoroutineActivity) {

        }
    }
}

小结

那Flow的实战就到这里,我主要把Flow用在了Repository层处理数据,ViewModel和UI层使用LiveData流转数据。当然,因为我自己也是边学习边总结,其中难免会有不对的地方,也希望大佬们多给点建议让我学习一番~

文章不足之处,还望大家多多海涵,多多指点,先行谢过!

参考文章