likes
comments
collection
share

造轮子-大文件并行上传、并发数控制、断点续传

作者站长头像
站长
· 阅读数 12

本文实现的大文件并行上传主要使用技术:前端vue2axios,后端使用express作为服务器。这种技术实现更接近实际开发需求。毕竟如果有这个需求一定是某个项目的一部分,而作为项目,大概率应该整个项目都用axios而不是原生XMlHttp。另外开始学习贵在搞清楚原理,所以降低技术难度,要用自己擅长的技术栈,而后可以改造为不同技术实现。

大文件并行上传原理:

  1. 并行肯定要一次发几个上传请求,所以要对大文件切片,这是基本前提
  2. 要进行并发控制,要提前将所有请求生成,以待处理
  3. 分片合并,分片是为了上传,合并是为了还原,所以需要标记切片以及切片的顺序
  4. 断点续传,断点是中断正在进行的axios请求,之后从断开处开始,所以要知道断开处情况,要知道上传了什么,将上传后的过滤后重新发起上传

造轮子-大文件并行上传、并发数控制、断点续传

基本实现

上传input

<template>
  <div>
    <input type="file" ref="fileUploadInput" @change="uploadFileChange" />
    <el-button @click="uploadChunks" size="small">分片上传</el-button>
  </div>
</template>
<script>
export default {
  name: "BigFIleUpload",
  data() {
    return {
        file: null // 上传文件
      }
  },
  methods: {
    // 上传后判断类型
    uploadFileChange(event) {
      const [file] = event.target.files;
      this.fileData.file = file;
      const { ext } = this.getFileNameAndExt();
      if (!this.checkFileType(ext)) {
        this.$refs.fileUploadInput.value = null;
        return this.$message({
          type: "warning",
          message: "文件上传错误",
        });
      }
    },
    // 获取文件名和扩展名
    getFileNameAndExt() {
      const { name } = this.fileData.file;
      return {
        name: name,
        ext: name.slice(name.lastIndexOf(".") + 1),
      };
    },
    // 允许类型
    checkFileType(ext) {
      return ["mp4", "docx", "doc", "xlsx"].includes(ext);
    },
    // 上传文件:切片、切片请求、并发控制、合并切片
    async uploadChunks() {

    },
  },
};
</script>

分片处理

根据实现原理,首先要对文件进行分片。分片利用的是Blobslice方法。分片方法以10M为单元。这块还暗示一个逻辑,当文件小于10M时,可以考虑直接上传,而不是分片上传。这个逻辑感兴趣的可以实现出来。

    const SIZE = 10 * 1024 * 1024;

    ...
    // file大文件,size切片的大小
    createChunk(file, size = SIZE) {
      const chunkList = [];
      let cur = 0;
      while (cur < file.size) {
        chunkList.push({
          file: file.slice(cur, cur + size), //使用slice()进行切片
        });
        cur += size;
      }
      return chunkList;
    },
    ...

文件hash

浏览器在进行缓存时会采用e-tag的标记,e-tag就是根据资源内容是否发生改变进行变化的。参考这一点,文件上传也采用这种方法,根据文件是否改变决定是否发起上传,而不是根据文件的扩展名,毕竟扩展名是可以手动修改的。

参考其他文章,这里直接也使用spark-md5的方案。计算文件hash采用web-worker方案。

  data() {
    return {
      fileData: {
        file: null, // 保存文件
        fileHash: "", // 文件hash
        worker: null, // 多线程worker
      }
    };
  },
  ...
   // 生成文件 hash(web-worker)
    calculateHash(fileChunkList) {
      return new Promise((resolve) => {
        this.fileData.worker = new Worker("/js/hash.js");
        this.fileData.worker.postMessage({ fileChunkList });
        this.fileData.worker.onmessage = (e) => {
          const { percentage, hash } = e.data;
          this.hashPercentage = percentage;
          if (hash) {
            resolve(hash);
          }
        };
      });
    },
    ...
    
    hash.js
    // 导入脚本
// import script for encrypted computing
self.importScripts("/js/spark-md5.min.js");

// 生成文件 hash
// create file hash
self.onmessage = e => {
  const { fileChunkList } = e.data;
  const spark = new self.SparkMD5.ArrayBuffer();
  let percentage = 0;
  let count = 0;
  const loadNext = index => {
    const reader = new FileReader();
    reader.readAsArrayBuffer(fileChunkList[index].file);
    reader.onload = e => {
      count++;
      spark.append(e.target.result);
      if (count === fileChunkList.length) {
        self.postMessage({
          percentage: 100,
          hash: spark.end()
        });
        self.close();
      } else {
        percentage += 100 / fileChunkList.length;
        self.postMessage({
          percentage
        });
        loadNext(count);
      }
    };
  };
  loadNext(0);
};

创建上传数据以及并发请求

考虑前端要显示单个文件的上传进度。将上传数据uploadChunkData保存到vue组件的data数据中。

分片标记使用chunkHash。显示进度使用percentage,切片大小size

使用axios的配置方法onUploadProgress修改uploadChunkData对应数据更新上传进度。

<template>
  <div>
    ...
    <el-form label-width="100px" label-position="top">
      <el-form-item label="上传总进度:">
        <el-progress :percentage="uploadPercentage"></el-progress>
      </el-form-item>
    </el-form>
    <el-table :data="uploadChunkData" style="width: 100%">
      <el-table-column prop="hash" label="chunk hash" width="180">
      </el-table-column>
      <el-table-column prop="size" label="size(KB)" width="180">
        <template slot-scope="{row}">
          {{ row.size }}
        </template>
      </el-table-column>
      <el-table-column min-width="180" prop="percentage" label="percentage">
        <template slot-scope="{ row }">
          <el-progress :percentage="row.percentage"></el-progress>
        </template>
      </el-table-column>
    </el-table>
  </div>
</template>
<script>
import axios from "axios";
export default {
  name: "BigFIleUpload",
  components: {},
  data() {
    return {
      uploadChunkData: [], // 保存并发上传数据 也保存进度显示数据
      fileData: {
        file: null, // 保存文件
        fileHash: "", // 文件hash
        worker: null, // 多线程worker
      }
    };
  },
  computed: {
     // 显示上传总体进度
    uploadPercentage() {
      if (!this.fileData.file || !this.uploadChunkData.length) return 0;
      const loaded = this.uploadChunkData
        .map((item) => item.size * item.percentage)
        .reduce((acc, cur) => acc + cur);
      return parseInt((loaded / this.fileData.file.size).toFixed(2));
    }
  },
  methods: {
    ...
    // 上传文件:切片、切片请求
      async uploadChunks() {
      if(!this.fileData.file) return
      // 创建切片
      const chunkList = this.createChunk(this.fileData.file);
      this.fileData.fileHash = await this.calculateHash(chunkList);

      // 创建上传切片数据
      this.uploadChunkData = chunkList
        .map(({ file }, index) => ({
          fileHash: this.fileData.fileHash,
          chunk: file, // 切片文件
          chunkHash:  `${this.fileData.fileHash}-${index}`, // 切片hash
          percentage: 0,
          size: file.size,
          index,
        }));

      this.startShouldFileUpload();
    },
   startShouldFileUpload() {
      // 创建切片请求
      const chunksRequests = this.createChunksRequest();

      // 并发以及并发控制
      this.requestWithLimit(chunksRequests, () => this.mergeRequest());
    },
    // 创建并发请求
    createChunksRequest(fileName) {
      return this.uploadChunkData
        .map(({ chunk, hash, index }) => {
          const formData = new FormData();
          formData.append("chunk", chunk);
          formData.append("chunkName", hash);
          formData.append("fileName", fileName);
          return { formData, index };
        })
        .map(({ formData, index }) =>()=>{
          return new Promise((resolve, reject) => {
            axios({
              method: "post",
              url: "http://localhost:3000/file/upload",
              data: formData,
              onUploadProgress: (progressEvent) => {
                let complete = parseInt(
                  (progressEvent.loaded / progressEvent.total) * 100
                );
                this.uploadChunkData[index].percentage = complete;
              },
            })
              .then((res) => {
                resolve(res);
              })
              .catch((err) => {
                reject(err);
              });
          });
        });
    },
  },
};
</script>

后端使用multiparty接收并处理切片。上传过程需要判断切片是否上传,存在则不必移动,不存在需要从临时缓存移动到指定目录,以便后续操作。需要判断保存切片的文件夹是否存在,不存在创建文件夹。

const express = require('express');
const app = express();

const multiparty = require("multiparty");
const path = require('path'); // 处理路径相关,不处理文件
const fse = require("fs-extra"); // 处理文件相关

// 跨域设置
const cors = require('cors')
app.use(cors())

const UPLOAD_DIR = path.resolve(__dirname, ".", "target");
// 提取后缀名
// get file extension
const extractExt = fileName => fileName.slice(fileName.lastIndexOf("."), fileName.length);

// 创建临时文件夹用于临时存储 chunk
// 添加 chunkDir 前缀与文件名做区分
// create a directory for temporary storage of chunks
// add the 'chunkDir' prefix to distinguish it from the chunk name
const getChunkDir = fileHash =>
  path.resolve(UPLOAD_DIR, `chunkDir_${fileHash}`);


app.post('/file/upload', function (req, res, next) {
  const multipart = new multiparty.Form();
  multipart.parse(req, async function (err, fields, files) {
      if(err){
        res.status = 500
        res.end("异常错误");
        return
      }
      const [chunk] = files.chunk;
      const [chunkHash] = fields.chunkHash; // 切片hash
      const [fileHash] = fields.fileHash;
      const [fileName] = fields.fileName;
      // 获取文件路径
      const filePath = path.resolve(
        UPLOAD_DIR,
        `${fileHash}${extractExt(fileName)}`
      );
      // path.resolve 将相对路径解析为绝对路径 切片名改为fileHash
      const chunkDir = getChunkDir(fileHash);
      const chunkPath = path.resolve(chunkDir, chunkHash);

      // 文件存在直接返回
      // return if file is exists
      if (fse.existsSync(filePath)) {
        res.end("file exist");
        return;
      }

      // 切片存在直接返回
      // return if chunk is exists
      if (fse.existsSync(chunkPath)) {
        res.end("chunk exist");
        return;
      }

      // 切片目录不存在,创建切片目录
      // if chunk directory is not exist, create it
      if (!fse.existsSync(chunkDir)) {
        await fse.mkdirs(chunkDir);
      }

    try {
      // fs-extra 的 move 方法用于移动文件或目录。
      await fse.move(chunk.path, path.resolve(chunkDir, chunkHash));
    } catch (err) {
      console.log(err)
    }
    res.end("received file chunk");
  });
});

并发控制请求

后端一般会限制并发数的,并且谷歌浏览器(Chrome)对于同一域名下的并发请求数量限制是6个。所以限制并发数是有必要的。这块逻辑:要保存请求数,同时发出并发数请求,之后任意一个请求结束,发起一个新的请求,同时减少记录数。一直到请求数为0,请求队列为0,执行回调函数。


<script>
const LIMIT = 6;
export default {
  name: "BigFIleUpload",
  methods: {
    ...
    requestWithLimit(prmiseQueue, callback = null) {
      // 请求数量记录,默认为 0
      let count = 0;

      // 递归调用,请求接口数据
      const run = () => {
        // 接口每调用一次,记录数加 1
        count++;
        const p = prmiseQueue.shift();
        p().then((res) => {
          // 接口调用完成,记录数减 1
          count--;
          if (!prmiseQueue.length && !count) {
            // 这里可以对所有接口返回的数据做处理,以便输出
            callback && callback();
          }
          // prmiseQueue 长度不为 0 且记录小于限制的数量时递归调用
          if (prmiseQueue.length && count < LIMIT) {
            run();
          }
        }).catch((err)=>{
          
        })
      };

      // 根据 limit 并发调用
      for (let i = 0; i < Math.min(prmiseQueue.length, LIMIT); i++) {
        run();
      }
    },
  },
};
</script>

请求合并分片

当切片全部上传,请求后端合并切片。

<script>
import axios from "axios";
const SIZE = 10 * 1024 * 1024;
export default {
  name: "BigFIleUpload",
  ...
  methods: {
    mergeRequest(fileName) {
      axios({
        method: "post",
        url: "http://localhost:3000/file/merge",
        headers: {
          "content-type": "application/json",
        },
        data: JSON.stringify({
          size: SIZE, // 开始位置
          fileName: fileName,
        }),
      });
    },
  },
};
</script>

合并切片之前需要对切片进行排序。排序之后就是找到所有切片路径进行合并。使用fs的读写流。

...

const UPLOAD_DIR = path.resolve(__dirname, ".", "target");
// 提取后缀名
// get file extension
const extractExt = fileName => fileName.slice(fileName.lastIndexOf("."), fileName.length);

// 创建临时文件夹用于临时存储 chunk
// 添加 chunkDir 前缀与文件名做区分
// create a directory for temporary storage of chunks
// add the 'chunkDir' prefix to distinguish it from the chunk name
const getChunkDir = fileHash =>
  path.resolve(UPLOAD_DIR, `chunkDir_${fileHash}`);

const resolvePost = req =>
  new Promise(resolve => {
    let chunk = "";
    req.on("data", data => {
      chunk += data;
    });
    req.on("end", () => {
      resolve(JSON.parse(chunk));
    });
  });

// 写入文件流
const pipeStream = (path, writeStream) =>
  new Promise(resolve => {
    const readStream = fse.createReadStream(path);
    readStream.on("end", () => {
      fse.unlinkSync(path);
      resolve();
    });
    readStream.pipe(writeStream);
  });


// 合并切片
const mergeFileChunk = async (filePath, fileHash, size) => {
  // get chuunk path
  // 获取切片路径
  const chunkDir = getChunkDir(fileHash);
  // read all chunk path
  // 读取所有chunk路径
  const chunkPaths = await fse.readdir(chunkDir);
  // 根据切片下标进行排序
  // chunkPaths.sort((a, b) => parseInt(a.split('-')[1]) - parseInt(b.split('-')[1]));
  chunkPaths.sort((a, b) => parseInt(a.split('-').pop()) - parseInt(b.split('-').pop()));
  // 并发写入文件
  await Promise.all(
    chunkPaths.map((chunkPath, index) =>
      pipeStream(
        path.resolve(chunkDir, chunkPath),
        // 根据 size 在指定位置创建可写流
        fse.createWriteStream(filePath, {
          start: index * size,
        })
      )
    )
  );
  // 合并后删除保存切片的目录
  fse.rmdirSync(chunkDir);
};


app.post('/file/merge', async function (req, res, next) {
  const data = await resolvePost(req);
  const { fileHash, fileName, size } = data;
  const ext = extractExt(fileName);
  const filePath = path.resolve(UPLOAD_DIR, `${fileHash}${ext}`);
  await mergeFileChunk(filePath, fileHash, size);
  res.end(
    JSON.stringify({
      code: 0,
      message: "file merged success"
    })
  );
});

app.listen(3000, function () {
  console.log('hello world')
})

优化计划:看了一些文章有了一些启发,当上传文件1g时候,真正考验性能的时刻到了。有没有再优化的空间,有,比如切片采用多线程。

断点续传实现

断点续传是对上面内容做过滤处理。

造轮子-大文件并行上传、并发数控制、断点续传

实现断点

axios配置项有一个中断的方法,可以将其保存到数组中,当中断时,执行中断操作即可。具体可参考:www.axios-http.cn/docs/req_co…

   data() {
    // save request list
    this.chunkRequestList = []
    ...
  },
 ...
 resetUploadData() {
      // 取消请求
      this.chunkRequestList.forEach(({cancel}) => cancel());
      this.chunkRequestList = [];
      if (this.fileData.worker) {
        this.fileData.worker.onmessage = null;
      }
  },
 createChunksRequest() {
      return this.uploadChunkData
        .filter(({ chunkHash }) => !this.uploadedList.includes(chunkHash))
        .map(({ chunk, hash, index }) => {
          const formData = new FormData();
          formData.append("chunk", chunk);
          formData.append("chunkHash", chunkHash);
          formData.append("fileName", this.getFileNameAndExt().fileName);
          formData.append("fileHash", this.fileData.fileHash);

          return { formData, index };
        })
        .map(({ formData, index }) => ()=>{
          return new Promise((resolve, reject) => {
            axios({
              method: "post",
              url: "http://localhost:3000/file/upload",
              data: formData,
              onUploadProgress: (progressEvent) => {
                let complete = parseInt(
                  (progressEvent.loaded / progressEvent.total) * 100
                );
                this.uploadChunkData[index].percentage = complete;
              },
              // 用于取消请求
              cancelToken: new CancelToken((cancel)=> {
                this.chunkRequestList.push({
                  cancelIndex: index,
                  cancel
                });
              }),
            })
              .then((res) => {
                // 去除请求
                if (this.chunkRequestList) {
                  const curIndex = this.chunkRequestList.findIndex(({cancelIndex})=> cancelIndex === index);
                  this.chunkRequestList.splice(curIndex, 1);
                }
                resolve(res);
              })
              .catch((err) => {
                reject(err);
              });
          });
        });
    },
    ...

恢复上传

通过接口获取已上传切片,之后过滤掉这部分切片对应的请求,只上传没有上传的切片和发起这部分请求。

需要保存已经上传的切片。获取已上传切片后过滤掉这部分请求数据。

 data() {
    // save uploaded file list
    this.uploadedList = []
    return {
      ...
    };
  },
  ...
    async uploadChunks() {
      ...
    
      // 创建上传切片数据
      this.uploadChunkData = chunkList
        .map(({ file }, index) => ({
          fileHash: this.fileData.fileHash,
          chunk: file, // 切片文件
          chunkHash:  `${this.fileData.fileHash}-${index}`,
          // 已上传为100%
          percentage: this.uploadedList.includes(`${this.fileData.fileHash}-${index}`) ? 100 : 0, 
          size: file.size,
          index,
        }));

    },
    createChunksRequest() {
      return this.uploadChunkData
        // 过滤请求
        .filter(({ chunkHash }) => !this.uploadedList.includes(chunkHash))
        .map(({ chunk, chunkHash, index }) => {
          const formData = new FormData();
          formData.append("chunk", chunk);
          formData.append("chunkHash", chunkHash);
          formData.append("fileName", this.getFileNameAndExt().fileName);
          formData.append("fileHash", this.fileData.fileHash);

          return { formData, index };
        })
        .map(({ formData, index }) => ()=>{
           return new Promise((resolve, reject) => {
            axios({
              method: "post",
              url: "http://localhost:3000/file/upload",
              data: formData,
              onUploadProgress: (progressEvent) => {
                let complete = parseInt(
                  (progressEvent.loaded / progressEvent.total) * 100
                );
                this.uploadChunkData[index].percentage = complete;
              },
              // 用于取消请求
              cancelToken: new CancelToken((cancel)=> {
                this.chunkRequestList.push({
                  cancelIndex: index,
                  cancel
                });
              }),
            })
              .then((res) => {
                // 去除请求
                if (this.chunkRequestList) {
                  const curIndex = this.chunkRequestList.findIndex(({cancelIndex})=> cancelIndex === index);
                  this.chunkRequestList.splice(curIndex, 1);
                }
                resolve(res);
              })
              .catch((err) => {
                reject(err);
              });
          });
        
        });
    },

增加验证接口,上传前调用查询已上传切片,使用readdir方法读取已经上传的切片。另外,在开始上传时,也可以校验上传文件是否已经存在,已经存在则不必再次上传。



...
  // 返回已上传的所有切片名
// return chunk names which is uploaded
const createUploadedList = async fileHash =>
fse.existsSync(getChunkDir(fileHash))
  ? await fse.readdir(getChunkDir(fileHash))
  : [];


app.post('/file/verify', async function (req, res, next) {
  const data = await resolvePost(req);
  const { fileHash, fileName } = data;
  const ext = extractExt(fileName);
  const filePath = path.resolve(UPLOAD_DIR, `${fileHash}${ext}`);
  if (fse.existsSync(filePath)) {
    res.end(
      JSON.stringify({
        shouldUpload: false
      })
    );
  } else {
    res.end(
      JSON.stringify({
        shouldUpload: true,
        uploadedList: await createUploadedList(fileHash)
      })
    );
  }
});

app.listen(3000, function () {
  console.log('hello world')
})

目前的这种断点续传,是针对切片是否上传完毕这个点的。是否有更好的方案呢?比如切片层级,记录上传了多少,之后从切片断点处发起请求而不是从一开始。可以探索一下。

完毕。

仓库地址

完整代码:github

参考文章