likes
comments
collection
share

全国空气质量爬取实战

作者站长头像
站长
· 阅读数 21

前言

Hi 许久不见,由于最近一直在搞课题所以成功从一个日更作者变成了一个周更作者,惭愧惭愧,不过值得庆幸的是目前已经快取得阶段性进展了,至少毕业论文是由着落了,而且还是一个正个八经的科研项目,而不是某某系统开发设计文档。

ok,我们先来看看我们实战爬取的效果。

爬取之后我们会将数据保存到excel里面,至于那个date文件夹是你自己设置的。

全国空气质量爬取实战

然后里面的数据是这样的

全国空气质量爬取实战

我们的整个的项目结构是这样的:

全国空气质量爬取实战

Pojo 实体类

这个东西咋说咧,是这个Java留下了的毛病,不过的因为涉及到保存操作,我觉得定义这样的一个类的话方面后面的数据存储。


import datetime


class AIRPojo(object):
    __AQI = None
    __PM25 = None
    __CO = None
    __SO2=None
    __PM10 = None
    __O3 = None
    __NO2 = None
    __City = None
    __Province=None
    __COUNT = 0
    createTime = str(datetime.datetime.now().strftime("%Y-%m-%d-%X"))


    def get_AQI(self):

        if (self.__AQI):
            return self.__AQI

    def get_PM25(self):
        if (self.__PM25):
            return self.__PM25

    def get_CO(self):
        if (self.__CO):
            return self.__CO

    def get_SO2(self):
        if(self.__SO2):
            return self.__SO2

    def get_PM10(self):
        if (self.__PM10):
            return self.__PM10

    def get_O3(self):
        if (self.__O3):
            return self.__O3

    def get_NO2(self):
        if (self.__NO2):
            return self.__NO2

    def get_City(self):
        if(self.__City):
            return self.__City

    def get_Province(self):
        if(self.__Province):
            return self.__Province

    def set_AQI(self, AQI):
        if(self.__COUNT==0):
            self.__AQI = AQI
            self.__COUNT+=1
    def set_PM25(self, PM25):
        if(self.__COUNT==1):
            self.__PM25 = PM25
            self.__COUNT+=1
    def set_CO(self, CO):
        if(self.__COUNT==2):
            self.__CO = CO
            self.__COUNT+=1
    def set_SO2(self,SO2):
        if(self.__COUNT==3):
            self.__SO2=SO2
            self.__COUNT+=1
    def set_PM10(self, PM10):
        if(self.__COUNT==4):
            self.__PM10 = PM10
            self.__COUNT+=1
    def set_O3(self, O3):
        if(self.__COUNT==5):
            self.__O3 = O3
            self.__COUNT+=1
    def set_NO2(self, NO2):
        if(self.__COUNT==6):
            self.__NO2 = NO2
            self.__COUNT+=1
    def set_City(self,City):
        if(self.__COUNT==7):
            self.__City = City
            self.__COUNT+=1

    def set_Province(self,Province):
        if(self.__COUNT==8):
            self.__Province = Province
            self.__COUNT+=1

    def __str__(self):
        if(self.__COUNT>=8):
            return "AQI:"+self.__AQI+"-PM2.5:"+self.__PM25+"-CO:"+self.__CO+"-SO2:"+self.__SO2+"-PM10:"+self.__PM10+"-O3:"+self.__O3+"-NO2:"+self.__NO2+"-city:"+self.__City+"-Province"+self.__Province
        else:
            return "数据未保存完毕,无法输出全部结果"

if __name__ == '__main__':
    air = AIRPojo()
    print(air)

这个类是没有啥特殊的,他就是来封装数据的。

设置

接下来是我们的设置,这个部分的话其实也没有啥,这边其实是对一些设置进行了提取。 方便统一调整。



#日志等级,ALL,INFO,NONE
LOG_LEVEL="ALL"

#空气质量保存位置

AIRQualitySavePath="./date"

#记录当前空气质量保存进度
AIRQUALITYSAVECURRENT = 0
AIRQUALITYTOTAL = 0

不过在这款集成的需要设置的配置不多。

爬虫实现

这个爬虫的实现的话,稍微复杂一点。首先是咱们的异步请求

爬取流程

在这里的话其实爬取的流程还是很简单的

全国空气质量爬取实战

爬取请求

搞清楚了这个就知道咱们的爬取请求怎么做了


def get_provinces(self):
    response = requests.get(url=self.rootUrl)
    response_data = response.content.decode(self.encoding)
    html = self.parse.HTML(response_data)
    Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")

    for Province in Provinces:
        temp = list()
        Province_citys_link = Province.xpath("""./dd/a/@href""")
        Province_citys_name = Province.xpath("""./dd/a/text()""")
        for city_link,city_name in zip(Province_citys_link,Province_citys_name):
            temp.append((self.baseUrl+city_link,city_name))

        province_name = Province.xpath("./dt/b/text()")[0]
        self.all_provinces[province_name] = temp
        save_model = QualitySaveModel(province_name,len(temp))

        self.save_all_models[province_name] = save_model
    if(LOG_LEVEL=="ALL" or LOG_LEVEL=="INFO"):
        print("初始化完成,已得到所有省份")

#这里做一个内部切面来做数据保存工作
def parse_city_quality(self,task):
    if(task.result()):
        data,province,city,url= task.result()
        html = self.parse.HTML(data)
        airPojo= AIRPojo()

        AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()

        airPojo.set_AQI(AQI)
        ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")

        airPojo.set_PM25(ALL_Info[0].split(":")[1].strip())
        airPojo.set_CO(ALL_Info[1].split(":")[1].strip())
        airPojo.set_SO2(ALL_Info[2].split(":")[1].strip())
        airPojo.set_PM10(ALL_Info[3].split(":")[1].strip())
        airPojo.set_O3(ALL_Info[4].split(":")[1].strip())
        airPojo.set_NO2(ALL_Info[5].split(":")[1].strip())
        airPojo.set_City(city)
        airPojo.set_Province(province)
        self.TIMES+=1 #这里完成一个记录,说明此时的这个城市的天气质量被我们获取到了
        if(LOG_LEVEL=="ALL"):
            print("当前完成任务",self.TIMES,airPojo,url)

        #保存文件
        QualitySave.SaveQuality(airPojo,self.save_all_models)
    else:
        pass
async def fireCity(self,province,url,city,semaphore):
    #传递四个参数,一个是当前爬取的省份,一个是需要爬取的url接口,city最后是那个池的限制大小

    async with semaphore:
        timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url,timeout=timeout) as response:
                    data = await response.text(encoding=self.encoding)

                    return (data,province,city,url)
        except Exception as e:

            self.timeout_task.append((province,url,city,semaphore))

这部分就是前面两个步骤,请求主页面,然后解析对应的省份的城市,让创建异步请求,之后让异步请求进行处理。

超时处理

不过值得一提的是,我们的超时处理。


def check(self):


    while(self.timeout_task):
        if(LOG_LEVEL=="ALL"):
            print("正在处理超时连接",len(self.timeout_task))
        tasks = []
        while(self.timeout_task):
            province, url, city, semaphore = self.timeout_task.pop(0)
            c = self.fireCity(province, url, city, semaphore)
            task = asyncio.ensure_future(c)
            task.add_done_callback(self.parse_city_quality)
            tasks.append(task)
        self.loop.run_until_complete(asyncio.wait(tasks))

主要是这个玩意,当有些任务进入超时状态的时候,任务相关的消息就会被存储起来,然后重新创建异步请求。

线程处理

这里主要是这个原因,一开始我设计这个爬虫的目的其实是要配合一个GUI组件的,所以的话是有可能在一个UI线程里面执行的,所以要做一个线程安全处理。

这里面做一个判断

def __now_thread(self)->bool:
    #判断当前的线程是否为主线程
    current_name = threading.current_thread().getName()
    if(current_name=="MainThread"):
        return True
    return False

如果不是的话就像下面那样处理

if (self.__now_thread()):
    self.loop = asyncio.get_event_loop()
    semaphore = asyncio.Semaphore(self.PoolSize)
else:
    self.loop = asyncio.new_event_loop()
    asyncio.set_event_loop(self.loop)
    semaphore = asyncio.Semaphore(self.PoolSize)

完整代码

import time
import aiohttp
import asyncio
import requests
from lxml import etree
import threading
from Spider.pojo.AIRPojo import AIRPojo
from Spider.Setting.Settings import *
from Spider.Utils.QualitySaveModel import QualitySaveModel
from Spider.Utils.QualitySave import QualitySave
class AIR_Quality(object):
    def __init__(self):
        self.all_provinces={}
        self.headers = {

        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36"
    }
        self.parse = etree
        self.encoding = "gbk"
        self.baseUrl = "http://www.tianqihoubao.com"
        self.rootUrl = "http://www.tianqihoubao.com/aqi"
        self.TIMES = 0
        self.PoolSize = 500 #设置系统上限500
        self.tasks = []  # 任务队列
        self.timeout_task = [] #超时队列
        self.loop = None
        self.TasKLength = 0

        self.save_all_models={}

    def __now_thread(self)->bool:
        #判断当前的线程是否为主线程
        current_name = threading.current_thread().getName()
        if(current_name=="MainThread"):
            return True
        return False


    def get_provinces(self):
        response = requests.get(url=self.rootUrl)
        response_data = response.content.decode(self.encoding)
        html = self.parse.HTML(response_data)
        Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")

        for Province in Provinces:
            temp = list()
            Province_citys_link = Province.xpath("""./dd/a/@href""")
            Province_citys_name = Province.xpath("""./dd/a/text()""")
            for city_link,city_name in zip(Province_citys_link,Province_citys_name):
                temp.append((self.baseUrl+city_link,city_name))

            province_name = Province.xpath("./dt/b/text()")[0]
            self.all_provinces[province_name] = temp
            save_model = QualitySaveModel(province_name,len(temp))

            self.save_all_models[province_name] = save_model
        if(LOG_LEVEL=="ALL" or LOG_LEVEL=="INFO"):
            print("初始化完成,已得到所有省份")

    #这里做一个内部切面来做数据保存工作
    def parse_city_quality(self,task):
        if(task.result()):
            data,province,city,url= task.result()
            html = self.parse.HTML(data)
            airPojo= AIRPojo()

            AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()

            airPojo.set_AQI(AQI)
            ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")

            airPojo.set_PM25(ALL_Info[0].split(":")[1].strip())
            airPojo.set_CO(ALL_Info[1].split(":")[1].strip())
            airPojo.set_SO2(ALL_Info[2].split(":")[1].strip())
            airPojo.set_PM10(ALL_Info[3].split(":")[1].strip())
            airPojo.set_O3(ALL_Info[4].split(":")[1].strip())
            airPojo.set_NO2(ALL_Info[5].split(":")[1].strip())
            airPojo.set_City(city)
            airPojo.set_Province(province)
            self.TIMES+=1 #这里完成一个记录,说明此时的这个城市的天气质量被我们获取到了
            if(LOG_LEVEL=="ALL"):
                print("当前完成任务",self.TIMES,airPojo,url)

            #保存文件
            QualitySave.SaveQuality(airPojo,self.save_all_models)
        else:
            pass
    async def fireCity(self,province,url,city,semaphore):
        #传递四个参数,一个是当前爬取的省份,一个是需要爬取的url接口,city最后是那个池的限制大小

        async with semaphore:
            timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(url,timeout=timeout) as response:
                        data = await response.text(encoding=self.encoding)

                        return (data,province,city,url)
            except Exception as e:

                self.timeout_task.append((province,url,city,semaphore))

    def check(self):


        while(self.timeout_task):
            if(LOG_LEVEL=="ALL"):
                print("正在处理超时连接",len(self.timeout_task))
            tasks = []
            while(self.timeout_task):
                province, url, city, semaphore = self.timeout_task.pop(0)
                c = self.fireCity(province, url, city, semaphore)
                task = asyncio.ensure_future(c)
                task.add_done_callback(self.parse_city_quality)
                tasks.append(task)
            self.loop.run_until_complete(asyncio.wait(tasks))

    def run(self):
        global AIRQUALITYTOTAL
        start = time.time()

        if(not self.all_provinces):
            self.get_provinces()

        semaphore = None

        if (self.__now_thread()):
            self.loop = asyncio.get_event_loop()
            semaphore = asyncio.Semaphore(self.PoolSize)
        else:
            self.loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self.loop)
            semaphore = asyncio.Semaphore(self.PoolSize)

        #创建异步队列
        for province in self.all_provinces.keys():

            citys_info= self.all_provinces.get(province)
            for city_info in citys_info:
                url_marks,city = city_info
                url = ""
                for url_mark in url_marks.split():
                    url+=url_mark
                c = self.fireCity(province,url,city,semaphore)

                task = asyncio.ensure_future(c)
                task.add_done_callback(self.parse_city_quality)
                self.tasks.append(task)
        self.TasKLength = len(self.tasks)
        AIRQUALITYTOTAL = self.TasKLength

        self.loop.run_until_complete(asyncio.wait(self.tasks))

        self.check()
        if(LOG_LEVEL=="ALL" or LOG_LEVEL=="NONE"):
            print("耗时:", time.time() - start, "秒")
            print("任务总量:",self.TasKLength)
            print("执行完毕量",self.TIMES,"剩余超时任务:",len(self.timeout_task))

        self.loop.close()
if __name__ == '__main__':
    start = time.time()
    air_quality = AIR_Quality()
    air_quality.get_provinces()
    # print(air_quality.all_provinces)
    air_quality.run()

存储与解析

之后是咱们这个部分了。

这个部分的话其实也没啥。首先解析部分在爬虫里面做了,然后封装到了一个pojo对象里面。

全国空气质量爬取实战

我们这边要做的其实就是把这个对象保存起来。

import datetime
import os

from Spider.Setting.Settings import *
import xlwt


class QualitySaveModel(object):


    def __init__(self,province:str,total:int):
        #用于空气质量的保存
        #保存的表名
        self.name = str(datetime.datetime.now().strftime("%Y-%m-%d-%X")).replace(":",".")+province
        #保存的表
        self.save_boke = xlwt.Workbook(encoding='utf-8', style_compression=0)
        #保存的sheet
        self.sheet_boke = self.save_boke.add_sheet(self.name, cell_overwrite_ok=True)
        #总数
        self.total = total
        #当前已经保存的数量
        self.current_row = 0

        self.cols = ["Province","City","时间","AQI","PM10","PM2.5","CO","NO2","SO2","O3"]
        for i in range(len(self.cols)):
            #添加字段
            self.sheet_boke.write(0, i, self.cols[i])

    def save(self):
        if(self.current_row>=self.total):
            path_root = AIRQualitySavePath
            if(not os.path.exists(path_root)):
                os.makedirs(path_root)
            path = path_root+"/"+self.name+".xls"
            self.save_boke.save(path)
            if (LOG_LEVEL == "ALL" or LOG_LEVEL == "INFO"):
                print(path)
    def __str__(self):
        return "这是一个excel存储对象"

然后去调用就完了

from Spider.pojo.AIRPojo import AIRPojo
from Spider.Utils.QualitySaveModel import QualitySaveModel
from Spider.Setting.Settings import *
class QualitySave(object):
    @staticmethod

    def SaveQuality(data,savemodels):
        global AIRQUALITYSAVECURRENT

        savemodel = savemodels.get(data.get_Province())

        savemodel.current_row+=1
        savemodel.sheet_boke.write(savemodel.current_row,0,data.get_Province())
        savemodel.sheet_boke.write(savemodel.current_row,1,data.get_City())
        savemodel.sheet_boke.write(savemodel.current_row, 2, data.createTime)
        savemodel.sheet_boke.write(savemodel.current_row, 3, data.get_AQI())
        savemodel.sheet_boke.write(savemodel.current_row, 4, data.get_PM10())
        savemodel.sheet_boke.write(savemodel.current_row, 5, data.get_PM25())
        savemodel.sheet_boke.write(savemodel.current_row, 6, data.get_CO())
        savemodel.sheet_boke.write(savemodel.current_row, 7, data.get_NO2())
        savemodel.sheet_boke.write(savemodel.current_row, 8, data.get_SO2())
        savemodel.sheet_boke.write(savemodel.current_row, 9, data.get_O3())

        if(LOG_LEVEL=="ALL"):
            print(data.get_City(),"已写入至表中")

        savemodel.save()#保存


        AIRQUALITYSAVECURRENT +=1

之后调用的话其实在爬虫部分你应该也见到了。

最后 让爬虫运行就完了。

其实一开始我是有想像spider一样的,做个提取,封装,架构,但是最后嫌麻烦就这样了,写个python搞得和Java一样太累人了。