likes
comments
collection
share

利用selenium自动化库实现电影网站的数据获取

作者站长头像
站长
· 阅读数 8

简单概述

本系列可能是一个比较长的系列,主要是对《Python3网络爬虫开发实战》前七章的一个内容总结并且熟悉使用一下相关的框架与技术。

任务目标

爬取电影数据网站ssr1.scrape.center/, 此网站无反爬,数据通过服务端渲染,需要爬取的部分为列表页里面的电影数据详情。

任务目标解析

  1. 爬取ssr1.scrape.center/网站的列表页面, 通过列表页面的内容获取到需要的URL
  2. 爬取ssr1.scrape.center/detail/{id}… 网站内的数据详情,需要获取的部分有:
    1. 电影标题
    2. 电影图片的url
    3. 电影上映时间
    4. 电影分类
    5. 电影评分
    6. 剧情简介
  3. 将内容存放到需要的数据库中

技术选型与爬取

在进行过多次爬取同一个网站内容后,使用其他的库其实更多的还是一种对库的熟悉与了解,因此这里对于下面的内容通过贴代码进行简单解释的方式来介绍一下,因为在静态网页的数据中很多内容并不涉及到这些库的优势特性。

如何爬取

抓取主函数

# 网页内容的抓取
def scrape_page(browser, url):
    logging.info('scraping %s ...', url)
    # 异常捕获操作
    try:
        browser.get(url)
        wait = WebDriverWait(browser, 10)
        wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'div#app')))
    except:
        logging.error('error occured while scraping %s', url, exc_info=True)

使用自动化库访问网页时,需要一个浏览器对象和浏览器需要访问的url,因此通过传入两个参数来实现我们对网页内容的访问,下方的wait字段则是用于等待页面请求的完全响应,由于这个网页由vue.js进行搭建,所以我们整体上只需要等待(By.CSS_SELECTOR, 'div#app')的部分加载完成即可。

列表页内容抓取

# 进行列表页的抓取
def scrape_index(browser, page):
    index_url = f'{BASE_URL}/page/{page}'
    return scrape_page(browser, index_url)

逻辑同之前的几个相似,这里分析出页面的请求逻辑后就可以完成对我们所需要的内容进行访问请求。

详情页内容抓取

# 进行详情页的抓取
def scrape_detail(browser, url):
    return scrape_page(browser, url)

如何解析

由于自动化库自带解析的功能,因此我们直接使用自动化库自带的功能进行解析即可。

解析索引页

# 解析索引页
def parse_index(browser):
    # 获取所有超链接
    a_tags = browser.find_elements(By.CSS_SELECTOR, 'a.name')
    
    if not a_tags:
        return []
    
    for a_tag in a_tags:
        detail_url = a_tag.get_attribute('href')
        # 输出链接
        logging.info('get detail url %s', detail_url)
        # 生成器函数
        yield detail_url

自动化库的浏览器对象是直接对当前页面选项卡进行操作的,我们当然可以通过browser.page_source的方式获取资源后调用解析器,但同样的我们可以通过直接利用find_element()方法和find_elements()方法来实现对需要的元素进行获取。

解析详情页

# 解析详情页
def parse_detail(browser):
    
    # 获取标题
    name = None
    name_tag = browser.find_element(By.CSS_SELECTOR, 'h2.m-b-sm')
    if name_tag:
        name = name_tag.text
    
    # 获取图片
    cover = None
    cover_tag = browser.find_element(By.CSS_SELECTOR, 'img.cover')
    if cover_tag:
        cover = cover_tag.get_attribute('src')
    
    # 获取分类
    categories = []
    category_tags = browser.find_elements(By.CSS_SELECTOR, 'div.categories > button > span')
    if category_tags:
        categories = [category.text for category in category_tags]
    
    # 获取简介
    drama = None
    drama_tag = browser.find_element(By.CSS_SELECTOR, 'div.drama > p')
    if drama_tag:
        drama = drama_tag.text

    # 获取分数
    score = None
    score_tag = browser.find_element(By.CSS_SELECTOR, 'p.score')
    if score_tag:
        score = score_tag.text

    return {
        # 标题
        'name': name,
        # 图片
        'cover': cover,
        # 分数
        'score': score,
        # 分类
        'categories': categories,
        # 剧情简介
        'drama': drama
    }

详情页的解析也同样可以使用相关的选择器操作来完成,并且效果不比之前使用解析库的时候效果差,可以说做到了一个库完成了我们所需要的所有操作。

如何存储

# 网页内容的存储
def save_data(data_path, data):
    # 利用csv存储
    with open(data_path, 'a+', encoding='utf-8') as csv_file:
        field_name = ['name', 'cover', 'score', 'categories', 'drama']
        csv_writer = csv.DictWriter(csv_file,fieldnames=field_name)
        csv_writer.writerow(data)

这里的网页存储使用的csv的方式,通过字典的方式直接进行写入。

源代码

import csv
import logging

from os import makedirs
from os.path import exists
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s: %(message)s')

# 常量值
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
RESULT_DIR = 'results'

# 文件夹存在或者创建文件夹
exists(RESULT_DIR) or makedirs(RESULT_DIR)

# 网页内容的抓取
def scrape_page(browser, url):
    logging.info('scraping %s ...', url)
    # 异常捕获操作
    try:
        browser.get(url)
        wait = WebDriverWait(browser, 10)
        wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'div#app')))
    except:
        logging.error('error occured while scraping %s', url, exc_info=True)

# 进行列表页的抓取
def scrape_index(browser, page):
    index_url = f'{BASE_URL}/page/{page}'
    return scrape_page(browser, index_url)

# 进行详情页的抓取
def scrape_detail(browser, url):
    return scrape_page(browser, url)

# 进行网页内容的获取
# 解析索引页
def parse_index(browser):
    # 获取所有超链接
    a_tags = browser.find_elements(By.CSS_SELECTOR, 'a.name')
    
    if not a_tags:
        return []
    
    for a_tag in a_tags:
        detail_url = a_tag.get_attribute('href')
        # 输出链接
        logging.info('get detail url %s', detail_url)
        # 生成器函数
        yield detail_url

# 解析详情页
def parse_detail(browser):
    
    # 获取标题
    name = None
    name_tag = browser.find_element(By.CSS_SELECTOR, 'h2.m-b-sm')
    if name_tag:
        name = name_tag.text
    
    # 获取图片
    cover = None
    cover_tag = browser.find_element(By.CSS_SELECTOR, 'img.cover')
    if cover_tag:
        cover = cover_tag.get_attribute('src')
    
    # 获取分类
    categories = []
    category_tags = browser.find_elements(By.CSS_SELECTOR, 'div.categories > button > span')
    if category_tags:
        categories = [category.text for category in category_tags]
    
    # 获取简介
    drama = None
    drama_tag = browser.find_element(By.CSS_SELECTOR, 'div.drama > p')
    if drama_tag:
        drama = drama_tag.text

    # 获取分数
    score = None
    score_tag = browser.find_element(By.CSS_SELECTOR, 'p.score')
    if score_tag:
        score = score_tag.text

    return {
        # 标题
        'name': name,
        # 图片
        'cover': cover,
        # 分数
        'score': score,
        # 分类
        'categories': categories,
        # 剧情简介
        'drama': drama
    }

# 网页内容的存储
def save_data(data_path, data):
    # 利用csv存储
    with open(data_path, 'a+', encoding='utf-8') as csv_file:
        field_name = ['name', 'cover', 'score', 'categories', 'drama']
        csv_writer = csv.DictWriter(csv_file,fieldnames=field_name)
        csv_writer.writerow(data)

# 输出获取的网页内容
def main():
    options = webdriver.ChromeOptions()
    options.add_argument('--headless')
    browser = webdriver.Chrome(options=options)
    data_path = '{0}/movies.csv'.format(RESULT_DIR)
    with open(data_path, 'w', encoding='utf-8') as csv_file:
        field_name = ['name', 'cover', 'score', 'categories', 'drama']
        csv_writer = csv.DictWriter(csv_file, fieldnames=field_name)
        csv_writer.writeheader()
    try:
        for page in range(1, TOTAL_PAGE + 1):
            # 进入列表页
            scrape_index(browser, page)
            # 解析索引页
            detail_urls = list(parse_index(browser))

            for detail_url in detail_urls:
                scrape_detail(browser, detail_url)
                data = parse_detail(browser)
                logging.info('get data %s', data)
                save_data(data_path, data)
    finally:
        browser.close()


if __name__ == '__main__':
    main()

版权信息

本文由PorterZhang整理或写作完成 本人的Github: PorterZhang2021 本人的博客地址:PorterZhang