网络知识 娱乐 关于Flow的原理解析

关于Flow的原理解析

Flow是基于协程实现的异步数据流,所以在学习flow原理之前需要掌握协程相关的知识。

话不多说,直接开摆,不是,直接开始。

首先最简单的创建flow我们都知道flow{ emit(vale) },这里使用lambda直接将block传入flow里面了, 我们都知道flow是冷流,只有当我们collect收集flow的时候,这里面的block才会执行,这么说没人反对吧, 但是这是怎么实现的,其实就是巧妙的应用了协程的挂起函数。

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)nn// Named anonymous objectnprivate class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {n override suspend fun collectSafely(collector: FlowCollector<T>) {n collector.block()n }n}n

可以看到我们flow{}返回的是一个SafeFlow()对象,代码块block继承的是挂起的一个接口FlowCollector,这里是不是想起来launch源码里面的一个参数block: suspend CoroutineScope.() -> Unit,没错就是一样的!

public interface FlowCollector<in T> {nn /**n * Collects the value emitted by the upstream.n * This method is not thread-safe and should not be invoked concurrently.n */n public suspend fun emit(value: T)n}n

这个FlowCollector很重要,官方给的释义是:FlowCollector用作流的中间或终端收集器,表示接受流发出的值的实体。 此接口通常不应直接实现,而应在实现自定义运算符时用作流生成器中的接收器。此接口的实现不是线程安全的。这个接口只有一个emit方法,这个方法我们再熟悉不过了,就是我们往流里面塞数据用的方法,可以看到emit是一个挂起函数。(这里可以类比一下liveData的协程实现方式)

再看下SafeFlow里面重写了一个collectSafely方法,具体实现看下SafeFlow继承的AbstractFlow类

@FlowPreviewnpublic abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {nn @InternalCoroutinesApin public final override suspend fun collect(collector: FlowCollector<T>) {n val safeCollector = SafeCollector(collector, coroutineContext)n try {n collectSafely(safeCollector)n } finally {n safeCollector.releaseIntercepted()n }n }nn /**n * Accepts the given [collector] and [emits][FlowCollector.emit] values into it.n *n * A valid implementation of this method has the following constraints:n * 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values.n * The emission should happen in the context of the [collect] call.n * Please refer to the top-level [Flow] documentation for more details.n * 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are notn * thread-safe by default.n * To automatically serialize emissions [channelFlow] builder can be used instead of [flow]n *n * @throws IllegalStateException if any of the invariants are violated.n */n public abstract suspend fun collectSafely(collector: FlowCollector<T>)n}n

AbstractFlow里面就能看到collect的实现方法,可以看到AbstractFlow继承的就是Flow基类,Flow基类中没什么实现方法,只有一个collect的抽象方法,所以我们着重看下AbstractFlow里面的collect是怎么实现异步执行emit的。

public suspend fun Flow<*>.collect(): Unit = collect(NopCollector)nnpublic suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =n collect(object : FlowCollector<T> {n override suspend fun emit(value: T) = action(value)n })n

无论使用collect()还是collect{}调用的都是Flow的collect抽象方法,collect真正实现还是在AbstractFlow里面,这里只是将collect的block封装成了FlowCollector(重写emit方法用于执行block)

封装的FlowCollector和CoroutineContext继续被封装为SafeCollector类型,并将该实例传入collectSafely方法用于在SafeFlow中实现block的执行。

@Suppress("UNCHECKED_CAST")nprivate val emitFun =n FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>nn@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST")ninternal actual class SafeCollector<T> actual constructor(n @JvmField internal actual val collector: FlowCollector<T>,n @JvmField internal actual val collectContext: CoroutineContextn) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {nn......nn /**n *这是状态机重用的巧妙实现。首先,它检查它是否未同时使用(我们明确禁止),然后只缓存一个完成实例,n *避免在每个发射上进行额外分配,使其在其热路径上有效地无垃圾。n */n override suspend fun emit(value: T) {n return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->n try {n emit(uCont, value)n } catch (e: Throwable) {n // 保存已引发emit异常(甚至检查上下文)的事实n lastEmissionContext = DownstreamExceptionElement(e)n throw en }n }n }nn private fun emit(uCont: Continuation<Unit>, value: T): Any? {n val currentContext = uCont.contextn currentContext.ensureActive()n // 检查flow是否在同一个上下文中n val previousContext = lastEmissionContextn if (previousContext !== currentContext) {n checkContext(currentContext, previousContext, value)n }n completion = uContn return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)n }nn//检查contextnprivate fun checkContext(n currentContext: CoroutineContext,n previousContext: CoroutineContext?,n value: Tn) {n if (previousContext is DownstreamExceptionElement) {n exceptionTransparencyViolated(previousContext, value)n }n checkContext(currentContext)n lastEmissionContext = currentContextn}nn

通过上面的分析可知,未调用collect之前是不会调用collectSafely方法不会收集我们塞入emit中的数据,只有调用collect方法才会调用到,进而以扩展函数的形式调用到flow的block,block里面就是我们在emit里面写入的数据,所以作为接收器的SafeCollector调用了emit方法。

FlowCollector的block在SafeCollector中以value的形式进行传递,最后返回一个emitFun变量,该变量的类型就是FlowCollector的emit的block类型。

看到这个emitFun的参数会发现怎么多了一个,因为emit是挂起函数默认有一个参数的传递就是Continuation,这一块kotlin替我们做了,但是对于java来说必须要传递Continuation进去。通过这种方式传入一样的Continuation,保证了Continuation的统一。

说了这么多总结一下:

利用扩展函数的性质,调用到flow的block进而调用了SafeCollector的emit,而这里的emit会调用到传进来的FlowCollector的emit,而传进来的emit函数被重写调用block,所以就会调用到collect的block。因为只有调用collect之后进而调用到safeFlow的collect函数,进而才会调用到collectSafely函数去执行flow的代码。所以不调用collect的话,flow的代码构建块是不会执行的,最多返回一个safeFlow的对象而已。

作者:Lu_Hsiang
链接:https://juejin.cn/post/7120112404393885733