28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271 | class AzureCosmosDBMongoDBVectorSearch(BasePydanticVectorStore):
"""Azure CosmosDB MongoDB vCore 向量存储。
要使用,您应该同时具备以下条件:
- 安装了 ``pymongo`` python 包
- 与 Azure Cosmodb MongoDB vCore 集群关联的连接字符串
示例:
`pip install llama-index-vector-stores-azurecosmosmongo`
```python
import pymongo
from llama_index.vector_stores.azurecosmosmongo import AzureCosmosDBMongoDBVectorSearch
# 使用您的 Azure CosmosDB MongoDB URI 设置连接字符串
connection_string = "YOUR_AZURE_COSMOSDB_MONGODB_URI"
mongodb_client = pymongo.MongoClient(connection_string)
# 创建 AzureCosmosDBMongoDBVectorSearch 的实例
vector_store = AzureCosmosDBMongoDBVectorSearch(
mongodb_client=mongodb_client,
db_name="demo_vectordb",
collection_name="paul_graham_essay",
)
```"""
stores_text: bool = True
flat_metadata: bool = True
_collection: Any = PrivateAttr()
_index_name: str = PrivateAttr()
_embedding_key: str = PrivateAttr()
_id_key: str = PrivateAttr()
_text_key: str = PrivateAttr()
_metadata_key: str = PrivateAttr()
_insert_kwargs: dict = PrivateAttr()
_db_name: str = PrivateAttr()
_collection_name: str = PrivateAttr()
_cosmos_search_kwargs: dict = PrivateAttr()
_mongodb_client: Any = PrivateAttr()
def __init__(
self,
mongodb_client: Optional[Any] = None,
db_name: str = "default_db",
collection_name: str = "default_collection",
index_name: str = "default_vector_search_index",
id_key: str = "id",
embedding_key: str = "content_vector",
text_key: str = "text",
metadata_key: str = "metadata",
cosmos_search_kwargs: Optional[Dict] = None,
insert_kwargs: Optional[Dict] = None,
**kwargs: Any,
) -> None:
"""初始化向量存储。
Args:
mongodb_client: Azure CosmoDB MongoDB 客户端(类型:MongoClient,lazy import 时显示为任意)。
db_name: Azure CosmosDB MongoDB 数据库名称。
collection_name: Azure CosmosDB 集合名称。
index_name: Azure CosmosDB MongoDB vCore 向量搜索索引名称。
id_key: 用作 id 的数据字段。
embedding_key: 包含每个文档嵌入的 Azure CosmosDB MongoDB 字段。
text_key: 包含每个文档文本的 Azure CosmosDB MongoDB 字段。
metadata_key: 包含每个文档元数据的 Azure CosmosDB MongoDB 字段。
cosmos_search_kwargs: 包含搜索选项的 Azure CosmosDB MongoDB 字段,如 kind、numLists、similarity 和 dimensions。
insert_kwargs: `insert` 期间使用的 kwargs。
"""
super().__init__()
if mongodb_client is not None:
self._mongodb_client = cast(pymongo.MongoClient, mongodb_client)
else:
if "AZURE_COSMOSDB_MONGODB_URI" not in os.environ:
raise ValueError(
"Must specify Azure cosmodb 'AZURE_COSMOSDB_MONGODB_URI' via env variable "
"if not directly passing in client."
)
self._mongodb_client = pymongo.MongoClient(
os.environ["AZURE_COSMOSDB_MONGODB_URI"]
)
self._collection = self._mongodb_client[db_name][collection_name]
self._index_name = index_name
self._embedding_key = embedding_key
self._id_key = id_key
self._text_key = text_key
self._metadata_key = metadata_key
self._insert_kwargs = insert_kwargs or {}
self._db_name = db_name
self._collection_name = collection_name
self._cosmos_search_kwargs = cosmos_search_kwargs or {}
self._create_vector_search_index()
def _create_vector_search_index(self) -> None:
db = self._mongodb_client[self._db_name]
db.command(
{
"createIndexes": self._collection_name,
"indexes": [
{
"name": self._index_name,
"key": {self._embedding_key: "cosmosSearch"},
"cosmosSearchOptions": {
"kind": self._cosmos_search_kwargs.get(
"kind", "vector-ivf"
),
"numLists": self._cosmos_search_kwargs.get("numLists", 1),
"similarity": self._cosmos_search_kwargs.get(
"similarity", "COS"
),
"dimensions": self._cosmos_search_kwargs.get(
"dimensions", 1536
),
},
}
],
}
)
def add(
self,
nodes: List[BaseNode],
**add_kwargs: Any,
) -> List[str]:
"""将节点添加到索引中。
Args:
节点:List[BaseNode]:带有嵌入的节点列表
Returns:
成功添加节点的id列表。
"""
ids = []
data_to_insert = []
for node in nodes:
metadata = node_to_metadata_dict(
node, remove_text=True, flat_metadata=self.flat_metadata
)
entry = {
self._id_key: node.node_id,
self._embedding_key: node.get_embedding(),
self._text_key: node.get_content(metadata_mode=MetadataMode.NONE) or "",
self._metadata_key: metadata,
}
data_to_insert.append(entry)
ids.append(node.node_id)
logger.debug("Inserting data into MongoDB: %s", data_to_insert)
insert_result = self._collection.insert_many(
data_to_insert, **self._insert_kwargs
)
logger.debug("Result of insert: %s", insert_result)
return ids
def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
"""使用ref_doc_id删除节点。
Args:
ref_doc_id(str):要删除的文档的doc_id。
"""
# delete by filtering on the doc_id metadata
self._collection.delete_one(
filter={self._metadata_key + ".ref_doc_id": ref_doc_id}, **delete_kwargs
)
@property
def client(self) -> Any:
"""返回 MongoDB 客户端。"""
return self._mongodb_client
def _query(self, query: VectorStoreQuery) -> VectorStoreQueryResult:
params: Dict[str, Any] = {
"vector": query.query_embedding,
"path": self._embedding_key,
"k": query.similarity_top_k,
}
if query.filters is not None:
raise ValueError(
"Metadata filters not implemented for azure cosmosdb mongodb yet."
)
query_field = {"$search": {"cosmosSearch": params, "returnStoredSource": True}}
pipeline = [
query_field,
{
"$project": {
"similarityScore": {"$meta": "searchScore"},
"document": "$$ROOT",
}
},
]
logger.debug("Running query pipeline: %s", pipeline)
cursor = self._collection.aggregate(pipeline) # type: ignore
top_k_nodes = []
top_k_ids = []
top_k_scores = []
for res in cursor:
text = res["document"].pop(self._text_key)
score = res.pop("similarityScore")
id = res["document"].pop(self._id_key)
metadata_dict = res["document"].pop(self._metadata_key)
try:
node = metadata_dict_to_node(metadata_dict)
node.set_content(text)
except Exception:
# NOTE: deprecated legacy logic for backward compatibility
metadata, node_info, relationships = legacy_metadata_dict_to_node(
metadata_dict
)
node = TextNode(
text=text,
id_=id,
metadata=metadata,
start_char_idx=node_info.get("start", None),
end_char_idx=node_info.get("end", None),
relationships=relationships,
)
top_k_ids.append(id)
top_k_nodes.append(node)
top_k_scores.append(score)
result = VectorStoreQueryResult(
nodes=top_k_nodes, similarities=top_k_scores, ids=top_k_ids
)
logger.debug("Result of query: %s", result)
return result
def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
"""查询前k个最相似节点的索引。
Args:
query: 一个VectorStoreQuery对象。
Returns:
包含查询结果的VectorStoreQueryResult。
"""
return self._query(query)
|