tornado concurrent 实现解析
用于处理线程和 Futures 的工具。Futures 是 python3.2 中 concurrent.futures 包引入的并发编程模式。在本模块中,定义了一个兼容的 Future 类,它被设计用于协同工作,还有一些用于与 concurrent.futures 包交互的实用函数。
Future 的设计目标是作为协程(coroutine)和 IOLoop 的媒介,从而将协程和 IOLoop 关联起来。
Future 是异步操作结果的占位符,用于等待结果返回。通常作为函数 IOLoop.add_future() 的参数或 gen.coroutine 协程中 yield 的返回值。
等到结果返回时,外部可以通过调用 set_result() 设置真正的结果,将结果保存在 Future 内存中,然后调用所有回调函数,恢复协程的执行,最后通过 result() 获取结果。
Future 类通过 self._done 的值来判断本次 Future 操作是否结束,默认为 False,当调用 set_result() 设置结果、或者调用 set_exc_info() 设置异常时会调用 _set_done() 修改其值,即表示本次操作已经完成。
tornado.concurrent.Future.set_result()
1
2
3
4
5
|
def set_result(self, result):
# 将结果保存到 self._result
self._result = result
# 修改 self._done,并调用所有回调函数
self._set_done()
|
tornado.concurrent.Future._set_done()
1
2
3
4
5
6
7
8
9
10
11
|
def _set_done(self):
# 修改 self._done 值,表示本次操作已完成
self._done = True
# 循环执行 self._callbacks 中的回调函数,通过 add_done_callback() 定义
for cb in self._callbacks:
try:
cb(self)
except Exception:
app_log.exception('Exception in callback %r for %r',
cb, self)
self._callbacks = None
|
tornado.concurrent.Future.add_done_callback()
1
2
3
4
5
6
7
|
def add_done_callback(self, fn):
# 添加本次 Future 操作完成时的回调函数,如果 Future 还没结束,则将回调函数加入
# self._callbacks,以便结束时调用,如果已经结束,则直接运行回调函数
if self._done:
fn(self)
else:
self._callbacks.append(fn)
|
tornado.concurrent.Future.result()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
def result(self, timeout=None):
# 清理日志
self._clear_tb_log()
# 判断 self._result 结果是否为None,通过 set_result() 设置
if self._result is not None:
return self._result
# 判断是否有异常信息,通过 set_exc_info() 设置
if self._exc_info is not None:
try:
raise_exc_info(self._exc_info)
finally:
self = None
# 检查是否结束
self._check_done()
return self._result
|
tornado.concurrent.chain_future()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
def chain_future(a, b):
# 将两个Future对象关联在一起,一个完成,另一个也完成
# “a”的结果(成功或失败)将被复制到“b”,除非在“a”结束之前,“b”已经完成或被取消。
def copy(future):
assert future is a
# 如果b已经完成或者取消,则直接返回
if b.done():
return
if (isinstance(a, TracebackFuture) and
isinstance(b, TracebackFuture) and
a.exc_info() is not None):
b.set_exc_info(a.exc_info())
elif a.exception() is not None:
b.set_exception(a.exception())
else:
b.set_result(a.result())
# 将 copy()添加到 a 的回调列表中
a.add_done_callback(copy)
|
首先会调用 a.add_done_callback(copy),若 a 已经完成则直接运行 copy 函数,否则加入到回调列表中等到 a 完成时再运行。copy 函数将两个 Future 对象联系到了一起,用作结果返回、超时处理。