likes
comments
collection
share

大文件上传的vue3+koa2实现

作者站长头像
站长
· 阅读数 15

大文件上传实现


需求分析

对于大文件上传的需求,我们要实现以下功能

  • 文件切割,分片上传
  • 实现文件秒传
  • 实现上传进度条

项目架构

本项目前端使用vite+vue3+element-plus,后台使用koa2框架。

vite创建vue3项目

// 创建项目
yarn create vite big-upload --template 
vue cd big-upload-ui 
// 安装需要的库
yarn add element-plus
yarn add axios

koa脚手架创建后台项目

// koa2脚手架
npm install koa-generator -g
// 脚手架创建项目
koa2 big-upload-server
cd big-upload-server
yarn
// 安装对应的库
yarn add koa-body fs-extra

大文件上传

前端思路

文件切片

文件选择使用input的file类型,获取文件对象。使用文件对象从Blob对象上继承的的slice方法对文件进行分片,返回一个Blob对象。然后将剪切的片段 添加到数组中完成对整个文件的分片。

文件唯一标识

而对每个文件的单一判断,我们一般都是来计算文件的hash值作为它的唯一标识。

这里我们引入spark-md5来计算文件的hash,但是对于整个文件的hash计算是非常耗时的。 这里我借鉴了js 如何快速计算出文件hash值 - 掘金 (juejin.cn)他的抽样计算的方法。

大文件上传的vue3+koa2实现

因为我们现在只需要文件的hash作为唯一标识,不需要得到精确的hash值,故采用此方法,这样可以大大节省计算hash所需要的时间。

定义上传的数据结构

现在文件的唯一标识解决了,我们就可以来准备进行上传了。

我们将切割好的片段依次上传,当我们所有的切片上传完成后,我们再发送一个合并请求,将所有切片进行合并变成完整的文件。

我们先来定义上传切片的数据结构:

切片(chunk)对象:
    + chunk: 文件对应的切片
    + size:切片大小
    + fileHash: 总文件hash
    + index: 切片下标
    + chunkHash;切片的hash (采用"总文件hash" + "切片下标") 来表示

代码实现

// App.vue
<template>
  <h1>大文件上传</h1>
  <input type="file" @change="handleFileChange" />
  <el-button @click="handleUpload">上传</el-button>
</template>

<script setup>
import { ElMessage } from "element-plus";
import SparkMD5 from "spark-md5";
import { computed, ref, watch } from "vue";
import { uploadChunks, mergeChunks } from "./api/upload";

const SIZE = 3 * 1024 * 1024; // 定义切片的大小
const File = ref(null);
const chunkList = ref([]);
const hash = ref("");

// 获取文件
function handleFileChange(e) {
  resetData();
  const [file] = e.target.files;
  if (!file) {
    File.value = null;
    return;
  }
  File.value = file;
}

// 点击按钮上传文件
async function handleUpload() {
  const file = File.value;
  if (!file) {
    ElMessage("请选择一个文件吧");
    return;
  }
  resetData();

  // 文件分片
  const fileChunkList = createFileChunk(file);
  hash.value = await calculateHash(fileChunkList);

  chunkList.value = fileChunkList.map(({ file }, index) => {
    return {
      chunk: file,
      size: file.size,
      chunkHash: `${hash.value} - ${index}`,
      fileHash: hash.value,
      index
    };
  });

  UploadChunks(chunkList);
}

// 文件分片
function createFileChunk(file, size = SIZE) {
  const fileChunkList = [];
  let cur = 0;
  while (cur < file.size) {
    fileChunkList.push({ file: file.slice(cur, cur + size) });
    cur += size;
  }
  return fileChunkList;
}

// 计算hash
function calculateHash(fileChunkList) {
  return new Promise((resolve, reject) => {
    const spark = new SparkMD5.ArrayBuffer();
    const reader = new FileReader();
    const file = File.value;
    const size = file.size;
    let offset = 2 * 1024 * 1024;
    let chunks = [file.slice(0, offset)];

    let cur = offset;
    while (cur < size) {
      // 最后一块全部加进来
      if (cur + offset >= size) {
        chunks.push(file.slice(cur, cur + offset));
      } else {
        // 中间的 前中后去两个字节
        const mid = cur + offset / 2;
        const end = cur + offset;
        chunks.push(file.slice(cur, cur + 2));
        chunks.push(file.slice(mid, mid + 2));
        chunks.push(file.slice(end - 2, end));
      }
      // 前取两个字节
      cur += offset;
    }

    reader.readAsArrayBuffer(new Blob(chunks));
    reader.onload = (e) => {
      spark.append(e.target.result);
      hashPercentage.value = 100;
      resolve(spark.end());
    };
  });
}

// 上传文件切片
function UploadChunks(chunkList = []) {
  requestList = chunkList.map((item) => {
    return uploadChunks(item)
  });
  
  await Prolise.all(requestList)
  mergeChunks()
}

// 通知服务器合并切片
function MergeChunks() {
  mergeChunks(
    JSON.stringify({
      fileName: File.value.name,
      fileSize: File.value.size,
      size: SIZE,
      hash: hash.value,
    })
  );
}
</script>
// 请求封装
// axios配置这里就省略了(设置了默认的请求头和响应拦截)
import request from "../utils/request";

export const uploadChunks = (data, onProgress, signal) => {
  return request({
    url: "/upload/chunk",
    method: "POST",
    data,
    // 文件的上传配置请求头为form-data形式
    headers: {
      "Content-type": "multipart/form-data;charset=UTF-8",
    }
  });
};

export const mergeChunks = (data) => {
  return request({
    url: "/upload/merge",
    method: "post",
    data,
  });
};

后端思路

首先我们先定义一下我们的文件存储目录结构

    + target
	+ fileHash-chunks
		+ chunkHash
	file

我们文件存储的目标目录是target,将文件切片统一放在fileHash-chunks文件夹中(因为同文件夹下不允许同名,我们存放切片的目录后面加个chunks区分)。需要合并的时候读取fileHash-chunks中的切片再进行合并。

upload路由

const router = require("koa-router")();
const path = require("path");
const fse = require("fs-extra");
const util = require("../utils/utils");
const { koaBody } = require("koa-body");

router.prefix("/upload");

// 大文件存储目录
const UPLOAD_DIR = path.resolve(__dirname, "..", "target");

// 提取文件后缀名
const extractExt = (filename) =>
  filename.slice(filename.lastIndexOf("."), filename.length);

/**
 * 针对 path 创建 readStream 并写入 writeStream,写入完成之后删除文件
 * @param {String} path
 * @param {String} writeStream
 */
const pipeStream = (path, writeStream) =>
  new Promise((resolve) => {
    const readStream = fse.createReadStream(path);
    readStream.on("end", () => {
      fse.unlinkSync(path);
      resolve();
    });
    readStream.pipe(writeStream);
  });

/**
 * 读取所有的 chunk 合并到 filePath 中
 * @param {String} filePath 文件存储路径
 * @param {String} chunkDir chunk存储文件夹名称
 * @param {String} size 每一个chunk的大小
 */
async function mergeFileChunk(filePath, chunkDir, size) {
  // 获取chunk列表
  const chunkPaths = await fse.readdir(chunkDir);
  // 根据切片下标进行排序  否则直接读取目录的获得的顺序可能会错乱
  chunkPaths.sort((a, b) => a.split("-")[1] - b.split("-")[1]);
  await Promise.all(
    chunkPaths.map((chunkPath, index) =>
      pipeStream(
        path.resolve(chunkDir, chunkPath),
        // 指定位置创建可写流
        fse.createWriteStream(filePath, {
          start: index * size,
          end: (index + 1) * size,
        })
      )
    )
  );
  fse.rmdirSync(chunkDir); // 合并后删除保存切片的目录
}

// 上传chunk
router.post("/chunk", koaBody({ multipart: true }), async (ctx, next) => {
  const { chunkHash, fileHash } = ctx.request.body;
  const { chunk } = ctx.request.files;
  const chunkDir = path.resolve(UPLOAD_DIR, `${fileHash}-chunks`);
  // // 切片目录不存在,创建切片目录
  if (!fse.existsSync(chunkDir)) {
    await fse.mkdirs(chunkDir);
  }
  await fse.move(chunk.filepath, `${chunkDir}/${chunkHash}`);
  ctx.body = util.success({ code: 200, data: "", msg: "上传成功" });
});

// chunk合并
router.post("/merge", async (ctx, next) => {
  const { fileName, fileSize, size, hash } = ctx.request.body;
  // console.log('data ====>', ctx.request.body);
  const ext = extractExt(fileName);
  const filePath = path.resolve(UPLOAD_DIR, `${hash}${ext}`);
  const chunkDir = path.resolve(UPLOAD_DIR, `${hash}-chunks`);
  await mergeFileChunk(filePath, chunkDir, size);
  ctx.body = util.success({ code: 200, data: "", msg: "合并成功" });
});

module.exports = router;

到这,最基础的大文件上传就解决了。


上传进度条

这里我们要监听两个进度的上传,单个切片的上传进度和总上传进度

单个切片上传

axios中对原生的XMLHttpRequest的上传进度监听进行了封装,我们只用在接口配置中配置onUploadProgress,传入想要的callback即可。

// App.vue
// 上传进度监听函数
function onProgress(item) {
  return (e) => {
    item.percentage = parseInt(String((e.loaded / e.total) * 100));
  };
}

// api/upload.js
export const uploadChunks = (data, onProgress) => {
  return request({
    ...
    onUploadProgress: onProgress,
  });
};

总文件上传进度

总文件上传进度只需要设置一个计算属性,将每个切片的上传进度 * 切片大小除以总文件大小即可。

// 计算总文件上传进度
const uploadPercentage = computed(() => {
  if (!File.value || !chunkList.value.length) return 0;
  const loaded = chunkList.value
    .map((item) => item.size * item.percentage)
    .reduce((acc, cur) => acc + cur);
  return parseInt((loaded / File.value.size).toFixed(2));
});

文件秒传

文件秒传比较简单,就是判断这个文件是否已经在服务器中了。有就直接返回,没有才开始上传,所以我们实现一个接口(verify)来判断。而判断方式也很简单,我们之前不是计算过文件的hash嘛,我们直接用这个hash去target目录下查找就行了。

// 验证文件是否存在
router.post("/verify", async (ctx, next) => {
  const { fileName, fileHash } = ctx.request.body;
  let shouldUpload = true;
  let msg = "文件不存在,需要上传";
  const ext = extractExt(fileName);
  const filePath = path.resolve(UPLOAD_DIR, `${fileHash}${ext}`);
  if (fse.existsSync(filePath)) {
    shouldUpload = false;
    msg = "文件存在,不需要上传";
  }
  ctx.body = util.success({
    code: 200,
    data: { shouldUpload, msg },
  });
});

断点续传

断点续传就是当文件上传失败后,我们重新上传文件之后只上传失败的片段,不上传成功的文件。

暂停上传

实现一个按钮,点击中止现在的请求。 axios中已经给出了取消请求的方式 取消请求 | Axios Docs (axios-http.com)

这里我们采用AbortController的方式来终止请求,创建一个请求控制器,在请求中加上signal: controller.signal这么一个配置,需要终止请求时调用controller.abort()来终止请求。

但是这里有个坑,在AbortController中signal是个只读属性

大文件上传的vue3+koa2实现

一旦我们调用abort()signal就会被修改为false,我们想继续请求signal必须为true,这样就会导致我们无法继续发送请求。

这里我的解决方法是我们创建一个变量将AbortController存起来,需要继续请求时我们直接创建一个新的AbortController传入请求的signal

// 请求控制器
const controller = ref(null);

// 暂停上传
function handlePause() {
  // console.log("中断上传", controller);
  requestList.value.forEach((element) => {
    controller.value.abort();
  });
  requestList.value = [];
  isPaused.value = true;
}

// api/uoload.js
export const uploadChunks = (data, onProgress, signal) => {
  return request({
    signal
  });
};

恢复上传

恢复上传其实就是重新发起上传请求,但是这里我们需要去判断哪些切片是已经上传过的,我们就需要对秒传接口进行更改,去读取对应的文件切片文件夹,读取已经上传过的切片,将这些切片存到一个数组中进行返回,然后前端获取到这个数组,对请求数组进行筛选,去除已经上传过的切片

而针对已经上传过的切片,我们还要来设置它的上传进度,直接将它的上传进度设置为100。

优化

切片过多导致并发http请求过多

我们对于请求的处理是直接设置一个requestList来存放所有的请求,并且用Promise.all来处理,当我们切片过多时,一次性创建的请求就会很庞大,浏览器就会十分卡顿。

于是我们就需要来控制并发数量,我们一次只能发送max个请求,当这些请求完成后再发送max个请求。一直循环,直到所有请求发送完毕。

切片上传失败问题

需求:

  • 第一次发送失败弹出提示信息
  • 第一次发送失败之后我们再进行3次的重传
  • 3次重传失败需要有提示

我们定一个状态:

const Status = { wait: 1, error: 2, done: 3, fail: 4 };

每一个请求都对应这下面的4种状态中的一个,一开始所有的请求都是 wait等待状态,发生错误时候变成error状态,3次重传都失败了之后变成fail状态,请求成功变成done完成状态

这里我们将控制并发和失败处理放在一起

// 控制请求发送以及上传错误处理
function sendRequest(form, max = 4) {
  return new Promise((resolve, reject) => {
    const len = form.length;
    let counter = 0; // 发送成功的请求数
    const retryArr = [];

    form.forEach((item) => (item.status = Status.wait));

    const start = async () => {
      let Err = false;
      while (counter < len && !isPaused.value && !Err) {
        // 创建请求列表
        let requestArr = [];

        // 并发控制请求
        for (let i = 0; i < max; i++) {
          let idx = form.findIndex(
            (item) => item.status == Status.wait || item.status == Status.error
          );
          if (idx == -1) {
            Err = true;
            return;
          }
          form[idx].status = Status.done;
          // console.log("开始", idx);

          let { index } = form[idx];

          requestArr.push(
            uploadChunks(
              form[idx],
              onProgress(chunkList.value[index]),
              controller.value.signal
            )
              .then(() => {
                form[idx].status = Status.done;
                counter++;
                if (counter === len) {
                  resolve();
                }
              })
              .catch((err) => {
                // console.log("err-----》", err);
                form[idx].status = Status.error;
                if (typeof retryArr[index] !== "number") {
                  if (!isPaused.value) {
                    ElMessage.info(`第 ${index} 个片段上传失败,系统准备重试`);
                    retryArr[index] = 0;
                  }
                }

                // 次数累加
                retryArr[index]++;
                if (retryArr[index] > 3) {
                  ElMessage.error(
                    `第 ${index} 个片段重试多次无效,系统准备放弃上传`
                  );
                  form[idx].status = Status.fail;
                  form[idx].percentage = 0;
                  // 终止当前所有请求
                  Err = true;
                  requestArr.forEach((element) => {
                    controller.value.abort();
                  });
                  requestArr = [];
                }
              })
          );

          await Promise.all(requestArr);
        }
      }
    };
    start();
  });
}

这里我们创建retryArr来存放重试次数,我们通过每个请求的状态来获取是否要进行请求,当状态为done或者fail即已经请求完成的,所以我们只需要状态为wait和error的请求,我们依次取出max个,然后进行发送。

chunk定期清理

当上传失败后如果没有再继续上传,这些chunk就会一直存在服务器上,日积月累就会给服务器的内存带来很大负担。所以我们就需要定期清理这些chunk。

需求:

  • 定期检测
  • 清除空文件夹
  • 清理超时chunk

这里引入node-schedule来定期执行任务

const fse = require("fs-extra");
const path = require("path");
const schedule = require("node-schedule");

const UPLOAD_DIR = path.resolve(__dirname, "..", "target");

// 空目录删除
function remove(file, stats) {
  const now = new Date().getTime();
  // console.log('stats', stats);
  console.log("file", file);
  const offset = now - stats.ctimeMs;
  if (offset > 2 * 60 * 1000) {
    fse.unlinkSync(file);
    console.log(file, "文件过期,删除");
  }
}

async function scan(dir, callback) {
  const files = await fse.readdirSync(dir);
  files.forEach(async (filename) => {
    const fileDir = path.resolve(dir, filename);
    const stats = await fse.statSync(fileDir);
    // 删除文件
    if (stats.isDirectory()) {
      scan(fileDir, remove);
      // 删除空文件夹
      if (fse.readdirSync(fileDir).length == 0) {
        fse.rmdirSync(fileDir);
      }
      return;
    }
    if(callback) {
      callback(fileDir, stats)
    }
  });
}

// scheduleJob参数
// * * * * * *
// ┬ ┬ ┬ ┬ ┬ ┬
// │ │ │ │ │ │
// │ │ │ │ │ └ day of week (0 - 7) (0 or 7 is Sun)
// │ │ │ │ └───── month (1 - 12)
// │ │ │ └────────── day of month (1 - 31)
// │ │ └─────────────── hour (0 - 23)
// │ └──────────────────── minute (0 - 59)
// └───────────────────────── second (0 - 59, OPTIONAL)
let start = function () {
  // 每5秒
  schedule.scheduleJob("*/5 * * * * *", function () {
    console.log("定时清理chunks开始");
    scan(UPLOAD_DIR);
  });
};

start();

总结

到这里大文件上传功能算是基本实现了

基本代码

总体思路是参考了我学长的文章前端超大大文件上传实现以及优化 - 掘金 (juejin.cn)按照我自己的理解进行了实现。