likes
comments
collection
share

异步爬取实战之爬取全国天气质量

作者站长头像
站长
· 阅读数 13

前言

今天要做的呢是爬取这个全国的一个空气质量,别问为什么,问就是有项目需要。

需求分析

www.tianqihoubao.com/aqi/ 爬取这个网站的全部城市的信息。 异步爬取实战之爬取全国天气质量 点击进去之后,是这样的页面 异步爬取实战之爬取全国天气质量 我们要做的就是把这些个框起来的信息全部保存起来。

提取分析

现在明确了需求,那么接下来就是如何提取到这些信息。 首先我们需要分析首页获取全国城市。

获取全国城市

点击元素定位很快就能找到这玩意 异步爬取实战之爬取全国天气质量 于是我们此时可以这样编写我们的xpath表达式,把这些信息提取出来 首先我们先设计一下我们的首页的信息存储格式。

这里的话我们定义一个字典 存储的数据是这样的。

{"河北":[(xxx市,url),(),()],“江西”:[(),()]}

因为我们后面还要保存到excel表格里面,需要做一个分类,不过这一块,我们后面再说,不敢先写了,玩意需求变了我就完蛋了。

所以我们这边提取省份的名字

Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")

提取对应的城市名字和链接

Province_citys_link = Province.xpath("""./dd/a/@href""")
Province_citys_name = Province.xpath("""./dd/a/text()""")

提取质量信息

那么接下来就到了如何提取空气的质量问题了。 随便点击近一个链接去看看 www.tianqihoubao.com/aqi/shijiaz… 异步爬取实战之爬取全国天气质量 同样的我们先定位一下 异步爬取实战之爬取全国天气质量 很快我们就定位到了信息,并且我们展开可以发现其他的信息都藏在了div标签下面 异步爬取实战之爬取全国天气质量 那么接下来我们依然只需要提取就行了。

AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()

ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")

编码

那么接下来就是编码了。 这里的话由于是一个小项目嘛,所以我这里的项目结构是这样的 异步爬取实战之爬取全国天气质量 这里的Spider只是项目的一个module

信息存储

由于为了方便信息的解析和存储,这里我定义了一个pojo实体类,这样的话可以更好地存储我们的数据。

class AIRPojo(object):
    __AQI = None
    __PM25 = None
    __CO = None
    __SO2=None
    __PM10 = None
    __O3 = None
    __NO2 = None
    __City = None
    __Province=None
    __COUNT = 0


    def get_AQI(self):

        if (self.__AQI):
            return self.__AQI

    def get_PM25(self):
        if (self.__PM25):
            return self.__PM25

    def get_CO(self):
        if (self.__CO):
            return self.__CO

    def get_SO2(self):
        if(self.__SO2):
            return self.__SO2

    def get_PM10(self):
        if (self.__PM10):
            return self.__PM10

    def get_O3(self):
        if (self.__O3):
            return self.__O3

    def get_NO2(self):
        if (self.__NO2):
            return self.__NO2

    def get_City(self):
        if(self.__City):
            return self.__City

    def get_Province(self):
        if(self.__Province):
            return self.__Province

    def set_AQI(self, AQI):
        if(self.__COUNT==0):
            self.__AQI = AQI
            self.__COUNT+=1
    def set_PM25(self, PM25):
        if(self.__COUNT==1):
            self.__PM25 = PM25
            self.__COUNT+=1
    def set_CO(self, CO):
        if(self.__COUNT==2):
            self.__CO = CO
            self.__COUNT+=1
    def set_SO2(self,SO2):
        if(self.__COUNT==3):
            self.__SO2=SO2
            self.__COUNT+=1
    def set_PM10(self, PM10):
        if(self.__COUNT==4):
            self.__PM10 = PM10
            self.__COUNT+=1
    def set_O3(self, O3):
        if(self.__COUNT==5):
            self.__O3 = O3
            self.__COUNT+=1
    def set_NO2(self, NO2):
        if(self.__COUNT==6):
            self.__NO2 = NO2
            self.__COUNT+=1
    def set_City(self,City):
        if(self.__COUNT==7):
            self.__City = City
            self.__COUNT+=1

    def set_Province(self,Province):
        if(self.__COUNT==8):
            self.__Province = Province
            self.__COUNT+=1

    def __str__(self):
        if(self.__COUNT>=8):
            return "AQI:"+self.__AQI+"-PM2.5:"+self.__PM25+"-CO:"+self.__CO+"-SO2:"+self.__SO2+"-PM10:"+self.__PM10+"-O3:"+self.__O3+"-NO2:"+self.__NO2+"-city:"+self.__City+"-Province"+self.__Province
        else:
            return "数据未保存完毕,无法输出全部结果"

if __name__ == '__main__':
    air = AIRPojo()
    print(air)


这一点还是java用起来舒服,但是没办法谁让我看上人家的协程了呢。

日志输出

这个毕竟是一个项目,所以还是有一些设置文件的,当然现在我们就只有一个 异步爬取实战之爬取全国天气质量 当然由于这里的数据源是确定好了的,所以就没有必要写在配置里面了,因为如果你改了,我这也没法解析呀。

爬虫编写

首页城市爬取

首先我们要爬取全部信息的话,那么显然我们是需要那个先对首页进行解析的,这样才做全站爬取呀。

这里的话,由于只是爬取首页,所以这里的话我没有先写异步,而是直接写的requests,来做,顺便看一下这个网站访问的速度怎么样。毕竟我们这里不是调取api,而且对一个网站爬取,这样的话必然是会慢一点的。

这里的话就很简单了

    def get_provinces(self):
        response = requests.get(url=self.rootUrl)
        response_data = response.content.decode(self.encoding)
        html = self.parse.HTML(response_data)
        Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")

        for Province in Provinces:
            temp = list()
            Province_citys_link = Province.xpath("""./dd/a/@href""")
            Province_citys_name = Province.xpath("""./dd/a/text()""")
            for city_link,city_name in zip(Province_citys_link,Province_citys_name):
                temp.append((self.baseUrl+city_link,city_name))

            self.all_provinces[Province.xpath("./dt/b/text()")[0]] = temp


这样一来就把我想要的信息都保存了。同时也发现,这个网站爬取访问是确实慢,大概2秒左右才能出一个结果,那么这样一来279个任务是必然需要使用到协程,异步的,开线程,用线程池,资源消耗高后面还有个数据读写安全的问题,加个锁那直接完犊子,单核直接给你拉满,性能不存在的。

异步爬取

那么这个时候就需要异步了。 这里也是分两个部分嘛,一个是回调函数,一个是异步请求。

 async def fireCity(self,province,url,city,semaphore):
        #传递四个参数,一个是当前爬取的省份,一个是需要爬取的url接口,city最后是那个池的限制大小

        async with semaphore:
            timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(url,timeout=timeout) as response:
                        data = await response.text(encoding=self.encoding)

                        return (data,province,city,url)
            except Exception as e:

                self.timeout_task.append((province,url,city,semaphore))
    def parse_city_quality(self,task):
        if(task.result()):
            data,province,city,url= task.result()
            html = self.parse.HTML(data)
            airPojo= AIRPojo()

            AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()

            airPojo.set_AQI(AQI)
            ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")

            airPojo.set_PM25(ALL_Info[0].split(":")[1].strip())
            airPojo.set_CO(ALL_Info[1].split(":")[1].strip())
            airPojo.set_SO2(ALL_Info[2].split(":")[1].strip())
            airPojo.set_PM10(ALL_Info[3].split(":")[1].strip())
            airPojo.set_O3(ALL_Info[4].split(":")[1].strip())
            airPojo.set_NO2(ALL_Info[5].split(":")[1].strip())
            airPojo.set_City(city)
            airPojo.set_Province(province)
            self.TIMES+=1 #这里完成一个记录,说明此时的这个城市的天气质量被我们获取到了
            if(LOG_LEVEL=="ALL"):
                print("当前完成任务",self.TIMES,airPojo,url)
        else:
            pass

超时问题

前面我们说了这个访问是很慢的,那么这样一来必然会超时,那么这个时候就需要重新分配任务了。但是这个 异步爬取实战之爬取全国天气质量 它不支持动态添加任务,不然的话我们完全可以把超时的任务重新放在里面。 所以为了解决这个问题,我又搞了一个超时队列。 异步爬取实战之爬取全国天气质量 超时的时候,把这个放在这个队列里面。当我们的第一次任务执行玩后 异步爬取实战之爬取全国天气质量 查看这个有没有超时的,如果有 .


    def check(self):
        while(self.timeout_task):
            if(LOG_LEVEL=="ALL"):
                print("正在处理超时连接",len(self.timeout_task))
            tasks = []
            while(self.timeout_task):
                province, url, city, semaphore = self.timeout_task.pop(0)
                c = self.fireCity(province, url, city, semaphore)
                task = asyncio.ensure_future(c)
                task.add_done_callback(self.parse_city_quality)
                tasks.append(task)
            self.loop.run_until_complete(asyncio.wait(tasks))

再次构建任务。直到没有为止,然后这个超时任务队列也是我直接使用list实现的。

完整爬虫代码

import time
import aiohttp
import asyncio
import requests
from lxml import etree
import threading
from Spider.pojo.AIRPojo import AIRPojo
from Spider.Setting.Settings import *

class AIR_Quality(object):
    def __init__(self):
        self.all_provinces={}
        self.headers = {
        "Referer":"http://www.tianqihoubao.com/",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36"
    }
        self.parse = etree
        self.encoding = "gbk"
        self.baseUrl = "http://www.tianqihoubao.com"
        self.rootUrl = "http://www.tianqihoubao.com/aqi"
        self.TIMES = 0
        self.PoolSize = 500 #设置系统上限500
        self.tasks = []  # 任务队列
        self.timeout_task = [] #超时队列
        self.loop = None
        self.TasKLength = 0
    def __now_thread(self)->bool:
        #判断当前的线程是否为主线程
        current_name = threading.current_thread().getName()
        if(current_name=="MainThread"):
            return True
        return False


    def get_provinces(self):
        response = requests.get(url=self.rootUrl)
        response_data = response.content.decode(self.encoding)
        html = self.parse.HTML(response_data)
        Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")

        for Province in Provinces:
            temp = list()
            Province_citys_link = Province.xpath("""./dd/a/@href""")
            Province_citys_name = Province.xpath("""./dd/a/text()""")
            for city_link,city_name in zip(Province_citys_link,Province_citys_name):
                temp.append((self.baseUrl+city_link,city_name))

            self.all_provinces[Province.xpath("./dt/b/text()")[0]] = temp


    #这里做一个内部切面来做数据保存工作
    def parse_city_quality(self,task):
        if(task.result()):
            data,province,city,url= task.result()
            html = self.parse.HTML(data)
            airPojo= AIRPojo()

            AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()

            airPojo.set_AQI(AQI)
            ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")

            airPojo.set_PM25(ALL_Info[0].split(":")[1].strip())
            airPojo.set_CO(ALL_Info[1].split(":")[1].strip())
            airPojo.set_SO2(ALL_Info[2].split(":")[1].strip())
            airPojo.set_PM10(ALL_Info[3].split(":")[1].strip())
            airPojo.set_O3(ALL_Info[4].split(":")[1].strip())
            airPojo.set_NO2(ALL_Info[5].split(":")[1].strip())
            airPojo.set_City(city)
            airPojo.set_Province(province)
            self.TIMES+=1 #这里完成一个记录,说明此时的这个城市的天气质量被我们获取到了
            if(LOG_LEVEL=="ALL"):
                print("当前完成任务",self.TIMES,airPojo,url)
        else:
            pass
    async def fireCity(self,province,url,city,semaphore):
        #传递四个参数,一个是当前爬取的省份,一个是需要爬取的url接口,city最后是那个池的限制大小

        async with semaphore:
            timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(url,timeout=timeout) as response:
                        data = await response.text(encoding=self.encoding)

                        return (data,province,city,url)
            except Exception as e:

                self.timeout_task.append((province,url,city,semaphore))

    def check(self):
        while(self.timeout_task):
            if(LOG_LEVEL=="ALL"):
                print("正在处理超时连接",len(self.timeout_task))
            tasks = []
            while(self.timeout_task):
                province, url, city, semaphore = self.timeout_task.pop(0)
                c = self.fireCity(province, url, city, semaphore)
                task = asyncio.ensure_future(c)
                task.add_done_callback(self.parse_city_quality)
                tasks.append(task)
            self.loop.run_until_complete(asyncio.wait(tasks))

    def run(self):

        start = time.time()

        if(not self.all_provinces):
            self.get_provinces()

        semaphore = None

        if (self.__now_thread()):
            self.loop = asyncio.get_event_loop()
            semaphore = asyncio.Semaphore(self.PoolSize)
        else:
            self.loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self.loop)
            semaphore = asyncio.Semaphore(self.PoolSize)

        #创建异步队列
        for province in self.all_provinces.keys():

            citys_info= self.all_provinces.get(province)
            for city_info in citys_info:
                url_marks,city = city_info
                url = ""
                for url_mark in url_marks.split():
                    url+=url_mark
                c = self.fireCity(province,url,city,semaphore)

                task = asyncio.ensure_future(c)
                task.add_done_callback(self.parse_city_quality)
                self.tasks.append(task)
        self.TasKLength = len(self.tasks)

        self.loop.run_until_complete(asyncio.wait(self.tasks))

        self.check()
        if(LOG_LEVEL=="ALL" or LOG_LEVEL=="NONE"):
            print("耗时:", time.time() - start, "秒")
            print("任务总量:",self.TasKLength)
            print("执行完毕量",self.TIMES,"剩余超时任务:",len(self.timeout_task))
if __name__ == '__main__':
    start = time.time()
    air_quality = AIR_Quality()
    air_quality.get_provinces()
    # print(air_quality.all_provinces)
    air_quality.run()



测试

异步爬取实战之爬取全国天气质量 异步爬取实战之爬取全国天气质量 异步爬取实战之爬取全国天气质量 数据ok,那么就先这样吧,算法蓝桥杯还没怎么刷呢,靠北啦~