Go实现多线程分片下载文件我们在下载大文件时,通常会使用多线程下载的方式来加快下载速度。一些常用的多线程下载工具,都是通
我们在下载大文件时,通常会使用多线程下载的方式来加快下载速度。例如常用的多线程下载工具(Gopeed、Aria2、XDM等等),都是通过多线程下载技术充分利用了网络带宽,以提高下载速度。
那么多线程下载是怎么实现的呢?多个线程发送网络请求,是怎么做到同时下载一个文件呢?事实上,借助HTTP协议中的一些机制就可以实现了!
今天我们就通过使用Go语言为例,从了解HTTP请求相关的一些机制开始,实现一个多线程下载的示例。
1,多线程下载原理
事实上,多线程下载的原理很简单,主要的步骤如下:
- 获取待下载文件大小
- 每个线程下载文件的一部分
- 全部下载完成后,拼接为完整文件
实现这些步骤,就涉及到HTTP协议的下列相关机制。
(1) HEAD
请求 - 只获取请求头
我们通常发送HTTP请求大多数是GET
或者POST
类型,发送请求后我们会立即获取响应体,浏览器则会根据响应体的类型来处理内容,例如返回的是text/html
就会作为网页显示,返回image/png
就会解码为图片等等,响应体的类型由响应头Content-Type
标识。当我们下载文件时,事实上也是发送HTTP请求,只不过服务器返回的响应体就是文件本身了!其类型则是application/octet-stream
,浏览器也知道这是个文件需要下载。
当然,文件作为响应体通常比起网页、图片要大得多,在多线程下载时,我们就要先获取文件的大小,而不是立即获取文件本身,这时我们就可以向服务器发起HEAD
请求而不是GET
请求。
服务器收到HEAD
请求后,就只会返回对应的响应头,而不会返回响应体,这样我们就可以在下载文件之前,读取响应头中的Content-Length
来先获取待下载文件大小。
(2) Range
请求头 - 只获取部分响应体
知道了文件大小,我们就需要让每个线程只下载一部分文件,借助HTTP的Range
请求头,就可以实现只让服务端返回响应体内容的一部分,而不是返回完整的响应体。
这里我们先来借助书籍《图解HTTP》中对Range
请求头的讲解,来学习一下:
可见当我们发送一个请求获取内容时,如果指定了如下请求头:
Range: bytes=5001-10000
那么服务端就只会返回响应体的第5001
到第10000
字节的内容部分,包含第5001
和第10000
字节,0
表示响应体的第一个字节。
这样,在多个线程同时下载文件时,我们在每个线程的请求中使用Range
请求头,就可以实现一个线程只下载文件的一部分了!
2,Go代码实现
知道了HTTP的上述几个机制,相信大家就知道如何实现一个简单的多线程下载了!我们可以总结主要步骤如下:
- 发送
HEAD
类型请求,通过Content-Length
请求头获取待下载文件大小 - 根据给定的线程数量,结合待下载文件大小,确定每个线程下载的范围部分,也就是每个线程的
Range
请求头字节范围 - 启动所有线程,使得每个线程下载它们对应的部分文件,并等待全部线程下载完成
- 合并每个线程下载的部分为最终文件
- 清理每个线程下载的文件部分
这里分别设计下列类(结构体),用于存放多线程下载时的传入参数和状态量:
上述ShardTask
类表示一个线程的下载任务,其中会完成一个分片(文件的一部分)的下载请求操作,它有如下作为参数的属性:
Url
下载的文件地址Order
分片序号ShardFilePath
这个分片文件的保存路径RangeStart
和RangeEnd
下载的文件起始范围和结束范围,用于设定Range
请求头
此外,还有作为下载状态的属性:
DownloadSize
下载任务进行时,这个线程已下载的文件部分大小TaskDone
这个线程的下载任务是否完成
该类的成员方法如下:
DoShardGet
执行分片下载任务,在其中会根据RangeStart
和RangeEnd
设定对应的HTTP请求头,发送请求并下载对应的文件部分
然后就是ParallelGetTask
类,表示一整个多线程下载任务,其中包含了一个多线程下载任务的参数和状态量,并且实现了多线程下载的每个步骤,它有如下作为参数的属性:
Url
文件的下载链接FilePath
文件下载完成后的保存位置Concurrent
下载并发数,即同时下载的分片数量TempFolder
临时分片文件的保存文件夹
此外还有作为状态的属性:
TotalSize
待下载文件的总大小ShardTaskList
存储所有分片任务对象指针的列表
该类中的方法主要是分片下载的一些步骤如下:
getLength
发送HEAD
请求获取Content-Length
以获取文件大小,获取后将其设定到TotalSize
属性allocateTask
根据给定的线程数和获取到的文件大小,计算每个线程下载的文件内容范围,并创建对应的ShardTask
结构体放入ShardTaskList
中downloadShard
为每一个ShardTask
对象创建一个线程(Goroutine)并在新的线程中调用ShardTask
对象的下载分片方法,以启动所有线程的下载任务,并通过sync.WaitGroup
来等待全部线程完成mergeFile
下载完成后,合并每个分片为最终文件cleanShard
合并完成后,清理下载的每个分片文件printTotalProcess
这是一个附加的辅助方法,用于实时输出下载进度Run
启动整个多线程下载任务,该函数是暴露的公开函数,其中对上述每个步骤函数进行了组织,按顺序调用执行
下面,我们来看一下它们的代码实现。
(1) ShardTask
- 一个线程的下载任务
package model
import (
"bufio"
"fmt"
"github.com/fatih/color"
"io"
"net/http"
"os"
"sync"
)
// ShardTask 单个分片下载任务的任务参数和状态量
type ShardTask struct {
// 下载链接
Url string
// 分片序号,从1开始
Order int
// 这个分片文件的路径
ShardFilePath string
// 分片的起始范围(字节,包含)
RangeStart int64
// 分片的结束范围(字节,包含)
RangeEnd int64
// 已下载的部分(字节)
DownloadSize int64
// 该任务是否完成
TaskDone bool
}
// NewShardTask 构造函数
func NewShardTask(url string, order int, shardFilePath string, rangeStart int64, rangeEnd int64) *ShardTask {
return &ShardTask{
// 设定任务参数
Url: url,
Order: order,
ShardFilePath: shardFilePath,
RangeStart: rangeStart,
RangeEnd: rangeEnd,
// 初始化状态量
DownloadSize: 0,
TaskDone: false,
}
}
// DoShardGet 开始下载这个分片(该方法在goroutine中执行)
func (task *ShardTask) DoShardGet(waitGroup *sync.WaitGroup) {
// 创建文件
file, e := os.OpenFile(task.ShardFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0755)
if e != nil {
color.Red("任务%d创建文件失败!", task.Order)
color.HiRed("%s", e)
return
}
// 准备请求
request, e := http.NewRequest("GET", task.Url, nil)
if e != nil {
color.Red("任务%d创建请求出错!", task.Order)
color.HiRed("%s", e)
return
}
// 设定请求头
request.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", task.RangeStart, task.RangeEnd))
// 发送请求
client := http.DefaultClient
response, e := client.Do(request)
if e != nil {
color.Red("任务%d发送下载请求出错!", task.Order)
color.HiRed("%s", e)
return
}
// 读取请求体
body := response.Body
// 读取缓冲区
buffer := make([]byte, 8092)
// 准备写入文件
writer := bufio.NewWriter(file)
for {
// 读取一次内容至缓冲区
readSize, readError := body.Read(buffer)
if readError != nil {
// 如果读取完毕则退出循环
if readError == io.EOF {
break
} else {
color.Red("任务%d读取响应错误!", task.Order)
color.HiRed("%s", readError)
return
}
}
// 把缓冲区内容追加至文件
_, writeError := writer.Write(buffer[0:readSize])
if writeError != nil {
color.Red("任务%d写入文件时出现错误!", task.Order)
color.HiRed("%s", writeError)
return
}
_ = writer.Flush()
// 记录下载进度
task.DownloadSize += int64(readSize)
}
// 关闭全部资源
_ = body.Close()
_ = file.Close()
// 标记任务完成
task.TaskDone = true
// 使线程组中计数器-1
waitGroup.Done()
}
构造函数NewShardTask
负责完成ShardTask
的参数传入和状态量初始化,而DoShardGet
方法实现了下载一个文件分片的完整步骤,从创建文件准备写入,到设定请求头,发出请求,最后读取响应体保存到文件。
(2) ParallelGetTask
- 一整个多线程下载任务
package model
import (
"bufio"
"fmt"
"gitee.com/swsk33/shard-download-demo/util"
"github.com/fatih/color"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"sync"
"time"
)
// ParallelGetTask 多线程下载任务类,存放一个多线程下载任务的参数和状态量
type ParallelGetTask struct {
// 文件的下载链接
Url string
// 文件的最终保存位置
FilePath string
// 下载并发数
Concurrent int
// 下载的分片临时文件保存文件夹
TempFolder string
// 下载文件的总大小
TotalSize int64
// 全部的下载分片任务参数列表
ShardTaskList []*ShardTask
}
// NewParallelGetTask 构造函数
func NewParallelGetTask(url string, filePath string, concurrent int, tempFolder string) *ParallelGetTask {
return &ParallelGetTask{
// 参数赋值
Url: url,
FilePath: filePath,
Concurrent: concurrent,
TempFolder: tempFolder,
// 初始化状态量
TotalSize: 0,
ShardTaskList: make([]*ShardTask, 0),
}
}
// 发送HEAD请求获取待下载文件的大小
func (task *ParallelGetTask) getLength() error {
// 发送请求
response, e := http.Head(task.Url)
if e != nil {
color.Red("发送HEAD请求出错!")
return e
}
// 读取并设定长度
task.TotalSize = response.ContentLength
return nil
}
// 根据待下载文件的大小和设定的并发数,创建每个分片任务对象
func (task *ParallelGetTask) allocateTask() {
// 如果并发数大于总大小,则进行调整
if int64(task.Concurrent) > task.TotalSize {
task.Concurrent = int(task.TotalSize)
}
// 开始计算每个分片的下载范围
eachSize := task.TotalSize / int64(task.Concurrent)
// 创建任务对象
for i := 0; i < task.Concurrent; i++ {
task.ShardTaskList = append(task.ShardTaskList, NewShardTask(task.Url, i+1, filepath.Join(task.TempFolder, strconv.Itoa(i+1)), int64(i)*eachSize, int64(i+1)*eachSize-1))
}
// 处理末尾部分
if task.TotalSize%int64(task.Concurrent) != 0 {
task.ShardTaskList[task.Concurrent-1].RangeEnd = task.TotalSize - 1
}
}
// 根据任务列表进行多线程分片下载操作
func (task *ParallelGetTask) downloadShard() {
// 创建线程组
waitGroup := &sync.WaitGroup{}
// 开始执行全部分片下载线程
for _, task := range task.ShardTaskList {
go task.DoShardGet(waitGroup)
waitGroup.Add(1)
}
// 等待全部下载完成
waitGroup.Wait()
}
// 下载完成后,合并分片文件
func (task *ParallelGetTask) mergeFile() error {
// 创建目的文件
targetFile, e := os.OpenFile(task.FilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0755)
if e != nil {
color.Red("创建目标文件出错!")
return e
}
// 创建写入器
writer := bufio.NewWriter(targetFile)
// 准备读取每个分片文件
for _, shard := range task.ShardTaskList {
shardFile, e := os.OpenFile(shard.ShardFilePath, os.O_RDONLY, 0755)
if e != nil {
color.Red("读取分片文件出错!")
return e
}
reader := bufio.NewReader(shardFile)
readBuffer := make([]byte, 1024*1024)
for {
// 读取每个分片文件,一次读取1KB
readSize, readError := reader.Read(readBuffer)
// 处理结束或错误
if readError != nil {
if readError == io.EOF {
break
} else {
color.Red("读取分片文件出错!")
return readError
}
}
// 写入到最终合并的文件
_, writeError := writer.Write(readBuffer[0:readSize])
if writeError != nil {
color.Red("写入合并文件出错!")
return writeError
}
_ = writer.Flush()
}
// 关闭分片文件资源
_ = shardFile.Close()
}
// 关闭目的文件资源
_ = targetFile.Close()
return nil
}
// 删除分片临时文件
func (task *ParallelGetTask) cleanShard() error {
for _, shard := range task.ShardTaskList {
e := os.Remove(shard.ShardFilePath)
if e != nil {
color.Red("删除分片临时文件%s出错!", shard.ShardFilePath)
return e
}
}
return nil
}
// 在一个新线程中,实时输出每个分片的下载进度和总进度
func (task *ParallelGetTask) printTotalProcess() {
go func() {
// 上一次统计时的已下载大小,用于计算速度
var lastDownloadSize int64 = 0
for {
// 如果全部任务完成则结束输出,并统计并发数
allDone := true
// 当前并发数
currentTaskCount := 0
for _, shardTask := range task.ShardTaskList {
if !shardTask.TaskDone {
allDone = false
currentTaskCount += 1
}
}
if allDone {
break
}
// 统计所有分片已下载大小之和
var totalDownloadSize int64 = 0
for _, shardTask := range task.ShardTaskList {
totalDownloadSize += shardTask.DownloadSize
}
// 计算速度
currentDownload := totalDownloadSize - lastDownloadSize
lastDownloadSize = totalDownloadSize
speedString := util.ComputeSpeed(currentDownload, 300)
// 输出到控制台
fmt.Printf("\r当前并发数:%3d 速度:%s 总进度:%3.2f%%", currentTaskCount, speedString, float32(totalDownloadSize)/float32(task.TotalSize)*100)
// 等待300ms
time.Sleep(300 * time.Millisecond)
}
}()
}
// Run 开始执行整个分片多线程下载任务
func (task *ParallelGetTask) Run() error {
// 获取文件大小
e := task.getLength()
if e != nil {
color.Red("%s", e)
return e
}
color.HiYellow("已获取到下载文件大小:%d字节", task.TotalSize)
// 分配任务
task.allocateTask()
color.HiYellow("已完成分片任务分配,共计%d个任务", len(task.ShardTaskList))
// 开启进度输出
task.printTotalProcess()
// 开始下载分片
task.downloadShard()
color.HiYellow("\n所有分片已下载完成!")
// 开始合并文件
e = task.mergeFile()
if e != nil {
color.Red("%s", e)
return e
}
color.HiYellow("合并分片完成!")
// 清理临时分片文件
e = task.cleanShard()
if e != nil {
color.Red("%s", e)
return e
}
color.HiYellow("清理分片临时文件完成!")
color.Green("分片下载任务完成!")
return nil
}
上述printTotalProcess
函数中,util.ComputeSpeed
函数用于计算下载速度并自动转换为可读单位,代码如下:
package util
import (
"fmt"
"math"
)
// 关于单位的实用工具函数
// ComputeSpeed 计算网络速度
// size 一段时间内下载的数据大小,单位字节
// timeElapsed 经过的时间长度,单位毫秒
// 返回计算得到的网速,会自动换算单位
func ComputeSpeed(size int64, timeElapsed int) string {
bytePerSecond := size / int64(timeElapsed) * 1000
if 0 <= bytePerSecond && bytePerSecond <= 1024 {
return fmt.Sprintf("%4d Byte/s", bytePerSecond)
}
if bytePerSecond > 1024 && bytePerSecond <= int64(math.Pow(1024, 2)) {
return fmt.Sprintf("%6.2f KB/s", float64(bytePerSecond)/1024)
}
if bytePerSecond > 1024*1024 && bytePerSecond <= int64(math.Pow(1024, 3)) {
return fmt.Sprintf("%6.2f MB/s", float64(bytePerSecond)/math.Pow(1024, 2))
}
return fmt.Sprintf("%6.2f GB/s", float64(bytePerSecond)/math.Pow(1024, 3))
}
可见通过构造函数NewParallelGetTask
完成参数传递和状态量设定后,其它每个私有函数都对应我们多线程下载中的一个步骤,最后由公开函数Run
统筹组织起所有的步骤,完成整个多线程下载任务。
3,实现效果
现在我们在main
函数中创建一个ParallelGetTask
对象,设定好参数后调用其Run
方法即可开始多线程下载文件的任务:
package main
import (
"gitee.com/swsk33/shard-download-demo/model"
)
func main() {
// 创建任务
task := model.NewParallelGetTask(
"https://download.bell-sw.com/java/21.0.4+9/bellsoft-jdk21.0.4+9-windows-amd64.msi",
"C:\\Users\\swsk33\\Downloads\\Liberica JDK 21.msi",
64,
"C:\\Users\\swsk33\\Downloads\\temp",
)
// 执行任务
_ = task.Run()
}
效果如下:
最终成功完成文件下载:
可见借助HTTP请求的一些机制,我们就可以实现一个多线程下载功能了!当然这里的程序还有许多可以完善的地方,例如失败重试、点断续传等等。
代码仓库地址:传送门
转载自:https://juejin.cn/post/7403547816916090906