likes
comments
collection
share

Van♂Python | 某星球的简单爬取

作者站长头像
站长
· 阅读数 27

这是我参与11月更文挑战的第1天,活动详情查看:2021最后一次更文挑战

0x1、引言

Van♂Python | 某星球的简单爬取

无独有偶,跟上节《Van ♂ Python | 某站点课程的简单爬取》一样,爬取的原因都是付费服务即将过期 (根本原因还是贫穷T﹏T)

Van♂Python | 某星球的简单爬取

技术方案同样是: 点点点 + 抓包,本节顺带试试想用很久的自动化神器 → Pyppeteer

严正声明

本文仅用于记录爬虫技术研究学习,不会提供直接爬取脚本,所爬数据已删且未传播。请勿用于非法用途,如有其它非法用途造成损失,于本文无关。


0x2、Pyppeteer速成

1、与Puppeteer的渊源?

Puppeteer 是Google官方出品的通过DevTools协议口控制 Headless Chrome 的 NodeJS 库,通过其提供的API可直接控制Chrome模拟大部分用户操作,来进行UI Test、爬虫访问页面采集数据。Pyppeter 可以理解为 puppeteer 的python版本。

2、与Selenium相比?

与Selenium库相比,Pyppeteer无需繁琐的环境配置,在首次运行时会检测是否按照Chromium,未安装程序会帮我们自动安装和配置。而且Pyppeteer基于Python的新特性async实现(Python 3.5以上),故它的一些执行也支持异步操作,相比之下效率提高了不少。

3、API文档

4、Puppeteer架构图

Van♂Python | 某星球的简单爬取

简述 (了解就行,不用记)

  • Puppeteer:通过 DevTools协议 与浏览器进行通信;
  • Browser:可持有浏览器上下文;
  • BrowserContext:定义了一个浏览器会话,并可拥有多个页面;
  • Page:至少有一个框架,主框架;
  • Frame:至少有一个执行上下文,默认的执行上下文(框架的JavaScript)被执行;
  • Worker:具有单一执行上下文,切便于与WebWorkers进行交互;

5、Pyppeteer安装

  • Step 1pip安装pyppeteer
pip install pyppeteer
  • Step 2安装chromium
import asyncio
from pyppeteer import launch


async def screen_shot():
    browser = await launch()
    page = await browser.newPage()
    await page.goto('https://juejin.cn/')
    await page.screenshot({'path': 'juejin.jpg'})
    await browser.close()


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(screen_shot())

Van♂Python | 某星球的简单爬取

当然,谷歌源,下载起来不一定顺畅,有时可能卡住不动,可利用淘宝源,下载压缩包解压,然后在 launch() 时指定 executablePath,方法如下:

# ① 获取原下载地址
from pyppeteer import chromium_downloader

# 根据系统版本替换:win32,win64,linux,mac
print(chromium_downloader.downloadURLs.get("win64"))

# 运行输出示例:
# https://storage.googleapis.com/chromium-browser-snapshots/Win_x64/588429/chrome-win32.zip

# ② storage.googleapis.com替换为淘宝源npm.taobao.org/mirrors,如:
https://npm.taobao.org/mirrors/chromium-browser-snapshots/Win_x64/588429/chrome-win32.zip

# 也可以进站点自行选择:
https://npm.taobao.org/mirrors/chromium-browser-snapshots

# 3、launch时指定userDataDir
await launch({'headless': headless,'args': launch_args, 'executablePath': './userData')

代码流程解析

async # 声明一个异步操作
await # 声明一个耗时操作

# 创建异步池并执行screen_shot()函数
asyncio.get_event_loop().run_until_complete(screen_shot()) 

# 创建浏览器对象,可传入字典类型的参数
browser = await launch() 

# 创建一个页面对象,页面操作在此对象上执行
page = await browser.newPage() 

await page.goto('https://juejin.cn/') # 页面跳转
await page.screenshot({'path': 'juejin.jpg'}) # 截图保存
await browser.close() # 关闭浏览器对象

关于API就了解这些,更多的可自行查阅官方文档,或者善用搜索引擎,接着分析下爬取流程。


0x3、数据爬取

① 访问登录页

访问登录页:未登录 → 会显示下述登录面板,已登录 → 自动跳转至主页。

Van♂Python | 某星球的简单爬取

爬取流程

  • 请求登录页,判断是否有登录二维码结点,有的话休眠10s等待扫码登录;
  • 没有二维码,说明进入主页;
  • 上述两步都进入分组获取;

② 分组获取

左侧面板可以看到:创建/管理的星球加入的星球

Van♂Python | 某星球的简单爬取

F12看下结点:

Van♂Python | 某星球的简单爬取

爬取流程

  • 通过selector选择器定位到两处结点,获取所有的星球名及链接,输出供用户选择想爬取的星球;
  • 附选择器示例:div.created-group > div:nth-child(2) > a

③ 内容爬取

一开始只是想爬下 精华 分类的,后面发现有的星球可能是像这样没数据:

Van♂Python | 某星球的简单爬取

索性就直接爬全部,内容列表做的分页,滚动到底部再加载更多,Ajax无疑了,在不尝试破解接口规则的情况下,最简单获取数据的方式莫过于:模拟滚动 + 解析节点 的形式了。

但在这个场景,解析节点的效率太低了,标签+图文+链接的搭配样式太多了,需要写很多解析规则,而采用 拦截特定请求的方式 就更无脑和高效一些了。

接着看下这个ajax请求的特点,如组成url,等下过滤请求用到,打开Network选项卡,清空,选中XHR,网页滚动到底部,看下加载的请求:

Van♂Python | 某星球的简单爬取

打开看下,确定是所需数据,拦截到这样的请求,把数据保存到本地,最后再统一进行批处理。

④ 确定滚动何时停止的两种思路

一直向下滚动,需要确定数据何时爬完,停止滚动,说下笔者的两个思路:

  • 方法一死循环 + asyncio.sleep() + pyppeteer查找底部结点

就是死循环,一直去检查底部结点是否可见,如此站点滑动到底部:

<div _ngcontent-isv-c98="" class="no-more">没有更多了</div>
  • 方法二js定时器 + 滑动距离与高度判断

就是开启开启一个定时器,记录滚动距离与当前页面高度,比较前者>=后者时,就可能滑动到底部。

对,是可能,因为存在列表没load的情况,所以可以加入一个重试次数,当重试次数达到阈值时才算完成。

Tips:笔者使用的方法二,以为对js语法不了解,不知道怎么暂停和启动一个计时器,所以把重试阈值设置得很大,也算间接实现休眠。

⑤ 初始化浏览器

流程摸清楚了,接着开始写代码实现爬取,先初始化浏览器:

import asyncio
import os
import time
from pyppeteer import launch

import cp_utils

# 启动配置参数
launch_args = [
    "--no-sandbox",  # 非沙盒模式
    "--disable-infobars",  # 隐藏信息栏
    # 设置UA
    "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
    "Chrome/83.0.4103.97 Safari/537.36 ",
    "--log-level=3",  # 日志等级
]

# 启动浏览器
async def init_browser(headless=False):
    return await launch({'headless': headless,
                         'args': launch_args,
                         'userDataDir': './userData',
                         'dumpio': True,
                         'ignoreHTTPSErrors ': True})

⑥ 新建页面

接着通过browser.newPage()来新建一个浏览器页面,除了常规设置外,还添加防WebDrivder检测~

# 新建页面
async def init_page(browser):
    page = await browser.newPage()
    await page.setViewport({'width': 1960, 'height': 1080}) # 设置页面宽高
    await page.setJavaScriptEnabled(True)
    await prevent_web_driver_check(page)
    return page


# 防WebDriver检测
async def prevent_web_driver_check(page):
    if page is not None:
        # 隐藏webDriver特征
        await page.evaluateOnNewDocument("""() => {
            Object.defineProperty(navigator, 'webdriver', { get: () => undefined })}
        """)
        # 某些站点会为了检测浏览器而调用js修改结果
        await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {},  }; }''')
        await page.evaluate(
            '''() =>{ Object.defineProperty(navigator, 'lang uages', { get: () => ['en-US', 'en'] }); }''')
        await page.evaluate(
            '''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''')

⑦ 登陆与拉取星球列表

利用page.waitForSelector()设置超时,检测是否有登陆二维码结点,执行后续逻辑:

# 登录
async def login(page, timeout=60):
    await page.goto(login_url, options={'timeout': int(timeout * 1000)})
    try:
        await page.waitForSelector('img.qrcode', {'visible': 'visible', 'timeout': 3000})
        # 等待扫码登录
        print("检测到未登录,等待扫码登录...")
        await asyncio.sleep(10)
        await fetch_group(page)
    except errors.TimeoutError:
        print("检测到处于登录状态,直接拉取列表...")
        await fetch_group(page)

       
# 提取群组
async def fetch_group(page):
    global choose_group_id, choose_group_name
    # 获取所有分组
    group_list = []
    created_groups = await page.JJ('div.created-group > div:nth-child(2) > a')
    joined_groups = await page.JJ('div.joined-group > div:nth-child(2) > a')
    for item in created_groups + joined_groups:
        group_name = await page.evaluate('item => item.textContent', item)
        group_url = await (await item.getProperty('href')).jsonValue()
        group_list.append([group_name.strip(), group_url])
    print("检测到如下星球列表如下:")
    for index, group in enumerate(group_list):
        print(index, '、', group)
    choose_group_index = input("请输入待爬取群组编号 (注:下标从0开始)")
    choose_group = group_list[int(choose_group_index)]
    choose_group_id = choose_group[1].split('/')[-1]
    choose_group_name = choose_group[0]
    await fetch_data(page, choose_group[1])

运行结果如下

Van♂Python | 某星球的简单爬取

⑧ 拦截请求、响应和数据保存

# 拦截请求
async def intercept_request(req):
    # 禁止获取图片、多媒体资源和发起websocket请求
    if req.resourceType in ['image', 'media', 'eventsource', 'websocket']:
        await req.abort()
    else:
        await req.continue_()


# 拦截响应
async def intercept_response(resp):
    resp_type = resp.request.resourceType
    if resp_type in ['xhr'] and 'https://xxx/v2/groups/{}/topics?scope=all&count=20'.format(
            choose_group_id) in resp.url:
        content = await resp.text()
        if len(content) > 0:
            temp_dir = os.path.join(content_save_dir, choose_group_name)
            cp_utils.is_dir_existed(temp_dir)
            content = await resp.text()
            print(resp.url + ' → ' + content)
            json_save_path = os.path.join(temp_dir, str(int(time.time() * 1000)) + '.json')
            cp_utils.write_str_data(content, json_save_path)
            print("保存文件:", json_save_path)
    return resp

⑨ 无限滚动

# 拉取内容
async def fetch_data(page, url, timeout=60):
    # 拦截请求抓取
    await page.setRequestInterception(True)
    page.on('request', lambda req: asyncio.ensure_future(intercept_request(req)))
    page.on('response', lambda resp: asyncio.ensure_future(intercept_response(resp)))
    print("开始爬取:", choose_group_name)
    await page.goto(url, options={'timeout': int(timeout * 1000)})
    # 休眠3秒等待加载
    await asyncio.sleep(3)
    # 一直向下滚动
    await page.evaluate('''async () => {
        await new Promise(((resolve, reject) => {
            // 每次滑动距离
            const distance = 100;

            // 当前高度
            var totalHeight = 0;

            // 最大重试次数和当前重试次数
            var maxTries = 20000;
            var curTries = 0;

            var timer = setInterval(() => {
                var scrollHeight = document.body.scrollHeight;
                window.scrollBy(0, distance)
                totalHeight += distance
                console.log(totalHeight + "-" + scrollHeight)
                if (totalHeight >= scrollHeight) {
                    if(curTries > maxTries) {
                        clearInterval(timer)
                        resolve();
                    } else {
                        curTries += 1;
                        totalHeight -= distance
                    }
                } else {
                    curTries = 0;
                }
            }, 100)
        }));
    }''')
    print("星球【{}】数据爬取完毕...".format(choose_group_name))
    # 取消拦截
    await page.setRequestInterception(False)

最后调用下:

if __name__ == '__main__':
    cur_browser = asyncio.get_event_loop().run_until_complete(init_browser())
    cur_page = asyncio.get_event_loop().run_until_complete(init_page(cur_browser))
    asyncio.get_event_loop().run_until_complete(login(cur_page))

运行后可以看到控制台输出对应的爬取信息:

Van♂Python | 某星球的简单爬取

也可以看到爬取到本地的json文件:

Van♂Python | 某星球的简单爬取

呦西,数据都保存到本地了,接着到数据处理环节~


0x4、数据处理

① 关键数据提取

随手打开几个爬取的json样本,很好看出json中的关键部分:

Van♂Python | 某星球的简单爬取

定义出提取实体类:

class Talk:
    def __init__(self, name=None, text=None, images=None, files=None):
        self.name = name
        self.text = text
        self.images = images
        self.files = files

接着就是遍历文件,json.loads转成dict,按需拿字段,非常简单:

import cp_utils
import json
import os

zsxq_save_dir = os.path.join(os.getcwd(), "zsxq")
result_json_path = os.path.join(os.getcwd(), "zsxq_result.json")
talk_list = []
talk_dict = {'data': None}


# 数据实体
class Talk:
    def __init__(self, name=None, text=None, images=None, files=None):
        self.name = name
        self.text = text
        self.images = images
        self.files = files

    def to_json_str(self):
        return json.dumps({'name': self.name, 'text': self.text, 'images': self.images, 'files': self.files},
                          ensure_ascii=False)

    def to_dict(self):
        return {'name': self.name, 'text': self.text, 'images': self.images, 'files': self.files}


# 提取json文件内容
def extract_json_file(file_path):
    global talk_list
    content = cp_utils.read_content_from_file(file_path)
    content_dict = json.loads(content)
    topics = content_dict['resp_data'].get('topics')
    print("解析文件:{}".format(file_path))
    if topics is not None and len(topics) > 0:
        for topic in topics:
            talk_entity = Talk()
            talk = topic.get('talk')
            if talk is not None:
                # 依次获取名称、文本、图片、文件
                owner = talk.get('owner')
                if owner is not None:
                    owner_name = owner.get("name")
                    if owner is not None:
                        talk_entity.name = owner_name
                text = talk.get('text')
                if text is not None:
                    talk_entity.text = text
                images = talk.get('images')
                if images is not None and len(images) > 0:
                    image_urls = []
                    for image in images:
                        original = image.get('original')
                        if original is not None:
                            image_urls.append(original.get('url'))
                    talk_entity.images = image_urls
                files = talk.get('files')
                if files is not None and len(files) > 0:
                    file_list = []
                    for file in files:
                        file_id = file.get('file_id')
                        file_name = file.get('name')
                        file_list.append({file_id: file_name})
                    talk_entity.files = file_list
            talk_list.append(talk_entity.to_dict())
    else:
        print("数据为空,跳过文件...")


if __name__ == '__main__':
    dir_list = cp_utils.fetch_all_file(zsxq_save_dir)
    print("可操作目录:\n")
    for index, path in enumerate(dir_list):
        print("{}、{}".format(index, path))
    choose_index = input("\n请输入要处理的目录序号 => ")
    choose_path = dir_list[int(choose_index)]
    print("当前选中目录:{}".format(choose_path))
    json_file_list = cp_utils.filter_file_type(choose_path, '.json')
    for json_file in json_file_list[:10]:
        extract_json_file(json_file)
    talk_dict['data'] = talk_list
    talk_json = json.dumps(talk_dict, ensure_ascii=False, indent=2)
    cp_utils.write_str_data(talk_json, result_json_path)
    print("文件写入完毕:{}".format(result_json_path))

遍历10个文件试试效果:

Van♂Python | 某星球的简单爬取

打开json文件看看:

Van♂Python | 某星球的简单爬取

② 将json转成Markdown

json肯定不是和便于阅读的,可以生成一波Markdown,拼接字符串而已,这里的主要难点是:

text的解析,有标签、普通文本、外部链接、表情...

Van♂Python | 某星球的简单爬取

可以通过 re.sub() + 反向引用 替换一波标签和外部链接,写个测试代码试试水:

    # 替换正则
    hash_tag_pattern = re.compile(r'(<e type="hashtag" .*? title=")(.*?)(".*?/>)')
    web_pattern = re.compile(r'(<e type="web" href=")(.*?)(" title=")(.*?)(" cache=.*?/>)')
   
    # 测试用例
    xml_str = """
    <e type="hashtag" hid="51288155841824" title="%23%E6%8A%80%E5%B7%A7%23"/>
<e type="hashtag" hid="28518452544211" title="%23%E6%95%88%E7%8E%87%E5%B7%A5%E5%85%B7%23"/> 今天推荐一个命令行辅助工具:fig,一图胜千言,直接看图

打开官网即可安装:
<e type="web" href="https%3A%2F%2Ffig.io%2Fwelcome" title="Fig+%7C+Welcome+to+Fig" cache=""/>
    """
    temp_result = unquote(hash_tag_pattern.sub(r"\g<2>", xml_str), 'utf-8')
    temp_result = unquote(web_pattern.sub(r"[\g<4>](\g<2>)", temp_result), 'utf-8')
    temp_result = temp_result.strip().replace("\n", "")
    print(temp_result)

看下解析结果

Van♂Python | 某星球的简单爬取

Good,接着补全图片及文件相关:

# 转换成md文件
def json_to_md(file_path):
    content = cp_utils.read_content_from_file(file_path)
    data_list = json.loads(content)['data']
    md_content = ''
    for data in data_list:
        name = data['name']
        if name is not None:
            md_content += name + "\n"
        text = data['text']
        if text is not None:
            temp_result = unquote(hash_tag_pattern.sub(r"\g<2>", text), 'utf-8').replace("#", "`")
            temp_result = unquote(web_pattern.sub(r"[\g<4>](\g<2>)", temp_result), 'utf-8')
            md_content += temp_result.strip()
        images = data['images']
        if images is not None:
            md_content += '\n'
            for image_url in images:
                img_file_name = str(int(time.time() * 1000)) + ".jpg"
                img_save_path = os.path.join(image_save_dir, str(int(time.time() * 1000)) + ".jpg")
                cp_utils.download_pic(img_save_path, image_url)
                relative_path = 'images/{}'.format(img_file_name)
                md_content += '![]({})'.format(relative_path)
        files = data['files']
        if files is not None:
            md_content += '\n文件:'
            for file in files:
                file_id = file.get('file_id')
                file_name = file.get('name')
                md_content += "《{}》".format(file_name)
        md_content += '\n\n---\n\n'
    cp_utils.write_str_data(md_content, result_md_path)

细心的你可能发现了,代码中把图片给download下来了,并没有采用远程图片的方式,原因是站点图片资源url,没有图片后缀名,Markdown语法识别不了,导致预览时图片显示不出来。生成后的md文件:

Van♂Python | 某星球的简单爬取

46257个字符,PyCharm打开预览直接卡死,MarkdwonPad2也难逃一劫,滚一下卡一下,还是有必要分成几个md文件存~


0x5、小结

本文借着爬某星球的契机,把Pyppeteer的用法过了一波,爬虫技巧 Level Up↑,另外,文件类这里只存了一个id,真实下载地址还得调用另外的接口获取,而且还得处于登录态,感兴趣的同学可自行尝试一波。

Van♂Python | 某星球的简单爬取

好的,就说这么多,有问题欢迎评论区指出,感谢~

Van♂Python | 某星球的简单爬取


参考文献