likes
comments
collection
share

你也可以手敲一个高速下载器(六)下载进度条

作者站长头像
站长
· 阅读数 21

你也可以手敲一个高速下载器(六)下载进度条

前言

在上节我们已经初步的实现的下载的功能,但是还差很多细节没有完善,这节我们就给我们的程序加一个进度条,下面是完成后的运行结果: 你也可以手敲一个高速下载器(六)下载进度条

进度条选型

这里选择的进度条库是rich 中的进度条,该库支持多个进度条同时输出,同时在支持异步和并发方法也很好的,也有很多大型项目去使用。

日志更改

在我们上一节运行的时候,会在终端输出很多详细的日志,但是这些详细的日志会和进度条产生冲突,在显示上面会很不直观,故而我们需要对目前的日志打印做一些修改。

多日志系统

在之前,我们是把所有的日志统一输出在终端上面的,这里我们就一个简单的日志分级:

  • info 级别以上的日志,依旧显示在终端上面
  • debug 级别以上的日志,输出在日志文件中

并对现有代码做出如下修改:

  • main文件中的get_content方法,首行日志打印级别由info修改为debug
  • __init__中对日志进行初始化
  • start方法中开始下载的日志由success级别改为info级别

代码如下:


# __init__
logger.remove()
logger.add(f"{save_path}.log", level="DEBUG")
logger.add(sys.stderr, level="INFO")

# start
logger.info(f"开始下载资源,总大小:{self.file_size}")

# get_content
logger.debug(f"开始下载:bytes={start}-{end}")

日志文件的存储

目前的临时方案是和文件保存的同一目录在,并直接在路径后面加.log后缀名,但后面会和下面文件的保存路径一样,优化为可配置的

请求类修改

之前我们完成了一个通用的请求类,这个请求类一般情况下也是可以满足我们的需求的,但是由于我们的主要业务是下载,而且还需要进度条的形式显示出来,所以我们要让我们的通用请求类支持流式的响应

请求类_request 方法修改

最核心的部分是把发送请求的代码由response = await self.session.reques(...),更换成request = self.session.build_request(...)response = await self.session.send(request, stream=stream)的形式,虽然看起来不一样,但其实在实现里面reques(...)方法的内部也是调用的send(...)方法,我们只不过换成了稍微底层了一点而已,下面是修改后的代码:

    async def _request(self, stream=False) -> Response:
        """
        使用httpx发起请求
        :return: 返回响应
        """
        # 打印一个DEBUG级别的日志
        logger.debug(f"[{self.method}] {self.url}")

        # 锁定信号量
        if self.sem:
            await self.sem.acquire()

        # 进行请求,并获取响应
        request = self.session.build_request(
            self.method,
            self.url,
            json=self.json,
        )
        response = await self.session.send(request, stream=stream)

        # 释放信号量
        if self.sem:
            self.sem.release()

        # 如果状态码不在允许范围当中,则抛出异常
        if response.status_code not in self.allow_codes:
            raise RequestStateException(response.status_code)

        return response

代码和之前的区别不大,只是加了个stream的可选参数而已

请求类 request 方法修改

由于增加了参数,所以在请求入口的函数里面也同样要添加这个参数,在调用的时候再把这个参数传递过去。因为我们下面传递的函数的签名,所以采取的方案是使用偏函数的方式传递默认参数,示例函数如下:

    async def request(self, stream=False) -> Response:
        """
        使用httpx发起请求 带重试机制
        :return: 返回响应
        """
          if self.retrys_count < 1:
            return await self._request(stream)

        ......

        resp = await r.wraps(functools.partial(self._request, stream))()
        return resp

进度条

创建进度条

此处创建进度条的方法,借鉴了rich官方示例,由于官方示例也是下载文件的,所以此处可以拿来借鉴参考即可,示例如下:

from rich.progress import (
    BarColumn,
    DownloadColumn,
    Progress,
    Console,
    TextColumn,
    TimeRemainingColumn,
    TransferSpeedColumn,
)

progress = Progress(
    TextColumn("[bold blue]{task.fields[filename]}", justify="right"),
    BarColumn(bar_width=None),
    "[progress.percentage]{task.percentage:>3.1f}%",
    "•",
    DownloadColumn(),
    "•",
    TransferSpeedColumn(),
    "•",
    TimeRemainingColumn(),
    console=Console(record=True)
)

总进度条及开始停止进度条

我们的进度条方案是一个总的下载进度条和若干个子任务的下载进度条,所以我们在先在初始化函数里面保存一个总进度条的任务 ID,并在start_download方法中设备任务 ID,同时在这个方法也会启动和停止进度条。当我们观看示例文件的时候,会发现官方示例是使用的上下文管理器来管理的进度条启停,但我们这个使用上下文管理器就很明显的不适合,所以和前面一个看看具体调用的什么方法,直接调用即可,示例代码如下:

# 开启多任务
tasks = itertools.starmap(self.get_content, args)
progress.start()
self.task_id = progress.add_task('', filename="总进度", total=self.content_length)
result = await asyncio.gather(*tasks)
progress.stop()

根据代码我们可以看出,在启动并发任务的前面开启的进度条并在进度条上面添加了总进度的任务,并设置了该条的总长度为内容的总长度,然后在并发任务结束后进行了进度条的停止操作

任务进度条

子任务的进度条写在了方法get_content中,此方法改动的地方由两处,一个由直接获取数据,修改为流的方法迭代获取数据,然后获取数据的过程中使用进度条显示出来,示例代码如下:

    async def get_content(self, index, start=None, end=None):
        """
        获取内容
        :param index: 索引
        :param start: 开始范围
        :param end: 结束范围
        :return: 返回内容的数据
        """
        logger.debug(f"开始下载:bytes={start}-{end}")

        if end:
            headers = dict(self.headers, Range=f"bytes={start}-{end}")
        else:
            headers = self.headers.copy()

        end = end or self.content_length
        task_id = progress.add_task("", filename=f"任务: {index}", total=end - start)

        req = Request("GET", self.url, sem=self._sem, headers=headers)
        try:
            content = b''
            resp = await req.request(stream=True)
            async for data in resp.aiter_bytes():
                content += data
                progress.update(task_id, advance=len(data))
                progress.update(self.task_id, advance=len(data))

        except RequestException:
            return self.network_error_exit()
        finally:
            await req.close()

        return {'index': index, 'content': content}

我们之前是直接获取的content,这里我们修改成立迭代获取内容,并在每次迭代时更新子任务进度条和总任务进度条,并在content中追加最新数据

代码仓库:

本节的代码以上传至 Github,请自行下载及观看:第六节代码

结语

本节为我们的程序添加了进度条的显示,但是依旧还有很多需要完善的地方,我们下载继续,敬请期待!!!

转载自:https://juejin.cn/post/7133616706492039182
评论
请登录