异步计算:Web 服务器 + Dask
内容
实时笔记本
你可以在 live session 中运行这个笔记本,或者在 Github 上查看它。
异步计算:Web 服务器 + Dask¶
让我们想象一个简单的Web服务器,它既能提供快速加载的页面,也能在加载较慢的页面上执行一些计算。在我们的例子中,这将是一个简单的斐波那契服务应用程序,但你可以想象将 fib
函数替换为对某些输入数据运行机器学习模型、从数据库获取结果等。
[ ]:
import tornado.ioloop
import tornado.web
def fib(n):
if n < 2:
return n
else:
return fib(n - 1) + fib(n - 2)
class FibHandler(tornado.web.RequestHandler):
def get(self, n):
result = fib(int(n))
self.write(str(result))
class FastHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello!")
def make_app():
return tornado.web.Application([
(r"/fast", FastHandler),
(r"/fib/(\d+)", FibHandler),
])
app = make_app()
app.listen(8000)
速度¶
我们知道用户将快速响应时间与权威内容和信任联系在一起,因此我们希望测量我们的页面加载速度。我们特别感兴趣的是在许多同时加载的情况下进行此操作,模拟我们的Web服务器在为许多用户提供服务时的响应情况。
[ ]:
import asyncio
import tornado.httpclient
client = tornado.httpclient.AsyncHTTPClient()
from time import time
async def measure(url, n=100):
""" Get url n times concurrently. Print duration. """
start = time()
futures = [client.fetch(url) for i in range(n)]
results = await asyncio.gather(*futures)
end = time()
print(url, ', %d simultaneous requests, ' % n, 'total time: ', (end - start))
时间安排¶
我们看到
Tornado 的往返时间大约为 3-5 毫秒
它可以在大约100毫秒内运行100个这样的查询,因此有一些很好的并发性正在发生。
调用 fib 需要一段时间
调用 fib 100 次所需的时间大约是原来的 100 倍,没有那么多的并行性
[ ]:
await measure('http://localhost:8000/fast', n=1)
[ ]:
await measure('http://localhost:8000/fast', n=100)
[ ]:
await measure('http://localhost:8000/fib/28', n=1)
[ ]:
await measure('http://localhost:8000/fib/28', n=100)
阻塞异步¶
在下面的示例中,我们看到对缓慢的 fib/
路由的一次调用将不幸地阻塞其他快得多的请求:
[ ]:
a = asyncio.ensure_future(measure('http://localhost:8000/fib/35', n=1))
b = asyncio.ensure_future(measure('http://localhost:8000/fast', n=1))
await b
讨论¶
这里有两个问题/机会:
我们所有的
fib
调用都是独立的,我们希望利用多核处理器或附近的集群并行运行这些计算。我们计算密集型的
fib
请求可能会妨碍我们的快速请求。一个慢速用户可能会影响其他所有人。
使用 Dask 进行异步离线计算¶
为了解决这两个问题,我们将使用 Dask 将计算任务卸载到其他进程或计算机上。由于 Dask 是一个异步框架,它可以很好地与 Tornado 或 Asyncio 集成。
[ ]:
from dask.distributed import Client
dask_client = await Client(asynchronous=True) # use local processes for now
dask_client
[ ]:
def fib(n):
if n < 2:
return n
else:
return fib(n - 1) + fib(n - 2)
class FibHandler(tornado.web.RequestHandler):
async def get(self, n):
future = dask_client.submit(fib, int(n)) # submit work to happen elsewhere
result = await future
self.write(str(result))
class MainHandler(tornado.web.RequestHandler):
async def get(self):
self.write("Hello, world")
def make_app():
return tornado.web.Application([
(r"/fast", MainHandler),
(r"/fib/(\d+)", FibHandler),
])
app = make_app()
app.listen(9000)
性能变化¶
通过将斐波那契计算卸载到 Dask,我们实现了两件事:
并行计算¶
我们现在可以在更短的时间内支持更多的请求。以下实验同时从20个请求中请求 fib(28)
。在旧版本中,我们按顺序计算这些请求,耗时几秒钟(最后一个请求的人在浏览器完成时等待了几秒钟)。在新版本中,这些请求中的许多可以并行计算,因此每个人都可以在几百毫秒内得到答案。
[ ]:
# Before parallelism
await measure('http://localhost:8000/fib/28', n=20)
[ ]:
# After parallelism
await measure('http://localhost:9000/fib/28', n=20)
异步计算¶
之前,当一个请求正在评估 fib(...)
时,Tornado 被阻塞了。它无法处理任何其他请求。当我们的服务器同时提供昂贵的计算和廉价的计算时,这个问题尤为突出。廉价的请求被无谓地挂起。
因为 Dask 能够与 Tornado 或 Asyncio 等异步系统集成,我们的 Web 服务器可以在许多请求之间自由切换,即使在后台进行计算时也是如此。在下面的示例中,我们看到尽管慢速计算首先启动,但快速计算仅在几毫秒内返回。
[ ]:
# Before async
a = asyncio.ensure_future(measure('http://localhost:8000/fib/35', n=1))
b = asyncio.ensure_future(measure('http://localhost:8000/fast', n=1))
await b
await a
[ ]:
# After async
a = asyncio.ensure_future(measure('http://localhost:9000/fib/35', n=1))
b = asyncio.ensure_future(measure('http://localhost:9000/fast', n=1))
await b
await a
其他选项¶
在这些情况下,人们今天倾向于使用 concurrent.futures 或 Celery。
concurrent.futures 允许在单台机器上轻松实现并行化,并且与异步框架很好地集成。API 与我们上面展示的完全相同(Dask 实现了 concurrent.futures API)。然而,concurrent.futures 不容易扩展到集群中。
Celery 更容易扩展到多台机器,但延迟较高,不容易缩小规模,并且需要一些努力才能集成到异步框架中(至少这是我的理解,我在这方面的经验有限)
在这种情况下,Dask 提供了两者的一些优势。它在常见的单机情况下易于设置和使用,但也可以 扩展到集群。它与异步框架很好地集成,并且只增加了非常小的延迟。
[ ]:
async def f():
start = time()
result = await dask_client.submit(lambda x: x + 1, 10)
end = time()
print('Roundtrip latency: %.2f ms' % ((end - start) * 1000))
await f()