likes
comments
collection
share

Go实现多线程分片下载文件我们在下载大文件时,通常会使用多线程下载的方式来加快下载速度。一些常用的多线程下载工具,都是通

作者站长头像
站长
· 阅读数 27

我们在下载大文件时,通常会使用多线程下载的方式来加快下载速度。例如常用的多线程下载工具(Gopeed、Aria2、XDM等等),都是通过多线程下载技术充分利用了网络带宽,以提高下载速度。

那么多线程下载是怎么实现的呢?多个线程发送网络请求,是怎么做到同时下载一个文件呢?事实上,借助HTTP协议中的一些机制就可以实现了!

今天我们就通过使用Go语言为例,从了解HTTP请求相关的一些机制开始,实现一个多线程下载的示例。

1,多线程下载原理

事实上,多线程下载的原理很简单,主要的步骤如下:

  • 获取待下载文件大小
  • 每个线程下载文件的一部分
  • 全部下载完成后,拼接为完整文件

实现这些步骤,就涉及到HTTP协议的下列相关机制。

(1) HEAD请求 - 只获取请求头

我们通常发送HTTP请求大多数是GET或者POST类型,发送请求后我们会立即获取响应体,浏览器则会根据响应体的类型来处理内容,例如返回的是text/html就会作为网页显示,返回image/png就会解码为图片等等,响应体的类型由响应头Content-Type标识。当我们下载文件时,事实上也是发送HTTP请求,只不过服务器返回的响应体就是文件本身了!其类型则是application/octet-stream,浏览器也知道这是个文件需要下载。

当然,文件作为响应体通常比起网页、图片要大得多,在多线程下载时,我们就要先获取文件的大小,而不是立即获取文件本身,这时我们就可以向服务器发起HEAD请求而不是GET请求。

服务器收到HEAD请求后,就只会返回对应的响应头,而不会返回响应体,这样我们就可以在下载文件之前,读取响应头中的Content-Length来先获取待下载文件大小。

(2) Range请求头 - 只获取部分响应体

知道了文件大小,我们就需要让每个线程只下载一部分文件,借助HTTP的Range请求头,就可以实现只让服务端返回响应体内容的一部分,而不是返回完整的响应体。

这里我们先来借助书籍《图解HTTP》中对Range请求头的讲解,来学习一下:

Go实现多线程分片下载文件我们在下载大文件时,通常会使用多线程下载的方式来加快下载速度。一些常用的多线程下载工具,都是通

可见当我们发送一个请求获取内容时,如果指定了如下请求头:

Range: bytes=5001-10000

那么服务端就只会返回响应体的第5001到第10000字节的内容部分,包含第5001和第10000字节,0表示响应体的第一个字节。

这样,在多个线程同时下载文件时,我们在每个线程的请求中使用Range请求头,就可以实现一个线程只下载文件的一部分了!

2,Go代码实现

知道了HTTP的上述几个机制,相信大家就知道如何实现一个简单的多线程下载了!我们可以总结主要步骤如下:

  • 发送HEAD类型请求,通过Content-Length请求头获取待下载文件大小
  • 根据给定的线程数量,结合待下载文件大小,确定每个线程下载的范围部分,也就是每个线程的Range请求头字节范围
  • 启动所有线程,使得每个线程下载它们对应的部分文件,并等待全部线程下载完成
  • 合并每个线程下载的部分为最终文件
  • 清理每个线程下载的文件部分

这里分别设计下列类(结构体),用于存放多线程下载时的传入参数和状态量:

Go实现多线程分片下载文件我们在下载大文件时,通常会使用多线程下载的方式来加快下载速度。一些常用的多线程下载工具,都是通

上述ShardTask类表示一个线程的下载任务,其中会完成一个分片(文件的一部分)的下载请求操作,它有如下作为参数的属性:

  • Url 下载的文件地址
  • Order 分片序号
  • ShardFilePath 这个分片文件的保存路径
  • RangeStartRangeEnd 下载的文件起始范围和结束范围,用于设定Range请求头

此外,还有作为下载状态的属性:

  • DownloadSize 下载任务进行时,这个线程已下载的文件部分大小
  • TaskDone 这个线程的下载任务是否完成

该类的成员方法如下:

  • DoShardGet 执行分片下载任务,在其中会根据RangeStartRangeEnd设定对应的HTTP请求头,发送请求并下载对应的文件部分

然后就是ParallelGetTask类,表示一整个多线程下载任务,其中包含了一个多线程下载任务的参数和状态量,并且实现了多线程下载的每个步骤,它有如下作为参数的属性:

  • Url 文件的下载链接
  • FilePath 文件下载完成后的保存位置
  • Concurrent 下载并发数,即同时下载的分片数量
  • TempFolder 临时分片文件的保存文件夹

此外还有作为状态的属性:

  • TotalSize 待下载文件的总大小
  • ShardTaskList 存储所有分片任务对象指针的列表

该类中的方法主要是分片下载的一些步骤如下:

  • getLength 发送HEAD请求获取Content-Length以获取文件大小,获取后将其设定到TotalSize属性
  • allocateTask 根据给定的线程数和获取到的文件大小,计算每个线程下载的文件内容范围,并创建对应的ShardTask结构体放入ShardTaskList
  • downloadShard 为每一个ShardTask对象创建一个线程(Goroutine)并在新的线程中调用ShardTask对象的下载分片方法,以启动所有线程的下载任务,并通过sync.WaitGroup来等待全部线程完成
  • mergeFile 下载完成后,合并每个分片为最终文件
  • cleanShard 合并完成后,清理下载的每个分片文件
  • printTotalProcess 这是一个附加的辅助方法,用于实时输出下载进度
  • Run 启动整个多线程下载任务,该函数是暴露的公开函数,其中对上述每个步骤函数进行了组织,按顺序调用执行

下面,我们来看一下它们的代码实现。

(1) ShardTask - 一个线程的下载任务

package model

import (
	"bufio"
	"fmt"
	"github.com/fatih/color"
	"io"
	"net/http"
	"os"
	"sync"
)

// ShardTask 单个分片下载任务的任务参数和状态量
type ShardTask struct {
	// 下载链接
	Url string
	// 分片序号,从1开始
	Order int
	// 这个分片文件的路径
	ShardFilePath string
	// 分片的起始范围(字节,包含)
	RangeStart int64
	// 分片的结束范围(字节,包含)
	RangeEnd int64
	// 已下载的部分(字节)
	DownloadSize int64
	// 该任务是否完成
	TaskDone bool
}

// NewShardTask 构造函数
func NewShardTask(url string, order int, shardFilePath string, rangeStart int64, rangeEnd int64) *ShardTask {
	return &ShardTask{
		// 设定任务参数
		Url:           url,
		Order:         order,
		ShardFilePath: shardFilePath,
		RangeStart:    rangeStart,
		RangeEnd:      rangeEnd,
		// 初始化状态量
		DownloadSize: 0,
		TaskDone:     false,
	}
}

// DoShardGet 开始下载这个分片(该方法在goroutine中执行)
func (task *ShardTask) DoShardGet(waitGroup *sync.WaitGroup) {
	// 创建文件
	file, e := os.OpenFile(task.ShardFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0755)
	if e != nil {
		color.Red("任务%d创建文件失败!", task.Order)
		color.HiRed("%s", e)
		return
	}
	// 准备请求
	request, e := http.NewRequest("GET", task.Url, nil)
	if e != nil {
		color.Red("任务%d创建请求出错!", task.Order)
		color.HiRed("%s", e)
		return
	}
	// 设定请求头
	request.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", task.RangeStart, task.RangeEnd))
	// 发送请求
	client := http.DefaultClient
	response, e := client.Do(request)
	if e != nil {
		color.Red("任务%d发送下载请求出错!", task.Order)
		color.HiRed("%s", e)
		return
	}
	// 读取请求体
	body := response.Body
	// 读取缓冲区
	buffer := make([]byte, 8092)
	// 准备写入文件
	writer := bufio.NewWriter(file)
	for {
		// 读取一次内容至缓冲区
		readSize, readError := body.Read(buffer)
		if readError != nil {
			// 如果读取完毕则退出循环
			if readError == io.EOF {
				break
			} else {
				color.Red("任务%d读取响应错误!", task.Order)
				color.HiRed("%s", readError)
				return
			}
		}
		// 把缓冲区内容追加至文件
		_, writeError := writer.Write(buffer[0:readSize])
		if writeError != nil {
			color.Red("任务%d写入文件时出现错误!", task.Order)
			color.HiRed("%s", writeError)
			return
		}
		_ = writer.Flush()
		// 记录下载进度
		task.DownloadSize += int64(readSize)
	}
	// 关闭全部资源
	_ = body.Close()
	_ = file.Close()
	// 标记任务完成
	task.TaskDone = true
	// 使线程组中计数器-1
	waitGroup.Done()
}

构造函数NewShardTask负责完成ShardTask的参数传入和状态量初始化,而DoShardGet方法实现了下载一个文件分片的完整步骤,从创建文件准备写入,到设定请求头,发出请求,最后读取响应体保存到文件。

(2) ParallelGetTask - 一整个多线程下载任务

package model

import (
	"bufio"
	"fmt"
	"gitee.com/swsk33/shard-download-demo/util"
	"github.com/fatih/color"
	"io"
	"net/http"
	"os"
	"path/filepath"
	"strconv"
	"sync"
	"time"
)

// ParallelGetTask 多线程下载任务类,存放一个多线程下载任务的参数和状态量
type ParallelGetTask struct {
	// 文件的下载链接
	Url string
	// 文件的最终保存位置
	FilePath string
	// 下载并发数
	Concurrent int
	// 下载的分片临时文件保存文件夹
	TempFolder string
	// 下载文件的总大小
	TotalSize int64
	// 全部的下载分片任务参数列表
	ShardTaskList []*ShardTask
}

// NewParallelGetTask 构造函数
func NewParallelGetTask(url string, filePath string, concurrent int, tempFolder string) *ParallelGetTask {
	return &ParallelGetTask{
		// 参数赋值
		Url:        url,
		FilePath:   filePath,
		Concurrent: concurrent,
		TempFolder: tempFolder,
		// 初始化状态量
		TotalSize:     0,
		ShardTaskList: make([]*ShardTask, 0),
	}
}

// 发送HEAD请求获取待下载文件的大小
func (task *ParallelGetTask) getLength() error {
	// 发送请求
	response, e := http.Head(task.Url)
	if e != nil {
		color.Red("发送HEAD请求出错!")
		return e
	}
	// 读取并设定长度
	task.TotalSize = response.ContentLength
	return nil
}

// 根据待下载文件的大小和设定的并发数,创建每个分片任务对象
func (task *ParallelGetTask) allocateTask() {
	// 如果并发数大于总大小,则进行调整
	if int64(task.Concurrent) > task.TotalSize {
		task.Concurrent = int(task.TotalSize)
	}
	// 开始计算每个分片的下载范围
	eachSize := task.TotalSize / int64(task.Concurrent)
	// 创建任务对象
	for i := 0; i < task.Concurrent; i++ {
		task.ShardTaskList = append(task.ShardTaskList, NewShardTask(task.Url, i+1, filepath.Join(task.TempFolder, strconv.Itoa(i+1)), int64(i)*eachSize, int64(i+1)*eachSize-1))
	}
	// 处理末尾部分
	if task.TotalSize%int64(task.Concurrent) != 0 {
		task.ShardTaskList[task.Concurrent-1].RangeEnd = task.TotalSize - 1
	}
}

// 根据任务列表进行多线程分片下载操作
func (task *ParallelGetTask) downloadShard() {
	// 创建线程组
	waitGroup := &sync.WaitGroup{}
	// 开始执行全部分片下载线程
	for _, task := range task.ShardTaskList {
		go task.DoShardGet(waitGroup)
		waitGroup.Add(1)
	}
	// 等待全部下载完成
	waitGroup.Wait()
}

// 下载完成后,合并分片文件
func (task *ParallelGetTask) mergeFile() error {
	// 创建目的文件
	targetFile, e := os.OpenFile(task.FilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0755)
	if e != nil {
		color.Red("创建目标文件出错!")
		return e
	}
	// 创建写入器
	writer := bufio.NewWriter(targetFile)
	// 准备读取每个分片文件
	for _, shard := range task.ShardTaskList {
		shardFile, e := os.OpenFile(shard.ShardFilePath, os.O_RDONLY, 0755)
		if e != nil {
			color.Red("读取分片文件出错!")
			return e
		}
		reader := bufio.NewReader(shardFile)
		readBuffer := make([]byte, 1024*1024)
		for {
			// 读取每个分片文件,一次读取1KB
			readSize, readError := reader.Read(readBuffer)
			// 处理结束或错误
			if readError != nil {
				if readError == io.EOF {
					break
				} else {
					color.Red("读取分片文件出错!")
					return readError
				}
			}
			// 写入到最终合并的文件
			_, writeError := writer.Write(readBuffer[0:readSize])
			if writeError != nil {
				color.Red("写入合并文件出错!")
				return writeError
			}
			_ = writer.Flush()
		}
		// 关闭分片文件资源
		_ = shardFile.Close()
	}
	// 关闭目的文件资源
	_ = targetFile.Close()
	return nil
}

// 删除分片临时文件
func (task *ParallelGetTask) cleanShard() error {
	for _, shard := range task.ShardTaskList {
		e := os.Remove(shard.ShardFilePath)
		if e != nil {
			color.Red("删除分片临时文件%s出错!", shard.ShardFilePath)
			return e
		}
	}
	return nil
}

// 在一个新线程中,实时输出每个分片的下载进度和总进度
func (task *ParallelGetTask) printTotalProcess() {
	go func() {
		// 上一次统计时的已下载大小,用于计算速度
		var lastDownloadSize int64 = 0
		for {
			// 如果全部任务完成则结束输出,并统计并发数
			allDone := true
			// 当前并发数
			currentTaskCount := 0
			for _, shardTask := range task.ShardTaskList {
				if !shardTask.TaskDone {
					allDone = false
					currentTaskCount += 1
				}
			}
			if allDone {
				break
			}
			// 统计所有分片已下载大小之和
			var totalDownloadSize int64 = 0
			for _, shardTask := range task.ShardTaskList {
				totalDownloadSize += shardTask.DownloadSize
			}
			// 计算速度
			currentDownload := totalDownloadSize - lastDownloadSize
			lastDownloadSize = totalDownloadSize
			speedString := util.ComputeSpeed(currentDownload, 300)
			// 输出到控制台
			fmt.Printf("\r当前并发数:%3d 速度:%s 总进度:%3.2f%%", currentTaskCount, speedString, float32(totalDownloadSize)/float32(task.TotalSize)*100)
			// 等待300ms
			time.Sleep(300 * time.Millisecond)
		}
	}()
}

// Run 开始执行整个分片多线程下载任务
func (task *ParallelGetTask) Run() error {
	// 获取文件大小
	e := task.getLength()
	if e != nil {
		color.Red("%s", e)
		return e
	}
	color.HiYellow("已获取到下载文件大小:%d字节", task.TotalSize)
	// 分配任务
	task.allocateTask()
	color.HiYellow("已完成分片任务分配,共计%d个任务", len(task.ShardTaskList))
	// 开启进度输出
	task.printTotalProcess()
	// 开始下载分片
	task.downloadShard()
	color.HiYellow("\n所有分片已下载完成!")
	// 开始合并文件
	e = task.mergeFile()
	if e != nil {
		color.Red("%s", e)
		return e
	}
	color.HiYellow("合并分片完成!")
	// 清理临时分片文件
	e = task.cleanShard()
	if e != nil {
		color.Red("%s", e)
		return e
	}
	color.HiYellow("清理分片临时文件完成!")
	color.Green("分片下载任务完成!")
	return nil
}

上述printTotalProcess函数中,util.ComputeSpeed函数用于计算下载速度并自动转换为可读单位,代码如下:

package util

import (
	"fmt"
	"math"
)

// 关于单位的实用工具函数

// ComputeSpeed 计算网络速度
// size 一段时间内下载的数据大小,单位字节
// timeElapsed 经过的时间长度,单位毫秒
// 返回计算得到的网速,会自动换算单位
func ComputeSpeed(size int64, timeElapsed int) string {
	bytePerSecond := size / int64(timeElapsed) * 1000
	if 0 <= bytePerSecond && bytePerSecond <= 1024 {
		return fmt.Sprintf("%4d Byte/s", bytePerSecond)
	}
	if bytePerSecond > 1024 && bytePerSecond <= int64(math.Pow(1024, 2)) {
		return fmt.Sprintf("%6.2f KB/s", float64(bytePerSecond)/1024)
	}
	if bytePerSecond > 1024*1024 && bytePerSecond <= int64(math.Pow(1024, 3)) {
		return fmt.Sprintf("%6.2f MB/s", float64(bytePerSecond)/math.Pow(1024, 2))
	}
	return fmt.Sprintf("%6.2f GB/s", float64(bytePerSecond)/math.Pow(1024, 3))
}

可见通过构造函数NewParallelGetTask完成参数传递和状态量设定后,其它每个私有函数都对应我们多线程下载中的一个步骤,最后由公开函数Run统筹组织起所有的步骤,完成整个多线程下载任务。

3,实现效果

现在我们在main函数中创建一个ParallelGetTask对象,设定好参数后调用其Run方法即可开始多线程下载文件的任务:

package main

import (
	"gitee.com/swsk33/shard-download-demo/model"
)

func main() {
	// 创建任务
	task := model.NewParallelGetTask(
		"https://download.bell-sw.com/java/21.0.4+9/bellsoft-jdk21.0.4+9-windows-amd64.msi",
		"C:\\Users\\swsk33\\Downloads\\Liberica JDK 21.msi",
		64,
		"C:\\Users\\swsk33\\Downloads\\temp",
	)
	// 执行任务
	_ = task.Run()
}

效果如下:

Go实现多线程分片下载文件我们在下载大文件时,通常会使用多线程下载的方式来加快下载速度。一些常用的多线程下载工具,都是通

最终成功完成文件下载:

Go实现多线程分片下载文件我们在下载大文件时,通常会使用多线程下载的方式来加快下载速度。一些常用的多线程下载工具,都是通

可见借助HTTP请求的一些机制,我们就可以实现一个多线程下载功能了!当然这里的程序还有许多可以完善的地方,例如失败重试、点断续传等等。

代码仓库地址:传送门

转载自:https://juejin.cn/post/7403547816916090906
评论
请登录