模式:使用管道技术来增加吞吐量#
如果你有多个工作项,每个工作项都需要多个步骤来完成,你可以使用 流水线 技术来提高集群利用率并增加系统的吞吐量。
备注
流水线化是提高性能的重要技术,并且被Ray库大量使用。例如,请参见 Ray数据。
示例用例#
应用程序的一个组件需要同时进行计算密集型工作和与其他进程通信。理想情况下,您希望重叠计算和通信以充分利用CPU并提高整体吞吐量。
代码示例#
import ray
@ray.remote
class WorkQueue:
def __init__(self):
self.queue = list(range(10))
def get_work_item(self):
if self.queue:
return self.queue.pop(0)
else:
return None
@ray.remote
class WorkerWithoutPipelining:
def __init__(self, work_queue):
self.work_queue = work_queue
def process(self, work_item):
print(work_item)
def run(self):
while True:
# Get work from the remote queue.
work_item = ray.get(self.work_queue.get_work_item.remote())
if work_item is None:
break
# Do work.
self.process(work_item)
@ray.remote
class WorkerWithPipelining:
def __init__(self, work_queue):
self.work_queue = work_queue
def process(self, work_item):
print(work_item)
def run(self):
self.work_item_ref = self.work_queue.get_work_item.remote()
while True:
# Get work from the remote queue.
work_item = ray.get(self.work_item_ref)
if work_item is None:
break
self.work_item_ref = self.work_queue.get_work_item.remote()
# Do work while we are fetching the next work item.
self.process(work_item)
work_queue = WorkQueue.remote()
worker_without_pipelining = WorkerWithoutPipelining.remote(work_queue)
ray.get(worker_without_pipelining.run.remote())
work_queue = WorkQueue.remote()
worker_with_pipelining = WorkerWithPipelining.remote(work_queue)
ray.get(worker_with_pipelining.run.remote())
在上面的例子中,一个工作角色从队列中取出工作,然后对其进行一些计算。在没有流水线的情况下,我们在请求工作项后立即调用 ray.get()
,因此在RPC传输过程中我们会被阻塞,导致CPU空闲时间。通过流水线,我们在处理当前工作项之前预先请求下一个工作项,这样我们可以在RPC传输过程中使用CPU,从而提高CPU利用率。