likes
comments
collection
share

如何不依赖框架,迅速写一个多线程爬虫

作者站长头像
站长
· 阅读数 39

现在程序员非常依赖框架,有的人离开框架就很难进行开发,但有时候很多一些临时的小需求,并且后续不需要维护的就不需要使用框架开发,因为框架还是体量比较大的,对于一些不重要的小项目只需要跑个脚本就行了。接下来我就给大家介绍一下,在爬虫领域如何不依赖一些框架也能写一些性能较好的爬虫代码。

这里主要用到3个库:网络请求requests,线程池ThreadPoolExecutor和任务队列Queue。requests就不介绍了,介绍一下ThreadPoolExecutor和Queue。

1.ThreadPoolExecutor

ThreadPoolExecutor线程池执行器允许您将工作任务提交给池,并限制最大并发线程数以及如何处理未使用的线程。它与传统的Thread比起来,线程池可以重用已经启动的线程,执行速度更快。并且可有效地管理系统资源,不会导致CPU和内存的过度消耗。可以简化编写并发代码的难度。

  1. 创建线程池 线程池的创建非常简单,只需调用ThreadPoolExecutor类的构造函数即可:
from concurrent.futures import ThreadPoolExecutor
# 创建一个包含3个线程的线程池
executor = ThreadPoolExecutor(max\_workers=3)

其中,max_workers参数指定线程池中最大的线程数,默认为None,表示不限制线程数。

  1. 提交任务

提交任务到线程池中非常方便,只需调用submit方法即可:

# 通过submit方法向线程池提交任务
future = executor.submit(func, *args, **kwargs)

其中,func为要执行的函数,*args和**kwargs为该函数的参数。submit方法返回一个Future对象,可以通过该对象获取任务执行结果。

  1. 处理任务结果

线程池中的任务是异步执行的,如果需要获取任务执行结果,可以使用as_completed方法:

# 通过submit方法向线程池提交任务
future = executor.submit(func, *args, **kwargs)

其中,futures为任务列表,as_completed方法会按照任务完成的顺序返回任务执行结果。

  1. 关闭线程池

线程池在不再使用时应该关闭,以释放系统资源。通过调用shutdown方法可以关闭线程池:

# 关闭线程池(等待所有任务执行完毕)
executor.shutdown(wait=True)

其中,wait参数表示是否等待所有任务执行完毕再关闭线程池。默认值为True,即等待所有任务执行完毕后关闭线程池。

由于多线程会造成线程安全问题,所以这里我们使用队列Queue,Queue 封装了可以用于在多线程并发模式下,安全的访问数据而不会造成数据共享冲突的数据类型。Queue的基本使用如下:

2.Queue

  1. 创建任务队列

Python内置的queue模块提供了一个简单易用的任务队列Queue。我们可以通过调用Queue类的构造函数来创建一个任务队列:

import queue

# 创建大小为5的任务队列
task_queue = queue.Queue(maxsize=5)

其中,maxsize参数指定队列中能够容纳的最大任务数。如果maxsize为0或None,则表示队列大小不限。

  1. 添加任务

往任务队列中添加任务非常简单,只需调用put方法即可:

# 向任务队列中添加任务
task_queue.put(task)

其中,task为要添加的任务对象。put方法的默认行为是阻塞的,直到队列有空闲位置才能继续添加任务。如果不希望阻塞,可以设置block参数为False,这样当队列已满时就会立刻抛出queue.Full异常。

  1. 获取任务

从任务队列中获取任务也非常简单,只需调用get方法即可:

# 从任务队列中获取任务
task = task_queue.get()

get方法的默认行为是阻塞的,直到队列中有任务可供获取。如果队列为空,get方法就会被阻塞。可以通过设置block参数为False来禁用阻塞行为,这样当队列为空时就会立刻抛出queue.Empty异常。

  1. 关闭任务队列

任务队列在不再使用时也应该关闭,以释放系统资源。我们可以判断队列的大小,如果为空就说明队列里数据已经跑完了,可以关闭了:

if self.queue.qsize() > 0:
    pass
else:
    break

接下来我们用这几个包来写一个demo:

class Crawler:
    def __init__(self):
        self.queue = Queue()
        
    def fetch(self, original_data):
        try:
            result = requests.get(original_data, headers='')
        except Exception as e:
            logger.warning(e)

    def worker(self):
        if self.queue.qsize() > 0:
            original_data = self.queue.get_nowait()
            try:
                self.fetch(original_data)
            except Exception as e:
                logger.warning(e)
            finally:
                logger.info('队列数量:' + str(self.queue.qsize()))
                self.queue.task_done()

    def start(self, num_threads):
        for page in range(1, 50):
            url = 'https://movie.douban.com/subject/26752088/comments?start={}&limit=20&status=P&sort=new_score'.format(page)
            self.queue.put(url)

        with ThreadPoolExecutor(max_workers=num_threads) as executor:
            for _ in range(num_threads):
                executor.submit(self.worker)
        self.queue.join()

if __name__ == '__main__':
    results = Crawler().start(num_threads=10)
转载自:https://juejin.cn/post/7238200443581923385
评论
请登录