Redis数据导入管道¶
依赖¶
安装并启动redis,设置OpenAI API密钥
In [ ]:
Copied!
%pip install llama-index-storage-docstore-redis
%pip install llama-index-vector-stores-redis
%pip install llama-index-embeddings-huggingface
%pip install llama-index-storage-docstore-redis
%pip install llama-index-vector-stores-redis
%pip install llama-index-embeddings-huggingface
In [ ]:
Copied!
!docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
!docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
338c889086e8649aa80dfb79ebff4fffc98d72fc6d988ac158c6662e9e0cf04b
In [ ]:
Copied!
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
os.environ["TOKENIZERS_PARALLELISM"] = "false"
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# 创建种子数据
在开始编写代码之前,我们需要创建一些种子数据来填充我们的数据库。这些种子数据将用于开发和测试我们的应用程序。
我们将使用Python的字典数据结构来表示我们的种子数据。每个字典将代表一个实体,并包含该实体的属性和值。
下面是一个示例,演示了如何创建一个用户的种子数据:
seed_users = [
{
'username': 'user1',
'email': 'user1@example.com',
'password': 'password1'
},
{
'username': 'user2',
'email': 'user2@example.com',
'password': 'password2'
},
# 更多用户...
]
In [ ]:
Copied!
# 生成一些测试数据!rm -rf test_redis_data # 删除已有的test_redis_data文件夹!mkdir -p test_redis_data # 创建test_redis_data文件夹!echo "这是一个测试文件:第一个!" > test_redis_data/test1.txt # 写入内容到test1.txt!echo "这是一个测试文件:第二个!" > test_redis_data/test2.txt # 写入内容到test2.txt
# 生成一些测试数据!rm -rf test_redis_data # 删除已有的test_redis_data文件夹!mkdir -p test_redis_data # 创建test_redis_data文件夹!echo "这是一个测试文件:第一个!" > test_redis_data/test1.txt # 写入内容到test1.txt!echo "这是一个测试文件:第二个!" > test_redis_data/test2.txt # 写入内容到test2.txt
In [ ]:
Copied!
from llama_index.core import SimpleDirectoryReader# 使用确定性ID加载文档documents = SimpleDirectoryReader( "./test_redis_data", filename_as_id=True).load_data()
from llama_index.core import SimpleDirectoryReader# 使用确定性ID加载文档documents = SimpleDirectoryReader( "./test_redis_data", filename_as_id=True).load_data()
In [ ]:
Copied!
from llama_index.embeddings.huggingface import HuggingFaceEmbeddingfrom llama_index.core.ingestion import ( DocstoreStrategy, IngestionPipeline, IngestionCache,)from llama_index.storage.kvstore.redis import RedisKVStore as RedisCachefrom llama_index.storage.docstore.redis import RedisDocumentStorefrom llama_index.core.node_parser import SentenceSplitterfrom llama_index.vector_stores.redis import RedisVectorStorefrom redisvl.schema import IndexSchemaembed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")custom_schema = IndexSchema.from_dict( { "index": {"name": "redis_vector_store", "prefix": "doc"}, # 自定义索引的字段 "fields": [ # llamaindex所需的必要字段 {"type": "tag", "name": "id"}, {"type": "tag", "name": "doc_id"}, {"type": "text", "name": "text"}, # 用于bge-small-en-v1.5嵌入的自定义向量字段 { "type": "vector", "name": "vector", "attrs": { "dims": 384, "algorithm": "hnsw", "distance_metric": "cosine", }, }, ], })
from llama_index.embeddings.huggingface import HuggingFaceEmbeddingfrom llama_index.core.ingestion import ( DocstoreStrategy, IngestionPipeline, IngestionCache,)from llama_index.storage.kvstore.redis import RedisKVStore as RedisCachefrom llama_index.storage.docstore.redis import RedisDocumentStorefrom llama_index.core.node_parser import SentenceSplitterfrom llama_index.vector_stores.redis import RedisVectorStorefrom redisvl.schema import IndexSchemaembed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")custom_schema = IndexSchema.from_dict( { "index": {"name": "redis_vector_store", "prefix": "doc"}, # 自定义索引的字段 "fields": [ # llamaindex所需的必要字段 {"type": "tag", "name": "id"}, {"type": "tag", "name": "doc_id"}, {"type": "text", "name": "text"}, # 用于bge-small-en-v1.5嵌入的自定义向量字段 { "type": "vector", "name": "vector", "attrs": { "dims": 384, "algorithm": "hnsw", "distance_metric": "cosine", }, }, ], })
In [ ]:
Copied!
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
embed_model,
],
docstore=RedisDocumentStore.from_host_and_port(
"localhost", 6379, namespace="document_store"
),
vector_store=RedisVectorStore(
schema=custom_schema,
redis_url="redis://localhost:6379",
),
cache=IngestionCache(
cache=RedisCache.from_host_and_port("localhost", 6379),
collection="redis_cache",
),
docstore_strategy=DocstoreStrategy.UPSERTS,
)
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
embed_model,
],
docstore=RedisDocumentStore.from_host_and_port(
"localhost", 6379, namespace="document_store"
),
vector_store=RedisVectorStore(
schema=custom_schema,
redis_url="redis://localhost:6379",
),
cache=IngestionCache(
cache=RedisCache.from_host_and_port("localhost", 6379),
collection="redis_cache",
),
docstore_strategy=DocstoreStrategy.UPSERTS,
)
In [ ]:
Copied!
nodes = pipeline.run(documents=documents)
print(f"Ingested {len(nodes)} Nodes")
nodes = pipeline.run(documents=documents)
print(f"Ingested {len(nodes)} Nodes")
Ingested 2 Nodes
确认文档已被摄取¶
我们可以使用向量存储创建一个向量索引,并快速查询哪些文档已被摄取。
In [ ]:
Copied!
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(
pipeline.vector_store, embed_model=embed_model
)
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(
pipeline.vector_store, embed_model=embed_model
)
In [ ]:
Copied!
print(
index.as_query_engine(similarity_top_k=10).query(
"What documents do you see?"
)
)
print(
index.as_query_engine(similarity_top_k=10).query(
"What documents do you see?"
)
)
I see two documents.
添加数据和摄取¶
在这里,我们可以更新现有文件,也可以添加新文件!
In [ ]:
Copied!
!echo "This is a test file: three!" > test_redis_data/test3.txt
!echo "This is a NEW test file: one!" > test_redis_data/test1.txt
!echo "This is a test file: three!" > test_redis_data/test3.txt
!echo "This is a NEW test file: one!" > test_redis_data/test1.txt
In [ ]:
Copied!
documents = SimpleDirectoryReader(
"./test_redis_data", filename_as_id=True
).load_data()
nodes = pipeline.run(documents=documents)
print(f"Ingested {len(nodes)} Nodes")
documents = SimpleDirectoryReader(
"./test_redis_data", filename_as_id=True
).load_data()
nodes = pipeline.run(documents=documents)
print(f"Ingested {len(nodes)} Nodes")
13:32:07 redisvl.index.index INFO Index already exists, not overwriting. Ingested 2 Nodes
In [ ]:
Copied!
index = VectorStoreIndex.from_vector_store(
pipeline.vector_store, embed_model=embed_model
)
response = index.as_query_engine(similarity_top_k=10).query(
"What documents do you see?"
)
print(response)
for node in response.source_nodes:
print(node.get_text())
index = VectorStoreIndex.from_vector_store(
pipeline.vector_store, embed_model=embed_model
)
response = index.as_query_engine(similarity_top_k=10).query(
"What documents do you see?"
)
print(response)
for node in response.source_nodes:
print(node.get_text())
You see three documents: test3.txt, test1.txt, and test2.txt. This is a test file: three! This is a NEW test file: one! This is a test file: two!
正如我们所看到的,数据已经正确地进行了去重和更新操作!即使我们运行了完整的流水线两次,索引中只有三个节点。