likes
comments
collection
share

python线程数量与线程池

作者站长头像
站长
· 阅读数 39

本文首发于知乎本文分为以下几个部分

  • 两个线程抓10个网页
  • 线程数量试验
  • 参考资料
  • 线程数量控制
  • 线程池

两个线程抓10个网页

之前我们有过循环抓取10页豆瓣电影数据的例子,当时是每次循环都新建了一个线程,但是如果我们想要只用两个线程怎么办呢?

首先声明,1个线程分5个的想法是不行的,因为每个线程运行时间带有随机性,如果任务平均分配,则很可能出现一个线程还在苦苦工作,而另一个线程已经完成,却无法帮助前一个线程分担的情况,这无疑会降低运行效率。

所以比较好的方法是维护一个队列,两个线程都从中获取任务,直到把这个队列中的任务都做完。这个过程其实就是特殊的生产消费模式,只不过没有生产者,任务量是固定的而已。

import threadingimport requestsfrom bs4 import BeautifulSoupfrom queue import Queueclass MyThread(threading.Thread):    def __init__(self, queue):        threading.Thread.__init__(self)        self.queue = queue    def run(self):        while not self.queue.empty(): # 如果while True 线程永远不会终止            url = self.queue.get()            print(self.name, url)            url_queue.task_done()            r = requests.get(url)            soup = BeautifulSoup(r.content, 'html.parser')            lis = soup.find('ol', class_='grid_view').find_all('li')            for li in lis:                title = li.find('span', class_="title").text                print(title)url_queue = Queue()for i in range(10):    url = 'https://movie.douban.com/top250?start={}&filter='.format(i*25)    url_queue.put(url)th1 = MyThread(url_queue)th2 = MyThread(url_queue)th1.start()th2.start()th1.join()th2.join()url_queue.join()print('finish')

这里注意几点

  • Queue.empty()表示如果队列是空则为True,否则是False
  • Queue.join()Queue.task_done()是相互配合使用的。这里的join和线程的join作用是类似的,它表示直到队列全部操作完成再执行后面的代码,而只有前面每次操作队列都运行一次Queue.task_done()join才能通过
  • 不过Queue.join()Queue.task_done()一起删除不会影响当前程序,只是为了更安全规范最好都加上
  • 如果不是每次循环新建一个线程,则运行的函数经常以whilewhile True开始,因为一个线程要处理多个任务,它对应的函数需要是能不断去获取任务的,必须是一个循环

在使用多线程时,我们现在看到了两种形式

  • for循环,每一项开启一个线程
  • 构建队列,开启少量线程,每个线程从队列中获取任务

我们可以对比一下这两种形式,第一种开启了更多线程,速度会更快。但是当任务成千上万的时候,还可以用第一种吗?线程数量有没有限制?

我们首先自己测试一下,然后再查阅资料

线程数量试验

首先用一个简单的函数,运行时显示当前的线程数量

import timeimport threadingimport randomthread_num = 1000def run():    print('first, there are', threading.activeCount(), 'threads running')    time.sleep(thread_num/1000 * random.random())    print('second, there are ', threading.activeCount(), 'threads running')for i in range(thread_num):    th = threading.Thread(target = run)    th.start()

thread_num变量表示开启线程数量,通过time.sleep延长程序运行时间。同时运行的线程数量应该比thread_num小,因为有些线程结束时,有些线程还没开始。

这个值我测到100000都没有出什么问题,只是CPU运行全满,不敢再加大。

下面我们测试一下网页抓取

import threadingimport requestsimport jsonthread_num = 100def run():    print('first, there are', threading.activeCount(), 'threads running')    r = requests.post("http://httpbin.org/post",     data = 'second there are {} threading running'.format(threading.activeCount()))    print(r.json()['data'])for i in range(thread_num):    th = threading.Thread(target = run)    th.start()

这个爬虫的功能也是请求之前输出当前线程数量,抓取结束返回(请求当时的)线程数量(主要是为了保证抓取是成功的)。线程数测试到1000也没出什么问题,最多有400多个线程同时运行,1000次抓取几秒就跑完了,感觉比不使用多线程时抓取10个页面还快。所以说只要对面网站不因为你请求过快把你封掉,你对每次循环开一个多线程是没有问题的。

参考资料

这个问题在stackoverflow上有讨论,见这里,最后这个问题因为没有一个统一标准答案而被关闭。从回答者的观点来看,不要揣测线程的最大承受数量,可以切实去尝试,去触碰它的上界。也就是通过试验,找到运行最快,又不出任何问题的那个数量,即使这个数量非常大,也不用无畏地担心。

线程数量控制

使用threading.Semaphore可以控制最多允许多少个线程同时进行,超出的部分自动等待

import threadingimport requestsfrom bs4 import BeautifulSoupclass MyThread(threading.Thread):    def __init__(self, i):        threading.Thread.__init__(self)        self.i = i    def run(self):        with thread_max_num:            print(self.name, 'start')            url = 'https://movie.douban.com/top250?start={}&filter='.format(self.i*25)            r = requests.get(url)            soup = BeautifulSoup(r.content, 'html.parser')            lis = soup.find('ol', class_='grid_view').find_all('li')            for li in lis:                title = li.find('span', class_="title").text                print(title)thread_max_num = threading.Semaphore(2)for i in range(10):    th = MyThread(i)    th.start()

threading.Semaphore的使用只需要初始化,再在run中用上下文管理形式,即可保证当开启新线程时,如果同时运行线程数会超过设置的最大值,则start等待不执行,直到前面的线程运行结束才可以开始。

线程池

线程池使用进程模块multiprocessing中的方法,代码如下

import requestsfrom bs4 import BeautifulSoupfrom multiprocessing.dummy import Pool as ThreadPooldef get_title(i):    # print(i)    title_list = []    url = 'https://movie.douban.com/top250?start={}&filter='.format(i*25)    r = requests.get(url)    soup = BeautifulSoup(r.content, 'html.parser')    lis = soup.find('ol', class_='grid_view').find_all('li')    for li in lis:        title = li.find('span', class_="title").text        # return title        title_list.append(title)        print(title)    return(title_list)pool = ThreadPool()print(pool.map(get_title, range(10)))

其实就是map对列表每一项进行相同操作,我们知道,python中有一个直接的map函数也可以实现相同的效果,下面我们来测试一下他们之间的差异,以及它们和正常开启多线程之间的差异

首先导入所有需要的模块和函数

import threadingimport requestsfrom bs4 import BeautifulSoupfrom multiprocessing.dummy import Pool as ThreadPoolimport timedef get_title(i):    # print(i)    title_list = []    url = 'https://movie.douban.com/top250?start={}&filter='.format(i*25)    r = requests.get(url)    soup = BeautifulSoup(r.content, 'html.parser')    lis = soup.find('ol', class_='grid_view').find_all('li')    for li in lis:        title = li.find('span', class_="title").text        # return title        title_list.append(title)        print(title)    return(title_list)

正常循环

start = time.time()for i in range(10):    get_title(i)print('no thread', time.time() - start, 'seconds')

map函数

start = time.time()print(list(map(get_title, list(range(10)))))print('map total', time.time() - start, 'seconds')

线程池

start = time.time()pool = ThreadPool()print(pool.map(get_title, range(10)))print('threadpool total', time.time() - start, 'seconds')

正常多线程

start = time.time()ths = []for i in range(10):    th = threading.Thread(target = get_title, args = (i, ))    th.start()    ths.append(th)for th in ths:    th.join()print('thread total', time.time() - start, 'seconds')

测试结果如下

no thread 5.3201446533203125 secondsmap total 5.255042791366577 secondsthreadpool total 3.0299293994903564 secondsthread total 1.9949142932891846 seconds

所以说内置map函数几乎没有改进效率,线程池改善了一部分,每次循环新建一个线程效率提升最为明显。

不过如果我们指定线程池的线程数(其实就是同时可开启的最大线程数量,超过则等待),指定为10以上,效率提升可以和循环情况相同

start = time.time()pool = ThreadPool(10)print(pool.map(get_title, range(10)))print('threadpool total', time.time() - start, 'seconds')

线程池有一个很大的好处就是编写更加简单,但是相比于正常写线程来说不够灵活,详情见这里

欢迎关注我的知乎专栏

专栏主页:python编程

专栏目录:目录

版本说明:软件及包版本说明

转载自:https://juejin.cn/post/6844903574950920206
评论
请登录