如何不依赖框架,迅速写一个多线程爬虫
现在程序员非常依赖框架,有的人离开框架就很难进行开发,但有时候很多一些临时的小需求,并且后续不需要维护的就不需要使用框架开发,因为框架还是体量比较大的,对于一些不重要的小项目只需要跑个脚本就行了。接下来我就给大家介绍一下,在爬虫领域如何不依赖一些框架也能写一些性能较好的爬虫代码。
这里主要用到3个库:网络请求requests,线程池ThreadPoolExecutor和任务队列Queue。requests就不介绍了,介绍一下ThreadPoolExecutor和Queue。
1.ThreadPoolExecutor
ThreadPoolExecutor线程池执行器允许您将工作任务提交给池,并限制最大并发线程数以及如何处理未使用的线程。它与传统的Thread比起来,线程池可以重用已经启动的线程,执行速度更快。并且可有效地管理系统资源,不会导致CPU和内存的过度消耗。可以简化编写并发代码的难度。
- 创建线程池 线程池的创建非常简单,只需调用ThreadPoolExecutor类的构造函数即可:
from concurrent.futures import ThreadPoolExecutor
# 创建一个包含3个线程的线程池
executor = ThreadPoolExecutor(max\_workers=3)
其中,max_workers参数指定线程池中最大的线程数,默认为None,表示不限制线程数。
- 提交任务
提交任务到线程池中非常方便,只需调用submit方法即可:
# 通过submit方法向线程池提交任务
future = executor.submit(func, *args, **kwargs)
其中,func为要执行的函数,*args和**kwargs为该函数的参数。submit方法返回一个Future对象,可以通过该对象获取任务执行结果。
- 处理任务结果
线程池中的任务是异步执行的,如果需要获取任务执行结果,可以使用as_completed方法:
# 通过submit方法向线程池提交任务
future = executor.submit(func, *args, **kwargs)
其中,futures为任务列表,as_completed方法会按照任务完成的顺序返回任务执行结果。
- 关闭线程池
线程池在不再使用时应该关闭,以释放系统资源。通过调用shutdown方法可以关闭线程池:
# 关闭线程池(等待所有任务执行完毕)
executor.shutdown(wait=True)
其中,wait参数表示是否等待所有任务执行完毕再关闭线程池。默认值为True,即等待所有任务执行完毕后关闭线程池。
由于多线程会造成线程安全问题,所以这里我们使用队列Queue,Queue 封装了可以用于在多线程并发模式下,安全的访问数据而不会造成数据共享冲突的数据类型。Queue的基本使用如下:
2.Queue
- 创建任务队列
Python内置的queue模块提供了一个简单易用的任务队列Queue。我们可以通过调用Queue类的构造函数来创建一个任务队列:
import queue
# 创建大小为5的任务队列
task_queue = queue.Queue(maxsize=5)
其中,maxsize参数指定队列中能够容纳的最大任务数。如果maxsize为0或None,则表示队列大小不限。
- 添加任务
往任务队列中添加任务非常简单,只需调用put方法即可:
# 向任务队列中添加任务
task_queue.put(task)
其中,task为要添加的任务对象。put方法的默认行为是阻塞的,直到队列有空闲位置才能继续添加任务。如果不希望阻塞,可以设置block参数为False,这样当队列已满时就会立刻抛出queue.Full异常。
- 获取任务
从任务队列中获取任务也非常简单,只需调用get方法即可:
# 从任务队列中获取任务
task = task_queue.get()
get方法的默认行为是阻塞的,直到队列中有任务可供获取。如果队列为空,get方法就会被阻塞。可以通过设置block参数为False来禁用阻塞行为,这样当队列为空时就会立刻抛出queue.Empty异常。
- 关闭任务队列
任务队列在不再使用时也应该关闭,以释放系统资源。我们可以判断队列的大小,如果为空就说明队列里数据已经跑完了,可以关闭了:
if self.queue.qsize() > 0:
pass
else:
break
接下来我们用这几个包来写一个demo:
class Crawler:
def __init__(self):
self.queue = Queue()
def fetch(self, original_data):
try:
result = requests.get(original_data, headers='')
except Exception as e:
logger.warning(e)
def worker(self):
if self.queue.qsize() > 0:
original_data = self.queue.get_nowait()
try:
self.fetch(original_data)
except Exception as e:
logger.warning(e)
finally:
logger.info('队列数量:' + str(self.queue.qsize()))
self.queue.task_done()
def start(self, num_threads):
for page in range(1, 50):
url = 'https://movie.douban.com/subject/26752088/comments?start={}&limit=20&status=P&sort=new_score'.format(page)
self.queue.put(url)
with ThreadPoolExecutor(max_workers=num_threads) as executor:
for _ in range(num_threads):
executor.submit(self.worker)
self.queue.join()
if __name__ == '__main__':
results = Crawler().start(num_threads=10)
转载自:https://juejin.cn/post/7238200443581923385