likes
comments
collection
share

BeautifulSoup库与parsel库进行电影网站数据的解析

作者站长头像
站长
· 阅读数 12

@TOC

简单概述

本系列可能是一个比较长的系列,主要是对《Python3网络爬虫开发实战》前七章的一个内容总结并且熟悉使用一下相关的框架与技术。

任务目标

爬取电影数据网站ssr1.scrape.center/, 此网站无反爬,数据通过服务端渲染,需要爬取的部分为列表页里面的电影数据详情。

任务目标解析

  1. 爬取ssr1.scrape.center/, 网站的列表页面,通过列表页面的内容获取到需要的URL
  2. 爬取ssr1.scrape.center/detail/{id}… 网站内的数据详情,需要获取的部分有:
    1. 电影标题
    2. 电影图片的url
    3. 电影上映时间
    4. 电影分类
    5. 电影评分
    6. 剧情简介
  3. 将内容存放到需要的数据库中

技术选型与爬取

如何爬取

这一部分并不是本次探讨的一个重点,对于爬取请求在之前已经使用了requests库与urllib库进行了相关的实现,如果需要参考可以参看前面的两篇文章,里面有相关的内容介绍。

如何解析

BeautifulSoup库

BeautifulSoup库是Python当中进行网页解析常用的一个库,这个库的功能十分完善,并且可以通过链式原则直接获取一些所需要的元素与属性,它的内部可以选择4种不同的解析方式,并且其可以通过css选择器进行直接的选取,可以说相当的好用了。

解析列表页

对于列表页的解析只需要查找到对应类名的元素后将其进行返回,而BeautifulSoup库正好有一个方便的方法,直接使用find_all()方法查找类名就可以获取到我们所需要的所有url。

# 对网页数据进行解析
def parse_index(html):
    soup = BeautifulSoup(html, 'lxml')
    # 获取url
    items = soup.find_all(class_='name')
    
    # 如果没有值
    if not items:
        return []
    
    # 存在值
    for item in items:
        # 对链接进行拼接
        detail_url = urljoin(BASE_URL, item['href'])
        # 输出生成的链接
        logging.info('get detail url %s', detail_url)
        # 生成器函数
        yield detail_url

解析详情页

详情页部分所需要获取的内容就比较多一些,除去直接利用正则表达式容易提取的上映时间外,其他所要获取的内容均可以利用BeautifulSoup库获取到,但在获取元素后获取元素的子孙节点的操作中就有些麻烦了,可能需要进行嵌套的相关操作进行获取。

# 对详情网页进行解析
def parse_detail(html):
    soup = BeautifulSoup(html, 'lxml')

    # 标题
    name = soup.find(name="h2").string if soup.find(name="h2") else None
    # 图片
    cover = soup.find(class_='cover')['src'] if soup.find(class_='cover') \
            else None
    # 分类
    categories = []
    # 获取分类
    categories_ele = soup.find(class_='categories')
    # 如果存在
    if categories_ele != -1:
        # 查找内部节点
        for category in categories_ele:
            if category.find('span') != -1:
                category = category.find('span').string
                categories.append(category)
    # 简介
    drama = soup.select('div.drama p')[0].string.strip() if soup.select('div.drama p') \
            else None
    # 上映时间
    published_at = re.compile('(\d{4}-\d{2}-\d{2}) 上映')
    published = re.search(published_at, html).group(1) if re.search(published_at, html) \
                else None
    # 评分
    score = soup.find(class_='score').string.strip() if soup.find(class_='score') \
            else None
    
    return {
        'name': name,
        'cover': cover,
        'published': published,
        'categories': categories,
        'score': score,
        'drama': drama
    }

parsel库

parsel库的优势之处在于它可以用css选择器的方式选择所需要的元素,也可以通过使用xpath来选择所需要的元素,甚至其还可以直接使用正则表达式进行选择,那么有了这个利器,我们可以在进行选择时,哪个较为方便就利用哪一个进行选择,减轻某些元素在特定情况下不好获取的问题。

解析列表页

由于parsel库可以使用xpath规则,css选择器,正则表达式的方式,这里就可以按自己的熟悉程度进行选择,完成自己需要的部分。

# 获取列表页的数据
def parse_index(html):
    selector = Selector(html)
    items = selector.css('a.name::attr(href)').getall()
    
    if not items:
        return []

    for item in items:
        # 对链接进行拼接
        detail_url = urljoin(BASE_URL, item)
        # 输出生成的链接
        logging.info('get detail url %s', detail_url)
        # 生成器函数
        yield detail_url

解析详情页

parsel库的三种模式的优势在此时就显现的淋漓尽致,当css选择器不太好进行操作时,我们就可以试一试xpath,如果xpath不好操作,那么此时我们也可以试一试正则表达式(当然这里的正则表达式我还是沿用的之前写过的正则表达式。

# 获取详情页的数据
def parse_detail(html):
    selector = Selector(text=html)

    # 标题
    name = selector.xpath('//h2[@class="m-b-sm"]/text()').get() if selector.xpath('//h2[@class="m-b-sm"]/text()').get() else None
    # 图片
    cover = selector.css('img.cover::attr(src)').get() if selector.css('img.cover::attr(src)').get() else None
    # 分类
    categories = selector.xpath('//button[contains(@class,"category")]/span/text()').getall() \
                if selector.xpath('//button[contains(@class,"category")]/span/text()').getall() \
                else None
    # 简介
    drama = selector.xpath('//div[@class="drama"]/p/text()').get().strip() \
            if selector.xpath('//div[@class="drama"]/p/text()').get() \
            else None
    # 出版日期
    published_at = re.compile('(\d{4}-\d{2}-\d{2}) 上映')
    published = re.search(published_at, html).group(1) if re.search(published_at, html) \
                else None
    # 分数
    score = selector.xpath('//p[contains(@class, "score")]/text()').get().strip() \
            if selector.xpath('//p[contains(@class, "score")]/text()').get() \
            else None
    
    return {
        'name': name,
        'cover': cover,
        'categories': categories,
        'drama': drama,
        'published': published,
        'score': score
    }

如何存储

这里两次抓取的内容我也分别使用了不同的方式进行存储:csv格式存储,以及使用非关系型数据库mongoDB进行存储。

csv格式存储

由于我们返回时采用的是字典格式,那么存储写入csv格式时,我们需要选择以字典的方式进行写入的模式即DictWriter对象,这里需要注意的是,传入的同时我们需要将字典的key也一并传入,否则会出现错误,同时由于需要一个key的标题,因此需要在主函数当中进行一次写入。

# 对获取的内容进行存储
# 本次存储使用csv
def save_data(data_path ,data):
    # 初始化字典写入对象
    with open(data_path, 'a+', encoding='utf-8') as csvfile:
        fieldnames = ['name', 'cover', 'published', 'categories', 'score', 'drama']  
        # 初始化一个字典写入对象
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writerow(data)

MongoDB存储

mongoDB数据库是一个非关系型的数据库,通过此数据库可以有效的实现对一些非关系型数据内容的存储,同时,由于python有一个pymongo库可以方便的对数据内容进行存储,所以对于此部分的整体实现是比较容易的。

# 进行数据内容的存储
# 使用MongoDB进行数据存储
def save_data(data):
    collection.update_one(
        {'name': data.get('name')},
        {'$set': data},
        upsert=True
    )

源代码

BeautifulSoup库抓取

import re
import requests
import logging
import csv

from os import makedirs
from os.path import exists
from bs4 import BeautifulSoup
from urllib.parse import urljoin


logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s: %(message)s')

# 常量
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
RESULT_DIR = 'results'

exists(RESULT_DIR) or makedirs(RESULT_DIR)


# 对网页进行爬取
def scrape_page(url):
    logging.info('scraping url: %s ...', url)
    try:
        # 进行请求
        response = requests.get(url=url)
        # 请求结果判断
        if response.status_code == 200:
            # 输出文本内容
            return response.text
        # 错误情况输出
        logging.error('get invaild status code %s while scraping %s',
                      response.status_code, url)
    except requests.RequestException:
        # 输出错误情况
        logging.error('error occured while scraping %s', url, exc_info=True)

def scrape_index(page):
    # 索引页面
    index_url = f'{BASE_URL}/page/{page}'
    # 进行抓取
    return scrape_page(index_url)

def scrape_detail(url):
    # 抓取页面内容
    return scrape_page(url)

# 对网页数据进行解析
def parse_index(html):
    soup = BeautifulSoup(html, 'lxml')
    # 获取url
    items = soup.find_all(class_='name')
    
    # 如果没有值
    if not items:
        return []
    
    # 存在值
    for item in items:
        # 对链接进行拼接
        detail_url = urljoin(BASE_URL, item['href'])
        # 输出生成的链接
        logging.info('get detail url %s', detail_url)
        # 生成器函数
        yield detail_url

# 对详情网页进行解析
def parse_detail(html):
    soup = BeautifulSoup(html, 'lxml')

    # 标题
    name = soup.find(name="h2").string if soup.find(name="h2") else None
    # 图片
    cover = soup.find(class_='cover')['src'] if soup.find(class_='cover') \
            else None
    # 分类
    categories = []
    # 获取分类
    categories_ele = soup.find(class_='categories')
    # 如果存在
    if categories_ele != -1:
        # 查找内部节点
        for category in categories_ele:
            if category.find('span') != -1:
                category = category.find('span').string
                categories.append(category)
    # 简介
    drama = soup.select('div.drama p')[0].string.strip() if soup.select('div.drama p') \
            else None
    # 上映时间
    published_at = re.compile('(\d{4}-\d{2}-\d{2}) 上映')
    published = re.search(published_at, html).group(1) if re.search(published_at, html) \
                else None
    # 评分
    score = soup.find(class_='score').string.strip() if soup.find(class_='score') \
            else None
    
    return {
        'name': name,
        'cover': cover,
        'published': published,
        'categories': categories,
        'score': score,
        'drama': drama
    }

# 对获取的内容进行存储
# 本次存储使用csv
def save_data(data_path ,data):
    # 初始化字典写入对象
    with open(data_path, 'a+', encoding='utf-8') as csvfile:
        fieldnames = ['name', 'cover', 'published', 'categories', 'score', 'drama']  
        # 初始化一个字典写入对象
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writerow(data)

# 主函数
def main():
    # 数据存放地址
    data_path = '{0}/movies.csv'.format(RESULT_DIR)
     # 初始化一个写入对象
    with open(data_path, 'w', encoding='utf-8') as csvfile:
        fieldnames = ['name', 'cover', 'published', 'categories', 'score', 'drama']  
        # 初始化一个字典写入对象
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
    
    for page_index in range(1, TOTAL_PAGE + 1):
        # 获取列表页
        page_html = scrape_index(page=page_index)
        # 获取详情页的urls
        detail_urls = parse_index(page_html)        
        for detail_url in detail_urls:
            # 抓取详情页内容
            detail_html = scrape_detail(detail_url)
            # 获取数据
            data = parse_detail(detail_html)
            # 获取信息
            logging.info('get detail data %s', data)
            save_data(data=data, data_path=data_path)
            

if __name__ == '__main__':
    main()

parsel库抓取

import re
import logging
import pymongo

from parsel import Selector
from urllib.request import urlopen
from urllib.parse import urljoin


logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s: %(message)s')

# 常量
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10

# MONGO
# 链接地址
MONGO_CLIENT_STRING = 'mongodb://localhost:27017'
# 链接数据库
MONGO_DB = 'spiders'
# 链接集合
MONGO_COLLECTION = 'movies'

# 数据库连接与配置
client = pymongo.MongoClient(MONGO_CLIENT_STRING)
db = client[MONGO_DB]
collection = db[MONGO_COLLECTION]

# 进行页面的抓取
# 构建一个抓取页面的函数
def scrape_page(url):
    logging.info('scraping %s ...', url)
    # 异常捕获操作
    try:
        # 进行请求
        response = urlopen(url=url)
        # 判断请求码
        if response.status == 200:
            # 输出网页内容
            return response.read().decode('utf-8')
        # 错误请求
        logging.error('get invaild status code %s while scraping %s',
                      response.status, url)
    except:
        # 错误请求
        logging.error('error occured while scraping %s', url, exc_info=True)

# 获取列表页网页
def scarpe_index(page):
    index_url = f'{BASE_URL}/page/{page}'
    return scrape_page(index_url)

# 获取详情页网页
def scrape_detail(url):
    return scrape_page(url=url)

# 进行页面的解析

# 获取列表页的数据
def parse_index(html):
    selector = Selector(html)
    items = selector.css('a.name::attr(href)').getall()
    
    if not items:
        return []

    for item in items:
        # 对链接进行拼接
        detail_url = urljoin(BASE_URL, item)
        # 输出生成的链接
        logging.info('get detail url %s', detail_url)
        # 生成器函数
        yield detail_url


# 获取详情页的数据
def parse_detail(html):
    selector = Selector(text=html)

    # 标题
    name = selector.xpath('//h2[@class="m-b-sm"]/text()').get() if selector.xpath('//h2[@class="m-b-sm"]/text()').get() else None
    # 图片
    cover = selector.css('img.cover::attr(src)').get() if selector.css('img.cover::attr(src)').get() else None
    # 分类
    categories = selector.xpath('//button[contains(@class,"category")]/span/text()').getall() \
                if selector.xpath('//button[contains(@class,"category")]/span/text()').getall() \
                else None
    # 简介
    drama = selector.xpath('//div[@class="drama"]/p/text()').get().strip() \
            if selector.xpath('//div[@class="drama"]/p/text()').get() \
            else None
    # 出版日期
    published_at = re.compile('(\d{4}-\d{2}-\d{2}) 上映')
    published = re.search(published_at, html).group(1) if re.search(published_at, html) \
                else None
    # 分数
    score = selector.xpath('//p[contains(@class, "score")]/text()').get().strip() \
            if selector.xpath('//p[contains(@class, "score")]/text()').get() \
            else None
    
    return {
        'name': name,
        'cover': cover,
        'categories': categories,
        'drama': drama,
        'published': published,
        'score': score
    }

# 进行数据内容的存储
# 使用MongoDB进行数据存储
def save_data(data):
    collection.update_one(
        {'name': data.get('name')},
        {'$set': data},
        upsert=True
    )

# 主函数
def main():
    for page in range(1, TOTAL_PAGE + 1):
        # 抓取详情页
        index_html = scarpe_index(page)
        # 获取urls
        detail_urls = parse_index(index_html)
        # 抓取每个详情页里面的内容
        for detail_url in detail_urls:
            # 获取详情页内容
            detail_html = scrape_detail(detail_url)
            # 解析详情页内容获取数据
            data = parse_detail(detail_html)
            # 将最终的数据输出
            logging.info('get detail data %s', data)
            # 保存数据
            save_data(data=data)

if __name__ == '__main__':
    main()

版权信息

本文由PorterZhang整理或写作完成 本人的Github: PorterZhang2021 本人的博客地址:PorterZhang