likes
comments
collection
share

基于sf-async处理复杂依赖关系任务

作者站长头像
站长
· 阅读数 34

背景

在云服务、电商等业务场景中,微服务是一种很流行的架构风格,每一个微服务都是一个独立的单元,微服务只关注自己具体的业务,由独立的团队负责。每个微服务只提供自己的数据,而终端页面的内容展示是需要完整数据,所以需要有一个node中间层服务来负责调用各个微服务,获取各种数据进行整合拼凑,将聚合后的数据返回给终端进行展示。

调用各个微服务来聚合数据并不是一件多难的事情,实际上它是一个脏活累活,不同微服务在公司里一般归属不同的部门,意味着node中间层的人要对接不同部门的人,不同部门的规范不一样,工作地点不一样,比如接口响应格式每个服务返回的不一致等等,比如同一个业务字段命名不同,在A部门叫虚拟机,在B部门叫云主机,其实它们表达的是同一个东西......这些差异都需要我们在中间层代码里做各种兼容处理进行抹平,代码维护成本++。

微服务之间的依赖关系是错综复杂的,有并行关系、串行关系,串并行夹杂的关系,有运行时根据某个字段才能确定调哪个的依赖关系。

针对node中间层上述这些问题,需要有一些技术方案来解决。

async.auto

async.auto是npm包async提供的一个处理复杂依赖关系的异步任务的方法,它的用法示例如下:

const async = require('async');

async.auto({
  getId: cb => {},
  getBase: cb => {},
  getClub: cb => {},
  // 表示getStock依赖getBase和getId的执行结果
  getStock: ['getBase', 'getId', ({ getBase, getId }, cb) => {}],
});

async.auto的问题

在项目中深入使用async.auto之后,发现其本身存在一些问题,会影响我们的开发。

扩展性差,集成其它能力需要额外重写

假设我们现在要实现:统计传入async.auto中的每个Task的执行耗时。 需要开发人员对async.auto方法进行重写,每次给async.auto扩展一个功能,都要开发人员想法子重写async.auto,扩展性不好导致实现成本++。

cb放在条件语句里容易执行多次

如下面的代码,当客户端传query参数duck时,cb就会执行两次,async.auto抛出Callback was already called的运行时错误。虽然这是开发人员的代码逻辑疏忽,但是可以认为是async.auto这种cb写法设计就容易让开发写出这样的错误代码

async.auto({
  getData: cb => {
    if (req.query.cat) {
      cb(null, 'cat')
      return
    }
    if (req.query.duck) {
      cb(null, 'duck')
      // return // 忘记写return导致cb执行了两次
    }
    cb(null, 'dog')
  },
});

写法有点乱

async.auto是一个存在很久的东西,其实它是同时支持async () => {}(cb) => {}两种写法的,在交替使用的时候,开发思维上容易出现async.auto({ getData: async function (cb) {} })的写法,导致报错cb is not a function,这是async.auto的历史包袱,历史包袱丢不掉,开发容易踩坑。

当Task存在依赖和不存在依赖时,async.auto的写法不统一,存在理解成本:

async.auto({
  // 没有依赖时对象的value是函数,
  // 函数的参数第一个就是cb
  getData1: cb => {},
  // 有依赖时对象的value是数组,数组的最后一个元素才是函数,
  // 函数的参数第二个才是cb
  getData2: ['getA', ({ getA }, cb) => {}],
  getData3: ['getA', 'getB', ({ getA, getB }, cb) => {}],
});

Task之间的依赖关系不清晰

依赖关系分别写在async.auto的每个Task中,当你的Task比较多时,Task的代码块比较长时,哪个Task被哪个Task依赖就很难一眼看出来,如果存在代码运行时才能确定依赖关系的Task,只能在每个Task中通过条件语句判断:

async.auto({
  getData1: cb => {
    if (!deps.getData1) {
      // 不需要执行该Task,手动返回
      return cb(null, {})
    }
    // do something
  },
  getData2: ['getA', ({ getA }, cb) => {
    if (!deps.getData2) {
      // 不需要执行该Task,手动返回
      return cb(null, {})
    }
    // do something
  }],
  getData3: ['getA', 'getB', ({ getA, getB }, cb) => {
    if (!deps.getData3) {
      // 不需要执行该Task,手动返回
      return cb(null, {})
    }
    // do something
  }],
});

sf-async

async.auto的api设计在项目实际使用中着实让笔者难受,相信其他人也有类似的感受,为此笔者设计出了一个async.auto的替代方案,帮助开发者处理复杂依赖关系的异步任务。

如何使用

sf-async提供了一个最基础的类SfAsync,开发者需要继承SfAsync实现自己的TaskAsync,例子如下:

const SfAsync = require('sf-async')

class AtomTaskAsync extends SfAsync {
  get deps () {
    return {
      getData: ['getId']
    }
  }
  async getId () {
    return '1'
  }
  async getData (req, res, { deps }) {
    const { getId } = deps // 获取依赖的返回结果
    return { id: getId, name: 'xx' }
  }
}

const demoAsync = new DemoAsync(req, res, params)
const { getData } = await demoAsync.run(['getData']) // 只需要手动传入getData,会根据deps自动调用getId

console.log(getData) // 输出结果:{ id: '1', name: 'xx' }

Task之间的依赖关系都写在类的get deps上,哪个Task依赖了哪个Task,通过deps一目了然,用get deps,不用deps,可以提高灵活性,做到在运行时确定依赖关系,如下:

class DemoAsync extends SfAsync {
  get deps () {
    if (this.req.query.cat) {
      return {
        getData: ['getKey']
      }
    }
    return {
      getData: ['getId']
    }
  }
}

扩展性

假如想实现统计每个异步任务的执行耗时,只需要在SfAsync这个基础类上面改改,就能实现:

module.exports = class SfAsync {
  #hrtime (field) {
    if (this.#timeMap.has(field)) {
      return
    }
    this.#timeMap.set(field, {})
    const start = Date.now()

    return () => {
      const end = Date.now()
      this.#timeMap.set(field, {
        start,
        duration: end - start,
      })
    }
  }
}

// 内部调用异步任务时参考这样处理下
// const endFn = this.#hrtime(field)
// await p
// endFn?.()

源码

module.exports = class SfAsync {
  req = null
  res = null
  params = null
  #timeMap = new Map()
  #promiseMap = new Map() // 保存每个Task的promise
  #result = {} // 保存每个Task的返回结果
  get deps () {
    return {}
  }

  constructor (req, res, params) {
    this.req = req
    this.res = res
    this.params = params || {}
  }

  // 统计执行耗时
  #hrtime (field) {
    if (this.#timeMap.has(field)) {
      return
    }
    this.#timeMap.set(field, {})
    const start = Date.now()

    return () => {
      const end = Date.now()
      this.#timeMap.set(field, {
        start,
        duration: end - start,
      })
    }
  }

  async #execPromise (field) {
    if (this.#result[field]) {
      return
    }
    if (this.#promiseMap.has(field)) {
      // 如果是执行过的异步任务,就从this.#promiseMap中获取promise结果
      this.#result[field] = await this.#promiseMap.get(field)
      return
    }
    const endFn = this.#hrtime(field)
    const p = this.#serial(field)
    this.#promiseMap.set(field, p)
    // 把当前任务的返回结果保存在this.#result中
    this.#result[field] = await p
    endFn?.()
  }

  #validDeps () {
    // 对Task做一些校验,防止开发者意外传入不存在的Task
    Object.keys(this.deps).forEach(field => {
      if (!this[field]) {
        throw new Error(`SfAsync: Invalid field in deps: ${field}`)
      }
      this.deps[field].forEach(depField => {
        if (!this[depField]) {
          throw new Error(`SfAsync: Invalid field in deps[${field}]: ${depField}`)
        }
      })
    })
  }

  #validate (fields) {
    this.#validDeps()
    ;(fields || []).forEach(field => {
      if (!this[field]) {
        throw new Error(`SfAsync: Invalid field in classMethods: ${field}`)
      }
    })
  }

  async #serial (field) {
    // field是Task的标识,depFields表示当前Task的依赖
    const depFields = this.deps[field] || []
    await this.#parallel(depFields) // 并行执行这些依赖

    // 从this.#result中获取这些依赖的返回结果deps
    const deps = depFields.reduce((total, curr) => {
      total[curr] = this.#result[curr]
      return total
    }, {})
    
    // 传入依赖结果deps,执行当前Task
    return await this[field](this.req, this.res, { deps, ...this.params })
  }

  // 并行执行
  async #parallel (fields) {
    const arr = fields.map(field => this.#execPromise(field))
    await Promise.allSettled(arr).catch(error => {
      // todo: log error
    })
  }

  async run (fields = []) {
    const endFn = this.#hrtime('_totalTime')
    this.#validate(fields)
    await this.#parallel(fields)
    endFn?.()
    return this.#result
  }
}
转载自:https://juejin.cn/post/7389935779332702219
评论
请登录