并行化数据导入流程¶
在这个笔记本中,我们将演示如何使用并行进程执行摄入管道。使用IngestionPipeline
可以实现批量并行执行的同步和异步版本。
%pip install llama-index-embeddings-openai
import nest_asyncio
nest_asyncio.apply()
import cProfile, pstats
from pstats import SortKey
加载数据¶
对于这个笔记本,我们将从llamahub加载PatronusAIFinanceBenchDataset
数据集。
!llamaindex-cli download-llamadataset PatronusAIFinanceBenchDataset --download-dir ./data
Successfully downloaded PatronusAIFinanceBenchDataset to ./data
from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader(input_dir="./data/source_files").load_data()
定义我们的数据摄取管道¶
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline
# 创建具有转换的管道
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=1024, chunk_overlap=20),
TitleExtractor(),
OpenAIEmbedding(),
]
)
# 由于我们将测试性能,使用timeit和cProfile
# 我们将禁用缓存
pipeline.disable_cache = True
并行执行¶
一次运行。将num_workers
设置为大于1的值将调用并行执行。
nodes = pipeline.run(documents=documents, num_workers=4)
100%|██████████| 5/5 [00:01<00:00, 3.74it/s] 100%|██████████| 5/5 [00:01<00:00, 4.25it/s] 100%|██████████| 5/5 [00:01<00:00, 4.54it/s] 100%|██████████| 5/5 [00:01<00:00, 3.27it/s] 100%|██████████| 2/2 [00:00<00:00, 2.43it/s]
len(nodes)
5297
%timeit pipeline.run(documents=documents, num_workers=4)
100%|██████████| 5/5 [00:01<00:00, 3.65it/s] 100%|██████████| 5/5 [00:01<00:00, 4.05it/s] 100%|██████████| 5/5 [00:01<00:00, 4.14it/s] 100%|██████████| 5/5 [00:01<00:00, 2.83it/s] 100%|██████████| 2/2 [00:00<00:00, 2.07it/s] 100%|██████████| 5/5 [00:01<00:00, 3.64it/s] 100%|██████████| 5/5 [00:00<00:00, 5.26it/s] 100%|██████████| 5/5 [00:00<00:00, 6.17it/s] 100%|██████████| 5/5 [00:01<00:00, 3.80it/s] 100%|██████████| 2/2 [00:00<00:00, 2.12it/s] 100%|██████████| 5/5 [00:01<00:00, 3.56it/s] 100%|██████████| 5/5 [00:00<00:00, 6.90it/s] 100%|██████████| 5/5 [00:01<00:00, 2.86it/s] 100%|██████████| 5/5 [00:01<00:00, 3.15it/s] 100%|██████████| 2/2 [00:00<00:00, 2.10it/s] 100%|██████████| 5/5 [00:00<00:00, 5.10it/s] 100%|██████████| 5/5 [00:01<00:00, 3.17it/s] 100%|██████████| 5/5 [00:01<00:00, 4.75it/s] 100%|██████████| 5/5 [00:01<00:00, 3.46it/s] 100%|██████████| 2/2 [00:00<00:00, 2.03it/s] 100%|██████████| 5/5 [00:00<00:00, 7.27it/s] 100%|██████████| 5/5 [00:01<00:00, 3.25it/s] 100%|██████████| 5/5 [00:01<00:00, 4.09it/s] 100%|██████████| 5/5 [00:01<00:00, 3.93it/s] 100%|██████████| 2/2 [00:00<00:00, 2.51it/s] 100%|██████████| 5/5 [00:00<00:00, 6.94it/s] 100%|██████████| 5/5 [00:01<00:00, 3.34it/s] 100%|██████████| 5/5 [00:01<00:00, 4.66it/s] 100%|██████████| 5/5 [00:01<00:00, 3.84it/s] 100%|██████████| 2/2 [00:00<00:00, 2.64it/s] 100%|██████████| 5/5 [00:01<00:00, 3.69it/s] 100%|██████████| 5/5 [00:01<00:00, 4.43it/s] 100%|██████████| 5/5 [00:00<00:00, 5.24it/s] 100%|██████████| 5/5 [00:01<00:00, 4.01it/s] 100%|██████████| 2/2 [00:00<00:00, 2.52it/s] 100%|██████████| 5/5 [00:01<00:00, 4.82it/s] 100%|██████████| 5/5 [00:00<00:00, 5.39it/s] 100%|██████████| 5/5 [00:01<00:00, 4.57it/s] 100%|██████████| 5/5 [00:01<00:00, 3.55it/s] 100%|██████████| 2/2 [00:00<00:00, 2.58it/s]
29 s ± 1.56 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
cProfile.run(
"pipeline.run(documents=documents, num_workers=4)",
"newstats",
)
p = pstats.Stats("newstats")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)
100%|██████████| 5/5 [00:01<00:00, 4.26it/s] 100%|██████████| 5/5 [00:01<00:00, 3.44it/s] 100%|██████████| 5/5 [00:01<00:00, 4.14it/s] 100%|██████████| 5/5 [00:01<00:00, 3.31it/s] 100%|██████████| 2/2 [00:00<00:00, 2.72it/s]
Tue Jan 9 14:59:20 2024 newstats 2048 function calls in 29.897 seconds Ordered by: cumulative time List reduced from 214 to 15 due to restriction <15> ncalls tottime percall cumtime percall filename:lineno(function) 1 0.000 0.000 29.897 29.897 {built-in method builtins.exec} 1 0.057 0.057 29.896 29.896 <string>:1(<module>) 1 0.000 0.000 29.840 29.840 pipeline.py:378(run) 12 0.000 0.000 29.784 2.482 threading.py:589(wait) 12 0.000 0.000 29.784 2.482 threading.py:288(wait) 75 29.784 0.397 29.784 0.397 {method 'acquire' of '_thread.lock' objects} 1 0.000 0.000 29.783 29.783 pool.py:369(starmap) 1 0.000 0.000 29.782 29.782 pool.py:767(get) 1 0.000 0.000 29.782 29.782 pool.py:764(wait) 1 0.000 0.000 0.045 0.045 context.py:115(Pool) 1 0.000 0.000 0.045 0.045 pool.py:183(__init__) 1 0.000 0.000 0.043 0.043 pool.py:305(_repopulate_pool) 1 0.000 0.000 0.043 0.043 pool.py:314(_repopulate_pool_static) 4 0.000 0.000 0.043 0.011 process.py:110(start) 4 0.000 0.000 0.043 0.011 context.py:285(_Popen)
<pstats.Stats at 0x139cbc1f0>
异步并行执行
在许多情况下,我们希望同时执行多个异步任务,并在所有任务完成后收集结果。这可以通过使用异步并行执行来实现。在Python中,我们可以使用asyncio.gather
函数来实现这一目的。这个函数接受多个协程对象作为参数,并在它们都完成后返回结果。
下面是一个示例,演示了如何使用asyncio.gather
来同时执行多个异步任务:
import asyncio
async def coro1():
await asyncio.sleep(1)
return 'coro1 done'
async def coro2():
await asyncio.sleep(2)
return 'coro2 done'
async def main():
result = await asyncio.gather(coro1(), coro2())
print(result)
asyncio.run(main())
在这个示例中,我们定义了两个异步协程coro1
和coro2
,它们分别模拟了耗时的异步任务。然后,我们定义了一个main
协程,它使用asyncio.gather
同时执行这两个异步任务,并在所有任务完成后打印结果。
当我们运行这段代码时,我们会看到coro1
和coro2
几乎同时完成,并打印出它们的结果。这展示了异步并行执行的能力,可以显著提高程序的性能和效率。
这里使用了concurrent.futures
中的ProcessPoolExecutor
来异步执行进程。任务是阻塞的,但也在各个进程上异步执行。
nodes = await pipeline.arun(documents=documents, num_workers=4)
100%|██████████| 5/5 [00:01<00:00, 3.78it/s] 100%|██████████| 5/5 [00:01<00:00, 4.33it/s] 100%|██████████| 5/5 [00:01<00:00, 4.96it/s] 100%|██████████| 5/5 [00:01<00:00, 3.73it/s] 100%|██████████| 2/2 [00:00<00:00, 2.26it/s]
len(nodes)
5297
import asyncio
loop = asyncio.get_event_loop()
%timeit loop.run_until_complete(pipeline.arun(documents=documents, num_workers=4))
100%|██████████| 5/5 [00:01<00:00, 4.61it/s] 100%|██████████| 5/5 [00:00<00:00, 6.02it/s] 100%|██████████| 5/5 [00:01<00:00, 4.78it/s] 100%|██████████| 5/5 [00:01<00:00, 3.78it/s] 100%|██████████| 2/2 [00:00<00:00, 2.45it/s] 100%|██████████| 5/5 [00:01<00:00, 4.30it/s] 100%|██████████| 5/5 [00:00<00:00, 5.27it/s] 100%|██████████| 5/5 [00:01<00:00, 4.55it/s] 100%|██████████| 5/5 [00:02<00:00, 1.92it/s] 100%|██████████| 2/2 [00:00<00:00, 2.53it/s] 100%|██████████| 5/5 [00:00<00:00, 5.50it/s] 100%|██████████| 5/5 [00:01<00:00, 3.81it/s] 100%|██████████| 5/5 [00:01<00:00, 3.69it/s] 100%|██████████| 5/5 [00:02<00:00, 2.26it/s] 100%|██████████| 2/2 [00:00<00:00, 2.78it/s] 100%|██████████| 5/5 [00:01<00:00, 3.70it/s] 100%|██████████| 5/5 [00:01<00:00, 4.99it/s] 100%|██████████| 5/5 [00:01<00:00, 4.44it/s] 100%|██████████| 5/5 [00:01<00:00, 3.45it/s] 100%|██████████| 2/2 [00:00<00:00, 2.60it/s] 100%|██████████| 5/5 [00:01<00:00, 3.81it/s] 100%|██████████| 5/5 [00:01<00:00, 4.67it/s] 100%|██████████| 5/5 [00:01<00:00, 4.97it/s] 100%|██████████| 5/5 [00:01<00:00, 2.70it/s] 100%|██████████| 2/2 [00:00<00:00, 2.52it/s] 100%|██████████| 5/5 [00:01<00:00, 4.20it/s] 100%|██████████| 5/5 [00:01<00:00, 4.31it/s] 100%|██████████| 5/5 [00:01<00:00, 3.84it/s] 100%|██████████| 5/5 [00:01<00:00, 3.06it/s] 100%|██████████| 2/2 [00:00<00:00, 2.65it/s] 100%|██████████| 5/5 [00:01<00:00, 4.39it/s] 100%|██████████| 5/5 [00:01<00:00, 4.78it/s] 100%|██████████| 5/5 [00:01<00:00, 3.68it/s] 100%|██████████| 5/5 [00:01<00:00, 4.64it/s] 100%|██████████| 2/2 [00:00<00:00, 2.36it/s] 100%|██████████| 5/5 [00:01<00:00, 4.88it/s] 100%|██████████| 5/5 [00:00<00:00, 6.65it/s] 100%|██████████| 5/5 [00:01<00:00, 4.55it/s] 100%|██████████| 5/5 [00:01<00:00, 3.25it/s] 100%|██████████| 2/2 [00:00<00:00, 3.87it/s]
20.3 s ± 6.01 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
loop = asyncio.get_event_loop()
cProfile.run(
"loop.run_until_complete(pipeline.arun(documents=documents, num_workers=4))",
"async-newstats",
)
p = pstats.Stats("async-newstats")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)
100%|██████████| 5/5 [00:01<00:00, 3.55it/s] 100%|██████████| 5/5 [00:01<00:00, 4.64it/s] 100%|██████████| 5/5 [00:01<00:00, 4.65it/s] 100%|██████████| 5/5 [00:01<00:00, 2.83it/s] 100%|██████████| 2/2 [00:00<00:00, 3.81it/s]
Tue Jan 9 15:02:31 2024 async-newstats 2780 function calls in 21.186 seconds Ordered by: cumulative time List reduced from 302 to 15 due to restriction <15> ncalls tottime percall cumtime percall filename:lineno(function) 1 0.000 0.000 21.186 21.186 {built-in method builtins.exec} 1 0.046 0.046 21.186 21.186 <string>:1(<module>) 1 0.000 0.000 21.140 21.140 nest_asyncio.py:87(run_until_complete) 14 0.000 0.000 21.140 1.510 nest_asyncio.py:101(_run_once) 14 0.000 0.000 20.797 1.486 selectors.py:554(select) 14 20.797 1.486 20.797 1.486 {method 'control' of 'select.kqueue' objects} 27 0.000 0.000 0.343 0.013 events.py:78(_run) 27 0.000 0.000 0.342 0.013 {method 'run' of '_contextvars.Context' objects} 2 0.000 0.000 0.342 0.171 nest_asyncio.py:202(step) 2 0.000 0.000 0.342 0.171 tasks.py:215(__step) 2 0.000 0.000 0.342 0.171 {method 'send' of 'coroutine' objects} 2 0.000 0.000 0.342 0.171 pipeline.py:478(arun) 66 0.245 0.004 0.245 0.004 {method 'acquire' of '_thread.lock' objects} 1 0.000 0.000 0.244 0.244 tasks.py:302(__wakeup) 1 0.000 0.000 0.244 0.244 _base.py:648(__exit__)
<pstats.Stats at 0x1037abb80>
顺序执行
默认情况下,num_workers
被设置为 None
,这将调用顺序执行。
nodes = pipeline.run(documents=documents)
100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 4.10it/s]
len(nodes)
5297
%timeit pipeline.run(documents=documents)
100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 5.96it/s] 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 3.80it/s] 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 4.58it/s] 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 4.14it/s] 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 3.19it/s] 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 3.41it/s] 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 4.28it/s] 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 2.75it/s]
1min 11s ± 3.37 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
cProfile.run("pipeline.run(documents=documents)", "oldstats")
p = pstats.Stats("oldstats")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)
100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 3.95it/s]
Tue Jan 9 15:14:23 2024 oldstats 5514413 function calls (5312843 primitive calls) in 74.119 seconds Ordered by: cumulative time List reduced from 1253 to 15 due to restriction <15> ncalls tottime percall cumtime percall filename:lineno(function) 1 0.000 0.000 74.125 74.125 {built-in method builtins.exec} 1 0.057 0.057 74.125 74.125 <string>:1(<module>) 1 0.000 0.000 74.068 74.068 pipeline.py:378(run) 1 0.000 0.000 74.068 74.068 pipeline.py:53(run_transformations) 1 0.010 0.010 66.055 66.055 base.py:334(__call__) 1 0.007 0.007 65.996 65.996 base.py:234(get_text_embedding_batch) 53 0.000 0.000 65.976 1.245 openai.py:377(_get_text_embeddings) 53 0.000 0.000 65.975 1.245 __init__.py:287(wrapped_f) 53 0.003 0.000 65.975 1.245 __init__.py:369(__call__) 53 0.001 0.000 65.966 1.245 openai.py:145(get_embeddings) 53 0.001 0.000 65.947 1.244 embeddings.py:33(create) 53 0.001 0.000 65.687 1.239 _base_client.py:1074(post) 53 0.000 0.000 65.680 1.239 _base_client.py:844(request) 53 0.002 0.000 65.680 1.239 _base_client.py:861(_request) 53 0.001 0.000 64.171 1.211 _client.py:882(send)
<pstats.Stats at 0x154f45060>
在主处理器上进行异步操作。
与同步情况一样,num_workers
默认为 None
,这将导致异步任务的单批执行。
nodes = await pipeline.arun(documents=documents)
100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 3.18it/s]
len(nodes)
5297
%timeit loop.run_until_complete(pipeline.arun(documents=documents))
100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 4.11it/s] 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 4.18it/s] 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 4.60it/s] 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 3.93it/s] 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 4.19it/s] 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 4.22it/s] 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 4.83it/s] 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 3.85it/s]
20.5 s ± 7.02 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
cProfile.run(
"loop.run_until_complete(pipeline.arun(documents=documents))",
"async-oldstats",
)
p = pstats.Stats("async-oldstats")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)
100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00, 3.31it/s]
Tue Jan 9 15:17:38 2024 async-oldstats 6967591 function calls (6754866 primitive calls) in 28.185 seconds Ordered by: cumulative time List reduced from 1210 to 15 due to restriction <15> ncalls tottime percall cumtime percall filename:lineno(function) 1 0.000 0.000 28.191 28.191 {built-in method builtins.exec} 1 0.000 0.000 28.191 28.191 <string>:1(<module>) 1 0.008 0.008 28.191 28.191 nest_asyncio.py:87(run_until_complete) 5111 0.046 0.000 28.181 0.006 nest_asyncio.py:101(_run_once) 5111 0.031 0.000 18.727 0.004 selectors.py:554(select) 8561 18.696 0.002 18.696 0.002 {method 'control' of 'select.kqueue' objects} 8794 0.010 0.000 9.356 0.001 events.py:78(_run) 8794 0.007 0.000 9.346 0.001 {method 'run' of '_contextvars.Context' objects} 4602 0.007 0.000 9.154 0.002 nest_asyncio.py:202(step) 4602 0.024 0.000 9.147 0.002 tasks.py:215(__step) 4531 0.003 0.000 9.093 0.002 {method 'send' of 'coroutine' objects} 16 0.000 0.000 6.004 0.375 pipeline.py:478(arun) 16 0.000 0.000 6.004 0.375 pipeline.py:88(arun_transformations) 1 0.000 0.000 5.889 5.889 schema.py:130(acall) 1 0.000 0.000 5.889 5.889 interface.py:108(__call__)
<pstats.Stats at 0x2b157f310>
总结¶
在本教程中,我们学习了如何使用Python编程语言进行数据分析。我们涵盖了以下主题:
- 数据加载和处理
- 数据可视化
- 统计分析
- 机器学习模型的建立和评估
通过这些内容,我们可以开始在Python中进行数据分析,并利用这些技能来解决实际问题。希望这些知识对你有所帮助!
上述实验的结果如下,其中每种策略都按照在这个示例数据集和管道中的速度从快到慢进行了列出。
- (异步,并行处理):20.3秒
- (异步,无并行处理):20.5秒
- (同步,并行处理):29秒
- (同步,无并行处理):1分钟11秒
我们可以看到,使用并行处理的两种情况优于同步、无并行处理(即.run(num_workers=None)
)。而且,至少对于这种异步任务的情况来说,使用并行处理带来的收益很少。也许对于更大的工作负载和IngestionPipelines来说,使用异步和并行处理可以带来更大的收益。