数据摄取管道#
IngestionPipeline
使用了所谓的 Transformations
概念,这些 Transformations
被应用于输入数据。这些 Transformations
被应用于您的输入数据,生成的节点要么被返回,要么被插入到向量数据库中(如果有的话)。每个节点+转换对都被缓存,这样在后续运行时(如果缓存被持久化),相同的节点+转换组合可以使用缓存结果,为您节省时间。
要查看 IngestionPipeline
被使用的交互式示例,请查看RAG CLI。
使用模式#
最简单的用法是这样实例化一个 IngestionPipeline
:
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline, IngestionCache
# 创建带有转换的管道
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
]
)
# 运行管道
nodes = pipeline.run(documents=[Document.example()])
请注意,在实际场景中,您可以从 SimpleDirectoryReader
或 Llama Hub 的其他读取器中获取您的文档。
连接到向量数据库#
在运行数据摄取管道时,您还可以选择自动将生成的节点插入到远程向量存储中。
然后,您可以稍后从该向量存储构建索引。
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline
from llama_index.vector_stores.qdrant import QdrantVectorStore
import qdrant_client
client = qdrant_client.QdrantClient(location=":memory:")
vector_store = QdrantVectorStore(client=client, collection_name="test_store")
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
],
vector_store=vector_store,
)
# 直接将数据摄取到向量数据库
pipeline.run(documents=[Document.example()])
# 创建您的索引
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(vector_store)
在管道中计算嵌入#
请注意,在上面的示例中,嵌入是作为管道的一部分计算的。如果您将管道连接到向量存储,则嵌入必须是管道的一个阶段,否则您稍后实例化索引将失败。
如果您不连接到向量存储,即只是生成节点列表,则可以从管道中省略嵌入。
缓存#
在 IngestionPipeline
中,每个节点+转换组合都被哈希和缓存。这样可以节省使用相同数据的后续运行时间。
以下各节描述了缓存周围的一些基本用法。
本地缓存管理#
一旦您有了一个管道,您可能希望存储和加载缓存。
# 保存
pipeline.persist("./pipeline_storage")
# 加载和恢复状态
new_pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
],
)
new_pipeline.load("./pipeline_storage")
# 由于缓存的存在,将立即运行
nodes = pipeline.run(documents=[Document.example()])
如果缓存变得太大,您可以清除它
# 删除所有缓存内容
cache.clear()
远程缓存管理#
我们支持多个远程存储后端用于缓存
RedisCache
MongoDBCache
FirestoreCache
这里是一个使用 RedisCache
的示例:
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline, IngestionCache
from llama_index.core.ingestion.cache import RedisCache
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
],
cache=IngestionCache(
cache=RedisCache(
redis_uri="redis://127.0.0.1:6379", collection="test_cache"
)
),
)
# 直接将数据摄取到向量数据库
nodes = pipeline.run(documents=[Document.example()])
在这里,不需要进行持久化步骤,因为一切都会在指定的远程集合中随着进行缓存。
异步支持#
IngestionPipeline
也支持异步操作
nodes = await pipeline.arun(documents=documents)
文档管理#
将 docstore
连接到摄取管道将启用文档管理。
使用 document.doc_id
或 node.ref_doc_id
作为基准点,摄取管道将积极寻找重复文档。
它的工作原理是:
- 存储
doc_id
->document_hash
的映射 - 如果连接了向量存储:
- 如果检测到重复的
doc_id
,并且哈希值已更改,则会重新处理并更新文档 - 如果检测到重复的
doc_id
,并且哈希值未更改,则跳过该节点 - 如果只有向量存储未连接:
- 检查每个节点的所有现有哈希值
- 如果找到重复项,则跳过该节点
- 否则,处理该节点
注意: 如果我们不连接向量存储,我们只能检查并删除重复的输入。
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.storage.docstore import SimpleDocumentStore
pipeline = IngestionPipeline(
transformations=[...], docstore=SimpleDocumentStore()
)
完整的演示可在我们的演示笔记本中找到。
还可以查看另一个使用Redis 作为整个摄取堆栈的指南。
并行处理#
IngestionPipeline
的 run
方法可以使用并行进程执行。
它通过利用 multiprocessing.Pool
将节点的批次分发到各个处理器上来实现。
要使用并行处理执行,将 num_workers
设置为要使用的进程数:
from llama_index.core.ingestion import IngestionPipeline
pipeline = IngestionPipeline(
transformations=[...],
)
pipeline.run(documents=[...], num_workers=4)