Pathway Reader¶
Pathway是一个开放的数据处理框架。它允许您轻松开发数据转换管道和机器学习应用程序,这些应用程序可以使用实时数据源和不断变化的数据。
本笔记本演示了如何设置一个实时数据索引管道。您可以以与常规阅读器相同的方式查询此管道的结果,但在幕后,Pathway在每次数据更改时更新索引,使您始终获得最新的答案。
在本笔记本中,我们首先将llama_index.readers.pathway.PathwayReader
阅读器连接到一个公共演示文档处理管道,该管道可以:
- 监视几个云数据源的数据更改。
- 为数据构建向量索引。
要拥有自己的文档处理管道,请查看托管服务或按照本笔记本中的步骤构建您自己的。
本文档中描述的基本管道允许轻松构建存储在云位置的文件的简单索引。但是,Pathway提供了构建实时数据管道和应用程序所需的一切,包括类似SQL的操作,如分组约简和不同数据源之间的连接,数据的基于时间的分组和窗口处理,以及各种连接器。
有关Pathway数据摄入管道和向量存储的更多详细信息,请访问向量存储管道。
先决条件¶
安装llama-index-readers-pathway
集成
%pip install llama-index-readers-pathway
配置日志记录
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.ERROR)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))
设置你的OpenAI API密钥。
import getpass
import os
# 如果选择的嵌入器不是OpenAI,则省略
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")
创建读取器并连接到公共管道¶
要实例化和配置PathwayReader
,您需要提供文档索引管道的url
或host
和port
。在下面的代码中,我们使用一个公开可用的演示管道,其REST API可以在https://demo-document-indexing.pathway.stream
访问。这个演示从Google Drive和Sharepoint摄取文档,并维护一个用于检索文档的索引。
from llama_index.readers.pathway import PathwayReader
reader = PathwayReader(url="https://demo-document-indexing.pathway.stream")
# 让我们使用一些文本进行搜索
reader.load_data(query_text="What is Pathway")
使用llama-index创建一个摘要索引¶
docs = reader.load_data(query_text="What is Pathway", k=2)
from llama_index.core import SummaryIndex
index = SummaryIndex.from_documents(docs)
query_engine = index.as_query_engine()
response = query_engine.query("What does Pathway do?")
print(response)
构建自己的数据处理流程¶
先决条件¶
安装 pathway
包。然后下载示例数据。
%pip install pathway
%pip install llama-index-embeddings-openai
!mkdir -p 'data/'
!wget 'https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md' -O 'data/pathway_readme.md'
定义Pathway跟踪的数据源¶
Pathway可以同时监听许多来源,比如本地文件、S3文件夹、云存储和任何数据流,以便进行数据更改。
更多信息请参见pathway-io。
import pathway as pw
data_sources = []
data_sources.append(
pw.io.fs.read(
"./data",
format="binary",
mode="streaming",
with_metadata=True,
) # 这将创建一个`pathway`连接器,用于跟踪./data目录中的所有文件
)
# 这将创建一个用于跟踪Google驱动器中文件的连接器。
# 请按照https://pathway.com/developers/tutorials/connectors/gdrive-connector/上的说明获取凭据
# data_sources.append(
# pw.io.gdrive.read(object_id="17H4YpBOAKQzEJ93xmC2z170l0bP2npMy", service_user_credentials_file="credentials.json", with_metadata=True))
创建文档索引流程¶
让我们创建文档索引流水线。transformations
应该是以 Embedding
转换结束的 TransformComponent
列表。
在这个例子中,让我们首先使用 TokenTextSplitter
分割文本,然后使用 OpenAIEmbedding
进行嵌入。
from pathway.xpacks.llm.vector_store import VectorStoreServer
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import TokenTextSplitter
embed_model = OpenAIEmbedding(embed_batch_size=10)
transformations_example = [
TokenTextSplitter(
chunk_size=150,
chunk_overlap=10,
separator=" ",
),
embed_model,
]
processing_pipeline = VectorStoreServer.from_llamaindex_components(
*data_sources,
transformations=transformations_example,
)
# 定义Pathway将要使用的主机和端口
PATHWAY_HOST = "127.0.0.1"
PATHWAY_PORT = 8754
# `threaded`以分离模式运行pathway,当从终端或容器中运行时,必须将其设置为False
# 有关`with_cache`的更多信息,请访问https://pathway.com/developers/api-docs/persistence-api
processing_pipeline.run_server(
host=PATHWAY_HOST, port=PATHWAY_PORT, with_cache=False, threaded=True
)
将读取器连接到自定义流水线¶
from llama_index.readers.pathway import PathwayReader
reader = PathwayReader(host=PATHWAY_HOST, port=PATHWAY_PORT)
# 让我们用一些文本进行搜索
reader.load_data(query_text="What is Pathway")