likes
comments
collection
share

python并发编程之多线程和线程池

作者站长头像
站长
· 阅读数 11

为什么要引入并发编程

场景1:一个网络爬虫,按顺序爬取花了1小时,采用并发下载减少到20分钟 场景2:一个APP应用,优化前每次打开页面需要3秒,采用异步并发提升到打开每次200毫秒

其实引入并发就是为了提升程序的运行速度。

喜欢的朋友可以关注我的公众号,微信搜索:feelwow

python中对并发编程的支持

  • 多线程:threading模块,利用CPU和IO可以同时执行的原理,让CPU不会干巴巴的等待IO完成
  • 多进程:multiprocessing模块,利用多核CPU的能力,真正的并行执行任务
  • 异步IO:asyncio模块,在单线程利用CPU和IO同时执行的原理,实现函数异步执行

同时python又提供了一些模块来辅助或者简化并发的运行。

  • 使用Lock对资源进行加锁,防止冲突访问
  • 使用Queue实现不同线程、进程之间的数据通信,实现生产者-消费者模型
  • 使用线程池、进程池Pool,简化线程、进程的任务提交,等待结束,获取结果

如何选择

  • CPU密集型:CPU密集型也叫计算密集型,是指I/O在很短的时间就可以完成,CPU需要大量的计算和处理,特点是CPU占用率特别高。典型的示例:压缩、解压缩,加密解密,正则表达式搜索等
  • IO密集型:IO密集型指的是系统运作大部分的状况是CPU在等待I/O,例如一些磁盘、内存、网络的读写,这种状况,CPU占用率不高,系统IO特别高。 典型的示例:文件处理程序,网络爬虫,读写数据库等

python中多进程、多线程、多协程的对比

  • 多线程:
    • 优点:相比进程,更加轻量级,占用资源少
    • 缺点:相比协程,启动数目有限,占用内存资源,有线程切换开销
    • 适用于:IO密集型计算,同时运行的任务数目要求不多
  • 多进程
    • 优点:可以利用多核CPU并行运算
    • 缺点:占用资源最多,可启动数目比线程少
    • 适用于:CPU密集型计算场景
  • 多协程
    • 优点:内存开销最少,启动协程数可以非常多
    • 缺点:支持的库有限,例如不能使用requests模块,代码实现复杂
    • 适用于:IO密集型计算、需要超多任务运行,有现成库支持的场景

python全局解释器锁GIL

python速度慢的两个原因?

相比其他语言,例如:C/C++/java/golang,python确实很慢,在一些特殊场景下,python要比C++慢100~200倍

那么python慢的原因到底是什么?

  1. 动态类型语言,边解释边执行
  2. GIL无法利用多核CPU并发执行

那么GIL是什么?

GIL:全局解释器锁,是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行,即便在多核心处理器上,使用GIL的解释器也只允许同一时间执行一个线程。

出现GIL的原因?

python设计初期,为了规避并发问题,解决多线程之间数据完整性和状态同步问题,因此引入了GIL。

由于python中对象的管理,是使用引用计数器进行的,引用数为0则释放对象。

比如:有两个线程A和B都想引用对象obj,并对该对象做撤销处理,线程A先执行了撤销,将对象obj做了减一处理,此时发生了多线程的调度切换,线程B也做了obj的撤销处理,obj此时又减一,这个时候又发生了多线程调度切换,此时对象obj的计数已经为0,此时Python会释放此对象,这个时候可能会破坏内存。

而多线程在执行期间,线程会释放GIL,实现CPU和IO的并行执行,因此多线程对于IO密集型的运行效率会有很大的提升。

创建多线程的方法

创建多线程的流程

  1. 先准备一个执行函数,例如:
def my_func(a, b):
    do_something(a, b)
  1. 创建一个线程
import threading
t = threading.Thread(target=my_func, args=(100, 200,))
  1. 启动线程
t.start()
  1. 等待结束
t.join()

爬虫示例

测试示例来源于爬取的北京新发地菜价信息,地址如下:www.xinfadi.com.cn/priceDetail…

浏览器f12抓包分析,可以看到,价格信息是通过www.xinfadi.com.cn/getPriceDat… 这个请求拿到的,请求方法为POST,我们试着拿第一页的数据信息,代码如下:

import requests

url = 'http://www.xinfadi.com.cn/getPriceData.html'


def get_resource(url, page=1):
    data = {
        "limit": 20,
        "current": page
    }
    resp = requests.post(url, data=data)
    resp.encoding = 'utf-8'
    price_list = resp.json()['list']
    res_data = [
        (info['prodName'], info['place'], info['avgPrice']) for info in price_list
    ]
    print(res_data)
    return res_data


if __name__ == '__main__':
    res = get_resource(url)
    print(res)

执行结果如下:

[('大白菜', '冀陕辽', '1.15'), ('娃娃菜', '冀', '1.25'), ('小白菜', '', '2.75'), ('圆白菜', '冀', '2.5'), ('圆白菜', '鲁', '1.9'), ('紫甘蓝', '冀', '0.75'), ('芹菜', '鲁', '2.65'), ('西芹', '辽', '2.9'), ('菠菜', '蒙', '6.5'), ('莴笋', '冀', '2.25'), ('团生菜', '冀', '4.5'), ('散叶生菜', '京辽', '4.75'), ('罗马生菜', '冀', '3.25'), ('油菜', '冀', '2.9'), ('香菜', '冀', '6.0'), ('茴香', '冀', '6.5'), ('韭菜', '粤冀', '2.85'), ('苦菊', '辽', '4.5'), ('油麦菜', '辽', '6.0'), ('黄心菜', '皖', '1.55')]

下面看下单线程爬取五十页的菜价信息时的用时情况吧:

为了测试方便,这里写一个统计程序运行时间的装饰器,最终单线程运行时的代码为:

import time
import xinfadi_spider
import threading
from functools import wraps


def count_time(func):
    @wraps(func)
    def _wraper(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        end = time.time()
        print(f"运行时间: {end - start}")
        return res

    return _wraper


@count_time
def single_thread():
    for page in range(1, 51):
        xinfadi_spider.get_resource(xinfadi_spider.url, page)


if __name__ == '__main__':
    single_thread()

运行时间最终为:

运行时间: 13.166715621948242

下面再看下多线程时的运行代码:

@count_time
def single_thread():
    for page in range(1, 51):
        xinfadi_spider.get_resource(xinfadi_spider.url, page)
    return


@count_time
def multi_thread():
    t = []
    for page in range(1, 51):
        t.append(
            threading.Thread(target=xinfadi_spider.get_resource, args=(xinfadi_spider.url, page,))
        )
    for thread in t:
        thread.start()
    for thread in t:
        thread.join()


if __name__ == '__main__':
    multi_thread()

最终运行速度提升了10倍左右,结果为:

运行时间: 1.8293204307556152

线程安全

线程安全指的是某个函数、函数库在多线程环境中被调用时,能正确的处理多个线程的共享变量,使程序功能正确完成

为了保证线程安全,python提供了QueueLock这两种方式,下面会结合一些案例进行介绍。

Queue队列

Queue是python专门提供的一种数据类型,其特点就是队列会通过先进先出,或者先进后出的模式,保证单个数据不会同时被多个线程进行访问,因此使用Queue可以安全的访问数据,而不会造成数据共享冲突。

另外使用Queue还能够实现生产者和消费者模型,进行程序间的解耦,可以用较少的资源解决高并发的一些问题。

Queue的一些常用方法:

  1. 导入模块: import queue
  2. 创建队列: q = queue.Queue()
  3. 添加元素: q.put(item)
  4. 获取元素: q.get()
  5. 查询元素的个数: q.qsize()
  6. 判断是否为空: q.empty()
  7. 判断是否已满: q.full()

这里结合上面的爬虫案例,使用Queue来进行并发解析,整体流程如下:

python并发编程之多线程和线程池

首先改造一下我们之前的爬虫程序,将解析获取网页数据和解析网页数据分开

import requests


url = 'http://www.xinfadi.com.cn/getPriceData.html'


def get_resource(url, page=1):
    data = {
        "limit": 20,
        "current": page
    }
    resp = requests.post(url, data=data)
    resp.encoding = 'utf-8'
    return resp.json()


def parse_resource(resource_data):
    price_list = resource_data['list']
    res_data = [
        (info['prodName'], info['place'], info['avgPrice']) for info in price_list
    ]
    return res_data


if __name__ == '__main__':
    res = get_resource(url)
    print(parse_resource(res))

然后编写生产者和消费者

import queue
import threading
import xinfadi_spider


def producer(page_queue: queue.Queue, resource_queue: queue.Queue):
    while True:
        page = page_queue.get()
        resource = xinfadi_spider.get_resource(xinfadi_spider.url, page)
        resource_queue.put(resource)
        print(f"{threading.current_thread().name} 爬取第{page}页内容, 当前队列大小为: "
              f"{page_queue.qsize()}")


def consumer(resource_queue: queue.Queue):
    while True:
        resouce = resource_queue.get()
        parse_resource = xinfadi_spider.parse_resource(resouce)
        print(f"{threading.current_thread().name} 解析数据, "
              f"队列大小为: {resource_queue.qsize()}")



if __name__ == '__main__':
    page_queue = queue.Queue()
    resource_queue = queue.Queue()

    for page in range(1, 51):
        page_queue.put(page)

    for i in range(3):
        t = threading.Thread(name=f'producer-{i}', target=producer, args=(page_queue, resource_queue,))
        t.start()

    for i in range(2):
        t = threading.Thread(name=f'consumer-{i}', target=consumer, args=(resource_queue,))
        t.start()

最终的结果如下:

producer-1 爬取第2页内容, 当前队列大小为: 47
producer-0 爬取第1页内容, 当前队列大小为: 47
producer-2 爬取第3页内容, 当前队列大小为: 45consumer-1 解析数据, 队列大小为: 2
consumer-1 解析数据, 队列大小为: 1
consumer-1 解析数据, 队列大小为: 0

producer-0 爬取第5页内容, 当前队列大小为: 44consumer-1 解析数据, 队列大小为: 0
........
........
........
producer-0 爬取第47页内容, 当前队列大小为: 0
consumer-0 解析数据, 队列大小为: 0
producer-1 爬取第50页内容, 当前队列大小为: 0
producer-2 爬取第49页内容, 当前队列大小为: 0consumer-0 解析数据, 队列大小为: 1
consumer-0 解析数据, 队列大小为: 0

Lock锁

而使用Lock时,在线程函数执行前,会先加锁,执行完成后,会释放锁,确保每次只有一个线程占有该锁

下面结合一个经典的银行取钱的例子,来说一下为什么要引入Lock

import threading


class Account:
    def __init__(self, account, balance):
        # 账户
        self.account = account
        # 账户余额
        self.balance = balance


def draw_money(account, money):
    if account.balance >= money:
        print(f"{threading.current_thread().name} 取钱成功,取出金额为: {money}")
        account.balance -= money
        print(f"当前账户余额为: {account.balance}")
    else:
        print(f"{threading.current_thread().name} 取钱失败,余额不足!")


if __name__ == '__main__':
    account = Account("tom", 1000)
    threading.Thread(name='A', target=draw_money, args=(account, 800,)).start()
    threading.Thread(name='B', target=draw_money, args=(account, 800,)).start()

正常的结果输出应该为下面:

A 取钱成功,取出金额为: 800
当前账户余额为: 200
B 取钱失败,余额不足!

创建了两个线程A和B,那么当A取完钱之后,账户余额就变成了200,因此当B再去执行时,账户余额不足,因此会提示余额不足,但是当我们多运行几次后,你会发现运行结果会出现下面的情况:

A 取钱成功,取出金额为: 800
B 取钱成功,取出金额为: 800
当前账户余额为: 200
当前账户余额为: -600

出现这种情况的原因是因为run()方法不具有线程的安全性,在线程去修改account账户余额时,恰好发生了线程切换,即另一个线程B去修改了account的账户,因此就出现了上面的结果。

当我们在代码里,更改余额的逻辑前面加一个阻塞time.sleep(0.5)之后

time.sleep(0.5)
account.balance -= money

你会看到每次的运行结果总会是错误的

A 取钱成功,取出金额为: 800
B 取钱成功,取出金额为: 800
当前账户余额为: 200
当前账户余额为: -600

那么python为了解决这个问题,引入了互斥锁Lock, 使用方法如下:

  1. 创建线程锁: lock = threading.Lock()
  2. 创建递归锁: lock = threading.RLock()
  3. 加锁: lock,acquire()
  4. 释放锁: locak.release()

对应的代码写法有两种,一种是通过try...finally...方式,另一种是通过with方式,如下:

import threading

lock = threading.Lock()

lock.acquire()
try:
    do something
finally:
    lock.release()

import threading

lock = threading.Lock()

with lock:
    do something

这里直接使用with的方式(使用起来更简洁),结合上面的示例就是:

import threading
import time

class Account:
    def __init__(self, account, balance):
        # 账户
        self.account = account
        # 账户余额
        self.balance = balance
        # 定义互斥锁
        self.lock = threading.Lock()


def draw_money(account, money):
    with account.lock:
        if account.balance >= money:
            print(f"{threading.current_thread().name} 取钱成功,取出金额为: {money}")
            time.sleep(0.5)
            account.balance -= money
            print(f"当前账户余额为: {account.balance}")
        else:
            print(f"{threading.current_thread().name} 取钱失败,余额不足!")


if __name__ == '__main__':
    account = Account("tom", 1000)
    threading.Thread(name='A', target=draw_money, args=(account, 800,)).start()
    threading.Thread(name='B', target=draw_money, args=(account, 800,)).start()

无论在逻辑里面添加任何阻塞,最终的结果都是正确的

A 取钱成功,取出金额为: 800
当前账户余额为: 200
B 取钱失败,余额不足!

线程池

首先了解一下线程的生命周期,如下图所示:

python并发编程之多线程和线程池

从上图可以看到新建线程时,系统需要分配资源,终止线程系统需要回收资源,因此这就会产生一定新建和终止的开销,如果可以重用线程,那么就可以减少系统开销,所以就有了线程池,那么使用线程池有哪些优势呢?

  1. 提升性能,减少了大量的新建、终止线程的开销,重用线程资源
  2. 适用于处理突发性大量请求或需要大量线程来完成任务,但实际任务处理时间较短的场景
  3. 能有效避免系统因为创建线程过多,导致系统负荷较高而变慢的问题
  4. 使用线程池,比单独使用线程要更加简洁

使用方法如下:

  1. map函数方式

map的结果和入参顺序是固定的

from concurrent.futures import ThreadPoolExecutor, as_completed

with ThreadPoolExecutor() as pool:
    # func 是目标函数
    # args_list 是一个参数列表
    results = pool.map(func, args_list)
    # 获取执行的返回结果
    for result in results:
        print(result)
  1. future模式

as_completed顺序是不固定的

from concurrent.futures import ThreadPoolExecutor, as_completed


with ThreadPoolExecutor() as pool:
    # arg 是指一个参数
    futures = [pool.submit(func, arg) for arg in args_list]
    for future in futures:
        print(future.result())
    for future in as_completed(futures):
        print(future.result())

下面结合上面的爬虫案例进行改造,首先是通过submit方式来看下

import xinfadi_spider
from concurrent.futures import ThreadPoolExecutor


# 新建一个线程来获取所有url资源
with ThreadPoolExecutor() as p1:
    futures = {
        page: p1.submit(xinfadi_spider.get_resource, xinfadi_spider.url, page)
        for page in range(1, 51)
    }
    for k, v in futures.items():
        print(k, v.result())

with ThreadPoolExecutor() as p2:
    futures_parse = {}
    for resource in futures.values():
        res = p2.submit(xinfadi_spider.parse_resource, resource.result())
        futures_parse[res] = resource
    for k, v in futures_parse.items():
        print(res.result())

注意:当使用submit时,返回的是一个future对象,可以通过result()获取返回结果,

而使用map提交任务时,相当于启动了len(iterlables)个线程来并发的去执行func函数

with ThreadPoolExecutor() as p3:
    res = p3.map(xinfadi_spider.get_resource, [xinfadi_spider.url] * 50, [i for i in range(1, 51)])

with ThreadPoolExecutor() as p4:
    p4.map(xinfadi_spider.parse_resource, [r for r in res])

需要注意的是,使用map时,传入多个参数时,需要保证传入的变量是一个可迭代的对象,例如数组、元祖等,并且需要保证参数的个数是一致的。

如果不使用with关键字来创建线程池时,例如直接通过pool = ThreadPoolExecutor()创建线程池时,需要在结束时使用pool.shutdown()来关闭线程池

多进程

多线程和协程本质上还是在单核上进行,而多进程是真正意义上的并行,利用了多进程在多核CPU上并行执行。

由于多进程和多线程写法几乎一样,所以这里不在做过多的讲解,只列出一些创建方法和使用方法。

  1. 导入模块
# 多进程
from multiprocessing import Process

# 多线程
from threading import Thread
  1. 新建、启动、等待结束
# 多进程
p = Process(target=func, args=(1,))
p.start()
p.join()

# 多线程
t = Thread(target=func, args=(1,))
t.start()
t.join()
  1. 数据通信
# 多进程
from multiprocessing import Queue
q = Queue()
q.put([1,2,3])
item = q.get()

# 多线程
import queue
q = Queue()
q.put([1,2,3])
item = q.get()
  1. 线程安全加锁
# 多进程
from multiprocessing import Lock
lock = Lock()
with lock:
  do_something()
  
# 多线程
from threading import Lock
lock = Lock()
with lock:
  do_something()
# 多进程
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as pool:
    # 方法一
    res = pool.map(func, *iterables)
    # 方法二
    res = pool.submit(func, arg)
    result = res.result()
    
# 多线程
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as pool:
    # 方法一
    res = pool.map(func, *iterables)
    # 方法二
    res = pool.submit(func, arg)
    result = res.result()

协程

关于协程这部分,需要很多的内容去描述,因此放到下一篇进行学习和介绍。

喜欢的朋友可以关注我的公众号,微信搜索:feelwow