likes
comments
collection
share

请让你的工人(Worker)更好地为你工作

作者站长头像
站长
· 阅读数 10

前言

众所周知,js是单线程的,而且不适合CPU密集型操作;但是我们在日常开发过程中或多或少会进行一些这样的操作,例如海量数据的深拷贝、TensorFlow中的数据模型训练、大循环、文件的读取......我们的页面会痴痴地等待CPU将这些计算任务执行完,浏览器将没有空闲时间处理用户交互,导致用户卡顿;可以用worker把这些任务放到worker线程进行计算,然后将结果返回给主线程,这样就解决了燃眉之急,那么具体该怎么做呢?下面来看一个大文件上传的案例:

大文件上传

大文件上传的过程中需要对大于5M(我们这里取一个固定值模拟一下)的文件进行分片,分片过程中我们需要读取文件内容然后得到文件MD5戳,这个过程非常消耗CPU;可以想象我们使用百度网盘的时候,经常下载一两个G的电影资源,每一个电影资源可能就要分成几百片,这样很有可能造成卡顿,我们的解决方式就是把这个过程放到worker线程中去做:

self.importScripts("../spark-md5.min.js");

async function calculateHash(file) {
  return new Promise((resolve, reject) => {
    const spark = new self.SparkMD5.ArrayBuffer()
    const reader = new FileReader();
    // 文件大小
    const size = file.size;
    let offset = 5 * 1024 * 1024;

    let chunks = [file.slice(0, offset)];
    let cur = offset;

    while (cur < size) {
      // 如果是最后一片,那么直接加进来
      chunks.push(file.slice(cur, cur + offset));
      cur += offset;
    }
    // 拼接
    reader.readAsArrayBuffer(new Blob(chunks));
    reader.onload = (e) => {
      spark.append(e.target.result);
      resolve([chunks, spark.end()]);
    };
  });
}

self.addEventListener('message',async({data})=>{
  const result = await calculateHash(data);
  self.postMessage(result);
});

worker中没有window,它的全局对象是self,并且和主线程通信需要不断的监听message事件以及调用postMessage方法,这些过程在每一次通信代码都是固定不变的,按照DRY的原则,我们应该把它们去除掉封装起来,这里我们就投机取巧直接用threads来管理我们的worker

用threads简化代码

threads的使命是让我们的worker像函数调用一样简单;下面我们就根据它的官方文档来改造我们刚才分片的一个流程:

// master.js
import { spawn, Thread, Worker } from "threads";

const worker = await spawn(new Worker("./worker.js"));
const [chunks,hash] = await worker.calculateHash();
console.log("文件分片完成",chunks,hash);

await Thread.terminate(worker);

// worker.js
import { expose } from "threads/worker"

expose({
  async calculateHash(file){
      return new Promise((resolve, reject) => {
        const spark = new self.SparkMD5.ArrayBuffer()
        const reader = new FileReader();
        // 文件大小
        const size = file.size;
        let offset = 5 * 1024 * 1024;

        let chunks = [file.slice(0, offset)];
        let cur = offset;

        while (cur < size) {
          // 如果是最后一片,那么直接加进来
          chunks.push(file.slice(cur, cur + offset));
          cur += offset;
        }
        // 拼接
        reader.readAsArrayBuffer(new Blob(chunks));
        reader.onload = (e) => {
          spark.append(e.target.result);
          resolve([chunks, spark.end()]);
        };
      });
   }
})

每个worker所做的工作就是将文件分片和计算文件指纹,如果我们不加限制的话,worker会达到最大数量,一般就是CPU核数,我们可以使用一个线程池来优化我们的代码

线程池优化

线程池也就是将所有的worker统一管理起来,当需要进行计算时将worker拿出来执行任务,当计算完成之后放到线程池中等待下一次任务的到来;这种情况下会有一定数量的worker常驻,但是比起频繁添加或者销毁worker的损耗小得多

// master
import { async } from "regenerator-runtime";
import { spawn, Pool, Worker } from "threads";

// 最大worker数量
const WORKER_COUNT = 6;

(async function () {
  const pool = Pool(() => spawn(new Worker("./worker.js")), { size: WORKER_COUNT });

  document.getElementById("file").addEventListener("change", async (e) => {
    const [chunks, hash] = await pool.queue((worker) => worker.calculateHash(e.target.files[0]));
    console.log("文件分片完成", chunks, hash);
  });

  window.addEventListener("beforeunload", async () => {
    await pool.completed();
    await pool.terminate();
  });
})();

// worker
import sha256 from "js-sha256"
import { expose } from "threads/worker"

expose({
  async calculateHash(file) {
    return new Promise((resolve, reject) => {
      const spark = new self.SparkMD5.ArrayBuffer()
      const reader = new FileReader();
      // 文件大小
      const size = file.size;
      let offset = 5 * 1024 * 1024;
  
      let chunks = [file.slice(0, offset)];
      let cur = offset;
  
      while (cur < size) {
        // 如果是最后一片,那么直接加进来
        chunks.push(file.slice(cur, cur + offset));
        cur += offset;
      }
      // 拼接
      reader.readAsArrayBuffer(new Blob(chunks));
      reader.onload = (e) => {
        spark.append(e.target.result);
        resolve([chunks, spark.end()]);
      };
    });
  },
})

从线程池中我们可以总结出一个规律:频繁操作的内容我们可以用一个池来存储它,我把它称之为”池优化“。

池优化

上面提到了线程池,但是其他涉及到池优化的场景还有很多,例如对象池、线程池等等,它们的应用场景都是类似的,都是为了减少频繁操作带来的性能损耗;

对象池

下面是一段伪代码,理解对象池的基本使用即可:

class DOMPool {
  constructor(capcity) {
    this.capcity = capcity;
    this.data = [];
  }

  createElement() {
    // 如果池中有直接复用
    if (this.data.length > 0) {
      return this.data.pop();
    }
    // 否则创建一个元素
    const div = document.createElement("div");
    document.body.appendChild(div);
    return div;
  }

  recycle(elements) {
    this.data.push(...elements);
  }
}

// 假设是之前的数据
let domArr = [document.createElement("div")];
const domPool = new DOMPool(Infinity);

function handleResponse() {
  domPool.recycle(domArr); // 回收等待复用
  domArr.lenght = 0; // 清空

  const response = []; // 接口数据
  response.forEach((item, index) => {
    const element = domPool.createElement();
    element.innerText = `列表第${index}项的数据为:${item}`;
    domArr.push(element);
  });
}

事件池

例如:我们有时候需要创建自定义事件,第一步是创建一个事件对象:

let myEvent = new Event("myClick", {"bubbles":true, "cancelable":false});
  window.addEventListener("myClick",()=>{
    console.log("recevied myClick")
  });
  window.dispatchEvent(myEvent)

当然如果需要传参数的话需要使用CustomEvent:

  const EVENT_NAMES = {
    myClick: "myClick",
  };
  let myEvent = new CustomEvent(EVENT_NAMES.myClick, {
    bubbles: true,
    cancelable: false,
    detail: {
      code: 0,
      success: true,
    },
  });
  window.addEventListener(EVENT_NAMES.myClick, (e) => {
    console.log("CustomEvent:myClick", e.detail);
  });
  window.dispatchEvent(myEvent);

这种情况下每定义一个自定义事件都需要创建一个事件对象;想象我们正在开发一个超大项目,内部逻辑非常复杂,我们可能需要定义很多事件对象;如果这些事件对象分散在各个页面,显然不太好维护;所以我们会把它们统一到一个事件对象的文件中去;

  const EVENT_NAMES = {
    myClick: "myClick",
    myChange: "myChange",
    myMouse: "myMouse",
  };
  export const myClickEvent = new CustomEvent(EVENT_NAMES.myClick, {
    bubbles: true,
    cancelable: false,
    detail: {
      code: 0,
      success: true,
    },
  });
  export const myChangeEvent = new CustomEvent(EVENT_NAMES.myChange, {});
  export const myMouseEvent = new CustomEvent(EVENT_NAMES.myMouse, {});

然后呢,这个文件会装满了各种各样的事件,看着非常不舒服;然后就想优化一下它,怎么优化呢?因为每个页面其实事件只会有有限个,而离开页面之后事件对象会被销毁,那么我们是否可以不销毁事件对象但是复用这个对象呢?当然是可以的。事件池干了以下两件事:

  1. 在绑定事件之前从池中拿出一个事件对象
  2. 销毁事件绑定时将事件对象的属性清除然后回收到事件池中等待复用

所以以后遇到频繁操作的业务场景,我们就可以想到是不是可以用”池“来优化一下。

后记

记得《穷爸爸富爸爸》里面有一句经典的话:”要让钱为你而工作而不是你为钱而工作“,我们的Worker就是为我们而工作的,我们想要更多的钱(更好的性能)那就需要去提高它们的效率,尽可能让他们在有限的时间里做更多的事情。