基于sf-async处理复杂依赖关系任务
背景
在云服务、电商等业务场景中,微服务是一种很流行的架构风格,每一个微服务都是一个独立的单元,微服务只关注自己具体的业务,由独立的团队负责。每个微服务只提供自己的数据,而终端页面的内容展示是需要完整数据,所以需要有一个node中间层服务来负责调用各个微服务,获取各种数据进行整合拼凑,将聚合后的数据返回给终端进行展示。
调用各个微服务来聚合数据并不是一件多难的事情,实际上它是一个脏活累活,不同微服务在公司里一般归属不同的部门,意味着node中间层的人要对接不同部门的人,不同部门的规范不一样,工作地点不一样,比如接口响应格式每个服务返回的不一致等等,比如同一个业务字段命名不同,在A部门叫虚拟机,在B部门叫云主机,其实它们表达的是同一个东西......这些差异都需要我们在中间层代码里做各种兼容处理进行抹平,代码维护成本++。
微服务之间的依赖关系是错综复杂的,有并行关系、串行关系,串并行夹杂的关系,有运行时根据某个字段才能确定调哪个的依赖关系。
针对node中间层上述这些问题,需要有一些技术方案来解决。
async.auto
async.auto
是npm包async
提供的一个处理复杂依赖关系的异步任务的方法,它的用法示例如下:
const async = require('async');
async.auto({
getId: cb => {},
getBase: cb => {},
getClub: cb => {},
// 表示getStock依赖getBase和getId的执行结果
getStock: ['getBase', 'getId', ({ getBase, getId }, cb) => {}],
});
async.auto的问题
在项目中深入使用async.auto
之后,发现其本身存在一些问题,会影响我们的开发。
扩展性差,集成其它能力需要额外重写
假设我们现在要实现:统计传入async.auto
中的每个Task的执行耗时。
需要开发人员对async.auto
方法进行重写,每次给async.auto
扩展一个功能,都要开发人员想法子重写async.auto
,扩展性不好导致实现成本++。
cb放在条件语句里容易执行多次
如下面的代码,当客户端传query参数duck时,cb就会执行两次,async.auto
抛出Callback was already called的运行时错误。虽然这是开发人员的代码逻辑疏忽,但是可以认为是async.auto
这种cb写法设计就容易让开发写出这样的错误代码
async.auto({
getData: cb => {
if (req.query.cat) {
cb(null, 'cat')
return
}
if (req.query.duck) {
cb(null, 'duck')
// return // 忘记写return导致cb执行了两次
}
cb(null, 'dog')
},
});
写法有点乱
async.auto
是一个存在很久的东西,其实它是同时支持async () => {}
和(cb) => {}
两种写法的,在交替使用的时候,开发思维上容易出现async.auto({ getData: async function (cb) {} })
的写法,导致报错cb is not a function,这是async.auto
的历史包袱,历史包袱丢不掉,开发容易踩坑。
当Task存在依赖和不存在依赖时,async.auto
的写法不统一,存在理解成本:
async.auto({
// 没有依赖时对象的value是函数,
// 函数的参数第一个就是cb
getData1: cb => {},
// 有依赖时对象的value是数组,数组的最后一个元素才是函数,
// 函数的参数第二个才是cb
getData2: ['getA', ({ getA }, cb) => {}],
getData3: ['getA', 'getB', ({ getA, getB }, cb) => {}],
});
Task之间的依赖关系不清晰
依赖关系分别写在async.auto
的每个Task中,当你的Task比较多时,Task的代码块比较长时,哪个Task被哪个Task依赖就很难一眼看出来,如果存在代码运行时才能确定依赖关系的Task,只能在每个Task中通过条件语句判断:
async.auto({
getData1: cb => {
if (!deps.getData1) {
// 不需要执行该Task,手动返回
return cb(null, {})
}
// do something
},
getData2: ['getA', ({ getA }, cb) => {
if (!deps.getData2) {
// 不需要执行该Task,手动返回
return cb(null, {})
}
// do something
}],
getData3: ['getA', 'getB', ({ getA, getB }, cb) => {
if (!deps.getData3) {
// 不需要执行该Task,手动返回
return cb(null, {})
}
// do something
}],
});
sf-async
async.auto
的api设计在项目实际使用中着实让笔者难受,相信其他人也有类似的感受,为此笔者设计出了一个async.auto
的替代方案,帮助开发者处理复杂依赖关系的异步任务。
如何使用
sf-async
提供了一个最基础的类SfAsync
,开发者需要继承SfAsync
实现自己的TaskAsync,例子如下:
const SfAsync = require('sf-async')
class AtomTaskAsync extends SfAsync {
get deps () {
return {
getData: ['getId']
}
}
async getId () {
return '1'
}
async getData (req, res, { deps }) {
const { getId } = deps // 获取依赖的返回结果
return { id: getId, name: 'xx' }
}
}
const demoAsync = new DemoAsync(req, res, params)
const { getData } = await demoAsync.run(['getData']) // 只需要手动传入getData,会根据deps自动调用getId
console.log(getData) // 输出结果:{ id: '1', name: 'xx' }
Task之间的依赖关系都写在类的get deps
上,哪个Task依赖了哪个Task,通过deps
一目了然,用get deps
,不用deps
,可以提高灵活性,做到在运行时确定依赖关系,如下:
class DemoAsync extends SfAsync {
get deps () {
if (this.req.query.cat) {
return {
getData: ['getKey']
}
}
return {
getData: ['getId']
}
}
}
扩展性
假如想实现统计每个异步任务的执行耗时,只需要在SfAsync这个基础类上面改改,就能实现:
module.exports = class SfAsync {
#hrtime (field) {
if (this.#timeMap.has(field)) {
return
}
this.#timeMap.set(field, {})
const start = Date.now()
return () => {
const end = Date.now()
this.#timeMap.set(field, {
start,
duration: end - start,
})
}
}
}
// 内部调用异步任务时参考这样处理下
// const endFn = this.#hrtime(field)
// await p
// endFn?.()
源码
module.exports = class SfAsync {
req = null
res = null
params = null
#timeMap = new Map()
#promiseMap = new Map() // 保存每个Task的promise
#result = {} // 保存每个Task的返回结果
get deps () {
return {}
}
constructor (req, res, params) {
this.req = req
this.res = res
this.params = params || {}
}
// 统计执行耗时
#hrtime (field) {
if (this.#timeMap.has(field)) {
return
}
this.#timeMap.set(field, {})
const start = Date.now()
return () => {
const end = Date.now()
this.#timeMap.set(field, {
start,
duration: end - start,
})
}
}
async #execPromise (field) {
if (this.#result[field]) {
return
}
if (this.#promiseMap.has(field)) {
// 如果是执行过的异步任务,就从this.#promiseMap中获取promise结果
this.#result[field] = await this.#promiseMap.get(field)
return
}
const endFn = this.#hrtime(field)
const p = this.#serial(field)
this.#promiseMap.set(field, p)
// 把当前任务的返回结果保存在this.#result中
this.#result[field] = await p
endFn?.()
}
#validDeps () {
// 对Task做一些校验,防止开发者意外传入不存在的Task
Object.keys(this.deps).forEach(field => {
if (!this[field]) {
throw new Error(`SfAsync: Invalid field in deps: ${field}`)
}
this.deps[field].forEach(depField => {
if (!this[depField]) {
throw new Error(`SfAsync: Invalid field in deps[${field}]: ${depField}`)
}
})
})
}
#validate (fields) {
this.#validDeps()
;(fields || []).forEach(field => {
if (!this[field]) {
throw new Error(`SfAsync: Invalid field in classMethods: ${field}`)
}
})
}
async #serial (field) {
// field是Task的标识,depFields表示当前Task的依赖
const depFields = this.deps[field] || []
await this.#parallel(depFields) // 并行执行这些依赖
// 从this.#result中获取这些依赖的返回结果deps
const deps = depFields.reduce((total, curr) => {
total[curr] = this.#result[curr]
return total
}, {})
// 传入依赖结果deps,执行当前Task
return await this[field](this.req, this.res, { deps, ...this.params })
}
// 并行执行
async #parallel (fields) {
const arr = fields.map(field => this.#execPromise(field))
await Promise.allSettled(arr).catch(error => {
// todo: log error
})
}
async run (fields = []) {
const endFn = this.#hrtime('_totalTime')
this.#validate(fields)
await this.#parallel(fields)
endFn?.()
return this.#result
}
}
转载自:https://juejin.cn/post/7389935779332702219