tornado 优秀的大并发处理能力得益于它的 web server 从底层开始就自己实现了一整套基于 epoll 的单线程异步架构。因此,有必要去了解一下关于epoll的预备知识。
epoll
epoll是在2.6内核中提出的,是之前的select和poll的增强版本。select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。
select select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。poll pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。
select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。
Linux IO模式及select,poll,epoll详解
epoll操作过程
epoll操作过程需要三个接口,分别如下:
int epoll_create(
int size);
int epoll_ctl(
int epfd,
int op,
int fd,
struct epoll_event *
event);
int epoll_wait(
int epfd,
struct epoll_event * events,
int maxevents,
int timeout);
1.int epoll_create(int size) 创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。 当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
2.int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) 函数是对指定描述符fd执行op操作。 - epfd:是epoll_create()的返回值。 - op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。 - fd:是需要监听的fd(文件描述符) - epoll_event:是告诉内核需要监听什么事,struct epoll_event结构如下:
struct epoll_event {
__uint32_t events;
epoll_data_t data;
};
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout) 等待epfd上的io事件,最多返回maxevents个事件。 参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
epoll工作模式
epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下: LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的。 ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)
ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。
Tornado.IOLoop
了解完epoll,可以开始阅读源码了。在IOLoop.py中,主要有两个类,IOLoop和PollIOLoop。可是在IOLoop中,发现很多关键的方法都是作为了接口交由子类实现,比如PollIOLoop,而这两者的关系由一个Configurable类联系起来。另外值得注意的是IOLoop没有__init__,这一点同样可以在Configurable类中找到解释。
class Configurable(object):
__impl_class =
None
__impl_kwargs =
None
def __new__(cls, *args, **kwargs):
base = cls.configurable_base()
init_kwargs = {}
if cls
is base:
impl = cls.configured_class()
if base.__impl_kwargs:
init_kwargs.update(base.__impl_kwargs)
else:
impl = cls
init_kwargs.update(kwargs)
instance = super(Configurable, cls).__new__(impl)
instance.initialize(*args, **init_kwargs)
return instance
@classmethod
def configurable_base(cls):
raise NotImplementedError()
@classmethod
def configurable_default(cls):
raise NotImplementedError()
def initialize(self):
@classmethod
def configure(cls, impl, **kwargs):
base = cls.configurable_base()
if isinstance(impl, (str, unicode_type)):
impl = import_object(impl)
if impl
is not None and not issubclass(impl, cls):
raise ValueError(
"Invalid subclass of %s" % cls)
base.__impl_class = impl
base.__impl_kwargs = kwargs
@classmethod
def configured_class(cls):
"""Returns the currently configured class."""
base = cls.configurable_base()
if cls.__impl_class
is None:
base.__impl_class = cls.configurable_default()
return base.__impl_class
@classmethod
def _save_configuration(cls):
base = cls.configurable_base()
return (base.__impl_class, base.__impl_kwargs)
@classmethod
def _restore_configuration(cls, saved):
base = cls.configurable_base()
base.__impl_class = saved[
0]
base.__impl_kwargs = saved[
1]
主要看Configurable类中的__new()__方法,这里base = cls.configurable_base(),但是cls.configurable_base()在这个类中是一个接口,显然是要在IOLoop中要实现的。
@classmethod
def configurable_base(cls):
return IOLoop
在IOLoop中,这个base类就是自己,于是走进了if判断中。于是关键就落在了impl这个变量上。接下来就轮到了cls.configured_class(),在configured_class()中又调用了configurable_default(),又是一个接口,再次回到IOLoop中查看
@classmethod
def configurable_default(cls):
if hasattr(select,
"epoll"):
from tornado.platform.epoll
import EPollIOLoop
return EPollIOLoop
if hasattr(select,
"kqueue"):
from tornado.platform.kqueue
import KQueueIOLoop
return KQueueIOLoop
from tornado.platform.select
import SelectIOLoop
return SelectIOLoop
ok,到这里就清楚了,这是一个工厂函数,根据操作系统选择不同的event loop,找到EPollIOLoop
class EPollIOLoop(PollIOLoop):
def initialize(self, **kwargs):
super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
到这里已经理清了EPollIOLoop,PollIOLoop和IOLoop间的关系,可以进入关键方法阅读了。
IOLoop
from __future__
import absolute_import, division, print_function, with_statement
import datetime
import errno
import functools
import heapq
import itertools
import logging
import numbers
import os
import select
import sys
import threading
import time
import traceback
import math
from tornado.concurrent
import TracebackFuture, is_future
from tornado.log
import app_log, gen_log
from tornado.platform.auto
import set_close_exec, Waker
from tornado
import stack_context
from tornado.util
import PY3, Configurable, errno_from_exception, timedelta_to_seconds
try:
import signal
except ImportError:
signal =
None
if PY3:
import _thread
as thread
else:
import thread
_POLL_TIMEOUT =
3600.0
class TimeoutError(Exception):
pass
class IOLoop(Configurable):
_EPOLLIN =
0x001
_EPOLLPRI =
0x002
_EPOLLOUT =
0x004
_EPOLLERR =
0x008
_EPOLLHUP =
0x010
_EPOLLRDHUP =
0x2000
_EPOLLONESHOT = (
1 <<
30)
_EPOLLET = (
1 <<
31)
NONE =
0
READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP
_instance_lock = threading.Lock()
_current = threading.local()
@staticmethod
def instance():
if not hasattr(IOLoop,
"_instance"):
with IOLoop._instance_lock:
if not hasattr(IOLoop,
"_instance"):
IOLoop._instance = IOLoop()
return IOLoop._instance
@staticmethod
def initialized():
"""Returns true if the singleton instance has been
created."""
return hasattr(IOLoop,
"_instance")
def install(self):
assert not IOLoop.initialized()
IOLoop._instance = self
@staticmethod
def clear_instance():
"""Clear the global `IOLoop` instance.
.. versionadded:: 4.0
"""
if hasattr(IOLoop,
"_instance"):
del IOLoop._instance
@staticmethod
def current(instance=True):
current = getattr(IOLoop._current,
"instance",
None)
if current
is None and instance:
return IOLoop.instance()
return current
def make_current(self):
IOLoop._current.instance = self
@staticmethod
def clear_current():
IOLoop._current.instance =
None
@classmethod
def configurable_base(cls):
return IOLoop
@classmethod
def configurable_default(cls):
if hasattr(select,
"epoll"):
from tornado.platform.epoll
import EPollIOLoop
return EPollIOLoop
if hasattr(select,
"kqueue"):
from tornado.platform.kqueue
import KQueueIOLoop
return KQueueIOLoop
from tornado.platform.select
import SelectIOLoop
return SelectIOLoop
def initialize(self, make_current=None):
if make_current
is None:
if IOLoop.current(instance=
False)
is None:
self.make_current()
elif make_current:
if IOLoop.current(instance=
False)
is not None:
raise RuntimeError(
"current IOLoop already exists")
self.make_current()
def close(self, all_fds=False):
raise NotImplementedError()
def add_handler(self, fd, handler, events):
raise NotImplementedError()
def update_handler(self, fd, events):
raise NotImplementedError()
def remove_handler(self, fd):
raise NotImplementedError()
def set_blocking_signal_threshold(self, seconds, action):
raise NotImplementedError()
def set_blocking_log_threshold(self, seconds):
self.set_blocking_signal_threshold(seconds, self.log_stack)
def log_stack(self, signal, frame):
gen_log.warning(
'IOLoop blocked for %f seconds in\n%s',
self._blocking_signal_threshold,
''.join(traceback.format_stack(frame)))
def start(self):
raise NotImplementedError()
def _setup_logging(self):
if not any(
[logging.getLogger().handlers,
logging.getLogger(
'tornado').handlers,
logging.getLogger(
'tornado.application').handlers]):
logging.basicConfig()
def stop(self):
raise NotImplementedError()
def run_sync(self, func, timeout=None):
future_cell = [
None]
def run():
try:
result = func()
if result
is not None:
from tornado.gen
import convert_yielded
result = convert_yielded(result)
except Exception:
future_cell[
0] = TracebackFuture()
future_cell[
0].set_exc_info(sys.exc_info())
else:
if is_future(result):
future_cell[
0] = result
else:
future_cell[
0] = TracebackFuture()
future_cell[
0].set_result(result)
self.add_future(future_cell[
0],
lambda future: self.stop())
self.add_callback(run)
if timeout
is not None:
timeout_handle = self.add_timeout(self.time() + timeout, self.stop)
self.start()
if timeout
is not None:
self.remove_timeout(timeout_handle)
if not future_cell[
0].done():
raise TimeoutError(
'Operation timed out after %s seconds' % timeout)
return future_cell[
0].result()
def time(self):
return time.time()
...
类中首先定义了epoll 监听的事件,并重点对EPOLLIN 、 EPOLLOUT 、 EPOLLERR定义了处理事件。
READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP
instance 、 initialized、 install、 clear_instance、 current、 make_current、 clear_current 这些方法都是为了让 IOLoop 类变成一个单例,保证从全局上调用的都是同一个 IOLoop。
initialize,close,add_handler,update_handler,remove_handler,start,stop等比较核心的方法都在下面的PollIOLoop中实现。
值得一提的是run_sync方法,其作用是提供给利用gen.coroutine的自定义协程方法使用IOLoop执行。
tornado.ioloop.PollIOLoop
class PollIOLoop(IOLoop):
"""Base class for IOLoops built around a select-like function.
For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
(Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
`tornado.platform.select.SelectIOLoop` (all platforms).
"""
def initialize(self, impl, time_func=None, **kwargs):
super(PollIOLoop, self).initialize(**kwargs)
self._impl = impl
if hasattr(self._impl,
'fileno'):
set_close_exec(self._impl.fileno())
self.time_func = time_func
or time.time
self._handlers = {}
self._events = {}
self._callbacks = collections.deque()
self._timeouts = []
self._cancellations =
0
self._running =
False
self._stopped =
False
self._closing =
False
self._thread_ident =
None
self._blocking_signal_threshold =
None
self._timeout_counter = itertools.count()
self._waker = Waker()
self.add_handler(self._waker.fileno(),
lambda fd, events: self._waker.consume(),
self.READ)
def close(self, all_fds=False):
self._closing =
True
self.remove_handler(self._waker.fileno())
if all_fds:
for fd, handler
in list(self._handlers.values()):
self.close_fd(fd)
self._waker.close()
self._impl.close()
self._callbacks =
None
self._timeouts =
None
def add_handler(self, fd, handler, events):
fd, obj = self.split_fd(fd)
self._handlers[fd] = (obj, stack_context.wrap(handler))
self._impl.register(fd, events | self.ERROR)
def update_handler(self, fd, events):
fd, obj = self.split_fd(fd)
self._impl.modify(fd, events | self.ERROR)
def remove_handler(self, fd):
fd, obj = self.split_fd(fd)
self._handlers.pop(fd,
None)
self._events.pop(fd,
None)
try:
self._impl.unregister(fd)
except Exception:
gen_log.debug(
"Error deleting fd from IOLoop", exc_info=
True)
def set_blocking_signal_threshold(self, seconds, action):
if not hasattr(signal,
"setitimer"):
gen_log.error(
"set_blocking_signal_threshold requires a signal module "
"with the setitimer method")
return
self._blocking_signal_threshold = seconds
if seconds
is not None:
signal.signal(signal.SIGALRM,
action
if action
is not None else signal.SIG_DFL)
def start(self):
if self._running:
raise RuntimeError(
"IOLoop is already running")
self._setup_logging()
if self._stopped:
self._stopped =
False
return
old_current = getattr(IOLoop._current,
"instance",
None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident()
self._running =
True
old_wakeup_fd =
None
if hasattr(signal,
'set_wakeup_fd')
and os.name ==
'posix':
try:
old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
if old_wakeup_fd != -
1:
signal.set_wakeup_fd(old_wakeup_fd)
old_wakeup_fd =
None
except ValueError:
old_wakeup_fd =
None
try:
while True:
ncallbacks = len(self._callbacks)
due_timeouts = []
if self._timeouts:
now = self.time()
while self._timeouts:
if self._timeouts[
0].callback
is None:
heapq.heappop(self._timeouts)
self._cancellations -=
1
elif self._timeouts[
0].deadline <= now:
due_timeouts.append(heapq.heappop(self._timeouts))
else:
break
if (self._cancellations >
512 and
self._cancellations > (len(self._timeouts) >>
1)):
self._cancellations =
0
self._timeouts = [x
for x
in self._timeouts
if x.callback
is not None]
heapq.heapify(self._timeouts)
for i
in range(ncallbacks):
self._run_callback(self._callbacks.popleft())
for timeout
in due_timeouts:
if timeout.callback
is not None:
self._run_callback(timeout.callback)
due_timeouts = timeout =
None
if self._callbacks:
poll_timeout =
0.0
elif self._timeouts:
poll_timeout = self._timeouts[
0].deadline - self.time()
poll_timeout = max(
0, min(poll_timeout, _POLL_TIMEOUT))
else:
poll_timeout = _POLL_TIMEOUT
if not self._running:
break
if self._blocking_signal_threshold
is not None:
signal.setitimer(signal.ITIMER_REAL,
0,
0)
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception
as e:
if errno_from_exception(e) == errno.EINTR:
continue
else:
raise
if self._blocking_signal_threshold
is not None:
signal.setitimer(signal.ITIMER_REAL,
self._blocking_signal_threshold,
0)
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError)
as e:
if errno_from_exception(e) == errno.EPIPE:
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func =
None
finally:
self._stopped =
False
if self._blocking_signal_threshold
is not None:
signal.setitimer(signal.ITIMER_REAL,
0,
0)
IOLoop._current.instance = old_current
if old_wakeup_fd
is not None:
signal.set_wakeup_fd(old_wakeup_fd)
def stop(self):
self._running =
False
self._stopped =
True
self._waker.wake()
def time(self):
return self.time_func()
def call_at(self, deadline, callback, *args, **kwargs):
timeout = _Timeout(
deadline,
functools.partial(stack_context.wrap(callback), *args, **kwargs),
self)
heapq.heappush(self._timeouts, timeout)
return timeout
def remove_timeout(self, timeout):
timeout.callback =
None
self._cancellations +=
1
def add_callback(self, callback, *args, **kwargs):
if self._closing:
return
self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs))
if thread.get_ident() != self._thread_ident:
self._waker.wake()
else:
pass
def add_callback_from_signal(self, callback, *args, **kwargs):
with stack_context.NullContext():
self.add_callback(callback, *args, **kwargs)
initialize
在PollIOLoop中,initialize方法得到了具体的实现
def initialize(self, impl, time_func=None, **kwargs):
super(PollIOLoop, self).initialize(**kwargs)
self._impl = impl
if hasattr(self._impl,
'fileno'):
set_close_exec(self._impl.fileno())
self.time_func = time_func
or time.time
self._handlers = {}
self._events = {}
self._callbacks = collections.deque()
self._timeouts = []
self._cancellations =
0
self._running =
False
self._stopped =
False
self._closing =
False
self._thread_ident =
None
self._blocking_signal_threshold =
None
self._timeout_counter = itertools.count()
self._waker = Waker()
self.add_handler(self._waker.fileno(),
lambda fd, events: self._waker.consume(),
self.READ)
epoll操作
def add_handler(self, fd, handler, events):
fd, obj = self.split_fd(fd)
self._handlers[fd] = (obj, stack_context.wrap(handler))
self._impl.register(fd, events | self.ERROR)
def update_handler(self, fd, events):
fd, obj = self.split_fd(fd)
self._impl.modify(fd, events | self.ERROR)
def remove_handler(self, fd):
fd, obj = self.split_fd(fd)
self._handlers.pop(fd,
None)
self._events.pop(fd,
None)
try:
self._impl.unregister(fd)
except Exception:
gen_log.debug(
"Error deleting fd from IOLoop", exc_info=
True)
由前面epoll介绍可知道,epoll_create,epoll_ctl,epoll_wait,epoll_close四个api即可使用epoll。
epoll_create: epoll 的生成在EPollIOLoop 的初始化中就已经完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)。 这个相当于 epoll_create。
epoll_ctl: add_handler, update_handler, remove_handler三个方法对应epoll_ctl中的三个参数add 、 modify 、 del。 在add_handler中,self.split_fd方法将文件描述符或者file-like object包装成(文件描述符,object),self._handlers字段保存文件描述符对应的处理器,然后将需要监视的I/O事件注册到select中。在Tornado中只关心READ, WRITE, 和 ERROR事件,其中ERROR事件是自动添加的。
epoll_wait: epoll_wait 操作在下面要介绍的start() 中:event_pairs = self._impl.poll(poll_timeout)。
epoll_close:epoll 的 close 在 PollIOLoop 中的 close 方法内调用: self._impl.close() 完成。
start
def start(self):
if self._running:
raise RuntimeError(
"IOLoop is already running")
self._setup_logging()
if self._stopped:
self._stopped =
False
return
old_current = getattr(IOLoop._current,
"instance",
None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident()
self._running =
True
old_wakeup_fd =
None
if hasattr(signal,
'set_wakeup_fd')
and os.name ==
'posix':
try:
old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
if old_wakeup_fd != -
1:
signal.set_wakeup_fd(old_wakeup_fd)
old_wakeup_fd =
None
except ValueError:
old_wakeup_fd =
None
try:
while True:
ncallbacks = len(self._callbacks)
due_timeouts = []
if self._timeouts:
now = self.time()
while self._timeouts:
if self._timeouts[
0].callback
is None:
heapq.heappop(self._timeouts)
self._cancellations -=
1
elif self._timeouts[
0].deadline <= now:
due_timeouts.append(heapq.heappop(self._timeouts))
else:
break
if (self._cancellations >
512 and
self._cancellations > (len(self._timeouts) >>
1)):
self._cancellations =
0
self._timeouts = [x
for x
in self._timeouts
if x.callback
is not None]
heapq.heapify(self._timeouts)
for i
in range(ncallbacks):
self._run_callback(self._callbacks.popleft())
for timeout
in due_timeouts:
if timeout.callback
is not None:
self._run_callback(timeout.callback)
due_timeouts = timeout =
None
if self._callbacks:
poll_timeout =
0.0
elif self._timeouts:
poll_timeout = self._timeouts[
0].deadline - self.time()
poll_timeout = max(
0, min(poll_timeout, _POLL_TIMEOUT))
else:
poll_timeout = _POLL_TIMEOUT
if not self._running:
break
if self._blocking_signal_threshold
is not None:
signal.setitimer(signal.ITIMER_REAL,
0,
0)
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception
as e:
if errno_from_exception(e) == errno.EINTR:
continue
else:
raise
if self._blocking_signal_threshold
is not None:
signal.setitimer(signal.ITIMER_REAL,
self._blocking_signal_threshold,
0)
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError)
as e:
if errno_from_exception(e) == errno.EPIPE:
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func =
None
finally:
self._stopped =
False
if self._blocking_signal_threshold
is not None:
signal.setitimer(signal.ITIMER_REAL,
0,
0)
IOLoop._current.instance = old_current
if old_wakeup_fd
is not None:
signal.set_wakeup_fd(old_wakeup_fd)
参考: IOLoop模块解析 深入理解 tornado 之 底层 ioloop 实现