数据摄取管道 + 文档管理¶
使用document.doc_id
或node.ref_doc_id
作为基准点,数据摄取管道将积极查找重复文档。
它的工作方式是:
- 存储一个
doc_id
->document_hash
的映射 - 如果检测到重复的
doc_id
,并且哈希值已更改,则文档将被重新处理 - 如果哈希值未更改,则文档将在管道中被跳过
如果我们不连接向量存储,我们只能检查和删除重复的输入。
如果连接了向量存储,我们还可以处理更新!我们有另一个指南用于更新和向量存储。
# 创建种子数据
在开始使用模型之前,通常需要一些种子数据来测试和验证模型的功能。这些种子数据可以是模拟的数据,也可以是真实数据的子集。
在这个示例中,我们将创建一些模拟的种子数据来演示如何使用模型。
In [ ]:
Copied!
%pip install llama-index-storage-docstore-redis
%pip install llama-index-storage-docstore-mongodb
%pip install llama-index-embeddings-huggingface
%pip install llama-index-storage-docstore-redis
%pip install llama-index-storage-docstore-mongodb
%pip install llama-index-embeddings-huggingface
In [ ]:
Copied!
# 生成一些测试数据!mkdir -p data!echo "这是一个测试文件:第一个!" > data/test1.txt!echo "这是一个测试文件:第二个!" > data/test2.txt
# 生成一些测试数据!mkdir -p data!echo "这是一个测试文件:第一个!" > data/test1.txt!echo "这是一个测试文件:第二个!" > data/test2.txt
In [ ]:
Copied!
from llama_index.core import SimpleDirectoryReader# 使用确定性ID加载文档documents = SimpleDirectoryReader("./data", filename_as_id=True).load_data()
from llama_index.core import SimpleDirectoryReader# 使用确定性ID加载文档documents = SimpleDirectoryReader("./data", filename_as_id=True).load_data()
/home/loganm/.cache/pypoetry/virtualenvs/llama-index-4a-wkI5X-py3.11/lib/python3.11/site-packages/deeplake/util/check_latest_version.py:32: UserWarning: A newer version of deeplake (3.8.9) is available. It's recommended that you update to the latest version using `pip install -U deeplake`. warnings.warn(
使用文档存储创建流水线¶
In [ ]:
Copied!
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.storage.docstore.redis import RedisDocumentStore
from llama_index.storage.docstore.mongodb import MongoDocumentStore
from llama_index.core.node_parser import SentenceSplitter
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"),
],
docstore=SimpleDocumentStore(),
)
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.storage.docstore.redis import RedisDocumentStore
from llama_index.storage.docstore.mongodb import MongoDocumentStore
from llama_index.core.node_parser import SentenceSplitter
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"),
],
docstore=SimpleDocumentStore(),
)
In [ ]:
Copied!
nodes = pipeline.run(documents=documents)
nodes = pipeline.run(documents=documents)
Docstore strategy set to upserts, but no vector store. Switching to duplicates_only strategy.
In [ ]:
Copied!
print(f"Ingested {len(nodes)} Nodes")
print(f"Ingested {len(nodes)} Nodes")
Ingested 2 Nodes
In [ ]:
Copied!
pipeline.persist("./pipeline_storage")
pipeline.persist("./pipeline_storage")
In [ ]:
Copied!
pipeline = IngestionPipeline( transformations=[ SentenceSplitter(), # 句子拆分器 HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"), # Hugging Face嵌入模型 ])# 恢复管道pipeline.load("./pipeline_storage")
pipeline = IngestionPipeline( transformations=[ SentenceSplitter(), # 句子拆分器 HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"), # Hugging Face嵌入模型 ])# 恢复管道pipeline.load("./pipeline_storage")
In [ ]:
Copied!
!echo "This is a test file: three!" > data/test3.txt
!echo "This is a NEW test file: one!" > data/test1.txt
!echo "This is a test file: three!" > data/test3.txt
!echo "This is a NEW test file: one!" > data/test1.txt
In [ ]:
Copied!
documents = SimpleDirectoryReader("./data", filename_as_id=True).load_data()
documents = SimpleDirectoryReader("./data", filename_as_id=True).load_data()
In [ ]:
Copied!
nodes = pipeline.run(documents=documents)
nodes = pipeline.run(documents=documents)
Docstore strategy set to upserts, but no vector store. Switching to duplicates_only strategy.
In [ ]:
Copied!
print(f"Ingested {len(nodes)} Nodes")
print(f"Ingested {len(nodes)} Nodes")
Ingested 2 Nodes
让我们确认已经摄入了哪些节点:
In [ ]:
Copied!
for node in nodes:
print(f"Node: {node.text}")
for node in nodes:
print(f"Node: {node.text}")
Node: This is a NEW test file: one! Node: This is a test file: three!
我们还可以验证文档存储只跟踪了三个文档
In [ ]:
Copied!
print(len(pipeline.docstore.docs))
print(len(pipeline.docstore.docs))
3