likes
comments
collection
share

基于 Redis 的任务队列 Bull -- Nodejs

作者站长头像
站长
· 阅读数 17

队列是一种强大的设计模式,可帮助您应对常见的应用程序扩展和性能挑战。队列可以帮助您解决的一些问题。

示例如下:

平滑处理峰值。可以在任意时间启动资源密集型任务,然后将这些任务添加到队列中,而不是同步执行。让任务进程以受控方式从队列中提取任务。也可以轻松添加新的队列消费者以扩展后端任务处理。 分解可能会阻塞 Node.js 事件循环的单一任务。比如用户请求需要像音频转码这样的 CPU 密集型工作,就可以将此任务委托给其他进程,从而释放面向用户的进程以保持响应。 提供跨各种服务的可靠通信渠道。例如,您可以在一个进程或服务中排队任务(作业),并在另一个进程或服务中使用它们。在任何流程或服务的作业生命周期中完成、错误或其他状态更改时,您都可以收到通知(通过监听状态事件)。当队列生产者或消费者失败时,它们的状态被保留,并且当节点重新启动时任务处理可以自动重新启动。

Bull 是一种流行的、受良好支持的、高性能的基于 Node.js 的队列系统实现。该软件包可以轻松地将 Bull Queues 以友好的方式集成到您的应用程序中。

Bull 使用 Redis 来保存作业数据,在使用 Redis 时,Queue 架构是完全分布式,和平台无关。例如,您可以在一个(或多个)节点(进程)中运行一些 Queue 生产者、消费者,而在其他节点上的运行其他生产者和消费者。

安装依赖

npm install bull --save
# Typescript Definitions
npm install @types/bull --save-dev

快速开始

通过 bull 官方库学习的示例,展示下我配置的的代码,带你快速使用。

// 先引入bull库
const Queue = require('bull');

// 队列执行配置参数可以看 
// https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md
const oueueOptions = {
    redis: {
        port: 6379,
        host: '127.0.0.1', // 连接IP
        password: null, // 没有密码就填null
        db: 16, // 使用区间库
    },
    prefix: 'bull_task', // redis中key前缀
    defaultJobOptions: {
        attempts: 1,
        removeOnComplete: true,
        backoff: false,
        delay: 0
    },
};

// 给队列任务一个名字,后面参数进行队列执行配置
const queueFoo = new Queue('foo', oueueOptions);

// 给队列添加监听器
function fnFooListener() {
    // 移除队列任务上全部监听器
    queueFoo.removeAllListeners();

    // 给队列任务监听器
    queueFoo.addListener('progress', function (job, progress) {
        console.log(`Job ${job.id} is ${progress}% ready!`);
    });

    queueFoo.addListener('completed', function (job, result) {
        console.log(`Job ${job.id} completed! Result: ${JSON.stringify(result)}`);
        job.remove(); // 移除任务记录,防止下次添加同名jobId失败
    });
    queueFoo.addListener('failed', function (job, err) {
        console.log(`Job ${job.id} failed! ${err}`);
        job.remove(); // 移除任务记录,防止下次添加同名jobId失败
    })
}
fnFooListener();

// 普通添加任务
function fnFoo() {
    // 定义执行处理器
    queueFoo.process(function (job, done) {
        console.log('执行queueFoo任务的数据:', job.data)
        // console.log('执行queueFoo任务的参数:', job.opts)
        // console.log('执行queueFoo任务的整体信息:', job.toJSON())

        // 程序出现未知异常错误
        if (job.id.endsWith('2')) {
            // 当任务出现未知异常时,返回错误结果
            throw new Error('unexpected error 出现未知异常');
        }
        if (job.id.endsWith('4')) {
            // 返回错误结果
            done(new Error('error 返回错误结果'));
            return
        }

        // 模拟任务等待,请求回调,数据读取等业务处理
        let i = 0;
        while (i < 2) {
            i++;

            // 改变任务进度
            job.progress(i);

        }

        // 获取任务进度
        console.log('jobId: %s  , queueFoo任务进度: %s', job.id, job.progress());
        // 返回完成结果, done()完成但无返回结果
        done(null, { ikun: i /* etc... */ });
    });


    // 给队列添加任务
    for (let i = 1; i <= 5; i++) {
        // 明确任务ID, 队列中已存在同名的不会添加
        const jobId = "jobFoo" + i;
        // 任务参数
        const data = {
            key: "jobId",
            value: jobId
        }
        queueFoo.add(data, {
            jobId: jobId,
        });
    }
}
fnFoo()

队列配置参数 oueueOptions 是将队列定义在Redis中的db/key和一些执行任务的状态记录限制,可以看下 官方手册 ,我这里只针对 QueueJobEvents 三部分常用函数进行说明。

通过 new Queue('队列名称', oueueOptions) 声明任务队列,一个任务队列允许定义多个不同名称处理器Process,一个任务队列下可以添加多个任务Job。可以将任务指定给定义名称的处理器得到想要的处理结果。

Queue队列可以添加状态监听,监听方式用法与Nodejs一致(EventEmitter)。添加前移除当期队列的全部监听是为了防止重复添加,导致一次触发多次响应。如果只监听状态的一次结果可以用 .once 来获取单次结果。

.process 是给队列定义处理器的函数,函数的callback支持普通回调和异步回调,上面代码是普通回调示例。

.add 是给队列添加任务的函数,允许添加多个不同名称的jobId,不传入jobId属性是默认随机生成jobId,当你传入jobId属性时队列中已存在同名的不会添加,所以每次执行后需要移除任务记录。

// ... 
// fnFoo()

// 接连上部分代码, 96行注释掉,添加下面通过异步方式的代码示例

// 异步添加任务
async function fnFooAsyncJobAdd() {
    const groupStr = Math.ceil(Math.random() * 100) + '-';
    // 批量添加任务项
    for (let i = 1; i <= 5; i++) {
        const id = groupStr + i
        const job = await queueFoo.add({
            key: "jobId",
            value: id
        }, {
            jobId: id, // 显示的标记任务id, 队列中已存在同名的不会添加
            delay: 5000 // 延迟5秒

            // cron 重复任务 5秒一次
            // repeat: { cron: "*/5 * * * * *" }

            // 间隔周期任务 10秒一次
            // repeat: { every: 10 * 1000 }
        });

        console.log('给queueFoo任务的数据:', job.data)
        // console.log('给任务的参数:', job.opts)
        // console.log('任务的整体信息:', job.toJSON())
    }

    console.log("queueFoo队列任务数:", await queueFoo.count())
}

// 异步任务处理器
async function fnFooAsyncJobProcess() {
    // 任务处理执行
    queueFoo.process(async (job) => {
        console.log('执行queueFoo任务的数据:', job.data)
        // console.log('执行queueFoo任务的参数:', job.opts)
        // console.log('执行queueFoo任务的整体信息:', job.toJSON())

        // 程序中途执行错误
        if (job.id.startsWith('3') || job.id.startsWith('6')) {
            // 当任务出现未知异常时,返回错误结果
            throw new Error('unexpected error 出现未知异常');
        }
        if (job.id.startsWith('9')) {
            // 返回错误结果
            return Promise.reject(new Error('error 返回错误结果'));
        }

        // 模拟任务等待,请求回调,数据读取等业务处理
        let i = 0;
        while (i < 2) {
            // 延迟响应
            await new Promise(resolve => setTimeout(() => resolve(i++), 1000));

            // 改变任务进度
            await job.progress(i);
        }

        // 获取任务进度
        const progress = await job.progress();
        console.log('jobId: %s , queueFoo任务进度: %s', job.id, progress);
        // 返回完成结果
        return Promise.resolve({ ikun: i /* etc... */ });
    });
}

async function fnFooAsyncJob() {
    await fnFooAsyncJobProcess();
    await fnFooAsyncJobAdd();
    // await fnFooAsyncJobAdd();
}
fnFooAsyncJob();

看看 Queue

官方手册 Queue 部分

声明处理器

.process 是给队列定义处理器的函数,同名称处理器只能定义一个,重复定义会异常提示 throw new Error('Cannot define the same handler twice ' + name);

使用 .add 时也要对应添加到名称的队列哦,不然会提示 Job 23-4 failed! Error: Missing process handler for job type xxx

// 默认未定义
queue.process(async (job) => {});
// 任意Job处理
queue.process('*', async (job) => {});
// 定义名称queueName的处理器,默认单任务处理
queue.process('queueName', async (job) => {});
// 定义名称queueMore的处理器,指明同时处理任务数量,最大125
queue.process('queueMore', 125, async (job) => {});

添加单个任务

.add 是给队列添加单个任务的函数,允许添加多个不同名称的jobId,不传入jobId属性是默认随机生成jobId,当你传入jobId属性时队列中已存在同名的不会添加,所以每次执行后需要移除任务记录。

// 一般就两种
await queue.add(参数, 任务配置参数);
await queue.add(队列的处理器名称, 参数, 任务配置参数);
// 指定处理器名称,空对象,传入jobId
await queue.add('queueName', {}, {jobId: '66-6'});
// 默认处理器,空对象,指明JobId cron重复任务,根据cron表达式时间执行
await queue.add({}, {jobId: '66-7', repeat: { cron: "0 0 0 * * *" }});
// 默认处理器,空对象,指明JobId 间隔时间重复任务,不能与cron同时设置
await queue.add({}, {jobId: '66-8', repeat: { every: 10 * 1000 }});

当定义重复任务是 关于crom表达式可以使用库 cron-parser 进行格式解析,使用 在线工具 可以生成crom表达式。

向队列中添加多个任务

.addBulk 是给队列一次添加多个任务的函数,参数需要传入数组对象,数值每项的格式要求和.add一致,任务配置参数中 repeat 不支持。

const jobs = [{
    name: 'processName', // 指定名称处理器
    data: {},
    opts: {
        jobId: "6265", // 显示的标记任务id, 队列中已存在同名的不会添加
        delay: 1000, // 延迟5秒
    }
},{
    // 使用默认处理器
    data: {},
    opts: {
        jobId: "6275", // 显示的标记任务id, 队列中已存在同名的不会添加
    }
}];
await queue.addBulk(jobs);

获取队列任务列表-单次jobId

.getJobs 可以获取队列中的单次任务对象,支持根据任务类型和获取指定条数,默认按执行时间降序排序。

// 按状态获取条数进行升序排序
const jobs = await queueFoo.getJobs((['active','waiting','delayed'], 0, 5, true));
// 默认获取全部
const jobs = await queueFoo.getJobs();

for (const job of jobs) {
    console.log(job.id)
}

删除任务 建议获取得到任务后使用 .remove 移除任务,在执行状态中的任务无法移除。

// 获取队列中全部单次任务
const jobs = await queueFoo.getJobs();
for (const job of jobs) {
    const isActive = await job.isActive();
    if(!isActive){
        await job.remove();
    }
}
// 通过jobId获取任务
const job = await queueFoo.getJob('66');
if(job){
    const isActive = await job.isActive();
    if(!isActive){
        await job.remove();
    }
}

获取队列任务列表-重复任务

.getRepeatableJobs 可以获取队列中的重复任务对象,支持获取指定条数,默认按执行时间降序排序。

// 按状态获取条数进行升序排序
const repeatableJobs = await queueFoo.getRepeatableJobs(0, 5, true);
// 默认获取全部
const repeatableJobs = await queueFoo.getRepeatableJobs();

const repeatableJobs = await queueFoo.getRepeatableJobs();
for (const repeatableJob of repeatableJobs) {
    console.log(repeatableJob.id, repeatableJob.key, repeatableJob.cron)
}

删除重复任务 通过获取重复任务列表找到指定jobId的重复任务使用 .removeRepeatableByKey.removeRepeatable 移除重复任务,在执行状态中的任务无法移除。

const repeatableJobs = await queueFoo.getRepeatableJobs();
for (const repeatableJob of repeatableJobs) {
    if (repeatableJob.id == jobId) {
        // 方式一:根据重复任务的key移除
        await queueFoo.removeRepeatableByKey(repeatableJob.key);

        // 方式二:移除指定jobId且定义配置一致的
        await queueFoo.removeRepeatable({
            cron: repeatableJob.cron,
            jobId: repeatableJob.id,
        })
    }
}

删除等待中或延迟中的单次任务

.empty 删除等待中或延迟中的单次任务,不会删除活动、失败、已完成状态的任务。

对重复任务无效

await queueFoo.empty();

清除任务记录

.clean 清除创建超过秒外的任务记录,默认只清除完成状态的记录。

// 清除任务监听
queueFoo.on('cleaned', function (jobs, type) {
  console.log('Cleaned %s %s jobs', jobs.length, type);
});

// 默认清除完成状态任务 超过5秒外所有的记录
await queueFoo.clean(5000);
// 清除失败状态任务 超过5秒外所有的记录
await queueFoo.clean(10000, 'failed');

清空队列数据

.obliterate 清除队列内所有数据,有执行中的任务会提示 throw new Error('Cannot obliterate queue with active jobs');,使用 force 强制清空会导致重复任务无法正常读取执行时间提示警告。

// 清空队列任务,但无法正常移除执行中的任务
await queue.obliterate();
// 强制清空任务队列,存在执行中的任务不推荐操作。
await queue.obliterate({ force: true });
  • .count 获取任务队列中等待和延迟的任务数量函数

  • .getJobCounts 是获取任务队列对应状态任务数量函数

其他 getXxx 和 isXxx 的函数通过名称和说明就能很好的使用。

看看 Job

官方手册 Job 部分

任务进度

.progress 获取任务进度也可以改变任务进度值,常用于流程步骤任务。

// 获取任务进度
const progress = await queue.progress();
console.log(progress)
// 改变任务进度
await job.progress(75);
  • .getState 获取当前任务状态
  • .retry 重试当前任务
  • .update 对传入的参数进行更新
  • .promote 将当前任务从延迟状态转为等待状态
  • .toJSON 对当前任务信息

其他函数通过名称和说明就能很好的使用。

如果有需要补充的留个言。

看看 Events

官方手册 Events 部分

常用事件监听

// 移除队列任务上全部监听器
queueFoo.removeAllListeners();

// 任务开始执行触发
queueFoo.addListener('active', function (job, jobPromise) {
  // A job has started. You can use `jobPromise.cancel()`` to abort it.
})

// 进度改变触发
queueFoo.addListener('progress', function (job, progress) {
  // A job's progress was updated!
})

// 执行完成触发
queueFoo.addListener('completed', function (job, result) {
  // A job successfully completed with a `result`.
  job.remove(); // 移除任务记录,防止下次添加同名jobId失败
})

// 执行失败触发
queueFoo.addListener('failed', function (job, err) {
  // A job failed with reason `err`!
  job.remove(); // 移除任务记录,防止下次添加同名jobId失败
})

相关阅读

转载自:https://juejin.cn/post/7188760319672942647
评论
请登录