实时笔记本

你可以在 live session Binder 中运行这个笔记本,或者在 Github 上查看它。

9c3b2697e8184a0dba0c6484990a93f6

异步计算:Web 服务器 + Dask

让我们想象一个简单的Web服务器,它既能提供快速加载的页面,也能在加载较慢的页面上执行一些计算。在我们的例子中,这将是一个简单的斐波那契服务应用程序,但你可以想象将 fib 函数替换为对某些输入数据运行机器学习模型、从数据库获取结果等。

[ ]:
import tornado.ioloop
import tornado.web

def fib(n):
    if n < 2:
        return n
    else:
        return fib(n - 1) + fib(n - 2)


class FibHandler(tornado.web.RequestHandler):
    def get(self, n):
        result = fib(int(n))
        self.write(str(result))


class FastHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello!")


def make_app():
    return tornado.web.Application([
        (r"/fast", FastHandler),
        (r"/fib/(\d+)", FibHandler),
    ])


app = make_app()
app.listen(8000)

速度

我们知道用户将快速响应时间与权威内容和信任联系在一起,因此我们希望测量我们的页面加载速度。我们特别感兴趣的是在许多同时加载的情况下进行此操作,模拟我们的Web服务器在为许多用户提供服务时的响应情况。

[ ]:
import asyncio
import tornado.httpclient

client = tornado.httpclient.AsyncHTTPClient()

from time import time

async def measure(url, n=100):
    """ Get url n times concurrently.  Print duration. """
    start = time()
    futures = [client.fetch(url) for i in range(n)]
    results = await asyncio.gather(*futures)
    end = time()
    print(url, ', %d simultaneous requests, ' %  n, 'total time: ', (end - start))

时间安排

我们看到

  1. Tornado 的往返时间大约为 3-5 毫秒

  2. 它可以在大约100毫秒内运行100个这样的查询,因此有一些很好的并发性正在发生。

  3. 调用 fib 需要一段时间

  4. 调用 fib 100 次所需的时间大约是原来的 100 倍,没有那么多的并行性

[ ]:
await measure('http://localhost:8000/fast', n=1)
[ ]:
await measure('http://localhost:8000/fast', n=100)
[ ]:
await measure('http://localhost:8000/fib/28', n=1)
[ ]:
await measure('http://localhost:8000/fib/28', n=100)

阻塞异步

在下面的示例中,我们看到对缓慢的 fib/ 路由的一次调用将不幸地阻塞其他快得多的请求:

[ ]:
a = asyncio.ensure_future(measure('http://localhost:8000/fib/35', n=1))
b = asyncio.ensure_future(measure('http://localhost:8000/fast', n=1))
await b

讨论

这里有两个问题/机会:

  1. 我们所有的 fib 调用都是独立的,我们希望利用多核处理器或附近的集群并行运行这些计算。

  2. 我们计算密集型的 fib 请求可能会妨碍我们的快速请求。一个慢速用户可能会影响其他所有人。

使用 Dask 进行异步离线计算

为了解决这两个问题,我们将使用 Dask 将计算任务卸载到其他进程或计算机上。由于 Dask 是一个异步框架,它可以很好地与 Tornado 或 Asyncio 集成。

[ ]:
from dask.distributed import Client

dask_client = await Client(asynchronous=True)  # use local processes for now
dask_client
[ ]:

def fib(n): if n < 2: return n else: return fib(n - 1) + fib(n - 2) class FibHandler(tornado.web.RequestHandler): async def get(self, n): future = dask_client.submit(fib, int(n)) # submit work to happen elsewhere result = await future self.write(str(result)) class MainHandler(tornado.web.RequestHandler): async def get(self): self.write("Hello, world") def make_app(): return tornado.web.Application([ (r"/fast", MainHandler), (r"/fib/(\d+)", FibHandler), ]) app = make_app() app.listen(9000)

性能变化

通过将斐波那契计算卸载到 Dask,我们实现了两件事:

并行计算

我们现在可以在更短的时间内支持更多的请求。以下实验同时从20个请求中请求 fib(28)。在旧版本中,我们按顺序计算这些请求,耗时几秒钟(最后一个请求的人在浏览器完成时等待了几秒钟)。在新版本中,这些请求中的许多可以并行计算,因此每个人都可以在几百毫秒内得到答案。

[ ]:
# Before parallelism
await measure('http://localhost:8000/fib/28', n=20)
[ ]:
# After parallelism
await measure('http://localhost:9000/fib/28', n=20)

异步计算

之前,当一个请求正在评估 fib(...) 时,Tornado 被阻塞了。它无法处理任何其他请求。当我们的服务器同时提供昂贵的计算和廉价的计算时,这个问题尤为突出。廉价的请求被无谓地挂起。

因为 Dask 能够与 Tornado 或 Asyncio 等异步系统集成,我们的 Web 服务器可以在许多请求之间自由切换,即使在后台进行计算时也是如此。在下面的示例中,我们看到尽管慢速计算首先启动,但快速计算仅在几毫秒内返回。

[ ]:
# Before async
a = asyncio.ensure_future(measure('http://localhost:8000/fib/35', n=1))
b = asyncio.ensure_future(measure('http://localhost:8000/fast', n=1))
await b
await a
[ ]:
# After async
a = asyncio.ensure_future(measure('http://localhost:9000/fib/35', n=1))
b = asyncio.ensure_future(measure('http://localhost:9000/fast', n=1))
await b
await a

其他选项

在这些情况下,人们今天倾向于使用 concurrent.futuresCelery

  • concurrent.futures 允许在单台机器上轻松实现并行化,并且与异步框架很好地集成。API 与我们上面展示的完全相同(Dask 实现了 concurrent.futures API)。然而,concurrent.futures 不容易扩展到集群中。

  • Celery 更容易扩展到多台机器,但延迟较高,不容易缩小规模,并且需要一些努力才能集成到异步框架中(至少这是我的理解,我在这方面的经验有限)

在这种情况下,Dask 提供了两者的一些优势。它在常见的单机情况下易于设置和使用,但也可以 扩展到集群。它与异步框架很好地集成,并且只增加了非常小的延迟。

[ ]:
async def f():
    start = time()
    result = await dask_client.submit(lambda x: x + 1, 10)
    end = time()
    print('Roundtrip latency: %.2f ms' % ((end - start) * 1000))

await f()