# asyncio笔记 **Repository Path**: HaiXiuDeDXianSheng/asyncio-notes ## Basic Information - **Project Name**: asyncio笔记 - **Description**: python异步编程相关知识点asyncio,介绍asyncio,库的使用。该项目对比了python3.6版本和3.7以上版本在使用asyncio 时的不同语法。 - **Primary Language**: Python - **License**: GPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 6 - **Forks**: 4 - **Created**: 2022-02-06 - **Last Updated**: 2025-04-10 ## Categories & Tags **Categories**: Uncategorized **Tags**: Python, 异步编程 ## README - [协程](#协程) - [概念](#概念) - [对yield的理解](#对yield的理解) - [异步IO](#异步io) - [asyncio](#asyncio) - [事件循环](#事件循环) - [快速上手](#快速上手) - [创建协程](#创建协程) - [await关键字](#await关键字) - [协程嵌套](#协程嵌套) - [tsak对象](#tsak对象) - [创建一个task](#创建一个task) - [协程并发](#协程并发) - [asyncio的future对象(asyncio.Future)](#asyncio的future对象asynciofuture) - [concurrent的future对象(concurrent.futures.Future)](#concurrent的future对象concurrentfuturesfuture) - [异步和非异步模块混合的案例](#异步和非异步模块混合的案例) - [asyncio异步迭代器(了解即可)](#asyncio异步迭代器了解即可) - [什么是异步迭代器](#什么是异步迭代器) - [什么是异步可迭代对象](#什么是异步可迭代对象) - [绑定回调(为了得到协程对象的返回值)](#绑定回调为了得到协程对象的返回值) - [future与result](#future与result) - [阻塞与await](#阻塞与await) - [并发和并行](#并发和并行) - [异步上下文管理器](#异步上下文管理器) - [uvloop](#uvloop) - [实战案例](#实战案例) - [异步redis](#异步redis) - [异步mysql](#异步mysql) - [总结](#总结) # 协程 https://zhuanlan.zhihu.com/p/72887901 https://zhuanlan.zhihu.com/p/73568282 ## 概念 在同一线程内,一段执行代码过程中,可以中断并跳转到另一段代码中,接着之前中断的地方继续执行。(和多线程时的情况类似) - 优点: - 无需线程上下文切换,协程避免了无意义的调度,可以提高性能。(但因此,程序员必须自己承担调度责任。同时,协程也失去了标准线程使用多CPU的能力) - 无需原子操作锁定及同步开销。 - 方便切换控制流,简化编程模型 - 高并发+高扩展性+低成本,一个CPU支持上万的协程不是问题。很适合用于高并发处理 - 缺点 - 无法利用多核资源。协程的本质时单线程,不能同时将单个CPU的多个核用上,协程需要进程配合才能运行在多CPU上(不过我们日常编程不会有这个问题,除非是CPU密集型应用) - 进行阻塞操作(如IO时)会阻塞掉整个程序 ## 对yield的理解 >```python > def fun(): > print('---------start---------') > while True: > res = yield 4 > print('res:',res) > > g=fun() > print(next(g)) > print('*'*20) > print(next(g)) > ``` 得到结果如下所示: >``` >---------start--------- >4 >******************** >res: None >4 >``` 用yield实现协程(子程序间的切换) >```python >import time >def A(): > while True: > print('-------A-------') > yield > time.sleep(0.5) > >def B(c): > while True: > print('---------B------------') > c.__next__() > >a=A() >B(a) >``` 得到的结果就是函数A()和函数B()不断交替执行。 >``` >---------B------------ >-------A------- >---------B------------ >-------A------- >---------B------------ >-------A------- >---------B------------ >``` ## 异步IO - 同步:先执行第一个事务,如果遇到阻塞,则进行等待直到第一个事务执行完毕,再执行第二个事务 - 异步:执行第一个事务之后,如果遇到阻塞,则会执行第二个事务,不会等待。可以通过状态、通知、回调来调用处理结果 >```python >import time >def foo(): > time.sleep(1) > >now = lambda :time.time() >start=now() >for i in range(5): > foo() >print('同步所花费的时间: %f s'%(now() - start)) >``` 得到结果如下: >``` >同步所花费的时间: 5.002217 s >``` 异步IO需要使用asyncio模块 >```python >import time >import asyncio >now = lambda :time.time() >async def fun(): > asyncio.sleep(1) >start = now() >loop =asyncio.get_event_loop() >for i in range(5): > loop.run_until_complete(fun()) >print('异步所花费的时间: %f s'%(now() - start)) >``` 得到结果如下: >``` >异步所花费的时间: 0.002996 s >``` 多线程、多进程主要适合科学计算等CPU密集型程序。异步IO主要适合IO密集型场景,例如网络爬虫。 # asyncio asyncio是python3.4之后引入的标准库,内置对异步IO的支持。asyncio的编程模型是一个消息循环,我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。 1. event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数。 2. coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。 3. task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。 4. future: 代表将来执行或者没有执行的任务的结果,它和task没有本质上的区别 5. async/await 关键字:python3.5用于定义协程的关键字,**async定义一个协程,await用于挂起阻塞的异步调用接口。** ## 事件循环 理解成一个死循环,回去检测某些代码 >``` > # 伪代码 > 任务列表 =[任务1、任务2....] > while True: > 可执行的任务列表,执行完成的任务列表 = 去任务列表中检查所有的任务,将“可执行”和“已完成”的任务返回 > for 就绪任务 in 可执行任务列表: > 执行就绪任务 > for 已完成的任务 in 执行完成的任务列表: > 任务列表中移除 已完成的任务 > 如果 任务列表 中任务都完成,则终止循环 >``` 代码 >```python >import asyncio > # 创建或获取一个事件循环 > loop=asyncio.get_event_loop() > # 将任务放到“任务列表” > loop.run_until_complete(asyncio.wait(tasks)) >``` ## 快速上手 协程函数:定义函数的时候前面有async 关键字 协程对象:执行 协程函数() 得到的是协程对象 ### 创建协程 >```python >import asyncio > async def do_work(): > print('打我') > result = do_work() 执行协程函数创建协程对象,内部代码不运行。 >```python > # 创建或获取一个事件循环 > loop=asyncio.get_event_loop() > # 将任务放到“任务列表” > loop.run_until_complete(result) >``` 上面是python 3.6的写法,在python 3.7之后有更简洁的写法 >```python > asyncio.run(result) # 直接这么写就行了 >``` ## await关键字 await后面跟的是一个可等待的对象。**await的作用就是等待后面的函数返回结果,只有后面的函数返回结果了,程序才会继续往下执行。** - 可等待的对象包括: - 协程对象 - future对象 - task对象 (可以暂时理解成IO等待) **await在等什么?在等其后面的对象返回的值** ## 协程嵌套 >```python >import asyncio > >async def others(): > print('start') > await asyncio.sleep(2) > print('end') > return '返回值' > >async def func(): > print('执行协程函数内部代码') > # 遇到IO操作,挂起该协程。当IO操作完成后再继续往下执行 > # 当前协程挂起时,事件循环可以执行其他协程 > response = await others() # others()是一个协程对象 > print('IO操作结果为:',response) > >loop=asyncio.get_event_loop() >loop.run_until_complete(func()) >``` 一个协程函数中也可以有多个await >```python >import asyncio > >async def others(): > print('start') > await asyncio.sleep(2) # 只有外部线程等待了(await)才会切换到这里来执行 > print('end') > return '返回值' > >async def func(): > print('执行协程函数内部代码') > # 遇到IO操作,挂起该协程。当IO操作完成后再继续往下执行 > # 当前协程挂起时,事件循环可以执行其他协程 > response1 = await others() > print('IO操作结果为:',response1) > > response2 = await others() > print('IO操作结果为:',response2) > >loop=asyncio.get_event_loop() >loop.run_until_complete(func()) >``` 代码先执行func中的 print('执行协程函数内部代码'),然后遇到 await,协程挂起,等待await后面的程序全部执行完毕并返回结果。发现此处await的是一个协程,程序进入协程others() 中,执行print('start'),又遇到了await,此次await的是一个IO操作,则程序挂起,去执行其他的协程。(这里没有其他的协程,于是只能老实等待)。asyncio.sleep(2)执行结束后,回到others()里面继续执行挂起之后的代码。response1结束。response2同理。 得到结果如下: >``` >执行协程函数内部代码 >start >end >IO操作结果为: 返回值 >start >end >IO操作结果为: 返回值 >``` ## tsak对象 ### 创建一个task 就是往事件循环中加入任务用的。Tasks用于开发调度协程,通过asyncio.create_task(协程对象)创建(python 3.7 之后有这个函数),也可以用**asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)创建一个task**,run_until_complete的参数是一个future对象,当传入一个协程,其内部会自动封装成task。不建议手动实例化task对象。 >```python >import asyncio > >async def others(): > print('start') > await asyncio.sleep(2) # 只有外部线程等待了(await)才会切换到这里来执行 > print('end') > return '返回值' > >async def func(): > print('执行协程函数内部代码') > # 遇到IO操作,挂起该协程。当IO操作完成后再继续往下执行 > # 当前协程挂起时,事件循环可以执行其他协程 > task1=asyncio.ensure_future(others()) > task2=asyncio.ensure_future(others()) > response1 = await task1 > print('IO操作结果为:',response1) > response2 = await task2 > print('IO操作结果为:',response2) > >loop=asyncio.get_event_loop() >loop.run_until_complete(func()) >``` 结果如下: >``` >执行协程函数内部代码 >start >start >end >end >IO操作结果为: 返回值 >IO操作结果为: 返回值 >``` 发现这里程序运行的结果和前面在await后面加协程对象的时候不太一样。这是为什么呢?**主要原因是加了两个 asyncio.ensure_future() 语句,添加了两个task对象。** - **程序执行的逻辑如下:** - 创建事件循环asyncio.get_event_loop() **[# 必须先创建事件循环]** - 运行主函数func(),执行print(),创建task1,创建task2 - 程序来到 response1=await task1,response1 一直在等待await后面的task1传回返回值。至此,由于遇到了await,func()主程序挂起. - 进入task1中执行。在await中执行print('start')后,遇到await阻塞,此时task1挂起,等待await后面的程序执行完毕。与此同时,发现除了task1外还存在另一个任务task2,于是去执行task2里面的程序。 - 执行task2中的print('start')后又遇到await等待,但此时已经没有其他任务了,因此只能等待task1和task2中挂起的程序执行完毕。 - task1先结束挂起,执行print('end'),并return返回值。**[谁先结束挂起,先执行谁那边的后续代码]** - 在task1执行完print('end')之后,return返回值之前,task2也结束挂起了。执行了print('end'),return返回值。**[这里虽然返回了值,但是response2只能等到await的时候才能接收到这个值]** - 函数返回至func()中,response1 获得了 await task1返回的值,执行了print() - response2获得了await task2的返回值,执行了print() 将程序改成如下的形式可以更加深刻理解程序执行的过程: >```python >import asyncio > >async def others(time,name): > print('start',name) > await asyncio.sleep(time) > print('end',name) > return '返回值'+name > >async def func(): > print('执行协程函数内部代码') > # 遇到IO操作,挂起该协程。当IO操作完成后再继续往下执行 > # 当前协程挂起时,事件循环可以执行其他协程 > task1=asyncio.ensure_future(others(3,'task1')) > task2=asyncio.ensure_future(others(2,'task2')) > response1 = await task1 > print('IO操作结果为:',response1) > response2 = await task2 > print('IO操作结果为:',response2) > >loop=asyncio.get_event_loop() >loop.run_until_complete(func()) >``` 得到结果如下: >``` >执行协程函数内部代码 >start task1 >start task2 >end task2 # 这边task2比task1先结束,这里之所以不会先print('IO操作结果为:',response2)是因为func()函数需要先等response1结束挂起才能执行后续。 >end task1 >IO操作结果为: 返回值task1 >IO操作结果为: 返回值task2 >``` 倘若交换task1和task2的sleep时间 >```python >import asyncio > >async def others(time,name): > print('start',name) > await asyncio.sleep(time) > print('end',name) > return '返回值'+name > >async def func(): > print('执行协程函数内部代码') > # 遇到IO操作,挂起该协程。当IO操作完成后再继续往下执行 > # 当前协程挂起时,事件循环可以执行其他协程 > task1=asyncio.ensure_future(others(2,'task1')) > task2=asyncio.ensure_future(others(3,'task2')) > response1 = await task1 > print('IO操作结果为:',response1) > response2 = await task2 > print('IO操作结果为:',response2) > >loop=asyncio.get_event_loop() >loop.run_until_complete(func()) >``` 得到结果如下: >``` >执行协程函数内部代码 >start task1 >start task2 >end task1 >IO操作结果为: 返回值task1 >end task2 >IO操作结果为: 返回值task2 >``` ## 协程并发 上述例子的代码写法一般用的很少,只是为了便于理解,正式写法如下: 使用asyncio.wait(task_list)即可。 >```python >import time >import asyncio > ># 使用async 来定义一个协程对象 >async def do_work(x): > print('waiting:',x) > await asyncio.sleep(x) # 模拟一个IO操作 > return 'Done after {}s'.format(x) > >async def main(): ># 创建多个协程对象 > coroutine1=do_work(1) > coroutine2=do_work(2) > coroutine3=do_work(3) > tasks=[ > asyncio.ensure_future(coroutine1), > asyncio.ensure_future(coroutine2), > asyncio.ensure_future(coroutine3) > ] > dones,pendings=await asyncio.wait(tasks,timeout=None) > # time_out=2 表示最多等2秒,假如time_out=None,则表示默认等到所有结果都完成 > ## 用gather()方法也行 > for task in dones: > print('task result:',task.result()) > >now = lambda :time.time() >start = now() ># 1.创建事件循环 >loop = asyncio.get_event_loop() ># 将协程对象加入到事件循环中 >loop.run_until_complete(main()) > >print('Time:',now()-start) >``` 得到结果如下 >``` >waiting: 1 >waiting: 2 >waiting: 3 >task result: Done after 3s >task result: Done after 1s >task result: Done after 2s >Time: 3.007572889328003 >``` 在python 3.7 之后我们可以在创建task的时候给task起名字 >```python >coroutine1=do_work(1) >coroutine2=do_work(2) >coroutine3=do_work(3) >tasks=[ > asyncio.create_task(coroutine1,name='MY_COROUTINE_1'), > asyncio.create_task(coroutine2,name='MY_COROUTINE_2'), > asyncio.create_task(coroutine3,name='MY_COROUTINE_3') >] >``` ## asyncio的future对象(asyncio.Future) future类是task类的基类,task对象只有运算得到返回值后,await的对象才能传回值并且向下运行。这个功能就是future对象来实现的。future源码中存在一个_state,一旦_state值变成“finished”,await就不再继续等待。future一般不手动去写它。 **案例1**: 若执行以下代码,则程序一直会处于等待状态。(卡死) >```python >import asyncio > >async def main(): > # 获取当前事件循环 > loop = asyncio.get_running_loop() # python3.7以后 > # 创建一个任务(future对象),什么也不干 > fut = loop.create_future() > # await会等待任务最终结果,没有结果则会一直等待下去 > await fut > >loop = asyncio.get_event_loop() >loop.run_until_complete(main()) >``` 示例2: >```python >import asyncio > >async def set_after(fut): > await asyncio.sleep(2) > fut.set_result('666') > >async def main(): > # 获取当前事件循环 > loop = asyncio.get_running_loop() # python3.7以后 > # 创建一个任务(future对象),什么也不干 > fut = loop.create_future() > # 等待任务最终结果,没有结果则会一直等待下去 > # 手动设置future的最终结果,那样fut就可以结束了 > await loop.create_task(set_after(fut)) > data = await fut # 等待future对象获取最终结果,否则一直等待下去 > print(data) > >loop = asyncio.get_event_loop() >loop.run_until_complete(main()) >``` 以上两段代码没有实际意义,只是加深对于future的理解 ## concurrent的future对象(concurrent.futures.Future) 在python中还有一个concurrent模块,concurrent模块也有对应的java接口。concurrent.future.Future对象和asyncio.Future对象没有任何关系,是完全不同的两个东西。concurrent.futures.Future是在利用进程池、线程池来实现异步操作时来使用的对象。 >```python >import time >from concurrent.futures import Future >from concurrent.futures.thread import ThreadPoolExecutor # 用线程执行异步 >from concurrent.futures.process import ProcessPoolExecutor # 用进程执行异步 > >def func(value): > time.sleep(1) > print(value) > return 123 > ># 创建线程池 >pool = ThreadPoolExecutor(max_workers=2) ># 或者创建进程池 ># pool = ProcessPoolExecutor(max_workers=5) > >for i in range(4): > fut = pool.submit(func,1) # 返回的是一个Future对象,fut.result()才是返回结果 > print(fut) >``` 实现得到的结果如下 >``` > > > > >1 >1 >1 >1 >``` 这边可以发现fut对象是并行创建的。 - 两者的相同点: - 两种Future都是用来等待结果的返回 **日后写代码可能会存在协程和线程、进程下的两种Future交叉使用的情况。** **[在编程的时候,遇到某个第三方模块不支持协程异步的时候用]** 一般情况下,不会交叉使用。一般情况下,要么用asyncio这种协程式的来实现异步,要么统一用进程池或线程池来实现异步。有些情况下会交叉使用,例如:crm项目中80%是基于协程异步编程+Mysql,这种情况下只有Mysql内部也支持async异步,两者才能实现无缝衔接。假如Mysql不支持协程,则需要考虑对此用线程、进程来实现了。 - **asyncio.run_in_executor()实现异步的内部原理**(以案例1为例) - 第一步:内部会先调用TreadPoolExector的submit方法去线程中申请一个func1函数,并返回一个concurrent.futures.Future对象 (与asyncio没有任何关系) - 第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装成asyncio.Future对象 - 因为concurrent.futures.Future对象不支持await语法,因此需要转换成asyncio.Future对象才能进行使用 **案例1:** >```python >import time >import asyncio >from concurrent.futures import Future >from concurrent.futures.thread import ThreadPoolExecutor # 用线程执行异步 >from concurrent.futures.process import ProcessPoolExecutor # 用进程执行异步 > >def func1(): > # 某个耗时操作 > time.sleep(1) > return 'SB' > >async def main(): > loop = asyncio.get_running_loop() > #1.run in default loop's exector (默认TreadPoolExecutor) > # 第一步:内部会先调用TreadPoolExector的submit方法去线程中申请一个func1函数,并返回一个concurrent.futures.Future对象 > # 第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装成asyncio.Future对象 > # 因为concurrent.futures.Future对象不支持await语法,因此需要转换成asyncio.Future对象才能进行使用 > > fut = loop.run_in_executor(None,func1) # 在指定的执行器中调用func()函数,executor参数必须是concurrent.futures.Exector对象 > result = await fut > print('default thread pool',result) > ># asyncio.run(main()) >loop = asyncio.get_event_loop() >loop.run_until_complete(main()) >``` 在使用线程池、进程池的时候,使用如下代码代替 >```python >import time >import asyncio >from concurrent.futures import Future >from concurrent.futures.thread import ThreadPoolExecutor # 用线程执行异步 >from concurrent.futures.process import ProcessPoolExecutor # 用进程执行异步 > >def func1(): > # 某个耗时操作 > time.sleep(1) > return 'SB' > >async def main(): > loop = asyncio.get_running_loop() > > # 2.RUN IN CUSTOM THREAD POOL > with ThreadPoolExecutor() as pool: > result = await loop.run_in_executor(pool,func1) > print('custom thread pool result',result) > > # 3.run in custom process pool > # with ProcessPoolExecutor() as pool: > # result = await loop.run_in_executor(pool,func1) > # print('custom process pool result',result) > ># asyncio.run(main()) >loop = asyncio.get_event_loop() >loop.run_until_complete(main()) >``` ## 异步和非异步模块混合的案例 案例:asyncio + 不支持异步的模块 >```python >import requests >import asyncio > >async def download_image(url): > # 发送网络请求下载图片 > print('开始下载:',url) > loop = asyncio.get_running_loop() > # request 不支持异步,使用线程池来辅助实现 > future = loop.run_in_executor(None,requests.get,url) > response = await future > print('下载完成') > # 图片保存 > file_name = url.rsplit('_')[-1] > with open(file_name,'wb') as fp: > fp.write(response.content) > >if __name__ =='__main__': > url_list=[ > 'www.xxxx1.com', > 'www.xxxx2.com', > 'www.xxxx2.com' > ] > > tasks = [download_image(url) for url in url_list] > loop = asyncio.get_event_loop() > loop.run_until_complete(asyncio.wait(tasks)) >``` - 上述程序的运行逻辑 - 一开始创建了三个url的任务 - 针对第一个url,执行download_image()函数,执行run_in_executor(),下面遇到await - 遇到await后协程挂起,去执行第二个url的任务,又创建一个线程,又await,然后转向第三个url,然后又创建线程,又等待。 以上的程序和纯依靠asyncio支持的协程实现异步存在一个区别就是,纯aysncio实现协程只存在一个线程,所有的I/O等待都是协程在等待。而上述过程,在实现异步的过程中,建立了三个线程,I/O等待是线程在进行等待,因此有较大的资源浪费。 ## asyncio异步迭代器(了解即可) ### 什么是异步迭代器 实现了__aiter__() 和__anext__() 方法的对象,而__anext__() 必须返回一个 awaitable 对象,async for 会处理异步迭代器的__anext__() 方法所返回的可等待对象,直到其引发一个StopAsyncIteration 异常。 ### 什么是异步可迭代对象 可在 async for 语句中被使用的对象,必须通过它的__aiter__() 方法返回一个 asynchronous iterator >```python >import asyncio > >class Reader: > '''自定义异步迭代器(同时也是异步可迭代对象) ''' > def __init__(self): > self.count = 0 > > async def readline(self): > # await asyncio.sleep(1) > self.count+=1 > if self.count==100: > return None > return self.count > > def __aiter__(self): > return self > > async def __anext__(self): > val = await self.readline() > if val == None: > raise StopAsyncIteration > return val >``` 上述代码,会发现假如采用如下迭代会报错: >```python >async for i in Reader(): > print(i) >``` 这是因为 **async for 必须放在协程函数内**,按照以下方式实现异步迭代 >```python >async def func(): > async for i in Reader(): > print(i) > >loop = asyncio.get_event_loop() >loop.run_until_complete(func()) >``` ## 绑定回调(为了得到协程对象的返回值) 绑定回调,在task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过该对象可以获取协程返回值,如果回调需要多个参数,可以通过偏函数导入。 >```python >import asyncio > ># 使用async 来定义一个协程对象 >async def do_work(x): > print('waiting:',x) > return 'Done after {}s'.format(x) > ># 获取协程对象 >coroutine = do_work(3) ># 1.创建事件循环 >loop = asyncio.get_event_loop() ># 创建task >task=loop.create_task(coroutine) ># 给任务添加回调函数 >def callback(future): > print('callback:',future.result()) > >task.add_done_callback(callback) ># 将协程对象加入到事件循环中 >loop.run_until_complete(task) >``` 得到结果 >``` >waiting: 3 >callback: Done after 3s >``` ## future与result 回调一直是很多异步编程的噩梦,程序员更喜欢用同步的编写方式写异步代码,以避免回调的问题。回调中我们使用了future的result()方法,前面不绑定回调的例子中,可以看到task的finished状态.在那个时候,可以直接读取task的result方法。 >```python >import asyncio > ># 使用async 来定义一个协程对象 >async def do_work(x): > print('waiting:',x) > return 'Done after {}s'.format(x) > ># 获取协程对象 >coroutine = do_work(3) ># 1.创建事件循环 >loop = asyncio.get_event_loop() ># 创建task >task=loop.create_task(coroutine) > ># 将协程对象加入到事件循环中 >loop.run_until_complete(task) ># 直接调用task中的result()来获取返回结果 >print('直接获取返回结果:',task.result()) >``` ## 阻塞与await 使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器中的yield一样,函数交出控制权。**协程遇到await,事件循环就会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再执行下一个协程。** >```python >import time >import asyncio > ># 使用async 来定义一个协程对象 >async def do_work(x): > print('waiting:',x) > await asyncio.sleep(x) # 模拟一个IO操作 > return 'Done after {}s'.format(x) > >now = lambda :time.time() >start = now() ># 获取协程对象 >coroutine = do_work(3) ># 1.创建事件循环 >loop = asyncio.get_event_loop() ># 创建task >task=loop.create_task(coroutine) ># 将协程对象加入到事件循环中 >loop.run_until_complete(task) >print('task result:',task.result()) >print('Time:',now()-start) >``` 得到结果如下: >``` >waiting: 3 >task result: Done after 3s >Time: 3.0025010108947754 >``` ## 并发和并行 >```python > import time >import asyncio >now = lambda :time.time() >async def fun(): > task1=asyncio.ensure_future(asyncio.sleep(1)) > task2=asyncio.ensure_future(asyncio.sleep(1)) > await task1 > await task2 >start = now() >loop =asyncio.get_event_loop() >for i in range(5): > loop.run_until_complete(fun()) >print('异步所花费的时间: %f s'%(now() - start)) >``` 得到结果如下; >``` >异步所花费的时间: 5.003474 s >``` >```python >import time >import asyncio >now = lambda :time.time() >async def fun(): > await asyncio.ensure_future(asyncio.sleep(1)) > await asyncio.ensure_future(asyncio.sleep(1)) >start = now() >loop =asyncio.get_event_loop() >for i in range(5): > loop.run_until_complete(fun()) >print('异步所花费的时间: %f s'%(now() - start)) >``` 得到的结果如下: >``` >异步所花费的时间: 10.006665 s >``` 因此,直接对asyncio.ensure_future()创建的task对象进行await是不能实现并发的,必须将创建的对象返回给一个变量,在对该变量绑定的task对象进行await才可实现。更好的方法是使用asyncio.gather()方法或者aysncio.wait()方法来实现并发. - 并发和并行的概念 https://www.zhihu.com/question/33515481 - **串行:**喂?你在做什么呢?买菜啊?好的,到家了说一声。啊?到家了?那你到幼儿园接娃吧。 - 串行的特点:前一个任务没搞定,下一个任务就只能等着。 - **并行:**来,这是你的盖浇饭,这是我的胡辣汤。咱俩一起吃。 - 并行的特点:两个任务在同一时刻**互不干扰**的**同时**执行。 - **并发:**你去买个菜,顺路把邮件发了;路过幼儿园时带娃回家。 - 并发的特点:同时安排若干个任务,这些**任务可以彼此穿插着进行**;有些任务可能是并行的,比如买菜、发邮件和去幼儿园的某些路途是重叠的,这时你的确同时在做三件事;但进菜市场和发邮件和接娃三者是互斥的,每个时刻只能完成其中一件。换句话说,**并发允许两个任务彼此干扰。** **要点:**串行还是并发,这都是**任务安排者**视角看到的东西。前者要求你看到前一个任务结束了,下一个任务才能安排;而后者呢,你可以同时提交许多任务,执行者(们)之间会相互协调并自己安排执行顺序(但未必合理,比如可能出现死锁)——总之,你把任务安排下去就不用管了。相比之下,“并行”是**任务执行者**视角的东西,和前两者所处平面不同。 **并发是对需求侧的描述,并行才是对实现侧的描述,这两根本不是同一个范畴的东西** # 异步上下文管理器 https://blog.csdn.net/tinyzhao/article/details/52684473 这种对象通过定义__aenter__()和__aexit__()方法来对asyncio with语句中的环境进行控制,叫做异步上下文管理器 >```python >import asyncio > >class AsyncContextManager: > def __init__(self,conn): > self.conn=conn > > async def do_something(self): > # 异步读取数据库操作 > return 666 > > async def __aenter__(self): > self.conn = await asyncio.sleep(1) > return self > > async def __aexit__(self,exec_type,exc_val,exc_tb): > """ > with语句运行结束之后触发此方法的运行 > exc_type:如果抛出异常,这里获取异常类型 > exc_val:如果抛出异常,这里显示异常内容 > exc_tb:如果抛出异常,这里显示所在位置,traceback > """ > # 异步关闭数据库连接 > await asyncio.sleep(1) > >async def func(): > async with AsyncContextManager() as fp: > result = await fp.do_something() > print(result) >loop = asyncio.get_event_loop() >loop.run_until_complete(func()) >``` # uvloop uvloop是asyncio事件循环的替代方案,是一个第三方的库,但是用了uvloop可以在一定程度上提高事件循环的效率。uvloop事件循环效率 > 默认asyncio 的事件循环效率,比其他框架效率至少提高两倍,性能比肩go语言。uvloop貌似暂不支持windows >```python >import asyncio >import uvloop > # 将asyncio中的事件循环替换成uvloop中的事件循环 >asyncio.set_event_loop(uvloop.EventLoopPolicy()) ># 上下代码和以前一样 >... >loop = asyncio.get_event_loop() >loop.run_until_complete() >``` 注意:一个知名的asgi->uvcorn,速度快是因为用了uvloop # 实战案例 ## 异步redis 在使用python代码操作redis时,**连接/操作/断开都是网络IO** >```python > pip install aioredis > ``` ## 异步mysql >```python > pip install aiomysql > ``` 示例1: >```python >import asyncio >import aiomysql > >async def execute(): > # 网络IO操作,连接mysql > conn = await aiomysql.connect(host='127.0.0.1', port=1306, user='root', password='password',db='test') > # 网络IO操作,创建cursor() > cur = await conn.cursor() > # 网络IO操作,执行sql语句 > await cur.execute('SELECT host.user FROM user') > # 网络IO操作,获取sql结果 > result = cur.fetchall() > print(result) > # 网络IO操作,关闭链接 > await cur.close() > conn.close() > >loop = asyncio.get_event_loop() >loop.run_until_complete(execute) >``` 示例2:连接多个mysql >```python >import asyncio >import aiomysql > >async def execute(host,password): > # 网络IO操作,连接mysql > conn = await aiomysql.connect(host=host, port=1306, user='root', password=password,db='test') > # 网络IO操作,创建cursor() > cur = await conn.cursor() > # 网络IO操作,执行sql语句 > await cur.execute('SELECT host.user FROM user') > # 网络IO操作,获取sql结果 > result = cur.fetchall() > print(result) > # 网络IO操作,关闭链接 > await cur.close() > conn.close() > >async def main(): > task_list=[ > asyncio.ensure_future(execute('127.0.0.1','password')), > asyncio.ensure_future(execute('40.0.0.1','password')) > ] > > await asyncio.wait(task_list) > >loop = asyncio.get_event_loop() >loop.run_until_complete(main()) >``` # 总结 异步编程最大的意义:通过一个线程来实现并发行为。利用其IO等待的时间来做其他的事情