likes
comments
collection
share

WebWorker 手动实现分片上传七牛云存储

作者站长头像
站长
· 阅读数 11

分片上传是个很常见的功能,而七牛官方的qiniu-js很多功能用不到且有些冗余,所以可以自己实现个简易版分片上传,放在一个简易队列中,且使用WebWorker也能很好的避免页面卡顿等问题。

创建分片

首先是创建分片函数,如果有上传后校验数据的需求可以加MD5,并非必须。

  • createChunk.ts
import SparkMD5 from 'spark-md5';

export interface ChunkData {
  start: number;
  end: number;
  index: number;
  md5: string;
  data: Blob;
}

export async function createChunk(file: File, index: number, chunkSize: number): Promise<ChunkData> {
  return new Promise((resolve) => {
    const spark = new SparkMD5.ArrayBuffer();
    const start = index * chunkSize;
    const end = start + chunkSize;
    const fileReader = new FileReader();
    fileReader.onload = (e) => {
      const data = new Blob([e.target.result]);
      spark.append(e.target.result);
      const md5 = spark.end();
      resolve({
        start,
        end,
        index,
        md5,
        data,
      });
    };
    fileReader.readAsArrayBuffer(file.slice(start, end));
  });
}

实现 WebWorker

Worker实现比较简单, 解构发送过来的数据并调用createChunk即可,每个Worker都可能会分到 1 个以上的任务,直接Promise.all

  • worker.js
import { createChunk } from './createChunk';

onmessage = async (e) => {
  const { file, chunkSize, startIndex, endIndex } = e.data;
  const proms = [];
  for (let i = startIndex; i < endIndex; i++) {
    proms.push(createChunk(file, i, chunkSize));
  }
  const chunks = await Promise.all(proms);
  postMessage(chunks);
};

实现分片

最后是主要的分片函数,分配任务和回调,分片大小设置为5MB,Worker数量为当前电脑的线程数量。

注意:如果你打包的js文件是放在另一个域名下的,可以通过vite的inline方式引入,或者是借助Blob中importScripts来引入以避免跨域问题,这里以vite为例。

  • splitFile.ts
import { ChunkData } from '@/media/components/upload/createChunk';
import MyWorker from './worker?worker&inline';

const chunkSize = 1024 * 1024 * 5;
const threadCount = navigator.hardwareConcurrency || 4;

/**
 * 分片上传
 * 传递一个回调函数,每当worker完成时就上传
 * 全部分片完成后返回result数组,如果有部分上传失败则可按需检验并重新上传
 *
 * @param file
 * @param callback
 */
export async function splitFile(file: File, callback: (data: ChunkData) => void): Promise<ChunkData[]> {
  return new Promise((resolve) => {
    const result: ChunkData[] = [];
    // 共有几个分片/任务
    const chunkCount = Math.ceil(file.size / chunkSize);
    // 每个线程分到的任务数量
    const workerChunkCount = Math.ceil(chunkCount / threadCount);
    let finishCount = 0;
    for (let i = 0; i < threadCount; i++) {
      // 有跨域问题的通过 vite 的 inline 引入,否则可以直接 URL 引入
      // new Worker(new URL('./worker.js', import.meta.url), { type: 'module' })
      const worker = new MyWorker();
      // 每个线程执行的任务范围
      const startIndex = i * workerChunkCount;
      const endIndex = startIndex + workerChunkCount > chunkCount ? chunkCount : startIndex + workerChunkCount;
      worker.postMessage({
        file,
        chunkSize,
        startIndex,
        endIndex,
      });
      worker.onmessage = (e) => {
        for (let i = startIndex; i < endIndex; i++) {
          result[i] = e.data[i - startIndex];
          // 直接上传分片
          callback(e.data[i - startIndex]);
        }
        worker.terminate();
        finishCount++;
        // 全部完成
        if (finishCount >= threadCount) {
          resolve(result);
        }
      };
    }
  });
}

应用

分片任务已经完成了,接下来就是常规的业务逻辑,以上传视频文件为例,其中bucketNamefilekey替换为你自己的,上传token也是从自行服务端获取。

import axios from 'axios';
import { splitFile } from './splitFile';

const http = axios.create();

export default {
  data() {
    return {
      uploadDomain: 'https://upload.qiniup.com',
      percent: 0,
      videoDuration: 0,
      bucketName: 'asdf-1', // 七牛云对象存储名
      filekey: 'upload/2023-09-26/test.mp4', // 七牛云的路径
      fileChunks: [],
      uploadResults: [],
    }
  },
  methods: {
    async onUpload(file: File) {
      const token = await this.getUploadToken();
      this.videoDuration = await this.getVideoDuration();
      const { uploadId } = await this.getUploadId(this.bucketName, this.filekey, token);
      // 单个上传任务
      const upload = async (res) => {
        const result = await this.doUpload(this.bucketName, this.filekey, uploadId, res.index + 1, token, res.data);
        this.uploadResults.push({
          ...result,
          partNumber: res.index + 1,
        });
        await this.checkComplete(bucketName, uploadId, token);
      }
      // 简易队列,其中一个出错也不会影响下一个任务
      const runTask = (() => {
        let pending = Promise.resolve();
        const run = async (res) => {
          await pending.catch((e) => e).finally(() => upload(res));
        }
        return (res) => (pending = run(res))
      })();
      // 注意此处没有处理错误,可以自行扩展
      this.fileChunks = await splitFile(file, (res) => {
        // console.log('uploading', res);
        // 此处可校验md5等操作
        runTask(res).then();
      });
    },
    async checkComplete(bucketName: string, uploadId: string, token: string, file: File) {
      // 上传进度
      this.percent = Math.ceil(this.uploadResults.length / this.fileChunks.length * 100);
      // 上传完成后的操作,可校验上传是否正确的等
      if (this.fileChunks.length === this.uploadResults.length) {
        await this.completeUpload(bucketName, this.filekey, uploadId, token);
      }
    },
    getUploadToken() {
      return new Promise<string>((resolve) => {
        resolve('up token') // 返回你的上传Token
      });
    },
    getVideoDuration() {
      return new Promise<number>((resolve) => {
        const fileUrl = URL.createObjectURL(this.file);
        const video = document.createElement('video');
        video.setAttribute('preload', 'auto');
        video.src = fileUrl;
        video.onloadeddata = () => {
          resolve(Math.floor(video.duration));
        };
      });
    },
    async getUploadId(bucketName: string, filename: string, token: string) {
      const result = await http({
        method: 'POST',
        url: `${this.uploadDomain}/buckets/${bucketName}/objects/${window.btoa(filename)}/uploads`,
        headers: {
          'Content-Type': 'application/json',
          Authorization: `UpToken ${token}`,
        },
      });
      console.log('getUploadId', result.data);
      return result.data; // {uploadId: '', expireAt: 1695894984}
    },
    async doUpload(bucketName: string, filename: string, uploadId: string, index: number, token: string, data: Blob) {
      const result = await http({
        method: 'PUT',
        url: `${this.uploadDomain}/buckets/${bucketName}/objects/${window.btoa(filename)}/uploads/${uploadId}/${index}`,
        headers: {
          'Content-Type': 'application/octet-stream',
          Authorization: `UpToken ${token}`,
        },
        data: data,
      });
      console.log('doUpload', index, result.data);
      return result.data; // {etag: '', md5: ''}
    },
    async completeUpload(bucketName: string, filename: string, uploadId: string, token: string, mimeType: string) {
      // 七牛文档要求按照partNumber排序
      this.uploadResults.sort((a, b) => (a.partNumber > b.partNumber ? 1 : -1));
      const result = await http({
        method: 'POST',
        url: `${this.uploadDomain}/buckets/${bucketName}/objects/${window.btoa(filename)}/uploads/${uploadId}`,
        headers: {
          'Content-Type': 'application/json',
          Authorization: `UpToken ${token}`,
        },
        data: {
          fname: filename,
          mimeType: mimeType, // 'video/mp4'
          parts: this.uploadResults, // [{etag: '', partNumber: 1}]
        },
      });
      console.log('completeUpload', result.data);
      return result.data; // {hash: '', key: ''}
    },
  },
}

至此,简易的文件分片上传已完成,当然还有暂停和取消上传等操作可以实现,不过已能满足大部分场景,而且很多时候上传出错的更倾向于刷新网页重头来。 如有问题欢迎指出!