Skip to content

数据摄取管道#

IngestionPipeline 使用了所谓的 Transformations 概念,这些 Transformations 被应用于输入数据。这些 Transformations 被应用于您的输入数据,生成的节点要么被返回,要么被插入到向量数据库中(如果有的话)。每个节点+转换对都被缓存,这样在后续运行时(如果缓存被持久化),相同的节点+转换组合可以使用缓存结果,为您节省时间。

要查看 IngestionPipeline 被使用的交互式示例,请查看RAG CLI

使用模式#

最简单的用法是这样实例化一个 IngestionPipeline

from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline, IngestionCache

# 创建带有转换的管道
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
        OpenAIEmbedding(),
    ]
)

# 运行管道
nodes = pipeline.run(documents=[Document.example()])

请注意,在实际场景中,您可以从 SimpleDirectoryReader 或 Llama Hub 的其他读取器中获取您的文档。

连接到向量数据库#

在运行数据摄取管道时,您还可以选择自动将生成的节点插入到远程向量存储中。

然后,您可以稍后从该向量存储构建索引。

from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline
from llama_index.vector_stores.qdrant import QdrantVectorStore

import qdrant_client

client = qdrant_client.QdrantClient(location=":memory:")
vector_store = QdrantVectorStore(client=client, collection_name="test_store")

pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
        OpenAIEmbedding(),
    ],
    vector_store=vector_store,
)

# 直接将数据摄取到向量数据库
pipeline.run(documents=[Document.example()])

# 创建您的索引
from llama_index.core import VectorStoreIndex

index = VectorStoreIndex.from_vector_store(vector_store)

在管道中计算嵌入#

请注意,在上面的示例中,嵌入是作为管道的一部分计算的。如果您将管道连接到向量存储,则嵌入必须是管道的一个阶段,否则您稍后实例化索引将失败。

如果您不连接到向量存储,即只是生成节点列表,则可以从管道中省略嵌入。

缓存#

IngestionPipeline 中,每个节点+转换组合都被哈希和缓存。这样可以节省使用相同数据的后续运行时间。

以下各节描述了缓存周围的一些基本用法。

本地缓存管理#

一旦您有了一个管道,您可能希望存储和加载缓存。

# 保存
pipeline.persist("./pipeline_storage")

# 加载和恢复状态
new_pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
    ],
)
new_pipeline.load("./pipeline_storage")

# 由于缓存的存在,将立即运行
nodes = pipeline.run(documents=[Document.example()])

如果缓存变得太大,您可以清除它

# 删除所有缓存内容
cache.clear()

远程缓存管理#

我们支持多个远程存储后端用于缓存

  • RedisCache
  • MongoDBCache
  • FirestoreCache

这里是一个使用 RedisCache 的示例:

from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline, IngestionCache
from llama_index.core.ingestion.cache import RedisCache


pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
        OpenAIEmbedding(),
    ],
    cache=IngestionCache(
        cache=RedisCache(
            redis_uri="redis://127.0.0.1:6379", collection="test_cache"
        )
    ),
)

# 直接将数据摄取到向量数据库
nodes = pipeline.run(documents=[Document.example()])

在这里,不需要进行持久化步骤,因为一切都会在指定的远程集合中随着进行缓存。

异步支持#

IngestionPipeline 也支持异步操作

nodes = await pipeline.arun(documents=documents)

文档管理#

docstore 连接到摄取管道将启用文档管理。

使用 document.doc_idnode.ref_doc_id 作为基准点,摄取管道将积极寻找重复文档。

它的工作原理是:

  • 存储 doc_id -> document_hash 的映射
  • 如果连接了向量存储:
  • 如果检测到重复的 doc_id,并且哈希值已更改,则会重新处理并更新文档
  • 如果检测到重复的 doc_id,并且哈希值未更改,则跳过该节点
  • 如果只有向量存储未连接:
  • 检查每个节点的所有现有哈希值
  • 如果找到重复项,则跳过该节点
  • 否则,处理该节点

注意: 如果我们不连接向量存储,我们只能检查并删除重复的输入。

from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.storage.docstore import SimpleDocumentStore

pipeline = IngestionPipeline(
    transformations=[...], docstore=SimpleDocumentStore()
)

完整的演示可在我们的演示笔记本中找到。

还可以查看另一个使用Redis 作为整个摄取堆栈的指南。

并行处理#

IngestionPipelinerun 方法可以使用并行进程执行。 它通过利用 multiprocessing.Pool 将节点的批次分发到各个处理器上来实现。

要使用并行处理执行,将 num_workers 设置为要使用的进程数:

from llama_index.core.ingestion import IngestionPipeline

pipeline = IngestionPipeline(
    transformations=[...],
)
pipeline.run(documents=[...], num_workers=4)

模块#