目录

tornado netutil 实现解析

tornado netutil 实现解析

tornado.netutil.bind_sockets()

该方法创建绑定到给定端口和地址的监听套接字 socket。返回套接字对象的列表,比如给定的 address 参数映射到多个 IP 地址,则返回多个 socket,最常见的是混合使用 IPv4 与 IPv6,则会创建对应的两个 socket。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def bind_sockets(port, address=None, family=socket.AF_UNSPEC,
                backlog=_DEFAULT_BACKLOG, flags=None, reuse_port=False):

    # 当设置了端口复用时,会检查系统是否支持端口复用功能
    if reuse_port and not hasattr(socket, "SO_REUSEPORT"):
        raise ValueError("the platform doesn't support SO_REUSEPORT")

    sockets = []
    if address == "":
        address = None
    # 如果系统不支持IPv6并且参数family为AF_UNSPEC,则family选择IPv4协议
    if not socket.has_ipv6 and family == socket.AF_UNSPEC:
        family = socket.AF_INET
    if flags is None:
        flags = socket.AI_PASSIVE
    bound_port = None
    # 循环遍历获取的地址信息
    for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM,
                                        0, flags)):
        af, socktype, proto, canonname, sockaddr = res
        # 排除另类数据
        if (sys.platform == 'darwin' and address == 'localhost' and
                af == socket.AF_INET6 and sockaddr[3] != 0):
            continue

        try:
            # 创建socket对象
            sock = socket.socket(af, socktype, proto)
        except socket.error as e:
            if errno_from_exception(e) == errno.EAFNOSUPPORT:
                continue
            raise
        # 设置close-on-exec标志位
        set_close_exec(sock.fileno())
        # 设置SO_REUSEADDR
        if os.name != 'nt':
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # 设置SO_REUSEPORT
        if reuse_port:
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
        if af == socket.AF_INET6:
            # 在linux上,ipv6 socket也默认接受ipv4,但是这样就无法绑定到ipv4
            # 中的0.0.0.0和ipv6中的::。 
            # 在其他系统上,单独的套接字必须用于监听ipv4和ipv6。 
            # 为了保持一致性,请务必在ipv6套接字上禁用ipv4,并在需要时使用
            # 单独的ipv4套接字。

            # Windows上的Python 2.x没有IPPROTO_IPV6。
            if hasattr(socket, "IPPROTO_IPV6"):
                sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)

        # 当port参数为None时,会自动分配绑定端口,该端口应同时绑定在IPv4和IPv6。
        # 当第一次循环时,bound_port会记录系统自动分配的port,然后应用到接下来的循环
        host, requested_port = sockaddr[:2]
        if requested_port == 0 and bound_port is not None:
            sockaddr = tuple([host, bound_port] + list(sockaddr[2:]))
        
        # 设置socket为非阻塞
        sock.setblocking(0)
        # 绑定socket
        sock.bind(sockaddr)
        # 记录下本次绑定的端口
        bound_port = sock.getsockname()[1]
        # 开启socket监听
        sock.listen(backlog)
        sockets.append(sock)
    return sockets

如上基本为 socket 的常规操作,主要操作放在了 IPv4 与 IPv6 的兼容方面。最终返回分别基于 IPv4 与 IPv6 两个 socket 对象组成的 list。

tornado.netutil.add_accept_handler()

正是该方法将 web 服务器与 IOLoop 连接了起来,主要用于添加一个 IOLoop 事件处理器来接受服务器 socket 上的新连接(来自客户端的连接)。当一个客户端连接被 accept,callback 函数将会被调用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def add_accept_handler(sock, callback, io_loop=None):

    if io_loop is None:
        io_loop = IOLoop.current()

    def accept_handler(fd, events):
        # 在我们处理回调时可能会有更多的连接; 为了防止其他任务的饥饿(如果该值过大,
        # 可能会导致程序执行当前队列时间过长,而后续任务无法被执行),我们必须限制
        # 我们一次接受的连接数。理想情况下,我们将接受输入此方法时等待的连接数,
        # 但是此信息不可用(而且在运行任何回调之前重新排列此方法以调用accept()
        # 多次可能会对多进程配置中的负载平衡产生不利影响)。 相反,我们使用(默认)
        # listen backlog作为我们可以合理接受的连接数的粗略启发式。
        for i in xrange(_DEFAULT_BACKLOG):
            try:
                connection, address = sock.accept()
            except socket.error as e:
                # _ERRNO_WOULDBLOCK表示我们接受了每个有用的连接
                if errno_from_exception(e) in _ERRNO_WOULDBLOCK:
                    return
                # ECONNABORTED表示有个链接已经被关闭了但任然在接受队列中
                if errno_from_exception(e) == errno.ECONNABORTED:
                    continue
                raise
            # 调用callback
            callback(connection, address)
    # 将服务器端sock以读事件注册到epoll,即epoll一直监听着服务器端socket,只要有
    # 客户端连接到服务器端socket,epoll就会触发,epoll.poll()方法就会返回,IOLoop
    # 就会调用accept_handler方法,最终调用callback(connection, address)
    io_loop.add_handler(sock, accept_handler, IOLoop.READ)