likes
comments
collection
share

深入理解 Flutter 中的 Stream (二)

作者站长头像
站长
· 阅读数 37
  1. 使用 scheduleMicrotask 暂停异步流时,流会在打印第一次输出之后才会暂停
  2. StreamBuilder 会出现 Stream 数据丢失的情况

那么今天就来拨云见日,深入剖析 Stream,通过源码层面的实现去回答上面的问题。

StreamController 家族一览

要剖析 Stream,先对它的源头 StreamController 入手,下面这张图是它的主要子类关系图。

深入理解 Flutter 中的 Stream (二)

EventSink是事件输出的接口,我们经常使用的add就是来源于此。

_EventDispatch是事件输出的接口。

_StreamControllerBase是结合了StreamController EventSink _EventDispatch等接口的组合类,其中_EventDispatch定义了事件如何输出,是区分各种Stream功能的关键一环,后面会重点展开。

// 输入事件的接口
abstract interface class EventSink<T> implements Sink<T> {
  void add(T event);
  void addError(Object error, [StackTrace? stackTrace]);
  void close();
}

// 事件分发接口类     
abstract class _EventDispatch<T> {
  void _sendData(T data);
  void _sendError(Object error, StackTrace stackTrace);
  void _sendDone();
}

// _StreamControllerBase 是结合了几个接口的抽象接口,
// 这里主要关心 _EventDispatch
abstract class _StreamControllerBase<T>
    implements
        StreamController<T>,
        _StreamControllerLifecycle<T>,
        _EventSink<T>,
        _EventDispatch<T> {}

_StreamControllerBase底下主要分成两个分支,一个分支是_StreamController领导的非广播流实现;另一个是_BroadcastStreamController所属的广播流分支。

非广播流下又可分为同步流异步流,对应的实现分别是_SyncStreamController_AsyncSteramCOntroller。两者的实现区别体现在_SyncStreamControllerDispatch_AsyncStreamControllerDispatch上,而这两个Dispatch就是实现了_EventDispatch接口的Mixin,也就是规定了这条流要怎么输出事件。

废话不多说,直接看源码,从非广播的基类_StreamController入手。

_StreamController 的实现

二进制位状态

_StreamController 里定义了很多标志位,它们是基于二进制位运算来更新当前状态的。二进制的每一位都能用来代表一种状态,如下,以_STATE_SUBSCRIBED来讲,它的二进制位展开就是00000001,从右往左算第1位就能用来标志流否被订阅,这里1代表当前流被订阅,0则相反。

static const int _STATE_INITIAL = 0;

static const int _STATE_SUBSCRIBED = 1; // 流被订阅
static const int _STATE_CANCELED = 2;  // 订阅取消了
static const int _STATE_SUBSCRIPTION_MASK = 3;  // 流被订阅/订阅取消

static const int _STATE_CLOSED = 4; // 流关闭了。之后不再接收事件的输入
static const int _STATE_ADDSTREAM = 8;  // 流的输入源是 Stream

// 状态标志
int _state = _STATE_INITIAL;

// 是否监听
bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0;

初始状态是_STATE_INITIAL,当我需要标记时,使用二进制运算:_state |= _STATE_SUBSCRIBED,展开就是 00000000 | 00000001 = 00000001,最后一位结果是1;而当我需要取消标记时,使用二进制运算:_state &= ~_STATE_SUBSCRIBED,展开就是00000001 & 00000000,最后一位结果是0

如果要判断当前位的状态,就去判断最后一位是否是 1,使用二进制与运算:_state & _STATE_SUBSCRIBED

二进制位状态遍布在Stream的各处实现中,其实这种处理在其它语言中也经常会看到,巧妙得利用了二进制的原理:一个位代表一种状态。

事件输入: EventSink 的实现

前面提到了EventSink作为流的输入,定义了三种输入接口add addEerror closeaddaddError方法是类似的处理,所以这里只分析addclose方法。

add

// 不在 _STATE_CLOSED &&  _STATE_ADDSTREAM 状态时,能通过 EventSink 添加事件
bool get _mayAddEvent => (_state < _STATE_CLOSED);
// 流是否被订阅
bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0;
// 初始状态
bool get _isInitialState =>
    (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL;

void add(T value) {
  if (!_mayAddEvent) throw _badEventState();
  _add(value);
}

void _add(T value) {
  if (hasListener) {
    _sendData(value);  // _sendData 是 _EventDispatch 中定义的接口方法
  } else if (_isInitialState) {
    _ensurePendingEvents().add(_DelayedData<T>(value));
  }
}

_mayAddEvent为 true 时,add 方法才会往下执行,也就是说当_state_STATE_CLOSED或者_STATE_ADDSTREAM时,都会抛出错误。搜索这个_STATE_ADDSTREAM的引用你会发现,只有在addStream方法中,_state才会被打上_STATE_ADDSTREAM的标记:_state |= _STATE_ADDSTREAM,所以STATE_ADDSTREAM代表的流的另外一种输入方式。

_isInitialState代表的是流的初始状态,此时还没有被订阅,当事件输入时,会先缓存到_ensurePendingEvent()生成的_PendingEvents<T>()对象中。

总结一下

  1. Stream 有两种输入源:EventSink中定义的接口和addStream,两种输入源是互斥的。
  2. 当 Stream 没有被订阅时,事件的输入会缓存到一个列表里

close

// 返回的是一个 Future
Future<void> _ensureDoneFuture() =>
    _doneFuture ??= _isCanceled ? Future._nullFuture : _Future<void>();
    
Future close() {
  if (isClosed) {
    return _ensureDoneFuture();
  }
  if (!_mayAddEvent) throw _badEventState();
  _closeUnchecked();
  return _ensureDoneFuture();
}

// 会发出一个 Done 事件
void _closeUnchecked() {
  _state |= _STATE_CLOSED;
  if (hasListener) {
    _sendDone();
  } else if (_isInitialState) {
    _ensurePendingEvents().add(const _DelayedDone());
  }
}

close 方法返回的是一个 Future,这个 Future 会在 _recordCancel 方法中执行 Complete,当StreamSubscription结束时,_recordCancel方法会被调用。StreamSubscription结束有几种情况,一种是接收到了Done事件,就是这个close方法中输入的;一种是StreamSubscription构造方法中cancelOnError为 true 时,接收到了error事件,也就是通过addError方法输入的;最后一种是StreamSubscription直接调用自身的cancel方法。

  1. close后也会产生一个事件,事件名字是_DelayedDone
  2. close后 Stream 不是马上关闭的,要等到事件处理完成后才会完成。

事件输出: _EventDispatch 的实现

前面提到了事件如何输入,现在讲下事件的输出。_EventDispatch中也有三个方法_sendData _senError _sendDone,这三个输出方法分别对应前面提到的三个输入。_StreamController并没有实现这三个方法,而是交给了它的子类。同步流和异步流的差异在于继承了不同的_EventDispatch

class _SyncStreamController<T> = _StreamController<T>
    with _SyncStreamControllerDispatch<T>;
    
class _AsyncStreamController<T> = _StreamController<T>
    with _AsyncStreamControllerDispatch<T>;

接下来看一下不同的_EventDispatch的实现

mixin _SyncStreamControllerDispatch<T>
    implements _StreamController<T>, SynchronousStreamController<T> {
  void _sendData(T data) {
    _subscription._add(data);
  }

  void _sendError(Object error, StackTrace stackTrace) {
    _subscription._addError(error, stackTrace);
  }

  void _sendDone() {
    _subscription._close();
  }
}

mixin _AsyncStreamControllerDispatch<T> implements _StreamController<T> {
  void _sendData(T data) {
    _subscription._addPending(_DelayedData<T>(data));
  }

  void _sendError(Object error, StackTrace stackTrace) {
    _subscription._addPending(_DelayedError(error, stackTrace));
  }

  void _sendDone() {
    _subscription._addPending(const _DelayedDone());
  }
}

可以看到,异步流会把所有的事件统一通过_addPending去处理,从名字上去看这应该是一个缓存队列。顺着这条路,我们来分析一下_subscription的来源以及它是如何消费事件的:_add _addError _close _addPending的区别。

_subscription 的创建

我们知道,StreamSubscription是通过Stream.listen返回的,定位一下调用过程,它的完整的路径是 Stream.listen -> Stream._createSubscription -> StreamController._subscribe。最终的_subscribe的实现是

StreamSubscription<T> _subscribe(void onData(T data)?, Function? onError,
    void onDone()?, bool cancelOnError) {
  if (!_isInitialState) {
    throw StateError("Stream has already been listened to.");
  }
  // 创建 subscription
  _ControllerSubscription<T> subscription = _ControllerSubscription<T>(
      this, onData, onError, onDone, cancelOnError);

  _PendingEvents<T>? pendingEvents = _pendingEvents;
  _state |= _STATE_SUBSCRIBED;
  // 通过 _varData 将 subscription 存起来
  if (_isAddingStream) {
    _StreamControllerAddStreamState<T> addState = _varData as dynamic;
    addState.varData = subscription;
    addState.resume();
  } else {
    _varData = subscription;
  }
  subscription._setPendingEvents(pendingEvents); // 将缓存事件传给 Subscription
  subscription._guardCallback(() {               // 触发 onListen
    _runGuarded(onListen);
  });

  return subscription;
}

这里主要做几件事

  1. 创建 subscription,并将它的引用保存到 _varData 。 这里有一点需要知道的是 _varData 有可能是 _StreamControllerAddStreamState 或者 StreamSubscription 的类型,当 _isAddingStream时,subscription 会被保存到_StreamControllerAddStreamState.varData里。
  2. 将提前到的事件 pendingEvent 传给 subscription 。
  3. 触发 onListen 。

通过前面的流程,获取subscription的方式就比较容易理解了。

_ControllerSubscription<T> get _subscription {
  assert(hasListener);
  Object? varData = _varData;
  if (_isAddingStream) {
    _StreamControllerAddStreamState<Object?> streamState = varData as dynamic;
    varData = streamState.varData;
  }
  return varData as dynamic;
}

_ControllerSubscription 如何消费事件

前面的分析,我们能得出subscription_ControllerSubscription类型的。现在我们就可以通过它的_add _addError _close _addPending 来分析它的输出,它的实现在父类_BufferingStreamSubscription 中。前面三个方法的实现类似,我们只取_add来分析

// _canFire 表示是不是有事件正在处理中
bool get _canFire => _state < _STATE_IN_CALLBACK;

void _add(T data) {
  assert(!_isClosed);
  if (_isCanceled) return;
  if (_canFire) {
    // 如果没有事件在处理,则进行
    _sendData(data);
  } else {
    _addPending(new _DelayedData<T>(data));
  }
}

// 发送事件
void _sendData(T data) {
  ...
  bool wasInputPaused = _isInputPaused;
  _state |= _STATE_IN_CALLBACK;
  // 事件处理,调用 _onData
  _zone.runUnaryGuarded(_onData, data);
  _state &= ~_STATE_IN_CALLBACK;
  _checkState(wasInputPaused);
}

// 存放到队列中
void _addPending(_DelayedEvent event) {
  var pending = _pending ??= _PendingEvents<T>();
  pending.add(event);
  if (!_hasPending) {
    _state |= _STATE_HAS_PENDING;
    if (!_isPaused) {
      pending.schedule(this);
    }
  }
}

这里有两点值得关注

  1. 事件最终会通过 _onData 发送出去,这个 _onData 是我们创建 Subscription 时传进来的回调方法
  2. Subscription 里面也会有一个队列_pendingEvent,存放着还未消费的事件,这个队列最终会通过 schedule(this)方法一个个输出。

schedule

关于schedule方法,我们来看一下它的处理。首先,它会对当前队列状态做一下判断,接着它会通过scheduleMicrotask开启事件的处理。但要知道,这个scheduleMicrotask开启的是一个异步事件 micro eventhandleNext就是事件处理的核心,handleNext会使到队列的下一个事件,回调到Subscription_sendData等方法。

void schedule(_EventDispatch<T> dispatch) {
  // 状态判断,避免重复
  if (isScheduled) return;
  assert(!isEmpty);
  if (_eventScheduled) {
    assert(_state == stateCanceled);
    _state = stateScheduled;
    return;
  }
  // 开启一个 microTask 事件,准备处理事件
  scheduleMicrotask(() {
    int oldState = _state;
    _state = stateUnscheduled;
    if (oldState == stateCanceled) return;
    handleNext(dispatch);
  });
  _state = stateScheduled;
}

至此,我已经讲完了非广播流下的同步异步流的处理,它们从事件输入到输出的整个过程。现在我们可以回过头来看一下,在文章开头提到的第 1 个问题

使用 scheduleMicrotask 暂停异步流时,流为什么会在打印第一次输出之后才会暂停?

void main() async {
  
  // 异步:sync 为 false
  var controller = StreamController<int>(sync: false);
  var subscription = controller.stream.listen((event) {
    print('output: $event');
  });
  
  controller.add(1);
  controller.add(2);
  controller.add(3);

  scheduleMicrotask(() {
    print('暂停');
    subscription.pause();

    Future.delayed(const Duration(seconds: 3), () {
      print('3秒后 -> 恢复');
      subscription.resume();
    });
  });
}

// will print
// output: 1
// 暂停
// 3秒后 -> 恢复
// output: 2
// output: 3

首先,异步流会在_AsyncStreamControllerDispatch通过_subscription._addPending输入,先放到队列里,最终会通过_pendingEvent.schedule方法输出。在这里,add(1)过来之后,schedule方法就会开始执行了,但由于它里面的scheduleMicrotask是异步的,所以handleNext还未执行。接着,同步的add(2) add(3)方法过来了。

然后走到下面的scheduleMicrotask准备暂停流,这个也是异步的任务。需要注意的是这个异步任务排在了add(1)触发的scheule -> scheduleMicrotask的后面。所以,handleNext先执行,输出output: 1handleNext处理完当前事件之后,会走队列下一个事件的输出,这个事件的输出也是通过schedule方法进行的,也就是后面的事件也是通过schedule里的scheduleMicrotask的异步处理里面。

但由于先后顺序问题,先走到了subscriptionn.paused()里,流先被暂停了,后面的事件就被暂停了。等到 3 秒后,事件恢复,才继续事件的消费。

_BroadcastStreamController 的实现

广播流的处理跟非广播流异曲同工。非广播流只存在一个_subscription_BroadcastStreamController则保存一个链表结构的_BroadcastSubscription,它里面保存着两个对象_firstSubscription _lastSubscription

// Double-linked list of active listeners.
_BroadcastSubscription<T>? _firstSubscription;
_BroadcastSubscription<T>? _lastSubscription;

对于同步流_SyncBroadcastStreamController来说,事件输出时通过_forEachListener 遍历操作 _sendData_forEachListener里面有一个_isFiring的判断,不允许还没有输出结束前再触发输出。

if (_isFiring) {
  throw new StateError(
      "Cannot fire new event. Controller is already firing an event");
}

为了解决这种情况,在_SyncBroadcastStreamController下创建了创建了一个子类_AsBroadcastStreamController,是一种特殊的同步流,当处于_isFiring状态时,事件过来的时候,会将事件通过异步的方式处理,也就是保存到pendingEvent里面。

void add(T data) {
  if (!isClosed && _isFiring) {
    _addPendingEvent(new _DelayedData<T>(data));
    return;
  }
  super.add(data);
  _flushPending();
}

异步流_AsyncBroadcastStreamController则是跟非广播下的异步流的处理方式一样,事件都会先缓存到_pendingEvent里面。

总的来说,非广播流广播流下的同步同步的处理是类似的。 只要掌握同步异步的本质就行。

总结

本来还打算继续分析 Stream 的变换处理 _ForwardingStream 以及 StreamBuilder的实现,但写着写着文章已到了 12000+ 的字数。这两者的实现里面没有太难理解的部分,既然如此,就留给大家自己去分析和思考了,有问题可以随时留言讨论,或者点个关注+发私信,我看到一定会回复的~

转载自:https://juejin.cn/post/7268539503903211560
评论
请登录