shield

asyncio.shield,用它可以屏蔽取消操作。一直到这里,我们还没有见识过Task的取消。

深究Python中的asyncio库-shield函数

看一个例子:

In:loop=asyncio.get_event_loop()
In:task1=loop.create_task(a())
In:task2=loop.create_task(b())
In:task1.cancel()
Out:True
In:awaitasyncio.gather(task1,task2)
Suspendinga
Suspendingb
---------------------------------------------------------------------------
CancelledErrorTraceback(mostrecentcalllast)
cell_nameinasync-def-wrapper()
CancelledError:

在上面的例子中,task1被取消了后再用asyncio.gather收集结果,直接抛CancelledError错误了。这里有个细节,gather支持return_exceptions参数:

In:awaitasyncio.gather(task1,task2,return_exceptions=True)
Out:[concurrent.futures._base.CancelledError(),'B']

可以看到,task2依然会执行完成,但是task1的返回值是一个CancelledError错误,也就是任务被取消了。如果一个创建后就不希望被任何情况取消,可以使用asyncio.shield保护任务能顺利完成。不过要注意一个陷阱,先看错误的写法:

In:task1=asyncio.shield(a())
In:task2=loop.create_task(b())
In:task1.cancel()
Out:True
In:awaitasyncio.gather(task1,task2,return_exceptions=True)
Suspendinga
Suspendingb
Resumingb
Out:[concurrent.futures._base.CancelledError(),'B']

可以看到依然是CancelledError错误,且协程a未执行完成,正确的用法是这样的:

In:task1=asyncio.shield(a())
In:task2=loop.create_task(b())
In:ts=asyncio.gather(task1,task2,return_exceptions=True)
In:task1.cancel()
Out:True
In:awaitts
Suspendinga
Suspendingb
Resuminga
Resumingb
Out:[concurrent.futures._base.CancelledError(),'B']

可以看到虽然结果是一个CancelledError错误,但是看输出能确认协程实际上是执行了的。所以正确步骤是:

先创建 GatheringFuture 对象 ts

取消任务

await ts

asynccontextmanager

如果你了解Python,之前可能听过或者用过contextmanager ,一个上下文管理器。通过一个计时的例子就理解它的作用:

fromcontextlibimportcontextmanager
asyncdefa():
awaitasyncio.sleep(3)
return'A'
asyncdefb():
awaitasyncio.sleep(1)
return'B'
asyncdefs1():
returnawaitasyncio.gather(a(),b())
@contextmanager
deftimed(func):
start=time.perf_counter()
yieldasyncio.run(func())
print(f'Cost:{time.perf_counter()-start}')

timed函数用了contextmanager装饰器,把协程的运行结果yield出来,执行结束后还计算了耗时:

In:fromcontextmanagerimport*
In:withtimed(s1)asrv:
...:print(f'Result:{rv}')
...:
Result:['A','B']
Cost:3.0052654459999992

大家先体会一下。在Python 3.7添加了asynccontextmanager,也就是异步版本的contextmanager,适合异步函数的执行,上例可以这么改:

@asynccontextmanager
asyncdefasync_timed(func):
start=time.perf_counter()
yieldawaitfunc()
print(f'Cost:{time.perf_counter()-start}')
asyncdefmain():
asyncwithasync_timed(s1)asrv:
print(f'Result:{rv}')
In:asyncio.run(main())
Result:['A','B']
Cost:3.00414147500004

async版本的with要用async with,另外要注意yield await func()这句,相当于yield + await func()

PS: contextmanager 和 asynccontextmanager 最好的理解方法是去看源码注释

发表回复