asyncio异步编程
# 协程
协程不是计算机提供,程序员人为创造的
协程(Coroutine),也可以被称之为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过程序实现代码块切换执行。例如:
def func1():
print(1)
def func2():
print(2)
def func3():
print(3)
func1()
func2()
func3()
2
3
4
5
6
7
8
9
10
11
12
13
实现协程有这么几种方法:
- greenlet,早期的模块
- yield 关键字,生成器,具有保存代码,保存状态,切换到其他函数去执行的特性
- python3.4 之后引入的
asyncio
模块- 通过它的装饰器和 yield
async,await
关键字(python3.5 之后)【推荐】
# greenlet 实现协程
安装
pip install greenlet
pip3 install greenlet
python -m pip install greenlet
2
3
4
5
实现
from greenlet import greenlet
def func1():
print(1) # 第二步:输出1
gr2.switch() # 第三步:切换到func2函数,从上一次执行的位置继续向后执行func2函数
print(2) # 第六步:输出2
gr2.switch() # 第七步:切换到func2函数,从上一次执行的位置继续向后执行func2函数
def func2():
print(3) # 第四步:输出3
gr1.switch() # 第五步:切换回func1函数,从上一次执行的位置继续向后执行func1函数
print(4) # 第八步:输出4
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # 第一步:去执行func1函数
# 输出结果 1 3 2 4
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# yield 关键字
def func1():
yield 1
yield from func2()
yield 2
def func2():
yield 3
yield 4
2
3
4
5
6
7
8
9
10
# asyncio 模块
在 python3.4 及以后的版本。
import asyncio
@asyncio.coroutine
def func1():
print(1)
yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(2)
@asyncio.coroutine
def func2():
print(3)
yield from asyncio.sleep(2) # # 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(4)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
注意:遇到 IO 阻塞自动切换,这就是牛逼之处
# async & await 关键字
在 python3.5 及之后的版本,本质上和上面的没啥区别
import asyncio
async def func1():
print(1)
await asyncio.sleep(2)
print(2)
async def func2():
print(3)
await asyncio.sleep(2)
print(4)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 协程的意义
在一个线程中如果遇到 IO 等待的时间,线程不会在这傻傻等着,利用 IO 等待的时间再去做点别的事情;
案例:去下载 3 张图片(网络 IO)
普通方式(同步的方式)
import requests
1def download_images(url): print("开始下载:", url) # 发送网络请求,下载图片 response = requests.get(url) print("下载完成") # 图片保存到本地 file_name = url.rsplit("_")[-1] with open(file_name, mode="wb") as f: f.write(response.content)
if name == 'main': url_list = [ "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcQG0Hl2WQnmhO_Sp_BAyjjA4y4LJLwu5M9POA&usqp=CAU", "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTvESXKroHI5muX_tRMN8UFOLVP1KRXmLzE-Q&usqp=CAU", "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTGI_d5Jb-PsXf5lPHB0P9BGxsWd8q3tCUOow&usqp=CAU" ]
for item in url_list: download_images(item)
1协程方式(异步的方式)
import asyncio import aiohttp
1
2async def fetch(session, url): print("发送请求", url) async with session.get(url, verify_ssl=False) as response: content = await response.content.read() file_name = url.rsplit("_")[-1] with open(file_name, mode="wb") as f: f.write(content) print("下载完成", url)
async def main(): async with aiohttp.ClientSession() as session: url_list = [ "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcQG0Hl2WQnmhO_Sp_BAyjjA4y4LJLwu5M9POA&usqp=CAU", "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTvESXKroHI5muX_tRMN8UFOLVP1KRXmLzE-Q&usqp=CAU", "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTGI_d5Jb-PsXf5lPHB0P9BGxsWd8q3tCUOow&usqp=CAU" ] tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
await asyncio.wait(tasks)
if name == 'main': asyncio.run(main())
1
# 异步编程
# 事件循环
理解成为一个死循环,去检测并执行某些代码。
伪代码:
# 伪代码
任务列表 = [任务1, 任务2, 任务3...]
while True:
可执行任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将可执行和已完成的任务返回
for 就绪任务 in 已准备就绪的任务列表:
执行已就绪的任务
for 已完成的任务 in 已完成的任务列表:
在任务列表中移除已完成的任务
如果任务列表中的任务都已完成,则终止循环
2
3
4
5
6
7
8
9
import asyncio
# 去生成或获取一个事件循环
loop = asyncio.get_event_loop()
# 可以理解为死循环在执行了
# 将任务放到任务列表
loop.run_until_complete(任务)
2
3
4
5
6
# 快速上手
协程函数,定义函数的时候,加上async def 函数名
协程对象,执行协程函数() 得到的协程对象。
asyncio 也支持旧式的 基于生成器的 (opens new window) 协程。
# 协程函数async
def fuc():
pass
result = func() # 协程对象
2
3
4
注意:执行协程函数创建的协程对象,函数内部代码是不会执行的。
如果想要运行,就得借助事件循环,让事件循环去执行协程对象
import asyncioasync
def func():
print("hello world")
result = func()
# loop = asyncio.get_event_loop()
loop.run_until_complete(result)
asyncio.run(result)
# python3.7有的 更简单了
2
3
4
5
6
7
8
# await 关键字
await + 可等待的对象,只能跟下面 3 种
- 协程对象
- Future 对象
- Task 对象
简单粗暴理解:都是 IO 等待
示例 1
import asyncioasync
def func():
print("来玩啊")
response = await asyncio.sleep(2)
# 模拟等待IO
print("结束", response)
asyncio.run(func())
2
3
4
5
6
7
8
示例 2
import asyncioasync
def others():
print("start")
await asyncio.sleep(2)
print("end")
return '返回值'
async def func():
print("执行协程函数内部代码")
# 遇到IO操作挂起当前协程(任务),等待IO操作完成之后再继续往下执行,当前协程挂起时,事件循环可以去执行其他协程(任务)。
response = await others()
print("IO请求结束,结果为", response)
asyncio.run(func())
2
3
4
5
6
7
8
9
10
11
12
13
示例 3
import asyncioasync
def others():
print("start")
await asyncio.sleep(2)
print("end")
return '返回值'
async def func():
print("执行协程函数内部代码")
# 遇到IO操作挂起当前协程(任务),等待IO操作完成之后再继续往下执行,当前协程挂起时,事件循环可以去执行其他协程(任务)。
response1 = await others()
print("IO请求结束,结果为", response1)
response2 = await others()
print("IO请求结束,结果为", response2)
asyncio.run(func())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
遇到 await,就是等待对象的值得到结果之后再继续向下走
# Task 对象
任务是被用来"并行的"调度协程
当一个协程通过
asyncio.create_task(协程对象)
等函数被封装为一个任务,该协程会被自动调度执行。白话:帮助我们在事件循环中并发的添加多个任务。
文档示例:
import asyncioasync
def nested():
return 42
async def main():
# Schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await taskasyncio.run(main())
2
3
4
5
6
7
8
9
10
11
示例 1:
import asyncioasync
def func():
print(1)
await asyncio.sleep(2)
print(2)
return '返回值'
async def main():
print("main开始")
# 创建task对象,并将当前执行func函数任务添加到事件循环
task1 = asyncio.create_task(func())
task2 = asyncio.create_task(func())
print("main结束")
# 当执行某协程时遇到IO操作,会自动切换其他任务
# 此处的await是等待相对应的协程全部执行完毕并获取返回结果
ret1 = await task1
ret2 = await task2
print(ret1, ret2)
asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
示例 2
import asyncioasync
def func():
print(1)
await asyncio.sleep(2)
print(2)
return '返回值'
async def main():
print("main开始")
task_list = [ asyncio.create_task(func(), name="n1"),
# 起名
asyncio.create_task(func(), name="n2") ]
print("main结束")
# 当执行某协程时遇到IO操作,会自动切换其他任务
# 此处的await是等待相对应的协程全部执行完毕并获取返回结果
# ret1 = await task1
# ret2 = await task2
done, pending = await asyncio.wait(task_list, timeout=2)
# 最多等2秒
print(done, pending)asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
示例 3
import asyncioasync
def func():
print(1)
await asyncio.sleep(2)
print(2)
return '返回值'
# 写在外边注意,只能将协程函数放进列表里
task_list = [func()func()]done, pending = asyncio.run(asyncio.wait(task_list))
print(done)
2
3
4
5
6
7
8
9
10
# asyncio.Future 对象
更偏向底层,是 Task 类的基类
Future
(opens new window) 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。
当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。
在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。
通常情况下 没有必要 在应用层级的代码中创建 Future 对象。
Future 对象有时会由库和某些 asyncio API 暴露给用户,用作可等待对象:
async def main():
await function_that_returns_a_future_object()
# this is also valid:
await asyncio.gather( function_that_returns_a_future_object(), some_python_coroutine() )
2
3
4
一个很好的返回对象的低层级函数的示例是 loop.run_in_executor()
(opens new window)。
示例 1
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务 Future对象 , 这个任务什么都不干
fut = loop.create_future()
# 等待任务最终结果, 没有结果会一直等待下去
await fut
asyncio.run(main())
2
3
4
5
6
7
8
# concurrent.futures.Future 对象
实现线程池、进程池实现异步操作时用到的对象
import timefrom concurrent.futures
import Futurefrom concurrent.futures.thread
import ThreadPoolExecutorfrom concurrent.futures.process
import ProcessPoolExecutor
def func(val):
time.sleep(1)
print(val)
# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)
# 或者# 创建进程池
# pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
fut = pool.submit(func, i)
print(fut)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
以后写代码可能会存在交叉使用,一部分用协程,一部分用线程池。
例如:crm 项目,内部的 80%都是基于协程异步编程。比如操作 MySQL(如果不支持)就会使用线程池或者进程池做异步编程。
交叉使用示例:
import timeimport asyncioimport concurrent.futures
def func1():
# 某个耗时操作
time.sleep(2)
return "SB"
async def main():
loop = asyncio.get_running_loop()
# 将不支持协程的进行转换
fut = loop.run_in_executor(None, func1)
result = await fut
print("default thread pool", result)
asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
案例:asyncio + 不支持异步的一个模块
import asyncioimport requestsasync
def download_image(url):
# 发生网络请求,下载图片,(遇到网络下载的IO请求,自动切换到其他任务)
print("开始下载:", url)
loop = asyncio.get_event_loop()
# requests模块不支持异步操作,所以就使用线程池来配合实现
future = loop.run_in_executor(None, requests.get, url)
response = await future
print("下载完成")
# 图片保存到本地
file_name = url.rsplit('_')[-1]
with open(file_name, mode="wb") as file_object:
file_object.write(response.content)if __name__ == '__main__':
url_list = [ '图片.jpg' ]
tasks = [download_image(url) for url in url_list]
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 异步迭代器
实现了__aiter__()
和__anext__()
方法的对象。__anext__
必须返回一个awaitable
对象。async for
会处理异步迭代器的__anext__()
方法所返回的可等待对象,直到其引发一个StopAsyncIteration
异常。
什么是异步可迭代对象?
可在async for
语句中被使用的对象。必须通过它的__aiter__()
方法返回一个asynchronous_iterator
。
import asyncioclass Reader(object):
""" 自定义异步迭代器,同时也是异步可迭代对象 """
def __init__(self):
self.count = 0
async def readline(self):
# await asyncio.sleep(2)
self.count += 1
if self.count == 100:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val = await self.readline()
if val == None:
raise StopAsyncIteration
return valasync def func():
obj = Reader()
# 必须写在协程函数内
async for item in obj:
print(item)asyncio.run(func())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 异步上下文管理器
此种对象通过定义__aenter__()
和__aexit__()
方法来对async with
语句中的环境进行控制。
import asyncioclass AsyncContextManager:
def __init__(self):
# self.conn = conn
pass
async def do_something(self):
# 异步操作数据库
return 666
async def __aenter__(self):
# 异步连接数据库
# self.conn = await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 异步关闭数据库
await asyncio.sleep(1)async def func():
obj = AsyncContextManager()
async with obj:
result = await obj.do_something()
print(result)
asyncio.run(func())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# uvloop
是 asyncio 的事件循环的替代方案。事件循环 > 默认 asyncio 的事件循环。
安装
pip3 install uvloop
import asyncioimport uvloop
# 主要是这一句
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 编写asyncio的代码
# 内部的事件循环自动化会变成
uvloopasyncio.run(...)
2
3
4
5
6
**注意:**Django3 和 FastAPI 一个asgi
-> uvicorn
本质上快都是用了 uvloop。