从任务启动任务
内容
从任务启动任务¶
有时从其他任务启动任务会很方便。例如,你可能直到获得某些初始计算的结果后,才知道要运行哪些计算。
激励示例¶
我们想要下载一段数据并将其转换为列表。然后我们希望为该列表的每个元素提交一个任务。在我们获取数据之前,我们不知道列表会有多长。
因此,我们发送我们的原始 download_and_convert_to_list
函数,该函数在我们的一个工作机器上下载数据并将其转换为列表:
future = client.submit(download_and_convert_to_list, uri)
但现在我们需要为这些数据的各个部分提交新任务。我们有三种选择。
将数据收集回本地进程,然后从本地进程提交新任务
只收集足够的数据信息返回到本地进程,并从本地进程提交作业
提交一个任务到集群,该任务将直接从该工作节点提交其他任务
本地收集数据¶
如果数据量不大,我们可以将其带回到客户端,在我们的本地机器上执行必要的逻辑:
>>> data = future.result() # gather data to local process
>>> data # data is a list
[...]
>>> futures = e.map(process_element, data) # submit new tasks on data
>>> analysis = e.submit(aggregate, futures) # submit final aggregation task
这很简单,如果 data
很小,那么这可能是最简单的,因此是正确的选择。然而,如果 data
很大,那么我们必须选择另一个选项。
从客户端提交任务¶
我们可以在远程数据上运行小函数,以确定足够的信息来提交正确类型的任务。在下面的示例中,我们在远程计算 data
的 len
函数,然后将数据分解为其各个元素。
>>> n = client.submit(len, data) # compute number of elements
>>> n = n.result() # gather n (small) locally
>>> from operator import getitem
>>> elements = [client.submit(getitem, data, i) for i in range(n)] # split data
>>> futures = client.map(process_element, elements)
>>> analysis = client.submit(aggregate, futures)
我们在远程计算长度,收集回这个非常小的结果,然后使用它来提交更多任务以分解数据并在集群上处理。这更复杂,因为我们必须在集群和本地进程之间来回几次,但移动的数据非常小,因此这只会给我们的总处理时间增加几毫秒。
扩展示例¶
计算斐波那契数涉及一个递归函数。当函数运行时,它会使用已计算的值调用自身。我们将在整个文档中使用这个作为示例,来说明从任务中提交任务的不同技术。
def fib(n):
if n < 2:
return n
a = fib(n - 1)
b = fib(n - 2)
return a + b
print(fib(10)) # prints "55"
我们将使用这个示例来展示不同的接口。
从工作者提交任务¶
注意:此接口是新的且实验性的。它可能在未来的版本中未经警告而更改。
我们可以从其他任务中提交任务。这使我们能够在工作节点上做出决策。
要从工作者提交新任务,该工作者必须首先创建一个新的客户端对象,该对象连接到调度器。有三种选择:
dask.delayed
和dask.compute
get_client
与secede
和rejoin
worker_client
dask.delayed¶
Dask 延迟行为如常:它将函数提交到图表中,优化带宽/计算量,并收集结果。更多详情,请参见 dask.delayed。
from distributed import Client
from dask import delayed, compute
@delayed
def fib(n):
if n < 2:
return n
# We can use dask.delayed and dask.compute to launch
# computation from within tasks
a = fib(n - 1) # these calls are delayed
b = fib(n - 2)
a, b = compute(a, b) # execute both in parallel
return a + b
if __name__ == "__main__":
# these features require the dask.distributed scheduler
client = Client()
result = fib(10).compute()
print(result) # prints "55"
在工作者上获取客户端¶
The get_client
function provides a normal
Client object that gives full access to the dask cluster, including the ability
to submit, scatter, and gather results.
from distributed import Client, get_client, secede, rejoin
def fib(n):
if n < 2:
return n
client = get_client()
a_future = client.submit(fib, n - 1)
b_future = client.submit(fib, n - 2)
a, b = client.gather([a_future, b_future])
return a + b
if __name__ == "__main__":
client = Client()
future = client.submit(fib, 10)
result = future.result()
print(result) # prints "55"
然而,如果太多任务同时请求作业,这可能会导致调度器死锁。每个任务不会通知调度器它们正在等待结果,并且可以自由计算其他任务。如果每个调度槽都在运行一个任务并且它们都请求更多任务,这可能会导致集群死锁。
为了避免这种死锁问题,我们可以使用 secede
和 rejoin
。这些函数将分别从集群中移除并重新加入当前任务。
def fib(n):
if n < 2:
return n
client = get_client()
a_future = client.submit(fib, n - 1)
b_future = client.submit(fib, n - 2)
secede()
a, b = client.gather([a_future, b_future])
rejoin()
return a + b
与上下文管理器连接¶
函数 worker_client
执行与 get_client
相同的任务,但实现为上下文管理器。使用 worker_client
作为上下文管理器可以确保工作线程上的正确清理。
from dask.distributed import Client, worker_client
def fib(n):
if n < 2:
return n
with worker_client() as client:
a_future = client.submit(fib, n - 1)
b_future = client.submit(fib, n - 2)
a, b = client.gather([a_future, b_future])
return a + b
if __name__ == "__main__":
client = Client()
future = client.submit(fib, 10)
result = future.result()
print(result) # prints "55"
调用 worker_client
的任务被保守地假设为 长时间运行 。它们可能需要很长时间,等待其他任务完成,收集结果等。为了避免它们占用处理槽,每当任务调用 worker_client
时,以下操作就会发生。
在工作线程上运行此函数的线程 退出 线程池并自行运行。这允许线程池用新线程填充该槽位,并继续处理其他任务,而不将此长时间运行的任务计入其正常配额。
Worker 向调度器发送一条消息,暂时将其允许的任务数量增加一个。这同样使得调度器可以分配更多的任务给这个Worker,而不将这个长时间运行的任务计入其中。
建立与调度器的连接需要几毫秒的时间,因此使用此功能的计算持续时间至少应比这长几倍是明智的。