Web Worker 线程池前言 Web Workers 提供了一种在主线程之外运行脚本的方法,从而避免阻塞用户界面并提
前言
Web Workers 提供了一种在主线程之外运行脚本的方法,从而避免阻塞用户界面并提升应用性能。然而,直接使用单个 Web Worker 处理多个任务可能效率不高,尤其是在需要处理大量并发任务时。为了解决这个问题,引入 Web Worker 线程池(Worker Thread Pool)
Web Worker
Web Worker 是一种在浏览器中运行 JavaScript 的机制,它允许脚本在后台线程中执行,从而不会阻塞主线程(通常指UI线程,另一个是合成线程),Web Worker 适用于执行计算密集型任务,如大规模数据处理、复杂算法计算等。
主要特点:
- 并行处理:多个 Web Worker 可以同时运行,充分利用多核 CPU。
- 隔离性:Worker 与主线程有独立的执行环境,主要通过消息传递进行通信。
- 安全性:Worker 运行在沙箱环境中,不能直接访问主线程的 DOM 或全局变量。
示例: 创建和使用一个简单的 Web Worker
// worker.js
self.onmessage = function (event) {
const { task, id } = event.data;
try {
// 假设 task 是一个数学函数
const result = performTask(task);
self.postMessage({ id, result });
} catch (error) {
self.postMessage({ id, error: error.message });
}
};
function performTask(task) {
// 计算斐波那契数
const { n } = task;
if (n <= 1) return 1;
return performTask({ n: n - 1 }) + performTask({ n: n - 2 });
}
// main.js
const worker = new Worker('worker.js');
worker.onmessage = function(event) {
console.log('计算结果:', event.data);
};
worker.postMessage(10); // 发送任务到 Worker
Worker Thread Pool 线程池
什么是线程池
线程池(Thread Pool)是一种设计模式,用于管理一组可复用的线程,以高效地处理大量的并发任务。通过预先创建一定数量的线程,线程池可以避免频繁创建和销毁线程带来的性能开销。
主要优势:
- 资源管理:控制并发线程的数量,避免过多线程导致资源耗尽。
- 性能提升:减少线程创建和销毁的开销,提升任务处理效率。
- 任务调度:有效地分配任务到空闲线程,提高任务吞吐量。
为什么需要线程池
在前端应用中,尤其是现代单页应用(SPA),可能需要处理大量的后台计算任务或数据处理需求。
如果每个任务都创建一个新的 Web Worker,将导致:
- 资源消耗大:每个 Web Worker 都需要独立的内存和计算资源。
- 性能下降:频繁创建和销毁 Worker 会带来显著的性能开销。
- 管理复杂:难以控制 Worker 的数量,可能导致资源争用或应用卡顿。
通过引入线程池,可以:
- 预先创建一定数量的 Worker,重复利用这些 Worker 处理多个任务。
- 控制同时运行的 Worker 数量,避免资源过载。
- 提高任务处理的整体效率和响应速度。
实现线程池
一个 Web Worker 线程池通常包含以下几个核心组件:
- Worker 队列:存放待执行的任务。
- Worker 实例:一定数量的 Worker 实例,在池中循环利用。
- 任务分配:将任务分配给空闲的 Worker。
- 回调机制:处理 Worker 任务完成后的结果。
线程池的核心组件
-
任务队列:一个先进先出的队列,存储待执行的任务。
-
Worker 创建与管理:
- 预创建一定数量的 Worker 实例。
- 每个 Worker 在执行任务时被标记为“忙碌”,任务完成后标记为“空闲”。
-
任务分配机制:
- 当有新任务到来时,检查是否有空闲的 Worker。
- 如果有,立即分配任务;否则,将任务加入队列,等待空闲 Worker。
示例:
// worker-pool.js
class WorkerPool {
/**
* workerScript, worker脚本路径
* 线程池大小,默认为浏览器支持的并发数
*/
constructor(workerScript, poolSize = navigator.hardwareConcurrency || 4) {
this.poolSize = poolSize; // 线程池大小
this.workerScript = workerScript; // 文件地址
this.workers = []; // worker实例数组,在池中循环利用
this.idleWorkers = []; // 空闲 workers
this.taskQueue = []; // 任务队列
this._init();
}
_init() {
for (let i = 0; i < this.poolSize; i++) {
const worker = new Worker(this.workerScript);
worker.onmessage = event => {
const { id, result, error } = event.data;
const pendingWorker = this.workers.find(w => w.id === id);
if (pendingWorker && pendingWorker.currentTask) {
if (error) {
pendingWorker.currentTask.reject(error);
} else {
pendingWorker.currentTask.resolve(result);
}
}
worker.isBusy = false;
this.idleWorkers.push(worker); // 用完之后,再往池子里面放回worker
this._runNext(); // 继续执行任务
};
worker.isBusy = false; // 是否忙碌
worker.id = i; // 唯一标识,根据任务的唯一标识(id),将结果返回给对应的 Promise
this.workers.push(worker);
this.idleWorkers.push(worker);
}
}
_runNext() {
// 如果有空闲的 Worker,立即分配任务
if (this.taskQueue.length === 0 || this.idleWorkers.length === 0) {
return;
}
// 接到任务且有空闲,则从队列首部取一个worker
const { task, resolve, reject } = this.taskQueue.shift();
const worker = this.idleWorkers.pop(); // 从空闲workers中取一个
worker.isBusy = true; // 设置当前worker标记为忙碌
worker.currentTask = { resolve, reject };
worker.postMessage({ id: worker.id, task }); // 发送task给worker处理
}
// 提交任务
runTask(task) {
return new Promise((resolve, reject) => {
this.taskQueue.push({ task, resolve, reject });
this._runNext();
});
}
terminate() {
this.workers.forEach(worker => worker.terminate());
this.workers = [];
this.idleWorkers = [];
this.taskQueue = [];
}
}
export default WorkerPool;
// worker.js
self.onmessage = function (event) {
const { task, id } = event.data;
try {
// 假设 task 是一个数学函数
const result = performTask(task);
self.postMessage({ id, result });
} catch (error) {
self.postMessage({ id, error: error.message });
}
};
function performTask(task) {
// 示例:计算斐波那契数
const { n } = task;
if (n <= 1) return 1;
return performTask({ n: n - 1 }) + performTask({ n: n - 2 });
}
// main.js
import WorkerPool from './worker-pool.js';
const pool = new WorkerPool('worker.js', 2); // 创建一个包含4个Worker的线程池
// 提交任务
// pool
// .runTask({ n: 10 })
// .then(result => {
// console.log('斐波那契 10:', result);
// // 释放线程池
// pool.terminate();
// })
// .catch(error => {
// console.error('任务错误:', error);
// });
// 提交多个任务
for (let i = 5; i <= 15; i++) {
pool
.runTask({ n: i })
.then(result => {
console.log(`斐波那契 ${i}:`, result);
})
.catch(error => {
console.error('任务错误:', error);
});
}
// 在不需要线程池时终止它
// pool.terminate();
Web Worker 线程池库
workerpool
- 支持浏览器和 Node.js 环境
import workerpool from 'workerpool';
// 创建一个线程池
const pool = workerpool.pool('worker.js', { minWorkers: 2, maxWorkers: 4 });
// 提交任务
pool.exec('performTask', [{ n: 10 }])
.then(result => {
console.log('斐波那契 10:', result);
})
.catch(err => {
console.error(err);
});
// 关闭线程池
pool.terminate();
threads.js
- 提供了高级的 Web Worker 管理功能,包括线程池、通讯优化等
- 支持类型安全和模块化开发
- 适合大型应用和复杂场景
import { spawn, Thread, Worker } from 'threads';
async function run() {
const worker = await spawn(new Worker('./worker'));
const result = await worker.performTask({n:10});
console.log('斐波那契 10:', result);
await Thread.terminate(worker);
}
run();
注意事项
安全性
- Worker 脚本必须遵守同源策略,不能访问主线程的 DOM 或全局变量
数据结构简洁
- 在主线程与 Worker 之间传递大量数据会带来序列化和反序列化的开销,影响性能
- 尽量减少数据传输量,或使用 Transferable Objects 优化数据传输
线程安全
- 确保任务之间独立执行,避免多个 Worker 修改同一数据导致竞态条
转载自:https://juejin.cn/post/7424151738649935910