likes
comments
collection
share

基于nodejs的minio的sdk,自建服务中转文件的断点续传方案

作者站长头像
站长
· 阅读数 34

背景

  • 公司业务问题,就不过多阐述,总的需求就是需要一个断点续传的功能
  • 公司的文件存储是使用的amazon s3,这个是通用的文件存储,类似阿里云的oss,一个文件可以当成是一个对象,可以给文件对象做一些操作,amazon s3是一个服务,可以创建一个一个桶用于存储文件,对于某个桶,有三种级别的权限:1、公有读,公有写;2、公有读,私有写;3、私有读,私有写
  • 为什么这个功能需要前端来做?因为后端只提供数据接口,不提供web服务,web服务都是前端自己开发和解决
  • 如果你不了解amazon s3或者阿里云的oss,那可能文章看的会有困难

简单文件上传

amazon的s3,文件直接上传的话有这几种方案,分为:前端直接上传和使用sdk上传

一、前端直接上传,分公有写和私有写

​ 公有写:可以直接拼接url:http://\<address\>/\<bucketname\>/\<objectname\>,在前端直接将文件发起put请求这个url即可,其中address是服务提供的域名,bucketname是我们的桶名称,objectname是对象名(即文件名)

​ 私有写:必须要用到minio的sdk,使用sdk提供的presignedPutObject方法生成一个上传地址,前端拿到链接后,也是在前端直接发起put请求即可,要注意,sdk只能运行在服务端,也就是必须要有自己的服务

二、使用sdk上传,sdk只能运行在服务端,流程基本上是,前端上传文件到服务端,服务端使用sdk将文件上传到S3的文件服务上

这两种方案,除了公有写的情况,其他的都需要有自建的服务端,而且从安全的角度上来说,设置公有写是不合理的

断点续传方案设计

要做这个断点续传的功能,是必须要有自己的服务的,和大多数人一样,做这个断点续传的功能的时候,我也是去网上搜别人的做法,然后找到合适的方法后,结合我们的项目,我设计了这样一个方案:

  1. 用户选择文件后,前端使用file.slice对文件进行分片
  2. 计算文件hash,也就是文件的md5,只要文件内容不变,文件的hash是不会变的,计算hash是一个同步任务,文件太大的话会把浏览器卡住,我使用的是spark-md5+浏览器的requestIdleCallback的api来解决的,可以根据自己项目的情况使用webworker也是可以的
  3. 使用hash查询是否已有对应的文件,这个可以自己有自己的数据库,存储hash对应的文件链接,如果是用hash作为文件名的话,也可以调用minio的sdk的statObject方法知道这个文件是否已上传,如果已经有了文件信息,就可以不用上传文件,直接拿到链接即可,这个就是秒传的功能
  4. 如果未上传,则读取服务器本地,以这个hash命名的文件夹下已经上传了哪些分片,将已上传的分片返回给前端,前端选择性的上传未上传的分片即可,这个就是断点续传的功能
  5. 前端上传分片的时候,会将文件的hash、第几个分片等这些信息作为参数给到服务端,服务端拿到文件后,会以hash命名文件夹,以第几个分片命名该分片,将分片存储在服务端
  6. 所有分片上传完成后,在服务端对文件进行合并,再调用minio的putObject方法将文件上传到S3文件服务器,上传完成后,将服务器本地的文件删除

整个断点续传功能是完成了,不过这个方案有个不完美的地方,就是分片文件要在服务端存储和合并,合并后才上传到文件服务上,据我所知,标准的S3本身是有断点续传和合并文件的功能的,可不可以直接上传分片的时候,将分片上传到S3文件服务器上,分片都上传完成后,直接在S3文件服务器合并文件?答案在后面,不过当时确实没有找到可以用的方案,唯一找到最接近的方案是百度的智能云提供的api:https://cloud.baidu.com/doc/B... 但是minio的sdk不提供uploadPart方法,这个方法也行不通,所以只能先作罢

问题

上面的方案有一个致命的问题没有考虑到,就是线上是有多台机器的,这些分片会上传到不同的机器,合并文件的时候是没法合并的,导致上面设计的方案都不能用了,发现这个问题还是因为在解决一个合并文件的时候发现有分片未找到的问题的时候才考虑到的,所以要重新考虑新的方案,着重点还是上传的分片怎么直接上传到S3文件服务器上,在S3文件服务器上合并

为了解决这个方案,我看了minio的源码,看了putObject的源码后,了解到putObject的核心流程如下:

  1. 使用block-stream2将文件进行分块
  2. 使用objectName调用findUploadId查询uploadId,如果没有uploadId,会调用initiateNewMultipartUpload初始化一个uploadId,通过自己的一些测试可以得到一些信息:

    2.1 每次调用initiateNewMultipartUpload返回的uploadId都不一样

    2.2 findUploadId会返回最新的uploadId

    2.3 通过查找别的信息得知uploadId有7天的有效期

  3. 调用listParts获得已上传的part
  4. 组合参数,调用makeRequest上传分片
  5. 调用completeMultipartUpload完成分片上传,这个方法会将所有分片合并,并返回合并后的文件etag

新方案

通过看putObject方法的源码,我们把我们现有的方案做一些修改

  1. 用户选择文件后,对文件进行分片
  2. 计算文件hash,和之前一致,以文件hash作为新的文件名,或者加上固定的前缀,必须以固定的规则命名,同一个文件最好不要名字不一致,因为minio的服务端名字是唯一的key
  3. 用文件名检索是否已存在文件,主要是调用的minio的statObject方法,如果未存在,以文件名获取uploadId,再用uploadId获取已上传的分片(此处区别于之前,因为分片文件不存在服务器本地,所以分片信息要存入数据库,其实还可以调用sdk的listParts方法获取到已上传的分片,但是调用listParts返回的信息没有带上期望得到的partNumber参数,可能是公司搭建的S3服务的原因,所以分片只能入库
  4. 前端拿到已上传信息后,和之前处理一致,如果已存在文件,则不上传,否则计算需要上传分片然后上传
  5. 自行开发一个uploadPart的方法,服务端接收到分片后,拿到分片文件的ArrayBuffer,拿到uploadId,拼装参数,调用sdk的makeRequest方法将分片上传到S3文件服务器服务器,上传完成后删除文件分片,将上传的分片信息入库
  6. 前端接受到所有分片都上传完成后,调用合并文件接口,服务端合并文件,调用sdk的completeMultipartUpload方法,会将在S3文件服务器服务器上的分片都合并

到此新的方案就完成了,下面贴上一些代码

前端:

文件分片:

function createChunks(file, size = SINGLECHUNKSIZE) {
    let cur = 0,
        index = 1;
    const chunks = [];
    while (cur < file.size) {
        chunks.push({
            start: cur, // 文件开始位置的字节
            file: file.slice(cur, cur + size), // 分片文件
            hash: "", // 文件hash
            progress: 0, // 上传进度
            uploaded: false, // 是否已上传
            index: index, // 第几个分片
        });
        index++;
        cur += size;
    }

    return chunks;
}

计算文件hash:

const md5ByRequestIdle = (chunks, { onProgress }) => {
    return new Promise((resolve) => {
        const spark = new SparkMD5.ArrayBuffer();
        let count = 0;
        const workLoop = async () => {
            if (count < chunks.length) {
                const reader = new FileReader();
                reader.onload = e => {
                    const add = (deadline) => {
                        if (deadline.timeRemaining() > 1) {
                            spark.append(e.target.result);
                            count++;
                            const progress = parseInt((count / chunks.length) * 100) / 100;
                            if (count < chunks.length) {
                                onProgress && onProgress(progress);
                                window.requestIdleCallback(workLoop);
                            } else {
                                onProgress && onProgress(1);
                                resolve(spark.end());
                            }
                        } else {
                            window.requestIdleCallback(add);
                        }
                    }
                    window.requestIdleCallback(add)
                }
                reader.readAsArrayBuffer(chunks[count].file);
            } else {
                resolve(spark.end());
            }
        }
        window.requestIdleCallback(workLoop);
    });
}

服务端:

上传文件分片:

async uploadPart(file, index, filename, uploadId?) {
    const part = Number(index);
    if (!uploadId) {
      uploadId = await this.getUploadIdByFilename(filename)
    }
    const curList = await this.ctx.model.etagCenter.findBy({
      filename,
      part,
      uploadId,
    })
    if (curList.length > 0) {
      return true
    }
    const client = new Client({
      endPoint: this.config.S3文件服务器v3.endPoint,
      accessKey: this.config.S3文件服务器v3.accessKey,
      secretKey: this.config.S3文件服务器v3.secretKey,
    })
    const chunk = await fse.readFile(file.filepath)
    const query = querystring.stringify({
      partNumber: part,
      uploadId,
    })
    const options = {
      method: 'PUT',
      query,
      headers: {
        'Content-Length': chunk.length,
        'Content-Type': mime.lookup(filename),
      },
      bucketName: this.config.S3文件服务器v3.bucketName,
      objectName: filename,
    }
    const etag = await new Promise((resolve, reject) => {
      client.makeRequest(options, chunk, [200], '', true, function(
        err,
        response,
      ) {
        if (err) return reject(err) // In order to aggregate the parts together, we need to collect the etags.

        let etag = response.headers.etag
        if (etag) {
          etag = etag.replace(/^"/, '').replace(/"$/, '')
        }
        fse.unlink(file.filepath)
        resolve(etag)
      })
    })
    const insertResult = await this.ctx.model.etagCenter.add({
      filename,
      etag,
      part,
      uploadId,
    })
    return insertResult.insertedCount > 0
}

方案和代码还有很多需要改进的地方,也还在思考

后续优化

现在分片文件需要先上传到我们自己的服务,然后才上传到S3文件服务器,中间会耗费一些时间和效率,后续期望能直接在前端将分片文件直接上传到S3文件服务器上,当然会在保证安全的前提下,不过目前没有时间考虑这个,如果行得通的话后续的优化会朝这个方向来