Python语法-多进程、多线程、协程(异步IO)

相关概念

并发和并行

  • 并发:指一个时间段内,在一个CPU(CPU核心)能运行的程序的数量。
  • 并行:指在同一时刻,在多个CPU上运行多个程序,跟CPU(CPU核心)数量有关。

因为

计算机CPU(CPU核心)在同一时刻只能运行一个程序。

同步和异步

  • 同步是指代码调用的时候必须等待执行完成才能执行剩余的逻辑。
  • 异步是指代码在调用的时候,不用等待操作完成,直接执行剩余逻辑。

阻塞和非阻塞

  • 阻塞是指调用函数的时候当前线程被挂起。
  • 非阻塞是指调用函数时当前线程不会被挂起,而是立即返回。

CPU密集型和I/O密集型

CPU密集型(CPU-bound):

CPU密集型又叫做计算密集型,指I/O在很短时间就能完成,CPU需要大量的计算和处理,特点是CPU占用高。

例如:压缩解压缩、加密解密、正则表达式搜索。

IO密集型(I/O-bound):

IO密集型是指系统运行时大部分时间时CPU在等待IO操作(硬盘/内存)的读写操作,特点是CPU占用较低。

例如:文件读写、网络爬虫、数据库读写。

多进程、多线程、多协程的对比

类型 优点 缺点 适用
多进程
Process(multiprocessing)
可以利用CPU多核并行运算 占用资源最多
可启动数目比线程少
CPU密集型计算
多线程
Thread(threading)
相比进程更轻量占用资源少 相比进程,多线程只能并发执行,不能利用多CPU(GIL)
相比协程启动数目有限制,占用内存资源有线程切换开销
IO密集型计算、同时运行的任务要求不多
多协程
Coroutine(asyncio)
内存开销最少,启动协程数量最多 支持库的限制
代码实现复杂
IO密集型计算、同时运行的较多任务

GIL全称Global Interpreter Lock

下图为GIL的运行

img

Python的多线程是伪多线程,同时只能有一个线程运行。

一个进程能够启动N个线程,数量受系统限制。

一个线程能够启动N个协程,数量不受限制。

怎么选择

对于其他语言来说,多线程是能同时利用多CPU(核)的,所以是适用CPU密集型计算的,但是Python由于GIL的限制,只能使用IO密集型计算。

所以对于Python来说:

  • 对于IO密集型来说能用多协程就用多协程,没有库支持才用多线程。

  • 对于CPU密集型就只能用多进程了。

协程(异步IO)

简单示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio


async def test():
await asyncio.sleep(3)
return "123"


async def main():
result = await test()
print(result)


if __name__ == '__main__':
asyncio.run(main())

单次请求查看结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
import asyncio


async def myfun(index):
print(f'[{index}]({threading.currentThread().name})')
await asyncio.sleep(1)
return index


def getfuture(future):
print(f"结果为:{future.result()}")

if __name__ == "__main__":
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(myfun(1))
future.add_done_callback(getfuture)
loop.run_until_complete(future)
loop.close()

或者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading
import asyncio


async def myfun(index):
print(f'[{index}]({threading.currentThread().name})')
await asyncio.sleep(1)
return index

if __name__ == "__main__":
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(myfun(1))
loop.run_until_complete(future)
print(f"结果为:{future.result()}")
loop.close()

批量请求查看结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading
import asyncio


async def myfun(index):
print(f'线程({threading.currentThread().name}) 传入参数({index})')
await asyncio.sleep(1)
return index

loop = asyncio.get_event_loop()
future_list = []
for item in range(3):
future = asyncio.ensure_future(myfun(item))
future_list.append(future)
loop.run_until_complete(asyncio.wait(future_list))
for future in future_list:
print(f"结果为:{future.result()}")
loop.close()

asyncio.wait和asyncio.gather

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading
import asyncio


async def myfun(index):
print(f'[{index}]({threading.currentThread().name})')
await asyncio.sleep(1)


loop = asyncio.get_event_loop()
tasks = [myfun(1), myfun(2)]
loop.run_until_complete(asyncio.wait(tasks))
#loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

asyncio.gather 和asyncio.wait区别:

在内部wait()使用一个set保存它创建的Task实例。因为set是无序的所以这也就是我们的任务不是顺序执行的原因。wait的返回值是一个元组,包括两个集合,分别表示已完成和未完成的任务。wait第二个参数为一个超时值
达到这个超时时间后,未完成的任务状态变为pending,当程序退出时还有任务没有完成此时就会看到如下的错误提示。

gather的使用
gather的作用和wait类似不同的是。

  1. gather任务无法取消。
  2. 返回值是一个结果列表
  3. 可以按照传入参数的 顺序,顺序输出。

协程和多线程结合

同时多个请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

import requests


def myquery(url):
r = requests.get(url)
print(r.text)
return r.text


if __name__ == "__main__":
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(3)
urls = ["https://www.psvmc.cn/userlist.json", "https://www.psvmc.cn/login.json"]
tasks = []
start_time = time.time()
for url in urls:
task = loop.run_in_executor(executor, myquery, url)
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print(f"用时{time.time() - start_time}")

结果

1
2
3
{"code":0,"msg":"success","obj":{"name":"小明","sex":"男","token":"psvmc"}}
{"code":0,"msg":"success","obj":[{"name":"小明","sex":"男"},{"name":"小红","sex":"女"},{"name":"小刚","sex":"未知"}]}
用时0.11207175254821777

单个请求添加回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor

import requests


def myquery(url):
print(f"请求所在线程:{threading.current_thread().name}")
r = requests.get(url)
return r.text


def myfuture(future):
print(f"回调所在线程:{threading.current_thread().name}")
print(future.result())


if __name__ == "__main__":
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(3)
url = "https://www.psvmc.cn/userlist.json"
tasks = []
start_time = time.time()
task = loop.run_in_executor(executor, myquery, url)
future = asyncio.ensure_future(task)
future.add_done_callback(myfuture)
loop.run_until_complete(future)
print(f"用时{time.time() - start_time}")

多线程

引用模块

1
2
3
4
5
6
7
8
from threading import Thread

def func(num):
return num

t = Thread(target=func, args=(100,))
t.start()
t.join()

数据通信

1
2
3
4
5
import queue

q = queue.Queue()
q.put(1)
item = q.get()

1
2
3
4
5
from threading import Lock

lock = Lock()
with lock:
pass

池化技术

1
2
3
4
5
6
7
8
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
# 方法1
results = executor.map(func, [1, 2, 3])
# 方法2
future = executor.submit(func, 1)
result = future.result()

方法单个参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from concurrent.futures import ThreadPoolExecutor
import threading
import time


# 定义一个准备作为线程任务的函数
def action(num):
print(threading.current_thread().name)
time.sleep(num)
return num + 100


if __name__ == "__main__":
# 创建一个包含3条线程的线程池
with ThreadPoolExecutor(max_workers=3) as pool:
future1 = pool.submit(action, 3)

future1.result()
print(f"单个任务返回:{future1.result()}")

print('------------------------------')
# 使用线程执行map计算
results = pool.map(action, (1, 3, 5))
for r in results:
print(f"多个任务返回:{r}")

结果

1
2
3
4
5
6
7
8
9
ThreadPoolExecutor-0_0
单个任务返回:103
------------------------------
ThreadPoolExecutor-0_0
ThreadPoolExecutor-0_1
ThreadPoolExecutor-0_2
多个任务返回:101
多个任务返回:103
多个任务返回:105

方法多个参数

单个请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from concurrent.futures import ThreadPoolExecutor


# 定义一个准备作为线程任务的函数
def myfun(num1, num2):
return num1 + num2


if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=3) as executor:
# 单个请求
future1 = executor.submit(myfun, *(1, 2))
# 或者
# future1 = executor.submit(lambda paras: myfun(*paras), (1, 2))
future1.result()
print(f"单个任务返回:{future1.result()}")

批量请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from concurrent.futures import ThreadPoolExecutor


# 定义一个准备作为线程任务的函数
def myfun(num1, num2):
return num1 + num2


if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=3) as executor:
# 使用线程执行map批量请求
results = executor.map(lambda paras: myfun(*paras), [(1, 2), (2, 3), (3, 4)])
for r in results:
print(f"多个任务返回:{r}")

批量请求 全部返回后输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from concurrent.futures import ThreadPoolExecutor, wait


def myfun(num1, num2):
return num1 + num2


if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=3) as executor:
# 批量请求
paras_list = [(1, 2), (2, 3), (3, 4)]
future_list = []
for paras in paras_list:
future = executor.submit(myfun, *paras)
future_list.append(future)
wait(future_list)
for future in future_list:
print(f"多个任务返回:{future.result()}")

批量请求 先返回先输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from concurrent.futures import ThreadPoolExecutor, as_completed


def myfun(num1, num2):
return num1 + num2


if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=3) as executor:
# 批量请求
paras_list = [(1, 2), (2, 3), (3, 4)]
future_list = []
for paras in paras_list:
future = executor.submit(myfun, *paras)
future_list.append(future)
for future in as_completed(future_list):
print(f"多个任务返回:{future.result()}")

多进程

引用模块

1
2
3
4
5
6
7
8
from multiprocessing import Process

def func(num):
return num

t = Process(target=func, args=(100,))
t.start()
t.join()

数据通信

1
2
3
4
import multiprocessing
q = multiprocessing.Queue()
q.put(1)
item = q.get()

1
2
3
4
5
from multiprocessing import Lock

lock = Lock()
with lock:
pass

池化技术

1
2
3
4
5
6
7
8
from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
# 方法1
results = executor.map(func, [1, 2, 3])
# 方法2
future = executor.submit(func, 1)
result = future.result()

方法单个参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import time


# 定义一个准备作为进程任务的函数
def action(num):
print(multiprocessing.current_process().name)
time.sleep(num)
return num + 100


if __name__ == "__main__":
# 创建一个包含3条进程的进程池
with ProcessPoolExecutor(max_workers=3) as pool:
future1 = pool.submit(action, 3)

future1.result()
print(f"单个任务返回:{future1.result()}")

print('------------------------------')
# 使用线程执行map计算
results = pool.map(action, [1, 3, 5])
for r in results:
print(f"多个任务返回:{r}")

结果

1
2
3
4
5
6
7
8
9
SpawnProcess-1
单个任务返回:103
------------------------------
SpawnProcess-2
SpawnProcess-3
SpawnProcess-1
多个任务返回:101
多个任务返回:103
多个任务返回:105

方法多个参数

单次请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from concurrent.futures import ProcessPoolExecutor, wait, as_completed


def myfun(num1, num2):
return num1 + num2


if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=3) as executor:
# 单个请求
future1 = executor.submit(myfun, *(1, 2))
# 或者
# future1 = executor.submit(lambda paras: myfun(*paras), (1, 2))
future1.result()
print(f"单个任务返回:{future1.result()}")

批量请求 全部返回后输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from concurrent.futures import ProcessPoolExecutor, wait


def myfun(num1, num2):
return num1 + num2


if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=3) as executor:
# 批量请求
paras_list = [(1, 2), (2, 3), (3, 4)]
future_list = []
for paras in paras_list:
future = executor.submit(myfun, *paras)
future_list.append(future)
wait(future_list)
for future in future_list:
print(f"多个任务返回:{future.result()}")

批量请求 先返回先输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from concurrent.futures import ProcessPoolExecutor, as_completed


def myfun(num1, num2):
return num1 + num2


if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=3) as executor:
# 批量请求
paras_list = [(1, 2), (2, 3), (3, 4)]
future_list = []
for paras in paras_list:
future = executor.submit(myfun, *paras)
future_list.append(future)
for future in as_completed(future_list):
print(f"多个任务返回:{future.result()}")

多进程/多线程/协程对比

异步 IO(asyncio)、多进程(multiprocessing)、多线程(multithreading)

IO 密集型应用CPU等待IO时间远大于CPU 自身运行时间,太浪费;

常见的 IO 密集型业务包括:浏览器交互、磁盘请求、网络爬虫、数据库请求等

Python 世界对于 IO 密集型场景的并发提升有 3 种方法:多进程、多线程、多协程;

理论上讲asyncio是性能最高的,原因如下:

  1. 进程、线程会有CPU上下文切换
  2. 进程、线程需要内核态和用户态的交互,性能开销大;而协程对内核透明的,只在用户态运行
  3. 进程、线程并不可以无限创建,最佳实践一般是 CPU*2;而协程并发能力强,并发上限理论上取决于操作系统IO多路复用(Linux下是 epoll)可注册的文件描述符的极限

那asyncio的实际表现是否如理论上那么强,到底强多少呢?我构建了如下测试场景:

请求10此,并sleep 1s模拟业务查询

  • 方法 1;顺序串行执行
  • 方法 2:多进程
  • 方法 3:多线程
  • 方法 4:asyncio
  • 方法 5:asyncio+uvloop

最后的asyncio+uvloop和官方asyncio 最大不同是用 Cython+libuv 重新实现了asyncio 的事件循环(event loop)部分,

官方测试性能是 node.js的 2 倍,持平 golang。

顺序串行执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import time


def query(num):
print(num)
time.sleep(1)


def main():
for h in range(10):
query(h)


# main entrance
if __name__ == '__main__':
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"时间差:{end_time-start_time}")

多进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from concurrent import futures
import time


def query(num):
print(num)
time.sleep(1)


def main():
with futures.ProcessPoolExecutor() as executor:
for future in executor.map(query, range(10)):
pass


# main entrance
if __name__ == '__main__':
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"时间差:{end_time-start_time}")

多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from concurrent import futures
import time


def query(num):
print(num)
time.sleep(1)


def main():
with futures.ThreadPoolExecutor() as executor:
for future in executor.map(query, range(10)):
pass


# main entrance
if __name__ == '__main__':
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"时间差:{end_time-start_time}")

asyncio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
import time


async def query(num):
print(num)
await asyncio.sleep(1)


async def main():
tasks = [asyncio.create_task(query(num)) for num in range(10)]
await asyncio.gather(*tasks)


# main entrance
if __name__ == '__main__':
start_time = time.perf_counter()
asyncio.run(main())
end_time = time.perf_counter()
print(f"时间差:{end_time-start_time}")

asyncio+uvloop

注意

Windows上不支持uvloop。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import uvloop
import time


async def query(num):
print(num)
await asyncio.sleep(1)


async def main():
tasks = [asyncio.create_task(query(host)) for host in range(10)]
await asyncio.gather(*tasks)


# main entrance
if __name__ == '__main__':
uvloop.install()
start_time = time.perf_counter()
asyncio.run(main())
end_time = time.perf_counter()
print(f"时间差:{end_time-start_time}")

运行时间对比

方式 运行时间
串行 10.0750972s
多进程 1.1638731999999998s
多线程 1.0146456s
asyncio 1.0110082s
asyncio+uvloop 1.01s

可以看出: 无论多进程、多线程还是asyncio都能大幅提升IO 密集型场景下的并发,但asyncio+uvloop性能最高!