全国空气质量爬取实战
前言
Hi 许久不见,由于最近一直在搞课题所以成功从一个日更作者变成了一个周更作者,惭愧惭愧,不过值得庆幸的是目前已经快取得阶段性进展了,至少毕业论文是由着落了,而且还是一个正个八经的科研项目,而不是某某系统开发设计文档。
ok,我们先来看看我们实战爬取的效果。
爬取之后我们会将数据保存到excel里面,至于那个date文件夹是你自己设置的。
然后里面的数据是这样的
我们的整个的项目结构是这样的:
Pojo 实体类
这个东西咋说咧,是这个Java留下了的毛病,不过的因为涉及到保存操作,我觉得定义这样的一个类的话方面后面的数据存储。
import datetime
class AIRPojo(object):
__AQI = None
__PM25 = None
__CO = None
__SO2=None
__PM10 = None
__O3 = None
__NO2 = None
__City = None
__Province=None
__COUNT = 0
createTime = str(datetime.datetime.now().strftime("%Y-%m-%d-%X"))
def get_AQI(self):
if (self.__AQI):
return self.__AQI
def get_PM25(self):
if (self.__PM25):
return self.__PM25
def get_CO(self):
if (self.__CO):
return self.__CO
def get_SO2(self):
if(self.__SO2):
return self.__SO2
def get_PM10(self):
if (self.__PM10):
return self.__PM10
def get_O3(self):
if (self.__O3):
return self.__O3
def get_NO2(self):
if (self.__NO2):
return self.__NO2
def get_City(self):
if(self.__City):
return self.__City
def get_Province(self):
if(self.__Province):
return self.__Province
def set_AQI(self, AQI):
if(self.__COUNT==0):
self.__AQI = AQI
self.__COUNT+=1
def set_PM25(self, PM25):
if(self.__COUNT==1):
self.__PM25 = PM25
self.__COUNT+=1
def set_CO(self, CO):
if(self.__COUNT==2):
self.__CO = CO
self.__COUNT+=1
def set_SO2(self,SO2):
if(self.__COUNT==3):
self.__SO2=SO2
self.__COUNT+=1
def set_PM10(self, PM10):
if(self.__COUNT==4):
self.__PM10 = PM10
self.__COUNT+=1
def set_O3(self, O3):
if(self.__COUNT==5):
self.__O3 = O3
self.__COUNT+=1
def set_NO2(self, NO2):
if(self.__COUNT==6):
self.__NO2 = NO2
self.__COUNT+=1
def set_City(self,City):
if(self.__COUNT==7):
self.__City = City
self.__COUNT+=1
def set_Province(self,Province):
if(self.__COUNT==8):
self.__Province = Province
self.__COUNT+=1
def __str__(self):
if(self.__COUNT>=8):
return "AQI:"+self.__AQI+"-PM2.5:"+self.__PM25+"-CO:"+self.__CO+"-SO2:"+self.__SO2+"-PM10:"+self.__PM10+"-O3:"+self.__O3+"-NO2:"+self.__NO2+"-city:"+self.__City+"-Province"+self.__Province
else:
return "数据未保存完毕,无法输出全部结果"
if __name__ == '__main__':
air = AIRPojo()
print(air)
这个类是没有啥特殊的,他就是来封装数据的。
设置
接下来是我们的设置,这个部分的话其实也没有啥,这边其实是对一些设置进行了提取。 方便统一调整。
#日志等级,ALL,INFO,NONE
LOG_LEVEL="ALL"
#空气质量保存位置
AIRQualitySavePath="./date"
#记录当前空气质量保存进度
AIRQUALITYSAVECURRENT = 0
AIRQUALITYTOTAL = 0
不过在这款集成的需要设置的配置不多。
爬虫实现
这个爬虫的实现的话,稍微复杂一点。首先是咱们的异步请求
爬取流程
在这里的话其实爬取的流程还是很简单的
爬取请求
搞清楚了这个就知道咱们的爬取请求怎么做了
def get_provinces(self):
response = requests.get(url=self.rootUrl)
response_data = response.content.decode(self.encoding)
html = self.parse.HTML(response_data)
Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")
for Province in Provinces:
temp = list()
Province_citys_link = Province.xpath("""./dd/a/@href""")
Province_citys_name = Province.xpath("""./dd/a/text()""")
for city_link,city_name in zip(Province_citys_link,Province_citys_name):
temp.append((self.baseUrl+city_link,city_name))
province_name = Province.xpath("./dt/b/text()")[0]
self.all_provinces[province_name] = temp
save_model = QualitySaveModel(province_name,len(temp))
self.save_all_models[province_name] = save_model
if(LOG_LEVEL=="ALL" or LOG_LEVEL=="INFO"):
print("初始化完成,已得到所有省份")
#这里做一个内部切面来做数据保存工作
def parse_city_quality(self,task):
if(task.result()):
data,province,city,url= task.result()
html = self.parse.HTML(data)
airPojo= AIRPojo()
AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()
airPojo.set_AQI(AQI)
ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")
airPojo.set_PM25(ALL_Info[0].split(":")[1].strip())
airPojo.set_CO(ALL_Info[1].split(":")[1].strip())
airPojo.set_SO2(ALL_Info[2].split(":")[1].strip())
airPojo.set_PM10(ALL_Info[3].split(":")[1].strip())
airPojo.set_O3(ALL_Info[4].split(":")[1].strip())
airPojo.set_NO2(ALL_Info[5].split(":")[1].strip())
airPojo.set_City(city)
airPojo.set_Province(province)
self.TIMES+=1 #这里完成一个记录,说明此时的这个城市的天气质量被我们获取到了
if(LOG_LEVEL=="ALL"):
print("当前完成任务",self.TIMES,airPojo,url)
#保存文件
QualitySave.SaveQuality(airPojo,self.save_all_models)
else:
pass
async def fireCity(self,province,url,city,semaphore):
#传递四个参数,一个是当前爬取的省份,一个是需要爬取的url接口,city最后是那个池的限制大小
async with semaphore:
timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url,timeout=timeout) as response:
data = await response.text(encoding=self.encoding)
return (data,province,city,url)
except Exception as e:
self.timeout_task.append((province,url,city,semaphore))
这部分就是前面两个步骤,请求主页面,然后解析对应的省份的城市,让创建异步请求,之后让异步请求进行处理。
超时处理
不过值得一提的是,我们的超时处理。
def check(self):
while(self.timeout_task):
if(LOG_LEVEL=="ALL"):
print("正在处理超时连接",len(self.timeout_task))
tasks = []
while(self.timeout_task):
province, url, city, semaphore = self.timeout_task.pop(0)
c = self.fireCity(province, url, city, semaphore)
task = asyncio.ensure_future(c)
task.add_done_callback(self.parse_city_quality)
tasks.append(task)
self.loop.run_until_complete(asyncio.wait(tasks))
主要是这个玩意,当有些任务进入超时状态的时候,任务相关的消息就会被存储起来,然后重新创建异步请求。
线程处理
这里主要是这个原因,一开始我设计这个爬虫的目的其实是要配合一个GUI组件的,所以的话是有可能在一个UI线程里面执行的,所以要做一个线程安全处理。
这里面做一个判断
def __now_thread(self)->bool:
#判断当前的线程是否为主线程
current_name = threading.current_thread().getName()
if(current_name=="MainThread"):
return True
return False
如果不是的话就像下面那样处理
if (self.__now_thread()):
self.loop = asyncio.get_event_loop()
semaphore = asyncio.Semaphore(self.PoolSize)
else:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
semaphore = asyncio.Semaphore(self.PoolSize)
完整代码
import time
import aiohttp
import asyncio
import requests
from lxml import etree
import threading
from Spider.pojo.AIRPojo import AIRPojo
from Spider.Setting.Settings import *
from Spider.Utils.QualitySaveModel import QualitySaveModel
from Spider.Utils.QualitySave import QualitySave
class AIR_Quality(object):
def __init__(self):
self.all_provinces={}
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36"
}
self.parse = etree
self.encoding = "gbk"
self.baseUrl = "http://www.tianqihoubao.com"
self.rootUrl = "http://www.tianqihoubao.com/aqi"
self.TIMES = 0
self.PoolSize = 500 #设置系统上限500
self.tasks = [] # 任务队列
self.timeout_task = [] #超时队列
self.loop = None
self.TasKLength = 0
self.save_all_models={}
def __now_thread(self)->bool:
#判断当前的线程是否为主线程
current_name = threading.current_thread().getName()
if(current_name=="MainThread"):
return True
return False
def get_provinces(self):
response = requests.get(url=self.rootUrl)
response_data = response.content.decode(self.encoding)
html = self.parse.HTML(response_data)
Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")
for Province in Provinces:
temp = list()
Province_citys_link = Province.xpath("""./dd/a/@href""")
Province_citys_name = Province.xpath("""./dd/a/text()""")
for city_link,city_name in zip(Province_citys_link,Province_citys_name):
temp.append((self.baseUrl+city_link,city_name))
province_name = Province.xpath("./dt/b/text()")[0]
self.all_provinces[province_name] = temp
save_model = QualitySaveModel(province_name,len(temp))
self.save_all_models[province_name] = save_model
if(LOG_LEVEL=="ALL" or LOG_LEVEL=="INFO"):
print("初始化完成,已得到所有省份")
#这里做一个内部切面来做数据保存工作
def parse_city_quality(self,task):
if(task.result()):
data,province,city,url= task.result()
html = self.parse.HTML(data)
airPojo= AIRPojo()
AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()
airPojo.set_AQI(AQI)
ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")
airPojo.set_PM25(ALL_Info[0].split(":")[1].strip())
airPojo.set_CO(ALL_Info[1].split(":")[1].strip())
airPojo.set_SO2(ALL_Info[2].split(":")[1].strip())
airPojo.set_PM10(ALL_Info[3].split(":")[1].strip())
airPojo.set_O3(ALL_Info[4].split(":")[1].strip())
airPojo.set_NO2(ALL_Info[5].split(":")[1].strip())
airPojo.set_City(city)
airPojo.set_Province(province)
self.TIMES+=1 #这里完成一个记录,说明此时的这个城市的天气质量被我们获取到了
if(LOG_LEVEL=="ALL"):
print("当前完成任务",self.TIMES,airPojo,url)
#保存文件
QualitySave.SaveQuality(airPojo,self.save_all_models)
else:
pass
async def fireCity(self,province,url,city,semaphore):
#传递四个参数,一个是当前爬取的省份,一个是需要爬取的url接口,city最后是那个池的限制大小
async with semaphore:
timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url,timeout=timeout) as response:
data = await response.text(encoding=self.encoding)
return (data,province,city,url)
except Exception as e:
self.timeout_task.append((province,url,city,semaphore))
def check(self):
while(self.timeout_task):
if(LOG_LEVEL=="ALL"):
print("正在处理超时连接",len(self.timeout_task))
tasks = []
while(self.timeout_task):
province, url, city, semaphore = self.timeout_task.pop(0)
c = self.fireCity(province, url, city, semaphore)
task = asyncio.ensure_future(c)
task.add_done_callback(self.parse_city_quality)
tasks.append(task)
self.loop.run_until_complete(asyncio.wait(tasks))
def run(self):
global AIRQUALITYTOTAL
start = time.time()
if(not self.all_provinces):
self.get_provinces()
semaphore = None
if (self.__now_thread()):
self.loop = asyncio.get_event_loop()
semaphore = asyncio.Semaphore(self.PoolSize)
else:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
semaphore = asyncio.Semaphore(self.PoolSize)
#创建异步队列
for province in self.all_provinces.keys():
citys_info= self.all_provinces.get(province)
for city_info in citys_info:
url_marks,city = city_info
url = ""
for url_mark in url_marks.split():
url+=url_mark
c = self.fireCity(province,url,city,semaphore)
task = asyncio.ensure_future(c)
task.add_done_callback(self.parse_city_quality)
self.tasks.append(task)
self.TasKLength = len(self.tasks)
AIRQUALITYTOTAL = self.TasKLength
self.loop.run_until_complete(asyncio.wait(self.tasks))
self.check()
if(LOG_LEVEL=="ALL" or LOG_LEVEL=="NONE"):
print("耗时:", time.time() - start, "秒")
print("任务总量:",self.TasKLength)
print("执行完毕量",self.TIMES,"剩余超时任务:",len(self.timeout_task))
self.loop.close()
if __name__ == '__main__':
start = time.time()
air_quality = AIR_Quality()
air_quality.get_provinces()
# print(air_quality.all_provinces)
air_quality.run()
存储与解析
之后是咱们这个部分了。
这个部分的话其实也没啥。首先解析部分在爬虫里面做了,然后封装到了一个pojo对象里面。
我们这边要做的其实就是把这个对象保存起来。
import datetime
import os
from Spider.Setting.Settings import *
import xlwt
class QualitySaveModel(object):
def __init__(self,province:str,total:int):
#用于空气质量的保存
#保存的表名
self.name = str(datetime.datetime.now().strftime("%Y-%m-%d-%X")).replace(":",".")+province
#保存的表
self.save_boke = xlwt.Workbook(encoding='utf-8', style_compression=0)
#保存的sheet
self.sheet_boke = self.save_boke.add_sheet(self.name, cell_overwrite_ok=True)
#总数
self.total = total
#当前已经保存的数量
self.current_row = 0
self.cols = ["Province","City","时间","AQI","PM10","PM2.5","CO","NO2","SO2","O3"]
for i in range(len(self.cols)):
#添加字段
self.sheet_boke.write(0, i, self.cols[i])
def save(self):
if(self.current_row>=self.total):
path_root = AIRQualitySavePath
if(not os.path.exists(path_root)):
os.makedirs(path_root)
path = path_root+"/"+self.name+".xls"
self.save_boke.save(path)
if (LOG_LEVEL == "ALL" or LOG_LEVEL == "INFO"):
print(path)
def __str__(self):
return "这是一个excel存储对象"
然后去调用就完了
from Spider.pojo.AIRPojo import AIRPojo
from Spider.Utils.QualitySaveModel import QualitySaveModel
from Spider.Setting.Settings import *
class QualitySave(object):
@staticmethod
def SaveQuality(data,savemodels):
global AIRQUALITYSAVECURRENT
savemodel = savemodels.get(data.get_Province())
savemodel.current_row+=1
savemodel.sheet_boke.write(savemodel.current_row,0,data.get_Province())
savemodel.sheet_boke.write(savemodel.current_row,1,data.get_City())
savemodel.sheet_boke.write(savemodel.current_row, 2, data.createTime)
savemodel.sheet_boke.write(savemodel.current_row, 3, data.get_AQI())
savemodel.sheet_boke.write(savemodel.current_row, 4, data.get_PM10())
savemodel.sheet_boke.write(savemodel.current_row, 5, data.get_PM25())
savemodel.sheet_boke.write(savemodel.current_row, 6, data.get_CO())
savemodel.sheet_boke.write(savemodel.current_row, 7, data.get_NO2())
savemodel.sheet_boke.write(savemodel.current_row, 8, data.get_SO2())
savemodel.sheet_boke.write(savemodel.current_row, 9, data.get_O3())
if(LOG_LEVEL=="ALL"):
print(data.get_City(),"已写入至表中")
savemodel.save()#保存
AIRQUALITYSAVECURRENT +=1
之后调用的话其实在爬虫部分你应该也见到了。
最后 让爬虫运行就完了。
其实一开始我是有想像spider一样的,做个提取,封装,架构,但是最后嫌麻烦就这样了,写个python搞得和Java一样太累人了。
转载自:https://juejin.cn/post/7120123879783137288