likes
comments
collection
share

Web Worker 线程池前言 Web Workers 提供了一种在主线程之外运行脚本的方法,从而避免阻塞用户界面并提

作者站长头像
站长
· 阅读数 15

前言

Web Workers 提供了一种在主线程之外运行脚本的方法,从而避免阻塞用户界面并提升应用性能。然而,直接使用单个 Web Worker 处理多个任务可能效率不高,尤其是在需要处理大量并发任务时。为了解决这个问题,引入 Web Worker 线程池(Worker Thread Pool)

Web Worker

Web Worker 是一种在浏览器中运行 JavaScript 的机制,它允许脚本在后台线程中执行,从而不会阻塞主线程(通常指UI线程,另一个是合成线程),Web Worker 适用于执行计算密集型任务,如大规模数据处理、复杂算法计算等。

主要特点:

  • 并行处理:多个 Web Worker 可以同时运行,充分利用多核 CPU。
  • 隔离性:Worker 与主线程有独立的执行环境,主要通过消息传递进行通信。
  • 安全性:Worker 运行在沙箱环境中,不能直接访问主线程的 DOM 或全局变量。

示例: 创建和使用一个简单的 Web Worker

// worker.js
self.onmessage = function (event) {
    const { task, id } = event.data;
    try {
        // 假设 task 是一个数学函数
        const result = performTask(task);
        self.postMessage({ id, result });
    } catch (error) {
        self.postMessage({ id, error: error.message });
    }
};

function performTask(task) {
    // 计算斐波那契数
    const { n } = task;
    if (n <= 1) return 1;
    return performTask({ n: n - 1 }) + performTask({ n: n - 2 });
}
// main.js
const worker = new Worker('worker.js');

worker.onmessage = function(event) {
  console.log('计算结果:', event.data);
};

worker.postMessage(10); // 发送任务到 Worker

Worker Thread Pool 线程池

什么是线程池

线程池(Thread Pool)是一种设计模式,用于管理一组可复用的线程,以高效地处理大量的并发任务。通过预先创建一定数量的线程,线程池可以避免频繁创建和销毁线程带来的性能开销。

主要优势:

  • 资源管理:控制并发线程的数量,避免过多线程导致资源耗尽。
  • 性能提升:减少线程创建和销毁的开销,提升任务处理效率。
  • 任务调度:有效地分配任务到空闲线程,提高任务吞吐量。

为什么需要线程池

在前端应用中,尤其是现代单页应用(SPA),可能需要处理大量的后台计算任务或数据处理需求。

如果每个任务都创建一个新的 Web Worker,将导致:

  • 资源消耗大:每个 Web Worker 都需要独立的内存和计算资源。
  • 性能下降:频繁创建和销毁 Worker 会带来显著的性能开销。
  • 管理复杂:难以控制 Worker 的数量,可能导致资源争用或应用卡顿。

通过引入线程池,可以:

  • 预先创建一定数量的 Worker,重复利用这些 Worker 处理多个任务。
  • 控制同时运行的 Worker 数量,避免资源过载。
  • 提高任务处理的整体效率和响应速度。

实现线程池

一个 Web Worker 线程池通常包含以下几个核心组件:

  • Worker 队列:存放待执行的任务。
  • Worker 实例:一定数量的 Worker 实例,在池中循环利用。
  • 任务分配:将任务分配给空闲的 Worker。
  • 回调机制:处理 Worker 任务完成后的结果。

线程池的核心组件

  • 任务队列:一个先进先出的队列,存储待执行的任务。

  • Worker 创建与管理:

    • 预创建一定数量的 Worker 实例。
    • 每个 Worker 在执行任务时被标记为“忙碌”,任务完成后标记为“空闲”。
  • 任务分配机制:

    • 当有新任务到来时,检查是否有空闲的 Worker。
    • 如果有,立即分配任务;否则,将任务加入队列,等待空闲 Worker。

示例:

// worker-pool.js
class WorkerPool {
	/**
	 * workerScript, worker脚本路径
	 * 线程池大小,默认为浏览器支持的并发数
	 */
	constructor(workerScript, poolSize = navigator.hardwareConcurrency || 4) {
		this.poolSize = poolSize; // 线程池大小
		this.workerScript = workerScript; // 文件地址
		this.workers = []; // worker实例数组,在池中循环利用
		this.idleWorkers = []; // 空闲 workers
		this.taskQueue = []; // 任务队列

		this._init();
	}

	_init() {
		for (let i = 0; i < this.poolSize; i++) {
			const worker = new Worker(this.workerScript);
			worker.onmessage = event => {
				const { id, result, error } = event.data;
				const pendingWorker = this.workers.find(w => w.id === id);
				if (pendingWorker && pendingWorker.currentTask) {
					if (error) {
						pendingWorker.currentTask.reject(error);
					} else {
						pendingWorker.currentTask.resolve(result);
					}
				}
				worker.isBusy = false;
				this.idleWorkers.push(worker); // 用完之后,再往池子里面放回worker
				this._runNext(); // 继续执行任务
			};
			worker.isBusy = false; // 是否忙碌
			worker.id = i; // 唯一标识,根据任务的唯一标识(id),将结果返回给对应的 Promise
			this.workers.push(worker);
			this.idleWorkers.push(worker);
		}
	}

	_runNext() {
		// 如果有空闲的 Worker,立即分配任务
		if (this.taskQueue.length === 0 || this.idleWorkers.length === 0) {
			return;
		}
		// 接到任务且有空闲,则从队列首部取一个worker
		const { task, resolve, reject } = this.taskQueue.shift();
		const worker = this.idleWorkers.pop(); // 从空闲workers中取一个
		worker.isBusy = true; // 设置当前worker标记为忙碌
		worker.currentTask = { resolve, reject };
		worker.postMessage({ id: worker.id, task }); // 发送task给worker处理
	}
	// 提交任务
	runTask(task) {
		return new Promise((resolve, reject) => {
			this.taskQueue.push({ task, resolve, reject });
			this._runNext();
		});
	}

	terminate() {
		this.workers.forEach(worker => worker.terminate());
		this.workers = [];
		this.idleWorkers = [];
		this.taskQueue = [];
	}
}

export default WorkerPool;
// worker.js
self.onmessage = function (event) {
	const { task, id } = event.data;
	try {
		// 假设 task 是一个数学函数
		const result = performTask(task);
		self.postMessage({ id, result });
	} catch (error) {
		self.postMessage({ id, error: error.message });
	}
};

function performTask(task) {
	// 示例:计算斐波那契数
	const { n } = task;
	if (n <= 1) return 1;
	return performTask({ n: n - 1 }) + performTask({ n: n - 2 });
}
// main.js
import WorkerPool from './worker-pool.js';

const pool = new WorkerPool('worker.js', 2); // 创建一个包含4个Worker的线程池

// 提交任务
// pool
// 	.runTask({ n: 10 })
// 	.then(result => {
// 		console.log('斐波那契 10:', result);
// 		// 释放线程池
// 		pool.terminate();
// 	})
// 	.catch(error => {
// 		console.error('任务错误:', error);
// 	});

// 提交多个任务
for (let i = 5; i <= 15; i++) {
	pool
		.runTask({ n: i })
		.then(result => {
			console.log(`斐波那契 ${i}:`, result);
		})
		.catch(error => {
			console.error('任务错误:', error);
		});
}

// 在不需要线程池时终止它
// pool.terminate();

Web Worker 线程池库

workerpool

  • 支持浏览器和 Node.js 环境
import workerpool from 'workerpool';

// 创建一个线程池
const pool = workerpool.pool('worker.js', { minWorkers: 2, maxWorkers: 4 });

// 提交任务
pool.exec('performTask', [{ n: 10 }])
  .then(result => {
    console.log('斐波那契 10:', result);
  })
  .catch(err => {
    console.error(err);
  });

// 关闭线程池
pool.terminate();

threads.js

  • 提供了高级的 Web Worker 管理功能,包括线程池、通讯优化等
  • 支持类型安全和模块化开发
  • 适合大型应用和复杂场景
import { spawn, Thread, Worker } from 'threads';

async function run() {
  const worker = await spawn(new Worker('./worker'));
  const result = await worker.performTask({n:10});
  console.log('斐波那契 10:', result);
  await Thread.terminate(worker);
}

run();

注意事项

安全性

  • Worker 脚本必须遵守同源策略,不能访问主线程的 DOM 或全局变量

数据结构简洁

  • 在主线程与 Worker 之间传递大量数据会带来序列化和反序列化的开销,影响性能
  • 尽量减少数据传输量,或使用 Transferable Objects 优化数据传输

线程安全

  • 确保任务之间独立执行,避免多个 Worker 修改同一数据导致竞态条
转载自:https://juejin.cn/post/7424151738649935910
评论
请登录