大文件上传的vue3+koa2实现
大文件上传实现
需求分析
对于大文件上传的需求,我们要实现以下功能
- 文件切割,分片上传
- 实现文件秒传
- 实现上传进度条
项目架构
本项目前端使用vite+vue3+element-plus,后台使用koa2框架。
vite创建vue3项目
// 创建项目
yarn create vite big-upload --template
vue cd big-upload-ui
// 安装需要的库
yarn add element-plus
yarn add axios
koa脚手架创建后台项目
// koa2脚手架
npm install koa-generator -g
// 脚手架创建项目
koa2 big-upload-server
cd big-upload-server
yarn
// 安装对应的库
yarn add koa-body fs-extra
大文件上传
前端思路
文件切片
文件选择使用input的file类型,获取文件对象。使用文件对象从Blob对象上继承的的slice方法对文件进行分片,返回一个Blob对象。然后将剪切的片段 添加到数组中完成对整个文件的分片。
文件唯一标识
而对每个文件的单一判断,我们一般都是来计算文件的hash值作为它的唯一标识。
这里我们引入spark-md5来计算文件的hash,但是对于整个文件的hash计算是非常耗时的。 这里我借鉴了js 如何快速计算出文件hash值 - 掘金 (juejin.cn)他的抽样计算的方法。
因为我们现在只需要文件的hash作为唯一标识,不需要得到精确的hash值,故采用此方法,这样可以大大节省计算hash所需要的时间。
定义上传的数据结构
现在文件的唯一标识解决了,我们就可以来准备进行上传了。
我们将切割好的片段依次上传,当我们所有的切片上传完成后,我们再发送一个合并请求,将所有切片进行合并变成完整的文件。
我们先来定义上传切片的数据结构:
切片(chunk)对象:
+ chunk: 文件对应的切片
+ size:切片大小
+ fileHash: 总文件hash
+ index: 切片下标
+ chunkHash;切片的hash (采用"总文件hash" + "切片下标") 来表示
代码实现
// App.vue
<template>
<h1>大文件上传</h1>
<input type="file" @change="handleFileChange" />
<el-button @click="handleUpload">上传</el-button>
</template>
<script setup>
import { ElMessage } from "element-plus";
import SparkMD5 from "spark-md5";
import { computed, ref, watch } from "vue";
import { uploadChunks, mergeChunks } from "./api/upload";
const SIZE = 3 * 1024 * 1024; // 定义切片的大小
const File = ref(null);
const chunkList = ref([]);
const hash = ref("");
// 获取文件
function handleFileChange(e) {
resetData();
const [file] = e.target.files;
if (!file) {
File.value = null;
return;
}
File.value = file;
}
// 点击按钮上传文件
async function handleUpload() {
const file = File.value;
if (!file) {
ElMessage("请选择一个文件吧");
return;
}
resetData();
// 文件分片
const fileChunkList = createFileChunk(file);
hash.value = await calculateHash(fileChunkList);
chunkList.value = fileChunkList.map(({ file }, index) => {
return {
chunk: file,
size: file.size,
chunkHash: `${hash.value} - ${index}`,
fileHash: hash.value,
index
};
});
UploadChunks(chunkList);
}
// 文件分片
function createFileChunk(file, size = SIZE) {
const fileChunkList = [];
let cur = 0;
while (cur < file.size) {
fileChunkList.push({ file: file.slice(cur, cur + size) });
cur += size;
}
return fileChunkList;
}
// 计算hash
function calculateHash(fileChunkList) {
return new Promise((resolve, reject) => {
const spark = new SparkMD5.ArrayBuffer();
const reader = new FileReader();
const file = File.value;
const size = file.size;
let offset = 2 * 1024 * 1024;
let chunks = [file.slice(0, offset)];
let cur = offset;
while (cur < size) {
// 最后一块全部加进来
if (cur + offset >= size) {
chunks.push(file.slice(cur, cur + offset));
} else {
// 中间的 前中后去两个字节
const mid = cur + offset / 2;
const end = cur + offset;
chunks.push(file.slice(cur, cur + 2));
chunks.push(file.slice(mid, mid + 2));
chunks.push(file.slice(end - 2, end));
}
// 前取两个字节
cur += offset;
}
reader.readAsArrayBuffer(new Blob(chunks));
reader.onload = (e) => {
spark.append(e.target.result);
hashPercentage.value = 100;
resolve(spark.end());
};
});
}
// 上传文件切片
function UploadChunks(chunkList = []) {
requestList = chunkList.map((item) => {
return uploadChunks(item)
});
await Prolise.all(requestList)
mergeChunks()
}
// 通知服务器合并切片
function MergeChunks() {
mergeChunks(
JSON.stringify({
fileName: File.value.name,
fileSize: File.value.size,
size: SIZE,
hash: hash.value,
})
);
}
</script>
// 请求封装
// axios配置这里就省略了(设置了默认的请求头和响应拦截)
import request from "../utils/request";
export const uploadChunks = (data, onProgress, signal) => {
return request({
url: "/upload/chunk",
method: "POST",
data,
// 文件的上传配置请求头为form-data形式
headers: {
"Content-type": "multipart/form-data;charset=UTF-8",
}
});
};
export const mergeChunks = (data) => {
return request({
url: "/upload/merge",
method: "post",
data,
});
};
后端思路
首先我们先定义一下我们的文件存储目录结构
+ target
+ fileHash-chunks
+ chunkHash
file
我们文件存储的目标目录是target,将文件切片统一放在fileHash-chunks文件夹中(因为同文件夹下不允许同名,我们存放切片的目录后面加个chunks区分)。需要合并的时候读取fileHash-chunks中的切片再进行合并。
upload路由
const router = require("koa-router")();
const path = require("path");
const fse = require("fs-extra");
const util = require("../utils/utils");
const { koaBody } = require("koa-body");
router.prefix("/upload");
// 大文件存储目录
const UPLOAD_DIR = path.resolve(__dirname, "..", "target");
// 提取文件后缀名
const extractExt = (filename) =>
filename.slice(filename.lastIndexOf("."), filename.length);
/**
* 针对 path 创建 readStream 并写入 writeStream,写入完成之后删除文件
* @param {String} path
* @param {String} writeStream
*/
const pipeStream = (path, writeStream) =>
new Promise((resolve) => {
const readStream = fse.createReadStream(path);
readStream.on("end", () => {
fse.unlinkSync(path);
resolve();
});
readStream.pipe(writeStream);
});
/**
* 读取所有的 chunk 合并到 filePath 中
* @param {String} filePath 文件存储路径
* @param {String} chunkDir chunk存储文件夹名称
* @param {String} size 每一个chunk的大小
*/
async function mergeFileChunk(filePath, chunkDir, size) {
// 获取chunk列表
const chunkPaths = await fse.readdir(chunkDir);
// 根据切片下标进行排序 否则直接读取目录的获得的顺序可能会错乱
chunkPaths.sort((a, b) => a.split("-")[1] - b.split("-")[1]);
await Promise.all(
chunkPaths.map((chunkPath, index) =>
pipeStream(
path.resolve(chunkDir, chunkPath),
// 指定位置创建可写流
fse.createWriteStream(filePath, {
start: index * size,
end: (index + 1) * size,
})
)
)
);
fse.rmdirSync(chunkDir); // 合并后删除保存切片的目录
}
// 上传chunk
router.post("/chunk", koaBody({ multipart: true }), async (ctx, next) => {
const { chunkHash, fileHash } = ctx.request.body;
const { chunk } = ctx.request.files;
const chunkDir = path.resolve(UPLOAD_DIR, `${fileHash}-chunks`);
// // 切片目录不存在,创建切片目录
if (!fse.existsSync(chunkDir)) {
await fse.mkdirs(chunkDir);
}
await fse.move(chunk.filepath, `${chunkDir}/${chunkHash}`);
ctx.body = util.success({ code: 200, data: "", msg: "上传成功" });
});
// chunk合并
router.post("/merge", async (ctx, next) => {
const { fileName, fileSize, size, hash } = ctx.request.body;
// console.log('data ====>', ctx.request.body);
const ext = extractExt(fileName);
const filePath = path.resolve(UPLOAD_DIR, `${hash}${ext}`);
const chunkDir = path.resolve(UPLOAD_DIR, `${hash}-chunks`);
await mergeFileChunk(filePath, chunkDir, size);
ctx.body = util.success({ code: 200, data: "", msg: "合并成功" });
});
module.exports = router;
到这,最基础的大文件上传就解决了。
上传进度条
这里我们要监听两个进度的上传,单个切片的上传进度和总上传进度
单个切片上传
axios中对原生的XMLHttpRequest的上传进度监听进行了封装,我们只用在接口配置中配置onUploadProgress,传入想要的callback即可。
// App.vue
// 上传进度监听函数
function onProgress(item) {
return (e) => {
item.percentage = parseInt(String((e.loaded / e.total) * 100));
};
}
// api/upload.js
export const uploadChunks = (data, onProgress) => {
return request({
...
onUploadProgress: onProgress,
});
};
总文件上传进度
总文件上传进度只需要设置一个计算属性,将每个切片的上传进度 * 切片大小除以总文件大小即可。
// 计算总文件上传进度
const uploadPercentage = computed(() => {
if (!File.value || !chunkList.value.length) return 0;
const loaded = chunkList.value
.map((item) => item.size * item.percentage)
.reduce((acc, cur) => acc + cur);
return parseInt((loaded / File.value.size).toFixed(2));
});
文件秒传
文件秒传比较简单,就是判断这个文件是否已经在服务器中了。有就直接返回,没有才开始上传,所以我们实现一个接口(verify)来判断。而判断方式也很简单,我们之前不是计算过文件的hash嘛,我们直接用这个hash去target目录下查找就行了。
// 验证文件是否存在
router.post("/verify", async (ctx, next) => {
const { fileName, fileHash } = ctx.request.body;
let shouldUpload = true;
let msg = "文件不存在,需要上传";
const ext = extractExt(fileName);
const filePath = path.resolve(UPLOAD_DIR, `${fileHash}${ext}`);
if (fse.existsSync(filePath)) {
shouldUpload = false;
msg = "文件存在,不需要上传";
}
ctx.body = util.success({
code: 200,
data: { shouldUpload, msg },
});
});
断点续传
断点续传就是当文件上传失败后,我们重新上传文件之后只上传失败的片段,不上传成功的文件。
暂停上传
实现一个按钮,点击中止现在的请求。 axios中已经给出了取消请求的方式 取消请求 | Axios Docs (axios-http.com)
这里我们采用AbortController的方式来终止请求,创建一个请求控制器,在请求中加上signal: controller.signal
这么一个配置,需要终止请求时调用controller.abort()
来终止请求。
但是这里有个坑,在AbortController中signal是个只读属性
一旦我们调用abort()
signal就会被修改为false,我们想继续请求signal必须为true,这样就会导致我们无法继续发送请求。
这里我的解决方法是我们创建一个变量将AbortController存起来,需要继续请求时我们直接创建一个新的AbortController传入请求的signal。
// 请求控制器
const controller = ref(null);
// 暂停上传
function handlePause() {
// console.log("中断上传", controller);
requestList.value.forEach((element) => {
controller.value.abort();
});
requestList.value = [];
isPaused.value = true;
}
// api/uoload.js
export const uploadChunks = (data, onProgress, signal) => {
return request({
signal
});
};
恢复上传
恢复上传其实就是重新发起上传请求,但是这里我们需要去判断哪些切片是已经上传过的,我们就需要对秒传接口进行更改,去读取对应的文件切片文件夹,读取已经上传过的切片,将这些切片存到一个数组中进行返回,然后前端获取到这个数组,对请求数组进行筛选,去除已经上传过的切片。
而针对已经上传过的切片,我们还要来设置它的上传进度,直接将它的上传进度设置为100。
优化
切片过多导致并发http请求过多
我们对于请求的处理是直接设置一个requestList来存放所有的请求,并且用Promise.all来处理,当我们切片过多时,一次性创建的请求就会很庞大,浏览器就会十分卡顿。
于是我们就需要来控制并发数量,我们一次只能发送max个请求,当这些请求完成后再发送max个请求。一直循环,直到所有请求发送完毕。
切片上传失败问题
需求:
- 第一次发送失败弹出提示信息
- 第一次发送失败之后我们再进行3次的重传
- 3次重传失败需要有提示
我们定一个状态:
const Status = { wait: 1, error: 2, done: 3, fail: 4 };
每一个请求都对应这下面的4种状态中的一个,一开始所有的请求都是 wait等待状态,发生错误时候变成error状态,3次重传都失败了之后变成fail状态,请求成功变成done完成状态
这里我们将控制并发和失败处理放在一起
// 控制请求发送以及上传错误处理
function sendRequest(form, max = 4) {
return new Promise((resolve, reject) => {
const len = form.length;
let counter = 0; // 发送成功的请求数
const retryArr = [];
form.forEach((item) => (item.status = Status.wait));
const start = async () => {
let Err = false;
while (counter < len && !isPaused.value && !Err) {
// 创建请求列表
let requestArr = [];
// 并发控制请求
for (let i = 0; i < max; i++) {
let idx = form.findIndex(
(item) => item.status == Status.wait || item.status == Status.error
);
if (idx == -1) {
Err = true;
return;
}
form[idx].status = Status.done;
// console.log("开始", idx);
let { index } = form[idx];
requestArr.push(
uploadChunks(
form[idx],
onProgress(chunkList.value[index]),
controller.value.signal
)
.then(() => {
form[idx].status = Status.done;
counter++;
if (counter === len) {
resolve();
}
})
.catch((err) => {
// console.log("err-----》", err);
form[idx].status = Status.error;
if (typeof retryArr[index] !== "number") {
if (!isPaused.value) {
ElMessage.info(`第 ${index} 个片段上传失败,系统准备重试`);
retryArr[index] = 0;
}
}
// 次数累加
retryArr[index]++;
if (retryArr[index] > 3) {
ElMessage.error(
`第 ${index} 个片段重试多次无效,系统准备放弃上传`
);
form[idx].status = Status.fail;
form[idx].percentage = 0;
// 终止当前所有请求
Err = true;
requestArr.forEach((element) => {
controller.value.abort();
});
requestArr = [];
}
})
);
await Promise.all(requestArr);
}
}
};
start();
});
}
这里我们创建retryArr来存放重试次数,我们通过每个请求的状态来获取是否要进行请求,当状态为done或者fail即已经请求完成的,所以我们只需要状态为wait和error的请求,我们依次取出max个,然后进行发送。
chunk定期清理
当上传失败后如果没有再继续上传,这些chunk就会一直存在服务器上,日积月累就会给服务器的内存带来很大负担。所以我们就需要定期清理这些chunk。
需求:
- 定期检测
- 清除空文件夹
- 清理超时chunk
这里引入node-schedule来定期执行任务
const fse = require("fs-extra");
const path = require("path");
const schedule = require("node-schedule");
const UPLOAD_DIR = path.resolve(__dirname, "..", "target");
// 空目录删除
function remove(file, stats) {
const now = new Date().getTime();
// console.log('stats', stats);
console.log("file", file);
const offset = now - stats.ctimeMs;
if (offset > 2 * 60 * 1000) {
fse.unlinkSync(file);
console.log(file, "文件过期,删除");
}
}
async function scan(dir, callback) {
const files = await fse.readdirSync(dir);
files.forEach(async (filename) => {
const fileDir = path.resolve(dir, filename);
const stats = await fse.statSync(fileDir);
// 删除文件
if (stats.isDirectory()) {
scan(fileDir, remove);
// 删除空文件夹
if (fse.readdirSync(fileDir).length == 0) {
fse.rmdirSync(fileDir);
}
return;
}
if(callback) {
callback(fileDir, stats)
}
});
}
// scheduleJob参数
// * * * * * *
// ┬ ┬ ┬ ┬ ┬ ┬
// │ │ │ │ │ │
// │ │ │ │ │ └ day of week (0 - 7) (0 or 7 is Sun)
// │ │ │ │ └───── month (1 - 12)
// │ │ │ └────────── day of month (1 - 31)
// │ │ └─────────────── hour (0 - 23)
// │ └──────────────────── minute (0 - 59)
// └───────────────────────── second (0 - 59, OPTIONAL)
let start = function () {
// 每5秒
schedule.scheduleJob("*/5 * * * * *", function () {
console.log("定时清理chunks开始");
scan(UPLOAD_DIR);
});
};
start();
总结
到这里大文件上传功能算是基本实现了
总体思路是参考了我学长的文章前端超大大文件上传实现以及优化 - 掘金 (juejin.cn)按照我自己的理解进行了实现。
转载自:https://juejin.cn/post/7224764099187474490