Asyncio 模块的另一个重要的组件是 Future 类。它和 concurrent.futures.Futures 很像,但是针对Asyncio的事件循环做了很多定制。 asyncio.Futures 类代表还未完成的结果(有可能是一个Exception)。所以综合来说,它是一种抽象,代表还没有做完的事情。
实际上,必须处理一些结果的回调函数被加入到了这个类的实例中。
1. 准备工作
要操作Asyncio中的 Future ,必须进行以下声明:
import asyncio
future = asyncio.Future()
基本的方法有:
cancel()
: 取消future的执行,调度回调函数result()
: 返回future代表的结果exception()
: 返回future中的Exceptionadd_done_callback(fn)
: 添加一个回调函数,当future执行的时候会调用这个回调函数remove_done_callback(fn)
: 从“call whten done”列表中移除所有callback的实例set_result(result)
: 将future标为执行完成,并且设置result的值set_exception(exception)
: 将future标为执行完成,并设置Exception
2. 如何做…
下面的例子展示了 Future
类是如何管理两个协程的,第一个协程 first_coroutine
计算前n个数的和, 第二个协程second_coroutine
计算n的阶乘,代码如下:
# -*- coding: utf-8 -*-
"""
Asyncio.Futures - Chapter 4 Asynchronous Programming
"""
import asyncio
import sys
@asyncio.coroutine
def first_coroutine(future, N):
"""前n个数的和"""
count = 0
for i in range(1, N + 1):
count = count + i
yield from asyncio.sleep(3)
future.set_result("first coroutine (sum of N integers) result = " + str(count))
@asyncio.coroutine
def second_coroutine(future, N):
count = 1
for i in range(2, N + 1):
count *= i
yield from asyncio.sleep(4)
future.set_result("second coroutine (factorial) result = " + str(count))
def got_result(future):
print(future.result())
if __name__ == "__main__":
N1 = int(sys.argv[1])
N2 = int(sys.argv[2])
loop = asyncio.get_event_loop()
future1 = asyncio.Future()
future2 = asyncio.Future()
tasks = [
first_coroutine(future1, N1),
second_coroutine(future2, N2)]
future1.add_done_callback(got_result)
future2.add_done_callback(got_result)
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
输出如下:
$ python asy.py 1 1
first coroutine (sum of N integers) result = 1
second coroutine (factorial) result = 1
$ python asy.py 2 2
first coroutine (sum of N integers) result = 3
second coroutine (factorial) result = 2
$ python asy.py 3 3
first coroutine (sum of N integers) result = 6
second coroutine (factorial) result = 6
$ python asy.py 4 4
first coroutine (sum of N integers) result = 10
second coroutine (factorial) result = 24
3. 讨论
在主程序中,我们通过定义future对象和协程联系在一起:
if __name__ == "__main__":
...
future1 = asyncio.Future()
future2 = asyncio.Future()
定义tasks的时候,将future对象作为变量传入协程中:
tasks = [
first_coroutine(future1, N1),
second_coroutine(future2, N2)]
最后,添加一个 future 执行时的回调函数:
def got_result(future):
print(future.result())
在我们传入future的协程中,在计算之后我们分别添加了3s、4s的睡眠时间:
yield from asyncio.sleep(4)
然后,我们将future标为完成,通过 future.set_result()
设置结果。
4. 了解更多
交换两个协程睡眠的时间,协程2会比1更早得到结果:
$ python asy.py 3 3
second coroutine (factorial) result = 6
first coroutine (sum of N integers) result = 6
$ python asy.py 4 4
second coroutine (factorial) result = 24
first coroutine (sum of N integers) result = 10
下一节:Celery 是一个 Python 框架,用来管理分布式任务的,遵循面向对象的中间件方法。它的主要 feature 是可以将许多小任务分布到一个大型的计算集群中,最后将任务的结果收集起来,组成整体的解决方案。