tornado iostream 实现解析
封装了对 socket fd 底层数据的读取、写入操作,针对不同的情况实现了几种不同的读取方式。
tornado.iostream.BaseIOStream.read_until_regex()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
def read_until_regex(self, regex, callback=None, max_bytes=None):
# 设置读取完成后的回调函数
future = self._set_read_callback(callback)
# 编译正则表达式,并保存到self._read_regex
self._read_regex = re.compile(regex)
# 保存最大读取数据长度到self._read_max_bytes
self._read_max_bytes = max_bytes
try:
self._try_inline_read()
except UnsatisfiableReadError as e:
# Handle this the same way as in _handle_events.
gen_log.info("Unsatisfiable read, closing connection: %s" % e)
self.close(exc_info=True)
return future
except:
if future is not None:
# Ensure that the future doesn't log an error because its
# failure was never examined.
future.add_done_callback(lambda f: f.exception())
raise
return future
|
通过正则表达式匹配客户端发来的数据,http协议规定了数据结构,每行数据后面都会有一个 CRLF,而请求行与请求头数据结尾也会有一个 CRLF,即请求头数据后面有两个 CRLF,请求体也是如此。
CRLF 即为回车(Carriage-Return,CR,\r
)、换行(Line-Feed,LF,\n
)。Windows 下 CRLF 表示:\r\n
;Unix 下 CRLF 表示:\n
。
tornado.http1connection.HTTP1Connection._read_message() 方法调用本方法传入的 regex 参数为 “\r?\n\r?\n”,刚好兼容了 Windows 和 Unix。
tornado.iostream.BaseIOStream._set_read_callback()
1
2
3
4
5
6
7
8
|
def _set_read_callback(self, callback):
assert self._read_callback is None, "Already reading"
assert self._read_future is None, "Already reading"
if callback is not None:
self._read_callback = stack_context.wrap(callback)
else:
self._read_future = TracebackFuture()
return self._read_future
|
通过 read_until_regex 方法读取请求数据时,是没有 callback 的,即 callback 为 None,所以_set_read_callback 会返回一个 Future 对象实例,用于返回异步执行结果。
tornado.iostream.BaseIOStream._try_inline_read()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
def _try_inline_read(self):
# 尝试从缓存数据中完成当前读操作。
# 如果此次读操作能在无阻塞的情况下完成,则在下次IOLoop迭代中执行读回调函数,
# 否则为此次读事件在套接字上启动监听
# 查看是否从前一次的读操作中获取到了数据
# 第一部分
self._run_streaming_callback()
pos = self._find_read_pos()
if pos is not None:
self._read_from_buffer(pos)
return
# 第二部分
self._check_closed()
try:
pos = self._read_to_buffer_loop()
except Exception:
# If there was an in _read_to_buffer, we called close() already,
# but couldn't run the close callback because of _pending_callbacks.
# Before we escape from this function, run the close callback if
# applicable.
self._maybe_run_close_callback()
raise
if pos is not None:
self._read_from_buffer(pos)
return
# 第三部分
# We couldn't satisfy the read inline, so either close the stream
# or listen for new data.
if self.closed():
self._maybe_run_close_callback()
else:
self._add_io_state(ioloop.IOLoop.READ)
|
方法可以分为三部分:
首先会去检测前一次是否将数据读取到了缓存,即通过 self._find_read_pos() 获取到的 pos 是否为 None,如果不为 None,则表示已经读取数据完成(读取到足够大小的数据:max_bytes 参数、正则表达式匹配成功:regex 参数,或者遇到指定的分隔符:read_until() 方法中的 delimiter 参数),然后调用 self._read_from_buffer(pos) 将_read_future 加入 IOLoop;
调用 self._read_to_buffer_loop() 循环读取请求数据,方法中封装了第一部分的部分操作,接下来与第一部分操作一致;
判断该stream连接是否断开,如果已经断开调用关闭回调函数,否则将该连接再次放到 IOLoop 中,继续读取数据。
tornado.iostream.BaseIOStream._find_read_pos()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
def _find_read_pos(self):
# 试图在读取缓冲区中找到满足当前待读取的位置
# 如果能够满足当前读取,则返回缓冲区中的位置;如果不能,则返回None。
if (self._read_bytes is not None and
(self._read_buffer_size >= self._read_bytes or
(self._read_partial and self._read_buffer_size > 0))):
num_bytes = min(self._read_bytes, self._read_buffer_size)
return num_bytes
elif self._read_delimiter is not None:
# Multi-byte delimiters (e.g. '\r\n') may straddle two
# chunks in the read buffer, so we can't easily find them
# without collapsing the buffer. However, since protocols
# using delimited reads (as opposed to reads of a known
# length) tend to be "line" oriented, the delimiter is likely
# to be in the first few chunks. Merge the buffer gradually
# since large merges are relatively expensive and get undone in
# _consume().
if self._read_buffer:
loc = self._read_buffer.find(self._read_delimiter,
self._read_buffer_pos)
if loc != -1:
loc -= self._read_buffer_pos
delimiter_len = len(self._read_delimiter)
self._check_max_bytes(self._read_delimiter,
loc + delimiter_len)
return loc + delimiter_len
self._check_max_bytes(self._read_delimiter,
self._read_buffer_size)
elif self._read_regex is not None:
# self._read_buffer在_read_to_buffer方法中定义
if self._read_buffer:
# 在已读取的数据中从上一次的位置开始匹配正则,self._read_buffer_pos在
# _consume方法中定义
m = self._read_regex.search(self._read_buffer,
self._read_buffer_pos)
# 匹配成功
if m is not None:
# 获取本次读取的数据大小
loc = m.end() - self._read_buffer_pos
# 判断本次读取的数据是否超过了最大读取数据量,如果是则
# 报错UnsatisfiableReadError
self._check_max_bytes(self._read_regex, loc)
return loc
# 如果没有匹配到正则表达式,则检查读取的数据量是否超过了最大读取数据量
# self._read_buffer_size在_read_to_buffer方法中定义
self._check_max_bytes(self._read_regex, self._read_buffer_size)
return None
|
在 read_until_regex() 方法中,self._read_regex 是存在的,self._read_delimiter 是针对 read_until() 方法的操作。在此方法中可知,传入 read_until_regex() 的 max_bytes 必须要大于 self.read_chunk_size(socket.recv 每次循环接收的数据量),否则会直接报错。
tornado.iostream.BaseIOStream._read_to_buffer_loop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
def _read_to_buffer_loop(self):
# This method is called from _handle_read and _try_inline_read.
try:
# 根据不同的方法调用获取对应的目标数据量
if self._read_bytes is not None:
target_bytes = self._read_bytes
elif self._read_max_bytes is not None:
target_bytes = self._read_max_bytes
elif self.reading():
# 对于没有max_bytes参数的read_until或者read_until_close,应在扫描分
# 隔符之前尽可能多地进行读取。
target_bytes = None
else:
target_bytes = 0
next_find_pos = 0
# 假装有一个挂起的回调,以便_read_to_buffer中的EOF不会触发立即关闭回调。
# 在这个方法(_try_inline_read)的最后,我们要么通过_read_from_buffer
# 建立一个真正的等待回调,要么运行关闭回调。避免程序中途退出。因为
# 在_maybe_run_close_callback、_maybe_add_error_listener等方法中都
# 会比较self._pending_callbacks参数值,如果该值不为0,则表示该loop过程
# 还在执行,不能去运行关闭回调。最后的finally块中会递减该值。
self._pending_callbacks += 1
while not self.closed():
# 从socket中读数据,直到得到EWOULDBLOCK(当一个非阻塞的操作没有数据
# 操作时,如读操作时,缓存区空了,此时没有数据读了,写操作时,缓存区已
# 满,无法再写入)或者类似的错误。在Windows上为EWOULDBLOCK,Linux
# 上为EAGAIN
if self._read_to_buffer() == 0:
break
self._run_streaming_callback()
# 如果已经读完了所有可以使用的字节,就跳出这个循环。不能在
# 这里调用read_from_buffer,因为它与pending_callback和
# error_listener机制的微妙交互。
# 如果已经达到target_bytes,则已经读取完成了。
if (target_bytes is not None and
self._read_buffer_size >= target_bytes):
break
# 否则,需要调用更加昂贵的find_read_pos。 在每次读取时这样做
# 效率不高,所以在第一次读取以及每当读取缓冲区大小加倍时都要这样做。
if self._read_buffer_size >= next_find_pos:
pos = self._find_read_pos()
if pos is not None:
return pos
next_find_pos = self._read_buffer_size * 2
return self._find_read_pos()
finally:
# 递减该属性值,以便能执行关闭回调等操作
self._pending_callbacks -= 1
|
读取底层 socket fd 中的数据,将其保存到缓存中。通过判断该 socket 连接是否断开进行循环读取操作,最主要的就是 self._read_to_buffer() 方法,封装了对 socket 中读取到的数据处理过程。最终会调用 socket 原生的读取函数 socket.recv(buffer),即 read_from_fd() 函数,此函数在 IOStream 中被实现。
tornado.iostream.BaseIOStream._read_from_buffer()
1
2
3
4
5
6
7
|
def _read_from_buffer(self, pos):
# 尝试从缓冲区中完成当前正在等待的读取
# 重置参数
self._read_bytes = self._read_delimiter = self._read_regex = None
self._read_partial = False
self._run_read_callback(pos, False)
|
tornado.iostream.BaseIOStream._run_read_callback()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
def _run_read_callback(self, size, streaming):
# 判断是否要运行stream回调函数,read_until_regex没有该函数
if streaming:
callback = self._streaming_callback
else:
callback = self._read_callback
self._read_callback = self._streaming_callback = None
# 如果self._read_future不为空
if self._read_future is not None:
assert callback is None
future = self._read_future
self._read_future = None
future.set_result(self._consume(size))
if callback is not None:
assert (self._read_future is None) or streaming
self._run_callback(callback, self._consume(size))
else:
# If we scheduled a callback, we will add the error listener
# afterwards. If we didn't, we have to do it now.
self._maybe_add_error_listener()
|
首先通过 streaming 参数,决定 callback 是何值。streaming 参数针对 read_bytes、read_until_close 这两个读取方法,而 read_until、read_until_regex 没有 streaming_callback。针对没有 streaming_callback 的方法,会判断 self._read_callback 和 self._read_future,如果 self._read_future 不会空,则会使用 Future 对象将数据返回,否则通过 callback 回调返回。Future 详解参考:tornado_concurrent
tornado.iostream.BaseIOStream._consume()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
def _consume(self, loc):
# 消耗缓存区的loc数量数据,并将其返回
if loc == 0:
return b""
assert loc <= self._read_buffer_size
# 获取已读取到缓存区的总数据self._read_buffer,并截取其中从位置
# self._read_buffer_pos(最开始为0)到self._read_buffer_pos + loc
# 中的数据
b = (memoryview(self._read_buffer)
[self._read_buffer_pos:self._read_buffer_pos + loc]
).tobytes()
self._read_buffer_pos += loc
self._read_buffer_size -= loc
# Amortized O(1) shrink
# (this heuristic is implemented natively in Python 3.4+
# but is replicated here for Python 2)
if self._read_buffer_pos > self._read_buffer_size:
del self._read_buffer[:self._read_buffer_pos]
self._read_buffer_pos = 0
return b
|