likes
comments
collection
share

Async*和yield的应用:制作一个简单的异步任务管理器

作者站长头像
站长
· 阅读数 6

前言

在开发过程中,我们经常会遇到这样的需求:有一批任务,每个任务的处理时长均不相同。当开始处理这批任务时,希望顺序执行,并展示总体进度和当前处理的任务。 如下图:

Async*和yield的应用:制作一个简单的异步任务管理器

设计和实现

通过上面的需求描述,我们不难发现此模块需要两个类来实现:异步任务(AsyncTask)和任务管理器(AsyncTaskManager)。其类图及实现如下:

AsyncTask

AsyncTask
+ValueNotifier progress
+get describe()
+dispose()
+startTask()

根据类图,我们可以设计如下抽象类:

abstract class AsyncTask {

  ///当前任务执行进度 [0-1.0]
  final ValueNotifier<double> progress = ValueNotifier(0.0);

  ///任务描述
  String get describe;

  ///释放任务
  void dispose();

  ///开始任务
  Future<bool> startTask();

}

AsyncTaskManager

AsyncTaskManager
+bool strict
-ValueNotifier progress
-ValueNotifier taskDescribe
-double taskWeight
-int totalToastLen
-Queue taskQueue
-Completer signal
+registerTask(AsyncTask task)
+dispose()
+getTaskByType() : T?
+launchTask() : Future
-watchTask(AsyncTask t)
-fetchTask() : Stream

这其中,我们将管理器内部的执行分为两部分:取任务_fetchTask()和执行任务launchTask()。由于二者不存在顺序关系,为了实现“顺序执行”这个需求,我们首先通过async*yield两个关键字,将任务的获取转换为Stream,之后增加一个signal(Completer)当作信号,每次取出一个任务时,便等待信号的变化,任务处理完成后,发送信号以获取下一个任务。实现如下:

class AsyncTaskManager {

  AsyncTaskManager({this.strict = false});

  ///严格模式
  /// * true : 一个任务错误,中断整个任务队列
  final bool strict;

  ///执行进度
  final _progress = 0.0.vn;
  ValueNotifier<double> get progress => _progress;

  ///任务描述
  final _taskDescribe = ''.vn;
  ValueNotifier<String> get taskDescribe => _taskDescribe;

  ///每个任务的进度权重
  /// * 影响进度的迭代值
  double _taskWeight = 1.0;
  
  ///初始任务总数量
  int _totalTaskLen = 0;

  Completer<bool>? _signal;

  ///待处理的任务队列
  final _taskQueue = Queue<AsyncTask>();

  ///注册任务
  void registerTask(AsyncTask task) {
    _taskQueue.add(task);
    _taskWeight = 1 / _taskQueue.length;
  }

  ///根据类型返回指定任务
  T? getTaskByType<T extends AsyncTask>() {
    try{
      final t = _taskQueue.firstWhere((element) => element is T);
      return t as T;
    } catch(e) {
      debugPrint(e.toString());
      return null;
    }
  }


  ///释放全部任务
  void dispose() {
    while(_taskQueue.isNotEmpty) {
      final t = _taskQueue.removeFirst();
      t.dispose();
    }
  }


  ///启动任务
  ///返回 任务链是否执行成功
  Future<bool> launchTask() async {
    _totalTaskLen = _taskQueue.length;
    final reporter = Completer<bool>();
    _fetchTask().listen((t) async {
      _watchTask(t);
      final success = await t.startTask();
      if(strict && !success) {
        _signal?.complete(false);
        reporter.complete(false);
        return;
      }
      _signal?.complete(true);
      _signal = null;
      if(_taskQueue.isEmpty) reporter.complete(true);
    });
    return reporter.future;
  }

  void _watchTask(AsyncTask t) {
    void updateTask() {
      final p = t.progress.value / _totalTaskLen
                + (_totalTaskLen - _taskQueue.length - 1)
                * _taskWeight;
      _progress.value = p;
      if(t.progress.value == 1) {
        t.progress.removeListener(updateTask);
      }
    }
    t.progress.addListener(updateTask);
  }

  Stream<AsyncTask> _fetchTask() async* {
    while(_taskQueue.isNotEmpty) {
      final task = _taskQueue.removeFirst();
      _taskDescribe.value = task.describe;
      _signal = Completer();
      yield task;
      final proceed = await _signal!.future;
      //信号异常,终止取出
      if(!proceed) return;
    }
  }


}

///---------便捷性拓展--------
extension NumExt on num{

  ValueNotifier<double> get vn => ValueNotifier(toDouble());

}

extension StrExt on String{

  ValueNotifier<String> get vn => ValueNotifier(this);

}

至此,一个异步任务管理器便完成了,如果有不足的地方欢迎指出交流,谢谢。

其他文章