likes
comments
collection
share

大文件切片上传、秒传、断点续传代码实现

作者站长头像
站长
· 阅读数 28

实现流程:

大文件切片上传、秒传、断点续传代码实现

一个完整的大文件,被切成一小块一小块,然后上传这些小块至服务端;最后在服务端将隶属于同一个文件的所有切片按照正确的顺序合并起来,就能显示完整的文件

几个问题:

  1. 如何切片?

    核心是 Blob 对象的 slice 方法。

大文件切片上传、秒传、断点续传代码实现

大文件切片上传、秒传、断点续传代码实现

  1. 一个文件切片之后怎么知道这些切片是这个文件的?或者说文件被切完片了在服务端怎么保存?(主要是针对多个文件上传的情况,文件一有多个切片、文件二有多个切片,怎么分清楚这些切片隶属于谁)

    利用文件的 hash 值作为该文件所有切片的文件夹。

大文件切片上传、秒传、断点续传代码实现 比如文件一的 hash 值为 18bbdc5f4b5d1fc7f62045e16a37dcb6 ,则以它为文件一所有切片的文件夹的名称,之后该文件的所有切片都放置该文件夹下

  1. 为什么是 hash 值?

    先思考一个问题,在向服务器上传文件时,怎么去区分不同的文件呢?如果根据文件名去区分的话可以吗?

    答案是不可以,因为文件名我们可以是随便修改的,所以不能根据文件名去区分。但是每一份文件的文件内容都不一样,我们可以根据文件的内容去区分,具体怎么做呢?

    答案是文件的 hash 值。文件 hash 值是依据文件内容产生的唯一值,且文件内容变化对应的 hash 值也会变化。

    因为我们需要找到文件的唯一标识作为文件夹名称,这样才能区分所上传的不同的文件

  2. 一个文件被切成一片一片的,之后怎么完整的显示整个文件的内容?也就是合并时如何按照正确的顺序合并?

    这里就要讲到一个切片命名时的技巧了。前端在进行文件的切片之后,对每一个切片的命名可以带上索引值。后续服务端在进行切片合并时,就可以根据索引值将所有切片正确拼接。

  3. 什么是秒传?真的是一秒上传所有吗?

    显然不是。秒传的实现其实是判断服务端是否已经又该文件的信息,有,则通知无需上传,这就是所谓的秒传

  4. 什么是断点续传?

    断点续传其实就是:对于一个大文件而言,它的切片可能有很多个(取决于每个切片的大小和源文件的大小)但这个文件上传一半,已经上传了部分切片了,那么下一次再次选择上传这个文件时,就可以不用上传已上传的切片。这就是断点续传

切片上传:

前端实现:

前置准备:

import sparkMD5 from 'spark-md5'

// 文件名
const fileName = ref('')
// 每块切片的大小(1KB)
const CHUNK_SIZE = 1024 * 8
// 文件上传
const uploadFile = (e) => {
	// 拿到文件信息
	const file = e.target.files[0]
	fileName.value = file.name
	
	// 对文件进行切片
	const fileChunkList = createFileChunks(file)
	
	// 计算文件 hash 值
	calculateHash(fileChunkList)
	
	// 上传切片
	uploadChunks(fileChunkList, fileChunkList)
}

/**
 * 文件切片处理
 * @param file 文件对象( name: 文件名; size: 文件大小; ... )
 */
const createFileChunks = (file) => {
	// 文件切片数组
  const fileChunkList = []
  // 当前已经切片的大小
  let cur = 0
  while (cur < file.size) {
    fileChunkList.push({
      file: file.slice(cur, cur + CHUNK_SIZE),
    })
    cur += CHUNK_SIZE
  }
  return fileChunkList
}

/**
 * 计算文件的Hash值
 * @param fileChunks 文件的切片数组
 */
const calculateHash = async (fileChunks) => {
  return new Promise((resolve) => {
    const spark = new sparkMD5.ArrayBuffer()
    const chunks = []

		// 待计算 hash 的切片
    fileChunks.forEach((chunk, index) => {
      if (index === 0 || index === fileChunks.length - 1) {
        // 1. 第一个和最后一个切片的内容全部参与计算
        chunks.push(chunk.file)
      } else {
        // 2. 中间剩余的切片我们分别在前面、后面和中间取2个字节参与计算
        // 前面的2字节
        chunks.push(chunk.file.slice(0, 2))
        // 中间的2字节
        chunks.push(chunk.file.slice(CHUNK_SIZE / 2, CHUNK_SIZE / 2 + 2))
        // 后面的2字节
        chunks.push(chunk.file.slice(CHUNK_SIZE - 2, CHUNK_SIZE))
      }
    })

    // console.log('chunks存放的信息', chunks)

    // 借用浏览器的FileReader() API
    const reader = new FileReader()
    reader.readAsArrayBuffer(new Blob(chunks))
    reader.onload = (e) => {
      console.log('FileReader解析文件信息', e)
      // 计算Hash
      spark.append(e.target.result)
      // end()方法输出计算结果
      resolve(spark.end())
    }
  })
}

// ----------------------------

<input type="file" @change="uploadFile" />
  • 文件 hash 值计算问题:

    计算文件 hash 值时,使用的是 spark-md5 这个三方包。同时,有必要计算一整个文件的 hash 值吗?显然是没必要的。因为当一个文件真的很大的时候,计算一个完整的文件的 hash 值也很耗时间。所以这里采取的方案是:

    1. 第一个和最后一个切片的内容全部参与计算
    2. 中间剩余的切片分别在前面、后面和中间取 2 个字节参与计算

这样既能保证所有的切片都进行计算,还能保证计算时间 有关FileReader的问题,请移步传送门

上传所有切片:

一次性上传所有切片可以吗?不可以。eg:文件总大小是 1G,每个切片大小是 1MB,就意味着同时上传 1024 个切片。浏览器肯定处理不了,原因是切片文件过多,浏览器一次性创建了太多的请求。这是没有必要的,拿 chrome 浏览器来说,默认的并发数量只有 6,过多的请求并不会提升上传速度,反而是给浏览器带来了巨大的负担。因此,我们有必要限制前端请求个数。可以用一个请求池(也就是请求队列)

上传文件时一般还要用到 formData对象,需要将我们要传递的文件还有额外信息放到这个 FormData对象里面。比如我们会存放文件哈希值(用于创建对应的切片文件夹)、每个切片的名称(用于后续切片的合并)、每个切片的文件对象…

/**
 * 上传切片
 * @param fileChunks 切片数组 [{ file: Blob }, { file: Blob }]
 * @param fileHash 整个文件的hash值
 */
const uploadChunks = async (fileChunks, fileHash) => {
  
  const data = fileChunks
    .map(({ file }, index) => ({
      fileHash: fileHash,
      index,
      chunkHash: `${fileHash}-${index}`,   // 每个切片的名称,大文件哈希值-索引(方便后续合并)
      chunk: file,
      size: file.size,
    }))

  const formDatas = data.map(({ chunk, chunkHash }) => {
    const formData = new FormData()
    // 切片文件对象
    formData.append('chunk', chunk)
    // 切片文件名称
    formData.append('chunkHash', chunkHash)
    // 大文件的文件名
    formData.append('fileName', fileName.value)
    // 大文件hash
    formData.append('fileHash', fileHash)
    return formData
  })

  let index = 0
  const max = 6 // 并发请求数量
  const taskPool = [] // 请求队列

	// 请求池
  while (index < formDatas.length) {
	  // 此处是做了代理,所以端口号写的是5173
    const task = fetch('<http://localhost:5173/adminapi/upload>', {
      method: 'POST',
      body: formDatas[index],
    })

    task.then(() => {
      taskPool.splice(taskPool.findIndex((item) => item === task))
    })
    taskPool.push(task)
    if (taskPool.length === max) {
      // 当请求队列中的请求数达到最大并行请求数的时候,得等之前的请求完成再循环下一个
      await Promise.race(taskPool)
    }
    index++
  }

  await Promise.all(taskPool)
}

后端实现:

后端要解析上传的文件信息,此处使用的是multiparty

// 引入fs模块(用于存放对应的文件切片)
var fs = require('fs')

// -----------------------切片上传---------------------
// 处理文件上传
var multiparty = require('multiparty')
// 所有上传的文件存放到uploads目录下
const UPLOAD_DIR = path.resolve(__dirname, 'uploads')

// 处理上传的分片
app.post('/adminapi/upload', async (req, res) => {
  const form = new multiparty.Form()
	// 解析文件信息
  form.parse(req, async function (err, fields, files) {
    if (err) {
      res.status(401).json({
        ok: false,
        msg: '上传失败',
      })
    }

    const chunkHash = fields['chunkHash'][0] // 当前切片的名称(也就是 哈希值-索引值)
    const fileName = fields['fileName'][0]   // 大文件的名称
    const fileHash = fields['fileHash'][0]   // 整个文件的hash值

    // 存储切片的临时文件夹(以文件的hash为文件夹放在uploads文件夹下,该文件的所有切片都放在对应的hash文件夹下)
    const chunkDir = path.resolve(UPLOAD_DIR, fileHash)

    // 切片目录不存在,则创建切片目录
    if (!fs.existsSync(chunkDir)) {
      // 异步创建文件夹
      await fs.promises.mkdir(chunkDir, { recursive: true }, (error) => {
        if (error) {
          console.log(error)
        } else {
          console.log('目录创建成功')
        }
      })
    }
    
		// 文件的源路径
    const oldPath = files.chunk[0].path
    // 文件待上传的路径
    const newPath = path.resolve(chunkDir, chunkHash)
    // 把文件切片移动到我们的切片文件夹中
    try {
      await fs.promises.access(oldPath) // 检查源文件是否存在
      console.log('源文件存在:', oldPath)
      await fs.promises.access(chunkDir) // 检查目标目录是否存在
      console.log('目标目录存在:', chunkDir)

      // 把文件切片移动到我们的切片文件夹中(可能由于项目路径存在中文,导致文件移动失败)
      // await fs.promises.rename(oldPath, newPath)
      // console.log('文件移动成功:', newPath)

      // 复制文件
      await fs.promises.copyFile(oldPath, newPath)
      console.log('文件复制成功:', newPath)

      // 删除原文件
      await fs.promises.unlink(oldPath)
      console.log('原文件删除成功:', oldPath)
    } catch (error) {
      console.log('移动文件失败:', error)
    }

    // // 上传成功
    res.status(200).json({
      ok: true,
      msg: 'received file chunk',
    })
    console.log('大文件切片上传成功')
  })
})

踩坑记录:这里有关目录的相关操作都使用了 await 异步等待,所以在进行相关操作时应该是调用 fs 模块的 promises 下的相关方法

切片合并:

大文件的所有切片上传之后,需要将所有切片进行合并,得到一个完整的文件内容。这里我们需要前端调用合并接口,进行文件的合并

大文件切片上传、秒传、断点续传代码实现

前端实现:

/**
 * 文件切片的合并
 */
const mergeFile = () => {
  // 发送合并请求
  fetch('<http://127.0.0.1:5173/adminapi/merge>', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      size: CHUNK_SIZE,
      fileHash: fileHash.value,
      fileName: fileName.value,
    }),
  })
    .then((response) => response.json())
    .then(() => {
      alert('上传成功')
    })
}

// ----------------------------------------------------------
 <!-- 文件切片合并 -->
 <button @click="mergeFile">合并</button>

前端需要传递过去文件的 hash 值,这样服务端才能知道是要合并哪个文件夹下的所有切片(因为是按照 hash 值进行文件夹的命名的)传过去文件名主要是要提取文件的后缀

后端实现:

// 提取文件后缀名
const extractExt = (filename) => {
  return filename.slice(filename.lastIndexOf('.'), filename.length)
}

/**
 * 读的内容写到writeStream中
 */
const pipeStream = (path, writeStream) => {
  return new Promise((resolve, reject) => {
    // 创建可读流
    const readStream = fs.createReadStream(path)
    readStream.on('end', async () => {
      // 删除源文件
      fs.unlinkSync(path)
      resolve()
    })
    // 将可读流连接到可写流
    readStream.pipe(writeStream)
  })
}

/**
 * 合并文件夹中的切片,生成一个完整的文件
 */
async function mergeFileChunk(filePath, fileHash, size) {
  // 找到该文件的切片存放的文件夹的路径
  const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
  // 读取目录下的所有子目录,也就是读取该文件的所有切片
  const chunkPaths = await fs.promises.readdir(chunkDir)
  // 根据切片下标进行排序
  // 否则直接读取目录的获得的顺序可能会错乱
  chunkPaths.sort((a, b) => {
    return a.split('-')[1] - b.split('-')[1]
  })

  const list = chunkPaths.map((chunkPath, index) => {
    return pipeStream(
      path.resolve(chunkDir, chunkPath),
      fs.createWriteStream(filePath, {
        start: index * size,
        end: (index + 1) * size,
      })
    )
  })

  await Promise.all(list)
  // 文件合并后删除保存切片的目录
  fs.rmdirSync(chunkDir)
}

/**
 * 合并文件
 */
app.post('/adminapi/merge', express.json(), async (req, res) => {
  console.log('请求体信息', req.body)
  const { fileHash, fileName, size } = req.body
  // 找到待合并文件的切片文件夹的位置
  const filePath = path.resolve(
    UPLOAD_DIR,
    `${fileHash}${extractExt(fileName)}`
  )
  // 如果已合并的大文件已经存在,则直接返回
  if (fs.existsSync(filePath)) {
    res.status(200).json({
      ok: true,
      msg: '合并成功',
    })
    console.log('大文件已合并')
    return
  }
  // 如果已合并的大文件不存在,则找到它的切片的文件夹
  const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
  // 切片目录不存在,则无法合并切片,报异常
  if (!fs.existsSync(chunkDir)) {
    res.status(200).json({
      ok: false,
      msg: '合并失败,请重新上传',
    })
    console.log('切片目录不存在')
    return
  }
  await mergeFileChunk(filePath, fileHash, size)
  res.status(200).json({
    ok: true,
    msg: '合并成功',
  })
})

踩坑记录:Express 是不能从 req.body 中读取传递过来的 JSON 数据的,所以需要使用一下中间价(前面前端传来的数据 body 是 JSON.stringify)

前面我们强调,切片的命名要 hash值-索引值,它的作用在这里就可以显现了。在 mergeFileChunk 中,我们就需要根据这个索引值先对所有的切片进行排序,然后再进行读写流操作


做到这里,正常的话我们其实就可以进行正常的文件切片上传和合并了

上传一个大一点的文件:

大文件切片上传、秒传、断点续传代码实现

大文件切片上传、秒传、断点续传代码实现

大文件切片上传、秒传、断点续传代码实现


但是,仍然有一些细节没有处理。比如:如果上传相同的文件的情况;而且如果中间网络断了,我们就得重新上传所有的分片,这些情况在大文件上传中也都需要考虑到。也就是接下来要介绍的文件秒传、断点续传

秒传:

秒传很简单,就是在真正的上传切片之前,判断一下这个文件是不是已经上传过就可以了。用什么来判断呢?还是 hash 值。前端在上传切片之前,调用相关接口,传递待上传文件的 hash 给后端,后端根据这个 hash 值判断这个文件是否已经存在。(因为前面讲过,我们用文件 hash 值作为文件名称)

前端实现:

/**
 * 文件上传
 */
const uploadFile = (e) => {
	// 拿到文件信息
	const file = e.target.files[0]
	fileName.value = file.name
	
	// 对文件进行切片
	const fileChunkList = createFileChunks(file)
	
	// 计算文件 hash 值
	calculateHash(fileChunkList)
	
	// 上传切片之前先判断
	const {data} = await verifyUpload()
	if(data.shouldUpload){
		uploadChunks(fileChunkList, fileChunkList)
	}else{
		alert('秒传成功')
	}
}

/**
 * 切片上传之前,判断该文件是否已经上传
 */
const verifyUpload = async () => {
  return fetch('<http://localhost:5173/adminapi/verify>', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      fileName: fileName.value,
      fileHash: fileHash.value,
    }),
  })
    .then((response) => response.json())
    .then((data) => {
      return data // data中包含对应的表示服务器上有没有该文件的查询结果
    })
}

data.shouldUpload/adminapi/verify 接口给的,也就是该文件是否已存在。当然可以使用其他字段,只要前后协调好即可

后端实现:

// --------------------------文件秒传--------------------------------
app.post('/adminapi/verify', express.json(), async (req, res) => {
  const { fileName, fileHash } = req.body
  console.log('秒传传递的参数:', fileName, fileHash)
  // 提取待上传文件的路径
  const filePath = path.resolve(
    UPLOAD_DIR,
    `${fileHash}${extractExt(fileName)}`
  )
  console.log('待上传文件解析之后的路径:', filePath)

  // 对待上传文件是否存在进行处理
  if (fs.existsSync(filePath)) {
    // 文件存在服务器中,不需要再上传了
    res.status(200).json({
      ok: true,
      data: {
        shouldUpload: false,
        msg: '文件已存在',
      },
    })
  } else {
    // 文件不在服务器中,就需要上传
    res.status(200).json({
      ok: true,
      data: {
        shouldUpload: true
      },
    })
  }
})

同样,由于前端传来的是JSON ,所以需要先使用一下中间件

断点续传:

断点续传其实就是一个文件在上传过程中,如果由于意外中断,那么下一次再上传时,已上传的切片就可以不用重复上传

和秒传有区别:秒传是不用重复上传文件,断点续传是不用重复上传切片

所以断点续传其实还是在/adminapi/verify 这里动手脚。后端如果判断出该文件已上传,则返回前端信息说无需再上传。但如果该文件还没有上传,后端除了通知前端要上传该文件外,还要给前端该文件已经上传的切片。大致流程:

大文件切片上传、秒传、断点续传代码实现

前端实现:

/**
 * 文件上传
 */
const uploadFile = (e) => {
	// 拿到文件信息
	const file = e.target.files[0]
	fileName.value = file.name
	
	// 对文件进行切片
	const fileChunkList = createFileChunks(file)
	
	// 计算文件 hash 值
	calculateHash(fileChunkList)
	
	// 上传切片之前先判断
	// 后端除了给前端是否要上传的信息,还会给该文件已上传的切片数组
	const {data} = await verifyUpload()
	if(data.shouldUpload){
		uploadChunks(fileChunkList, fileChunkList, data.uploadedList)
	}else{
		alert('秒传成功')
	}
}

/**
 * 上传切片
 * @param fileChunks 切片数组 [{ file: Blob }, { file: Blob }]
 * @param fileHash 整个文件的hash值
 * @param uploadedList 文件已上传的切片数组
 */
const uploadChunks = async (fileChunks, fileHash, uploadedList) => {
  // 先对文件的切片数组做一层过滤(找出所有切片中所有未上传的切片)
  const data = fileChunks
    .filter((chunk, index) => {
      return !uploadedList.includes(`${fileHash}-${index}`)
    })
    .map(({ file }, index) => ({
      fileHash: fileHash,
      index,
      chunkHash: `${fileHash}-${index}`,
      chunk: file,
      size: file.size,
    }))

  const formDatas = data.map(({ chunk, chunkHash }) => {
    const formData = new FormData()
    // 切片文件
    formData.append('chunk', chunk)
    // 切片文件hash
    formData.append('chunkHash', chunkHash)
    // 大文件的文件名
    formData.append('fileName', fileName.value)
    // 大文件hash
    formData.append('fileHash', fileHash)
    return formData
  })
	
	......
}

后端实现:

/**
 * 读取一个文件下的所有切片
 * @param {*} fileHash 当前文件的hash值
 * @returns 该文件的切片数组
 */
const createUploadedList = async (fileHash) => {
  const res = fs.existsSync(path.resolve(UPLOAD_DIR, fileHash))
    ? await fs.promises.readdir(path.resolve(UPLOAD_DIR, fileHash))
    : []
    
  return res
}

app.post('/adminapi/verify', express.json(), async (req, res) => {
  const { fileName, fileHash } = req.body
  console.log('秒传传递的参数:', fileName, fileHash)
  // 提取待上传文件的路径(虽然我们是以文件hash值作为uploads下的子文件夹,没有后缀,但是仍然可以调用一下这个解析后缀名的方法)
  const filePath = path.resolve(
    UPLOAD_DIR,
    `${fileHash}${extractExt(fileName)}`
  )
  console.log('待上传文件解析之后的路径:', filePath)

  // 对待上传文件是否存在进行处理
  if (fs.existsSync(filePath)) {
    // 文件存在服务器中,不需要再上传了
    res.status(200).json({
      ok: true,
      data: {
        shouldUpload: false,
        msg: '文件已存在',
      },
    })
  } else {
    const uploadedList =await createUploadedList(fileHash)
    // 文件不在服务器中,就需要上传(需要上传的话同时返回该文件已经有的切片数组,方便前端上传之前做过滤,防止已上传的切片重复上传)
    res.status(200).json({
      ok: true,
      data: {
        shouldUpload: true,
        uploadedList: uploadedList
      },
    })
  }
})

踩坑记录:如果说某个文件真的需要上传的话,我们会传过去一个uploadedList ,而在获取uploadedList 是异步操作,应该先await一下它

后言:

以上就是文章全部内容,有错误欢迎大家交流指正!!!


最后更新时间:2024-07-29

转载自:https://juejin.cn/post/7396932646591053836
评论
请登录