跳到主要内容

将AnalyticDB用作OpenAI嵌入向量数据库

nbviewer

本笔记本将逐步指导您如何将AnalyticDB用作OpenAI嵌入向量数据库。

本笔记本展示了以下端到端流程: 1. 使用OpenAI API创建的预先计算的嵌入向量。 2. 将嵌入向量存储在AnalyticDB的云实例中。 3. 将原始文本查询转换为嵌入向量使用OpenAI API。 4. 使用AnalyticDB在创建的集合中执行最近邻搜索。

什么是AnalyticDB

AnalyticDB 是一个高性能的分布式向量数据库。与PostgreSQL语法完全兼容,您可以轻松地利用它。AnalyticDB是阿里云管理的云原生数据库,具有强大的向量计算引擎。绝对的开箱即用体验可以扩展到处理数十亿数据向量,具有丰富的功能,包括索引算法、结构化和非结构化数据特性、实时更新、距离度量、标量过滤、时间旅行搜索等。还配备了完整的OLAP数据库功能和生产使用承诺的SLA承诺;

部署选项

先决条件

为了完成这个练习,我们需要准备一些东西:

  1. AnalyticDB 云服务器实例。
  2. 用于与向量数据库交互的 ‘psycopg2’ 库。任何其他的 postgresql 客户端库也可以。
  3. 一个OpenAI API密钥

我们可以通过运行一个简单的curl命令来验证服务器是否成功启动:

安装要求

这个笔记本显然需要openaipsycopg2包,但我们还会使用一些其他附加库。以下命令会安装它们全部:

! pip install openai psycopg2 pandas wget

准备您的OpenAI API密钥

OpenAI API密钥用于对文档和查询进行向量化。

如果您没有OpenAI API密钥,可以从https://beta.openai.com/account/api-keys获取。

获取到密钥后,请将其添加到您的环境变量中,命名为OPENAI_API_KEY

# 确保你的OpenAI API密钥已正确设置为环境变量。
# 注意:如果您在本地运行此笔记本,您需要重新加载终端和笔记本,以使环境变量生效。
import os

# 注意:或者,您也可以像这样设置一个临时的环境变量:
# os.environ["OPENAI_API_KEY"] = "sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

if os.getenv("OPENAI_API_KEY") is not None:
print("OPENAI_API_KEY is ready")
else:
print("OPENAI_API_KEY environment variable not found")

OPENAI_API_KEY is ready

连接到AnalyticDB

首先将其添加到您的环境变量中。或者您可以直接更改下面的”psycopg2.connect”参数。

使用官方Python库连接到正在运行的AnalyticDB服务器非常简单:

import os
import psycopg2

# 注意:或者,您也可以像这样设置一个临时的环境变量:
# os.environ["PGHOST"] = "your_host"
# os.environ["PGPORT"] "5432"),
# os.environ["PGDATABASE"] "postgres"),
# os.environ["PGUSER"] "user"),
# os.environ["PGPASSWORD"] "password"),

connection = psycopg2.connect(
host=os.environ.get("PGHOST", "localhost"),
port=os.environ.get("PGPORT", "5432"),
database=os.environ.get("PGDATABASE", "postgres"),
user=os.environ.get("PGUSER", "user"),
password=os.environ.get("PGPASSWORD", "password")
)

# 创建一个新的游标对象
cursor = connection.cursor()

我们可以通过运行任何可用的方法来测试连接:


# 执行一个简单的查询以测试连接
cursor.execute("SELECT 1;")
result = cursor.fetchone()

# 检查查询结果
if result == (1,):
print("Connection successful!")
else:
print("Connection failed.")

Connection successful!
import wget

embeddings_url = "https://cdn.openai.com/API/examples/data/vector_database_wikipedia_articles_embedded.zip"

# 文件大小约为700MB,因此需要一些时间来完成。
wget.download(embeddings_url)

100% [......................................................................] 698933052 / 698933052
'vector_database_wikipedia_articles_embedded.zip'

下载的文件必须被解压。

import zipfile
import os
import re
import tempfile

current_directory = os.getcwd()
zip_file_path = os.path.join(current_directory, "vector_database_wikipedia_articles_embedded.zip")
output_directory = os.path.join(current_directory, "../../data")

with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(output_directory)


# 检查CSV文件是否存在
file_name = "vector_database_wikipedia_articles_embedded.csv"
data_directory = os.path.join(current_directory, "../../data")
file_path = os.path.join(data_directory, file_name)


if os.path.exists(file_path):
print(f"The file {file_name} exists in the data directory.")
else:
print(f"The file {file_name} does not exist in the data directory.")


The file vector_database_wikipedia_articles_embedded.csv exists in the data directory.

索引数据

AnalyticDB将数据存储在关系中,其中每个对象至少由一个向量描述。我们的关系将被称为articles,每个对象将由titlecontent向量描述。

我们将从创建一个关系开始,在titlecontent上创建一个向量索引,然后我们将用预先计算的嵌入填充它。

create_table_sql = '''
CREATE TABLE IF NOT EXISTS public.articles (
id INTEGER NOT NULL,
url TEXT,
title TEXT,
content TEXT,
title_vector REAL[],
content_vector REAL[],
vector_id INTEGER
);

ALTER TABLE public.articles ADD PRIMARY KEY (id);
'''

# 创建索引的SQL语句
create_indexes_sql = '''
CREATE INDEX ON public.articles USING ann (content_vector) WITH (distancemeasure = l2, dim = '1536', pq_segments = '64', hnsw_m = '100', pq_centers = '2048');

CREATE INDEX ON public.articles USING ann (title_vector) WITH (distancemeasure = l2, dim = '1536', pq_segments = '64', hnsw_m = '100', pq_centers = '2048');
'''

# 执行SQL语句
cursor.execute(create_table_sql)
cursor.execute(create_indexes_sql)

# 提交更改
connection.commit()

加载数据

在本节中,我们将加载在本次会话之前准备好的数据,这样您就不必使用自己的学分重新计算维基百科文章的嵌入。

import io

# 本地CSV文件的路径
csv_file_path = '../../data/vector_database_wikipedia_articles_embedded.csv'

# 定义一个生成器函数,逐行处理文件
def process_file(file_path):
with open(file_path, 'r') as file:
for line in file:
# Replace '[' with '{' and ']' with '}'
modified_line = line.replace('[', '{').replace(']', '}')
yield modified_line

# 创建一个 StringIO 对象以存储修改后的行
modified_lines = io.StringIO(''.join(list(process_file(csv_file_path))))

# 创建用于 copy_expert 方法的 COPY 命令
copy_command = '''
复制 public.articles 表中的数据(包括 id、url、title、content、title_vector、content_vector、vector_id 字段)
从标准输入读取,使用 CSV 格式,包含表头,分隔符为逗号。
'''

# 使用copy_expert方法执行COPY命令
cursor.copy_expert(copy_command, modified_lines)

# 提交更改
connection.commit()

# 检查集合大小,确保所有点都已存储。
count_sql = """从public.articles表中选择计数(*);"""
cursor.execute(count_sql)
result = cursor.fetchone()
print(f"Count:{result[0]}")


Count:25000

搜索数据

一旦数据被放入Qdrant中,我们将开始查询集合中最接近的向量。我们可以提供一个额外的参数vector_name,以从基于标题切换到基于内容的搜索。由于预先计算的嵌入是使用text-embedding-3-small OpenAI模型创建的,因此在搜索过程中我们也必须使用它。

def query_analyticdb(query, collection_name, vector_name="title_vector", top_k=20):

# 从用户查询生成嵌入向量
embedded_query = openai.Embedding.create(
input=query,
model="text-embedding-3-small",
)["data"][0]["embedding"]

# 将嵌入式查询转换为与PostgreSQL兼容的格式
embedded_query_pg = "{" + ",".join(map(str, embedded_query)) + "}"

# 创建SQL查询
query_sql = f"""
SELECT id, url, title, l2_distance({vector_name},'{embedded_query_pg}'::real[]) AS similarity
FROM {collection_name}
ORDER BY {vector_name} <-> '{embedded_query_pg}'::real[]
LIMIT {top_k};
"""
# 执行查询
cursor.execute(query_sql)
results = cursor.fetchall()

return results

import openai

query_results = query_analyticdb("modern art in Europe", "Articles")
for i, result in enumerate(query_results):
print(f"{i + 1}. {result[2]} (Score: {round(1 - result[3], 3)})")

1. Museum of Modern Art (Score: 0.75)
2. Western Europe (Score: 0.735)
3. Renaissance art (Score: 0.728)
4. Pop art (Score: 0.721)
5. Northern Europe (Score: 0.71)
6. Hellenistic art (Score: 0.706)
7. Modernist literature (Score: 0.694)
8. Art film (Score: 0.687)
9. Central Europe (Score: 0.685)
10. European (Score: 0.683)
11. Art (Score: 0.683)
12. Byzantine art (Score: 0.682)
13. Postmodernism (Score: 0.68)
14. Eastern Europe (Score: 0.679)
15. Europe (Score: 0.678)
16. Cubism (Score: 0.678)
17. Impressionism (Score: 0.677)
18. Bauhaus (Score: 0.676)
19. Surrealism (Score: 0.674)
20. Expressionism (Score: 0.674)
# 这次我们将使用内容向量进行查询。
query_results = query_analyticdb("Famous battles in Scottish history", "Articles", "content_vector")
for i, result in enumerate(query_results):
print(f"{i + 1}. {result[2]} (Score: {round(1 - result[3], 3)})")

1. Battle of Bannockburn (Score: 0.739)
2. Wars of Scottish Independence (Score: 0.723)
3. 1651 (Score: 0.705)
4. First War of Scottish Independence (Score: 0.699)
5. Robert I of Scotland (Score: 0.692)
6. 841 (Score: 0.688)
7. 1716 (Score: 0.688)
8. 1314 (Score: 0.674)
9. 1263 (Score: 0.673)
10. William Wallace (Score: 0.671)
11. Stirling (Score: 0.663)
12. 1306 (Score: 0.662)
13. 1746 (Score: 0.661)
14. 1040s (Score: 0.656)
15. 1106 (Score: 0.654)
16. 1304 (Score: 0.653)
17. David II of Scotland (Score: 0.65)
18. Braveheart (Score: 0.649)
19. 1124 (Score: 0.648)
20. July 27 (Score: 0.646)