异步爬取实战之爬取全国天气质量
前言
今天要做的呢是爬取这个全国的一个空气质量,别问为什么,问就是有项目需要。
需求分析
www.tianqihoubao.com/aqi/ 爬取这个网站的全部城市的信息。
点击进去之后,是这样的页面
我们要做的就是把这些个框起来的信息全部保存起来。
提取分析
现在明确了需求,那么接下来就是如何提取到这些信息。 首先我们需要分析首页获取全国城市。
获取全国城市
点击元素定位很快就能找到这玩意
于是我们此时可以这样编写我们的xpath表达式,把这些信息提取出来
首先我们先设计一下我们的首页的信息存储格式。
这里的话我们定义一个字典 存储的数据是这样的。
{"河北":[(xxx市,url),(),()],“江西”:[(),()]}
因为我们后面还要保存到excel表格里面,需要做一个分类,不过这一块,我们后面再说,不敢先写了,玩意需求变了我就完蛋了。
所以我们这边提取省份的名字
Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")
提取对应的城市名字和链接
Province_citys_link = Province.xpath("""./dd/a/@href""")
Province_citys_name = Province.xpath("""./dd/a/text()""")
提取质量信息
那么接下来就到了如何提取空气的质量问题了。
随便点击近一个链接去看看
www.tianqihoubao.com/aqi/shijiaz…
同样的我们先定位一下
很快我们就定位到了信息,并且我们展开可以发现其他的信息都藏在了div标签下面
那么接下来我们依然只需要提取就行了。
AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()
ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")
编码
那么接下来就是编码了。
这里的话由于是一个小项目嘛,所以我这里的项目结构是这样的
这里的Spider只是项目的一个module
信息存储
由于为了方便信息的解析和存储,这里我定义了一个pojo实体类,这样的话可以更好地存储我们的数据。
class AIRPojo(object):
__AQI = None
__PM25 = None
__CO = None
__SO2=None
__PM10 = None
__O3 = None
__NO2 = None
__City = None
__Province=None
__COUNT = 0
def get_AQI(self):
if (self.__AQI):
return self.__AQI
def get_PM25(self):
if (self.__PM25):
return self.__PM25
def get_CO(self):
if (self.__CO):
return self.__CO
def get_SO2(self):
if(self.__SO2):
return self.__SO2
def get_PM10(self):
if (self.__PM10):
return self.__PM10
def get_O3(self):
if (self.__O3):
return self.__O3
def get_NO2(self):
if (self.__NO2):
return self.__NO2
def get_City(self):
if(self.__City):
return self.__City
def get_Province(self):
if(self.__Province):
return self.__Province
def set_AQI(self, AQI):
if(self.__COUNT==0):
self.__AQI = AQI
self.__COUNT+=1
def set_PM25(self, PM25):
if(self.__COUNT==1):
self.__PM25 = PM25
self.__COUNT+=1
def set_CO(self, CO):
if(self.__COUNT==2):
self.__CO = CO
self.__COUNT+=1
def set_SO2(self,SO2):
if(self.__COUNT==3):
self.__SO2=SO2
self.__COUNT+=1
def set_PM10(self, PM10):
if(self.__COUNT==4):
self.__PM10 = PM10
self.__COUNT+=1
def set_O3(self, O3):
if(self.__COUNT==5):
self.__O3 = O3
self.__COUNT+=1
def set_NO2(self, NO2):
if(self.__COUNT==6):
self.__NO2 = NO2
self.__COUNT+=1
def set_City(self,City):
if(self.__COUNT==7):
self.__City = City
self.__COUNT+=1
def set_Province(self,Province):
if(self.__COUNT==8):
self.__Province = Province
self.__COUNT+=1
def __str__(self):
if(self.__COUNT>=8):
return "AQI:"+self.__AQI+"-PM2.5:"+self.__PM25+"-CO:"+self.__CO+"-SO2:"+self.__SO2+"-PM10:"+self.__PM10+"-O3:"+self.__O3+"-NO2:"+self.__NO2+"-city:"+self.__City+"-Province"+self.__Province
else:
return "数据未保存完毕,无法输出全部结果"
if __name__ == '__main__':
air = AIRPojo()
print(air)
这一点还是java用起来舒服,但是没办法谁让我看上人家的协程了呢。
日志输出
这个毕竟是一个项目,所以还是有一些设置文件的,当然现在我们就只有一个
当然由于这里的数据源是确定好了的,所以就没有必要写在配置里面了,因为如果你改了,我这也没法解析呀。
爬虫编写
首页城市爬取
首先我们要爬取全部信息的话,那么显然我们是需要那个先对首页进行解析的,这样才做全站爬取呀。
这里的话,由于只是爬取首页,所以这里的话我没有先写异步,而是直接写的requests,来做,顺便看一下这个网站访问的速度怎么样。毕竟我们这里不是调取api,而且对一个网站爬取,这样的话必然是会慢一点的。
这里的话就很简单了
def get_provinces(self):
response = requests.get(url=self.rootUrl)
response_data = response.content.decode(self.encoding)
html = self.parse.HTML(response_data)
Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")
for Province in Provinces:
temp = list()
Province_citys_link = Province.xpath("""./dd/a/@href""")
Province_citys_name = Province.xpath("""./dd/a/text()""")
for city_link,city_name in zip(Province_citys_link,Province_citys_name):
temp.append((self.baseUrl+city_link,city_name))
self.all_provinces[Province.xpath("./dt/b/text()")[0]] = temp
这样一来就把我想要的信息都保存了。同时也发现,这个网站爬取访问是确实慢,大概2秒左右才能出一个结果,那么这样一来279个任务是必然需要使用到协程,异步的,开线程,用线程池,资源消耗高后面还有个数据读写安全的问题,加个锁那直接完犊子,单核直接给你拉满,性能不存在的。
异步爬取
那么这个时候就需要异步了。 这里也是分两个部分嘛,一个是回调函数,一个是异步请求。
async def fireCity(self,province,url,city,semaphore):
#传递四个参数,一个是当前爬取的省份,一个是需要爬取的url接口,city最后是那个池的限制大小
async with semaphore:
timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url,timeout=timeout) as response:
data = await response.text(encoding=self.encoding)
return (data,province,city,url)
except Exception as e:
self.timeout_task.append((province,url,city,semaphore))
def parse_city_quality(self,task):
if(task.result()):
data,province,city,url= task.result()
html = self.parse.HTML(data)
airPojo= AIRPojo()
AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()
airPojo.set_AQI(AQI)
ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")
airPojo.set_PM25(ALL_Info[0].split(":")[1].strip())
airPojo.set_CO(ALL_Info[1].split(":")[1].strip())
airPojo.set_SO2(ALL_Info[2].split(":")[1].strip())
airPojo.set_PM10(ALL_Info[3].split(":")[1].strip())
airPojo.set_O3(ALL_Info[4].split(":")[1].strip())
airPojo.set_NO2(ALL_Info[5].split(":")[1].strip())
airPojo.set_City(city)
airPojo.set_Province(province)
self.TIMES+=1 #这里完成一个记录,说明此时的这个城市的天气质量被我们获取到了
if(LOG_LEVEL=="ALL"):
print("当前完成任务",self.TIMES,airPojo,url)
else:
pass
超时问题
前面我们说了这个访问是很慢的,那么这样一来必然会超时,那么这个时候就需要重新分配任务了。但是这个
它不支持动态添加任务,不然的话我们完全可以把超时的任务重新放在里面。
所以为了解决这个问题,我又搞了一个超时队列。
超时的时候,把这个放在这个队列里面。当我们的第一次任务执行玩后
查看这个有没有超时的,如果有
.
def check(self):
while(self.timeout_task):
if(LOG_LEVEL=="ALL"):
print("正在处理超时连接",len(self.timeout_task))
tasks = []
while(self.timeout_task):
province, url, city, semaphore = self.timeout_task.pop(0)
c = self.fireCity(province, url, city, semaphore)
task = asyncio.ensure_future(c)
task.add_done_callback(self.parse_city_quality)
tasks.append(task)
self.loop.run_until_complete(asyncio.wait(tasks))
再次构建任务。直到没有为止,然后这个超时任务队列也是我直接使用list实现的。
完整爬虫代码
import time
import aiohttp
import asyncio
import requests
from lxml import etree
import threading
from Spider.pojo.AIRPojo import AIRPojo
from Spider.Setting.Settings import *
class AIR_Quality(object):
def __init__(self):
self.all_provinces={}
self.headers = {
"Referer":"http://www.tianqihoubao.com/",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36"
}
self.parse = etree
self.encoding = "gbk"
self.baseUrl = "http://www.tianqihoubao.com"
self.rootUrl = "http://www.tianqihoubao.com/aqi"
self.TIMES = 0
self.PoolSize = 500 #设置系统上限500
self.tasks = [] # 任务队列
self.timeout_task = [] #超时队列
self.loop = None
self.TasKLength = 0
def __now_thread(self)->bool:
#判断当前的线程是否为主线程
current_name = threading.current_thread().getName()
if(current_name=="MainThread"):
return True
return False
def get_provinces(self):
response = requests.get(url=self.rootUrl)
response_data = response.content.decode(self.encoding)
html = self.parse.HTML(response_data)
Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")
for Province in Provinces:
temp = list()
Province_citys_link = Province.xpath("""./dd/a/@href""")
Province_citys_name = Province.xpath("""./dd/a/text()""")
for city_link,city_name in zip(Province_citys_link,Province_citys_name):
temp.append((self.baseUrl+city_link,city_name))
self.all_provinces[Province.xpath("./dt/b/text()")[0]] = temp
#这里做一个内部切面来做数据保存工作
def parse_city_quality(self,task):
if(task.result()):
data,province,city,url= task.result()
html = self.parse.HTML(data)
airPojo= AIRPojo()
AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()
airPojo.set_AQI(AQI)
ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")
airPojo.set_PM25(ALL_Info[0].split(":")[1].strip())
airPojo.set_CO(ALL_Info[1].split(":")[1].strip())
airPojo.set_SO2(ALL_Info[2].split(":")[1].strip())
airPojo.set_PM10(ALL_Info[3].split(":")[1].strip())
airPojo.set_O3(ALL_Info[4].split(":")[1].strip())
airPojo.set_NO2(ALL_Info[5].split(":")[1].strip())
airPojo.set_City(city)
airPojo.set_Province(province)
self.TIMES+=1 #这里完成一个记录,说明此时的这个城市的天气质量被我们获取到了
if(LOG_LEVEL=="ALL"):
print("当前完成任务",self.TIMES,airPojo,url)
else:
pass
async def fireCity(self,province,url,city,semaphore):
#传递四个参数,一个是当前爬取的省份,一个是需要爬取的url接口,city最后是那个池的限制大小
async with semaphore:
timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url,timeout=timeout) as response:
data = await response.text(encoding=self.encoding)
return (data,province,city,url)
except Exception as e:
self.timeout_task.append((province,url,city,semaphore))
def check(self):
while(self.timeout_task):
if(LOG_LEVEL=="ALL"):
print("正在处理超时连接",len(self.timeout_task))
tasks = []
while(self.timeout_task):
province, url, city, semaphore = self.timeout_task.pop(0)
c = self.fireCity(province, url, city, semaphore)
task = asyncio.ensure_future(c)
task.add_done_callback(self.parse_city_quality)
tasks.append(task)
self.loop.run_until_complete(asyncio.wait(tasks))
def run(self):
start = time.time()
if(not self.all_provinces):
self.get_provinces()
semaphore = None
if (self.__now_thread()):
self.loop = asyncio.get_event_loop()
semaphore = asyncio.Semaphore(self.PoolSize)
else:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
semaphore = asyncio.Semaphore(self.PoolSize)
#创建异步队列
for province in self.all_provinces.keys():
citys_info= self.all_provinces.get(province)
for city_info in citys_info:
url_marks,city = city_info
url = ""
for url_mark in url_marks.split():
url+=url_mark
c = self.fireCity(province,url,city,semaphore)
task = asyncio.ensure_future(c)
task.add_done_callback(self.parse_city_quality)
self.tasks.append(task)
self.TasKLength = len(self.tasks)
self.loop.run_until_complete(asyncio.wait(self.tasks))
self.check()
if(LOG_LEVEL=="ALL" or LOG_LEVEL=="NONE"):
print("耗时:", time.time() - start, "秒")
print("任务总量:",self.TasKLength)
print("执行完毕量",self.TIMES,"剩余超时任务:",len(self.timeout_task))
if __name__ == '__main__':
start = time.time()
air_quality = AIR_Quality()
air_quality.get_provinces()
# print(air_quality.all_provinces)
air_quality.run()
测试
数据ok,那么就先这样吧,算法蓝桥杯还没怎么刷呢,靠北啦~
转载自:https://juejin.cn/post/7074105217754595358