likes
comments
collection
share

「最后一次,彻底搞懂kotlin Flow」(三) | 冷暖自知:Flow 与 SharedFlow 的冷和热

作者站长头像
站长
· 阅读数 18

「最后一次,彻底搞懂kotlin Flow」(三) | 冷暖自知:Flow 与 SharedFlow 的冷和热

引言

Cold | Hot

  coldhot 的概念并非一个通用的计算机术语,据笔者所知,这个概念最初来自于 ReactiveX,可以理解为我们熟悉的 Rxjava。但笔者并没有从 ReactiveX 官网上找到对于 cold 和 hot 的直接定义,我们来看看 ReactiveX 中的一段对于 cold 与 hot 的描述:

A “hot” Observable may begin emitting items as soon as it is created, and so any observer who later subscribes to that Observable may start observing the sequence somewhere in the middle. A “cold” Observable, on the other hand, waits until an observer subscribes to it before it begins to emit items, and so such an observer is guaranteed to see the whole sequence from the beginning.

  翻译:“热” Observable 可能在创建后立即开始发射数据,因此任何后来订阅该 Observable 的观察者可能从中间某个位置开始观察序列。另一方面,“冷” Observable 则等待观察者订阅它后才开始发射数据,因此这样的观察者可以保证从一开始就看到整个序列。

  可以看出来,流的冷热主要体现为创建流之后流开始发射数据的时机,冷流是在 subscribe 时开始发射,热流可以在没有观察者的时候就发射数据了。在知道了冷热流的含义后我们来看看 Flow 中的 cold 与 hot。

Cold

  在 Flow五项全能篇 中我们提到了 Flow 默认是 cold 的,我们来看看这个例子来加深一下对 cold 的理解:

// ColdHotFlow1.kt
fun main() = runBlocking {
    val flow = flow {
        repeat(5) {
            // 1. print emit
            println("emit $it")
            emit(it)
            delay(1000)
        }
    }

    // 2. print before collect and delay
    printlnWithTime()
    delay(1000)

    // 3. print collect
    flow.collect { printlnWithTime(it) }
}

// log
1247
emit 0
2268: 0
emit 1
3270: 1
emit 2
4275: 2
emit 3
5277: 3
emit 4
6283: 4

分析上面的 log,注释 2 处 首先打印,说明前面的 flow builder 并未开始 emit 数据。1s 之后,flow 开始 collect,依次打印了注释 1 处的 emit 和注释 3 处的 collect。这就是 cold 流,是受 collector 控制的流。

Hot

  说到 hot,你可能已经想到了 SharedFlow,那么我们就来看看 Channel(🤪):

Channel

// ColdHotFlow2.kt
fun main() = runBlocking {
    val channel = Channel<Int>(capacity = 2)

    launch {
        repeat(4) {
            channel.send(it)
            printlnWithTime("send $it")
            delay(1000)
        }
    }

    delay(4000)
    printlnWithTime()

    launch {
        repeat(4) {
            val receive = channel.receive()
            printlnWithTime("receive $receive")
        }
    }

    Unit
}

// log
0688: send 0
1693: send 1
4691
4700: receive 0
4700: receive 1
4700: receive 2
4700: send 2
5707: send 3
5707: receive 3

  可以看到,跟前面 flow 不一样的是,log 首先输出了两个 send,着对应于我们构造 channel 时的参数 capacity = 2,然后不再继续发送,在 launch 后的 delay 4s 之后,开始 launch 接收,这里马上接受到了前面 send 的两个值,然后连着接受了第三个值 2,最后在等待了发送 2 之后的 1s 后,开始发送和接收最后一个参数 3。由此,我们可以推断:

  1. channel 在开始接收之前就已经发送了
  2. channel 在发送完后可能会等待接收,这取决于 capacity 参数的设置和收发的情况
  3. channel 在接受完后会继续发送,直至发送完成

channel 并不是可观察的流,没有 collect 或者 subscribe 之类的方法,而是需要一个一个主动去接收。但我们依然可以一窥关于 hot 的特性,在未接收前就可以开始发送

为什么要区分冷流热流呢?因为热流是对于真实世界中无法控制数据产生时机的事物的抽象,比如:用户界面事件,传感器,实时网络数据(聊天,音视频流)等。我们无法控制这些可持续产生的数据何时产生,这时用像 standard flow builder 这样的自己把握数据源产生的冷流明显就不再合适了。Channel 让我们一窥了 hot 的特性,下面我们就来看看真的热流。

SharedFlow

// ColdHotFlow3.kt
fun main() = runBlocking {
    val sharedFlow = MutableSharedFlow<Int>(replay = 2)

    launch {
        repeat(4) {
            sharedFlow.emit(it)
            printlnWithTime("emit $it")
            delay(1000)
        }
    }

    delay(4000)
    printlnWithTime()

    launch {
        sharedFlow.collect {
            printlnWithTime("collect $it")
        }
    }

    Unit
}

// log
1721292956242: emit 0
1721292957249: emit 1
1721292958251: emit 2
1721292959252: emit 3
1721292960245
1721292960256: collect 2
1721292960256: collect 3

  可以看出我们的 sharedFlow 的默认行为跟 channel 又有一些不一样了,虽然 sharedFlow 同样可以在没有接收者的情况下开始发送,但,看起来他不仅没有等,而且在发送的次数超过 capacity 之后不会停下来等待接收者,只是在接收者姗姗来迟之后把最后的两个数据给了接收者,看起来 shareFlow 这个热流有点冷淡(cold)。但这其实是一般热流的默认行为,channel 并非流,而是 BlockingQueue 的非阻塞版本(所以可以叫 SuspendQueue?),主要是用于在多个 Coroutine 之间通信。

  我之所以说这不是 sharedFlow 的默认行为是因为 sharedFlow 也可以通过设置参数达成 channel 这样的效果,我们先来看看 MutableSharedFlow 的 builder 方法:

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

  MutableSharedFlow builder 方法有三个参数

  1. replay :用于定义在 flow 开始 collect 时,最多可以收到几个在 collect 之前已经发送的数据
  2. extraBufferCapacity: 用于定义除了 replay 的数量外,还可以有多少个可以缓冲量,replay + extraBufferCapacity = channel 中的 capacity。但这个 extraBufferCapacity 有一些实现的前提,后面回讲到
  3. onBufferOverflow: 用于定义缓冲区溢出时的行为,有 SUSPENDDROP_OLDESTDROP_LATEST 几种值可选,默认为 SUSPEND,下面介绍一下 SUSPEND

  接下来我们就用上面的参数来部分模拟一下 chennel 例子中的行为:

// ColdHotFlow4.kt
fun main() = runBlocking {
    val sharedFlow = MutableSharedFlow<Int>(replay = 1, extraBufferCapacity = 1)

    // 1. collect
    launch {
        sharedFlow.collect {
            delay(1000)
            printlnWithTime("collect $it")
        }
    }

    // 2. emit
    launch {
        repeat(4) {
            sharedFlow.emit(it)
            printlnWithTime("emit $it")
        }
    }

    Unit
}

// log
2921: emit 0
2922: emit 1
2925: emit 2
3925: collect 0
3925: emit 3
4931: collect 1
5936: collect 2
6942: collect 3

  这里我们首先需要先 collect,再 emit,否则 suspend 不会起作用。

  第二是我们把 delay 放到了 collect 中,用以形成 emit 的时间优势,这样就能体现出 emit 被 suspend。从 log 中可以看出来我们连续 emit 了 0 ,1,然后在 emit 2 的时候被 suspend 了,直到 collect 0 结束,emit 2 才完成。然后马上尝试 emit 3,被 suspend,然后依次 collect 直到结束。这就是 sharedFlow 的一些特性,我们再来看看另外一个常用的 hot flow:StateFlow。

StateFlow

// ColdHotFlow5.kt
fun main() = runBlocking {
    val stateFlow = MutableStateFlow(-1)

    launch {
        stateFlow.collect {
            delay(1000)
            printlnWithTime("collect $it")
        }
    }

    launch {
        repeat(3){
            stateFlow.emit(it)
            printlnWithTime("emit $it")
        }
    }

    Unit
}

  构造 MutableStateFlow 需要传入一个初始值。如同 StateFlow 的名字一样,这个 Flow 一定有一个状态,所以需要传入一个初始状态,即使传入 null 值,也需要手动传入。并且也只需要传入这一个值,没有capacity,没有 extraBufferCapacity。猜一猜打印的 log 是什么样的。如果不熟悉 StateFlow,log 应该会跟你想象的不一样:

0212: emit 0
0212: emit 1
0212: emit 2
1215: collect -1
2216: collect 2

  上面我们首先开始 collect,因为 delay 1s,所以会先输出下面的 emit,然后作为初始 state 传入的 -1 在 1s 后会被 collect 输出,然后被 collect 的是 2,中间的 0,1 哪里去了?

  这里就需要介绍一下 stateFlow 的特性了:即使已经 collect,也不会保证 collector 收到所有的中间 state,因为 stateFlow 默认 collector 只对最新的 state 感兴趣,如果 collector 来不及消费中间状态的值,那么这些中间状态就会被丢弃。所以我们最终在最初的 -1 之后只会 collect 到 2。

  SharedFlow 根据其参数的不同,可以支持各种不同的行为,StateFlow 可以看作是一种特殊的 SharedFlow,想象如果用 SharedFlow 应该怎样实现 StateFlow?下面我们就来看看:

// ColdHotFlow6.kt
fun main() = runBlocking {
    val sharedFlow = MutableSharedFlow<Int>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
    sharedFlow.emit(-1)

    launch {
        sharedFlow.collect {
            delay(1000)
            printlnWithTime("collect $it")
        }
    }

    launch {
        repeat(3){
            sharedFlow.emit(it)
            printlnWithTime("emit $it")
        }
    }

    Unit
}

// log
2308: emit 0
2308: emit 1
2308: emit 2
3310: collect -1
4316: collect 2

  我们在上面的示例中实现了对 StateFlow 的模拟。所以: StateFlow 的行为就像是 replay = 1 ,并且 DROP_OLDEST 的 SharedFlow。在了解了热流后,下面我们来讨论一个在热流中可能引起的问题:backpressure,背压问题。

背压

  如果我们把上游的 flow 和下游的 collector 分别看作 back 和 front 的话,那么我们可以说背压是上游生产者的压力。如何理解 backpressure 呢,我们来看看下面这个例子:

fun main() = runBlocking {
    val sharedFlow = MutableSharedFlow<String>()

    // 1. collect
    launch {
        sharedFlow.collect {
            delay(1000)
            printlnWithTime("collect $it")
        }
    }

    // 2. emit
    CoroutineScope(Executors.newSingleThreadExecutor().asCoroutineDispatcher()).launch {
        while (true) {
            launch {
                sharedFlow.emit(UUID.randomUUID().toString())
                println("emit")
            }
        }
    }

    Unit
}

// log
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "pool-1-thread-1"

  我们在注释 2 处开始不停的 launch Coroutine 来 emit 数据,这些 coroutine 会被 suspend 住并被加入到队列中,直到下游 flow collect 一个数据,然后这些 Coroutine 才会被恢复。但是下游的 flow collect 数据却很慢,这就给上游数据产生造成了压力生产者生产和消费者消费速率的不匹配(上快下慢),这就造成了背压。问题产生了,那么如何解决背压呢?

Buffer

  想象如果一段时间内有持续的暴雨,那么就可能会造成河水上涨,河水流速增快,但只是通过河流排洪的话始终有一个极限,严重时甚至造成河流溃坝,形成水灾。所以我们修建水库,用来蓄洪,这就是 capacity 和 extraBufferCapacity 的作用,他们共同构成了 Buffer。一旦有再次有暴雨,水库就能发挥蓄洪的作用。然而受到物理环境的限制,水库不可能无限大,并且水库的质量有好有坏,施工维护差的水库可能在达到设计标准前就得偷偷泄洪了。上面我们用暴雨形成的积水,水库,河道来比喻了 Flow,Buffer,Collector。下面我们来看看在 Flow 中,buffer 达到极限时的几种做法。

BufferOverflow

BufferOverflow 有三个值,下面我们来看看 Flow 中如何处理 buffer overflow 的:

  1. SUSPEND: 挂起发射数据,在暴雨的场景下,这显然不适用
  2. DROP_OLDEST:丢弃最老的数据,这意味着新的数据是更有意义的
  3. DROP_LATEST:丢弃最新的数据,与上面正好相反

上面就是 Flow 中几种处理 buffer overflow 的方式,具体用什么方式取决于业务的类型。我们知道了热流和冷流的特性,那么热流和冷流可以互相转化吗?当然可以的!

冷热交替

Cold -> Hot

// ColdHotFlow7.kt
fun main() = runBlocking {
    val coldFlow = flow {
        repeat(3) {
            delay(1000)
            emit(it)
        }
    }
    collectColdFlow(coldFlow)

    val hotFlow = coldFlow.shareIn(scope = this, started = SharingStarted.Eagerly)
    collectHotFlow(hotFlow)
}


private suspend fun collectColdFlow(coldFlow: Flow<Int>) = coroutineScope {
    repeat(2) { collectorIndex ->
        launch { coldFlow.collect { printlnWithTime("collect${collectorIndex + 1} cold $it") } }
        delay(2000)
    }
}


private suspend fun collectHotFlow(hotFlow: SharedFlow<Int>) = coroutineScope {
    repeat(2) { collectorIndex ->
        launch { hotFlow.collect { printlnWithTime("collect${collectorIndex + 1} hot $it") } }
        delay(2000)
    }
}

  上面的代码我们分别 collect 了冷流和热流各两次,对同一个流两次 collect 之间间隔了 2s,看看 log 输出:

2255: collect1 cold 0
3256: collect1 cold 1
4243: collect2 cold 0
4259: collect1 cold 2
5244: collect2 cold 1
6249: collect2 cold 2

7267: collect1 hot 0
8267: collect1 hot 1
9267: collect1 hot 2
9268: collect2 hot 2

  可以看出来虽然两个 cold flow 被 collect 了两次且中间间隔了 2s,但 collector2 依然可以把 flow 中的数据完全收集到,cold flow 并没有 replay,只是因为 cold flow 可以控制数据源,所以在每次 collect 时都会完整的 emit 所有的数据,不同的 flow 之间不会共享 collect 的进度,所以我们或许可以把 default cold flow 称为 UnSharedFlow

  SharedFlow 则正如其名字一样,两个 collector 共享了 collect 的进度。没有 replay 的 sharedFlow,在 collector2 2s 后开始 collect 数据时,collect1 已经 collect 了 0,1,所以 collector2 只能 collect 到一个 2。

在上面的实例中,我们把 coldFlow 转化为 hotFlow 用到了一个方法:shareIn,这个方法接收三个参数,

  1. scope:决定了 flow 的生命周期
  2. started : 决定 flow 开始和结束的时机
  3. replay: 这个我们熟悉,用于定义在 flow 开始 collect 时,最多可以收到几个在 collect 之前已经发送的数据

这个 replay 看起来有点 sharedFlow 的味道,的确,shareIn 方法返回的就是 SharedFlow,replay 直接对应,其他两个参数由 Flow 本身的属性决定,这里不做展开。需要注意是是 started 参数,其中一个可选的值是 Eagerly:这意味着我们会得到标准的热流,这会让 flow 立即被 collect,另外一个是 Lazily:返回一个 shared cold flow,shared flow 并非一定是 hot 的。StateFlow 也有一个对应的 stateIn 的方法,这里不作介绍。

Hot -> Cold

  官方并没有 hot flow 转 cold flow 的方法,cold flow 把 flow 转换为 SharedFlow 并立即开始 emit 数据,但已经开始 emit 数据的 sharedFlow 如何转换为 cold flow?我们确实无法阻止 hot flow 发射数据,但是我们却可以得到可用的 cold flow:

// ColdHotFlow8.kt
fun main() = runBlocking {
    val sharedFlow = MutableSharedFlow<Int>(replay = Int.MAX_VALUE)

    // 1. sharedFlow emit
    launch {
        repeat(3) {
            println("emit $it")
            sharedFlow.emit(it)
        }
    }
    delay(1000)
    // 2. generate cold flow
    val coldFlow = sharedFlow.asColdFlow()
    // 3. dependently collect coldFlow 2 times
    repeat(2) { collectorIndex->
        delay(1000)
        launch {
            coldFlow.collect { println("$collectorIndex collect $it") }
        }
    }
}

fun <T> SharedFlow<T>.asColdFlow() = flow {
    collect { emit(it) }
}

// log
emit 0
emit 1
emit 2
0 collect 0
0 collect 1
0 collect 2
1 collect 0
1 collect 1
1 collect 2

  我们在注释 1 处 emit 了 3个数据,然后等待了 1s,之后我们在注释 2 处才生成 cold flow,再在注释 3 处 collect 了两次,两次间隔 1s,即使如此我们仍然每次都 collect 到了完成的数据,就像是 cold flow 一样。

  其实原理非常简单,就在于 asColdFlow 方法和 replay 参数,我们把 replay 参数设置为最大值,这样我们就能 buffer sharedFlow 发射的数据,然后再用 standard flow build 构造一个 cold flow,在 collect 时发送 buffer sharedFlow 的值。注意:这个做法仅供学习,内存如同水库容量的物理限制一样,缓存始终有上限值,达到上限后可能会丢数据,另外大量的缓存数据也会造成内存消耗

总结

我们认识到了 Flow 的冷与热的概念并非凭空产生,而是对于现实世界中不同类型数据的生成机制的抽象。

Cold Flow 的数据在开始 collect 时才会发射,并且默认不同的 collector 之间会相互独立的接收数据。

Hot Flow 的数据在还未有 collector 时就会开始发射,我们可以根据需求使用 SharedFlow 和 StateFlow

Hot Flow 由于自身的性质可能会产生背压的问题,但我们可以通过 buffer 与 BufferOverflow 配合来处理

Hot Flow 和 Cold Flow 也不是完全不同的,他们在一定程度上是可以互相转化的。

我们这三篇已经讲 Flow 演化的历史,Flow 的结构,Flow 的冷热性质涵盖到了。接下来要讲什么,读者可以在评论区讨论互动,笔者会根据互动的情况来安排后面的内容。

示例源码github.com/chdhy/kotli…

练习

  1. 上一次练习查看了 debounce 操作符的源码,思考一下 debounce 在是否可以算作一种对于backpressure 的应对
  2. sample 是否可以应对背压?

点赞👍文章,关注❤️ 笔者,获取其他文章更新

转载自:https://juejin.cn/post/7396361501790142474
评论
请登录