目录

Tornado实现后台逻辑的时候,可能遇到这样的情况: 需要同时请求多个第三方数据,比如同时从多个网址请求数据,而这几个第三方数据相互没有关联。 最简单的方式是写多个yield,第一个yield返回结果之后,继续请求第二个yield。

这样虽然不会影响总体的性能,因为当前yield进行的时候,程序可以继续执行其他的请求,而无需等待在这边。 但是对于单个的请求来讲, 从它的视角来看, 就是顺序的请求多个第三方数据,然后返回,这样会造成这个请求的处理时间比较长。 

那么Tornado有没有这种机制,对于单个请求来讲,如果需要请求多个无关联的外部数据,可以同时将这几个数据请求发出,等所有的数据返回之后再进行下面的流程。 相当于小学时的数学题,刷牙需要2分钟, 泡面需要3分钟, 我们无需刷完牙再泡面;而是泡面的同时可以刷牙,这样可以将单个流程的5分钟减少到3分钟。当然答案是肯定的, Tornado有类似的机制。

tornado可以用如下方式,同时并发n个请求:

response1, response2,... responsen = yield [http_client.fetch(url1) , http_client.fetch(url2), ...... ,http_client.fetch(url2) ]

等到n个请求都响应了之后,会返回给程序控制权

对于其中的原理,在网上找到一篇文章,讲得很详细,

下面的都是转载的内容, 原始文章地址: http://www.pulpcode.cn/2016/03/06/tornado-yield-futures-run-in-parallel/

tornado并行执行多个异步的原理

起因

实际上之前一直使用tornado异步,也大概知道tornado Coroutine yield的原理,从来不知道tornado可以支持“同时多个异步并发执行,等它们的结果都获取到再返回”,官网中给出类似的写法:

You can also yield a list or dict of Futures, which will be started at the same time and run in parallel; a list or dict of results will be returned when they are all finished:

@gen.coroutine
def get(self):
	http_client = AsyncHTTPClient()
	response1, response2 = yield [http_client.fetch(url1),
	http_client.fetch(url2)]
	response_dict = yield dict(response3=http_client.fetch(url3),
	response4=http_client.fetch(url4))
	response3 = response_dict['response3']
	response4 = response_dict['response4']

所以这篇博客就来挖掘一下原因。

前提知识

如果你想看懂这篇文章,你要知道什么是yield,什么是异步,还有什么是协程,对tornado异步的原理了解,里面用到的Futrue对象,coroutine之类,它们是如何被tornado的ioloop进行上下文切换的。

当你在一个函数中使用yield的时候,这个函数就被称为一个生成器,这和普通函数不同的是,它可以被中断,什么是中断呢?比如现在有一个函数,你去调用它,这个时候,你就只能在这里等调用结果,什么都不能干,因为控制权已经交给函数了,当然比这高级一点的是将回调函数传入这个函数,使得这个函数能够在某些情况发生时,调用这个回调函数。

而有了yield则更高级,可以在这个函数执行的途中,将控制权在返还给调用者,这样我们可以在做一些事情之后,继续让这个函数执行下去。这才是yield真正的精髓所在,比xrange要高端多了。

传递参数

如果一个函数中有一个这样的一条语句:

m = yield n,那其实意味着,在函数内部与外部的沟通中,n将会被生成器送出,m将会被外部调用者传入(通过send)。

对应的tornado这条语句:response = yield http_client.fetch(url)

实际上,http_client.fetch(url)返回了一个Future对象,当这个handler函数(比如get)被装饰器包裹之后,它就会通过generator.next()启动yield返回的generator,通过ioloop与generator.send(vaule)驱动generator运行,已达到异步执行的目的。

而在tornado的coroutine异步处理中,都是通过Future对象封装异步回调函数的。你看见Futrue,一定会想起,python3.2新增的concurrent.futures功能,实际上功能确实类似,或者说思想上是一样的,既“封装callable,以便异步执行。”你可以简单的记为:tornado异步函数将返回一个Future对象,yield这个对象,将得到最终结果。

Future对象

这个Future对象有一个叫_set_done的flag,当调用set_result(self,result)来为这个Future对象设置result时会设置_set_done。在设置set_done之后,它所有的add_done_callback才会被执行。之后你就可以通过result方法,来获取最终结果。

Future的一个简单的例子:

class HelloHandler(RequestHandler):
	@gen.coroutine
	def get(self):
		x = yield self.do_test()
		self.render("xxxx")
		def do_test(self):
			fut = Future()
			fut.set_result("test")
			return fut

寻找问题

讲完上面的基础知识,我们该去找原问题了,既“同时多个异步并发执行,等它们的结果都获取到再返回”的支持代码.

首先我们找到tornado的源码,找到coroutine其实是这样实现的。

def _make_coroutine_wrapper(func, replace_callback):
    """The inner workings of ``@gen.coroutine`` and ``@gen.engine``.
    The two decorators differ in their treatment of the ``callback``
    argument, so we cannot simply implement ``@engine`` in terms of
    ``@coroutine``.
    """
    # On Python 3.5, set the coroutine flag on our generator, to allow it
    # to be used with 'await'.
    if hasattr(types, 'coroutine'):
        func = types.coroutine(func)
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        future = TracebackFuture()
        if replace_callback and 'callback' in kwargs:
            callback = kwargs.pop('callback')
            IOLoop.current().add_future(
                future, lambda future: callback(future.result()))
        try:
            result = func(*args, **kwargs) # 如果这个函数是一个普通的函数,将返回一个值,否者会返回一个生成器。
        except (Return, StopIteration) as e:
            result = _value_from_stopiteration(e)
        except Exception:
            future.set_exc_info(sys.exc_info())
            return future
        else:
            if isinstance(result, GeneratorType):# 当它是生成器的时候。
                # Inline the first iteration of Runner.run.  This lets us
                # avoid the cost of creating a Runner when the coroutine
                # never actually yields, which in turn allows us to
                # use "optional" coroutines in critical path code without
                # performance penalty for the synchronous case.
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    yielded = next(result) # result作为一个生成器将会执行到yield处,并返回一个Future对象。
                    if stack_context._state.contexts is not orig_stack_contexts:
                        yielded = TracebackFuture()
                        yielded.set_exception(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))
                except (StopIteration, Return) as e:
                    future.set_result(_value_from_stopiteration(e))
                except Exception:
                    future.set_exc_info(sys.exc_info())
                else:
                    Runner(result, future, yielded)
                try:
                    return future
                finally:
                    # Subtle memory optimization: if next() raised an exception,
                    # the future's exc_info contains a traceback which
                    # includes this stack frame.  This creates a cycle,
                    # which will be collected at the next full GC but has
                    # been shown to greatly increase memory usage of
                    # benchmarks (relative to the refcount-based scheme
                    # used in the absence of cycles).  We can avoid the
                    # cycle by clearing the local variable after we return it.
                    future = None
        future.set_result(result)
        return future
    return wrapper

可以看到关键的这几句:

result = func(*args, **kwargs)
yielded = next(result)
Runner(result, future, yielded)

简单的说,这个函数,捕获被装饰器函数返回的生成器对象,传递给Runner。

Runner的代码实际上是这样的:

def __init__(self, gen, result_future, first_yielded):
    self.future = first_yield
    self.io_loop.add_future(
        self.future, lambda f: self.run()
    )
def run(self):
    while True:
        if not future.done():
            return
        try:
            value = future.result()
            yielded = self.gen.send(value)
        except (StopIteration, Return) as e:
            self.finished = True
        except Exception:
            self.finished = True
            return
        if not self.handle_yield(yielded):
            return

而这个Runner,会将Futrue对象注册到io_loop中,或者就之前的例子,我们可以说将异步函数fetch注册到ioloop,当fetch完成后,它会调用自己的一个回调函数(我们这里讨论的是没有给fetch传递callback的情况,详见AsyncHTTPClient的定义),给future对象设置值。而io_loop又将会调用回调函数lambda f: self.run(),来将future.result的值赋值给value. 可以看到这个value被send给了生成器内部,之后生成器又会得到下一个执行点(生成器)

这个runner使用迭代的方式,一个一个的获取生成器,处理完之后,就使用handle_yield,判断是否要还有下一个Future对象和回调,(之所以用迭代,而不是递归,是因为在python中递归是一件很慢的事儿。)直到所有的future都done了。

接着我们将注意力放到这个handle_yield函数中。

def handle_yield(self, yielded):
    # Lists containing YieldPoints require stack contexts;
    # other lists are handled in convert_yielded.
    if _contains_yieldpoint(yielded):
        yielded = multi(yielded)
    if isinstance(yielded, YieldPoint):
        # YieldPoints are too closely coupled to the Runner to go
        # through the generic convert_yielded mechanism.
        self.future = TracebackFuture()
        def start_yield_point():
            try:
                yielded.start(self)
                if yielded.is_ready():
                    self.future.set_result(
                        yielded.get_result())
                else:
                    self.yield_point = yielded
            except Exception:
                self.future = TracebackFuture()
                self.future.set_exc_info(sys.exc_info())
        if self.stack_context_deactivate is None:
            # Start a stack context if this is the first
            # YieldPoint we've seen.
            with stack_context.ExceptionStackContext(
                    self.handle_exception) as deactivate:
                self.stack_context_deactivate = deactivate
                def cb():
                    start_yield_point()
                    self.run()
                self.io_loop.add_callback(cb)
                return False
        else:
            start_yield_point()
    else:
        try:
            self.future = convert_yielded(yielded)
        except BadYieldError:
            self.future = TracebackFuture()
            self.future.set_exc_info(sys.exc_info())
    if not self.future.done() or self.future is moment:
        self.io_loop.add_future(
            self.future, lambda f: self.run())
        return False
    return True

可以看到

if not self.future.done() or self.future is moment:
	self.io_loop.add_future(
		self.future, lambda f: self.run())

所以这就是循环的去执行到下一个yield的方式。还要注意的是,代码中提到的YieldPoint已经被放弃,Tornado4.0也推荐使用Future类型。

再注意convert_yielded函数。

def convert_yielded(yielded):
    """Convert a yielded object into a `.Future`.
    The default implementation accepts lists, dictionaries, and Futures.
    If the `~functools.singledispatch` library is available, this function
    may be extended to support additional types. For example::
        @convert_yielded.register(asyncio.Future)
        def _(asyncio_future):
            return tornado.platform.asyncio.to_tornado_future(asyncio_future)
    .. versionadded:: 4.1
    """
    # Lists and dicts containing YieldPoints were handled earlier.
    if isinstance(yielded, (list, dict)):
        return multi(yielded)
    elif is_future(yielded):
        return yielded
    elif isawaitable(yielded):
        return _wrap_awaitable(yielded)
    else:
        raise BadYieldError("yielded unknown object %r" % (yielded,))

我们注意到了multi(yielded)这个调用,最后我们找到了这段代码,这就是答案:

def multi(children, quiet_exceptions=()):
    """Runs multiple asynchronous operations in parallel.
    ``children`` may either be a list or a dict whose values are
    yieldable objects. ``multi()`` returns a new yieldable
    object that resolves to a parallel structure containing their
    results. If ``children`` is a list, the result is a list of
    results in the same order; if it is a dict, the result is a dict
    with the same keys.
    That is, ``results = yield multi(list_of_futures)`` is equivalent
    to::
        results = []
        for future in list_of_futures:
            results.append(yield future)
    If any children raise exceptions, ``multi()`` will raise the first
    one. All others will be logged, unless they are of types
    contained in the ``quiet_exceptions`` argument.
    If any of the inputs are `YieldPoints <YieldPoint>`, the returned
    yieldable object is a `YieldPoint`. Otherwise, returns a `.Future`.
    This means that the result of `multi` can be used in a native
    coroutine if and only if all of its children can be.
    In a ``yield``-based coroutine, it is not normally necessary to
    call this function directly, since the coroutine runner will
    do it automatically when a list or dict is yielded. However,
    it is necessary in ``await``-based coroutines, or to pass
    the ``quiet_exceptions`` argument.
    This function is available under the names ``multi()`` and ``Multi()``
    for historical reasons.
    .. versionchanged:: 4.2
       If multiple yieldables fail, any exceptions after the first
       (which is raised) will be logged. Added the ``quiet_exceptions``
       argument to suppress this logging for selected exception types.
    .. versionchanged:: 4.3
       Replaced the class ``Multi`` and the function ``multi_future``
       with a unified function ``multi``. Added support for yieldables
       other than `YieldPoint` and `.Future`.
    """
    if _contains_yieldpoint(children):
        return MultiYieldPoint(children, quiet_exceptions=quiet_exceptions)
    else:
        return multi_future(children, quiet_exceptions=quiet_exceptions)

之后是multi_future的定义:

def multi_future(children, quiet_exceptions=()):
    """Wait for multiple asynchronous futures in parallel.
    This function is similar to `multi`, but does not support
    `YieldPoints <YieldPoint>`.
    .. versionadded:: 4.0
    .. versionchanged:: 4.2
       If multiple ``Futures`` fail, any exceptions after the first (which is
       raised) will be logged. Added the ``quiet_exceptions``
       argument to suppress this logging for selected exception types.
    .. deprecated:: 4.3
       Use `multi` instead.
    """
    if isinstance(children, dict):
        keys = list(children.keys())
        children = children.values()
    else:
        keys = None
    children = list(map(convert_yielded, children))
    assert all(is_future(i) for i in children)
    unfinished_children = set(children)
    future = Future()
    if not children:
        future.set_result({} if keys is not None else [])
    def callback(f):
        unfinished_children.remove(f)
        if not unfinished_children:
            result_list = []
            for f in children:
                try:
                    result_list.append(f.result())
                except Exception as e:
                    if future.done():
                        if not isinstance(e, quiet_exceptions):
                            app_log.error("Multiple exceptions in yield list",
                                          exc_info=True)
                    else:
                        future.set_exc_info(sys.exc_info())
            if not future.done():
                if keys is not None:
                    future.set_result(dict(zip(keys, result_list)))
                else:
                    future.set_result(result_list)
    listening = set()
    for f in children:
        if f not in listening:
            listening.add(f)
            f.add_done_callback(callback)
    return future

这是它支持并行异步的关键代码,可以看到这个被包装的Future,它的listening,维护着多个子future,每次有一个子future完成后,就会调用callback,将其从unfinished_children中移除,当所有子Future的callback都执行完后,才会真正调用这个Future的set_result方法。