57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567 | class MilvusVectorStore(BasePydanticVectorStore):
"""Milvus向量存储。
在这个向量存储中,我们存储文本、它的嵌入和元数据在一个Milvus集合中。这个实现允许使用已经存在的集合。它还支持创建一个新的集合,如果集合不存在或者`overwrite`设置为True。
Args:
- uri (str, optional): 连接的URI,形式为"https://address:port"用于Milvus或Zilliz云服务,或者"path/to/local/milvus.db"用于本地的轻量级Milvus。默认为"./milvus_llamaindex.db"。
- token (str, optional): 登录的令牌。如果不使用rbac,则为空,如果使用rbac,则大多数情况下为"username:password"。
- collection_name (str, optional): 数据将被存储的集合的名称。默认为"llamalection"。
- dim (int, optional): 集合的嵌入向量的维度。在创建新集合时需要。
- embedding_field (str, optional): 集合的嵌入字段的名称,默认为DEFAULT_EMBEDDING_KEY。
- doc_id_field (str, optional): 集合的doc_id字段的名称,默认为DEFAULT_DOC_ID_KEY。
- similarity_metric (str, optional): 要使用的相似度度量,目前支持IP和L2。
- consistency_level (str, optional): 为新创建的集合使用的一致性级别。默认为"Strong"。
- overwrite (bool, optional): 是否覆盖同名的现有集合。默认为False。
- text_key (str, optional): 在传递的集合中存储文本的键。在使用自己的集合时使用。默认为None。
- index_config (dict, optional): 用于构建Milvus索引的配置。默认为None。
- search_config (dict, optional): 用于搜索Milvus索引的配置。注意,这必须与`index_config`指定的索引类型兼容。默认为None。
- batch_size (int): 在将数据插入Milvus时,配置一次处理的文档数量。默认为DEFAULT_BATCH_SIZE。
- enable_sparse (bool): 一个布尔标志,指示是否启用对混合检索的稀疏嵌入的支持。默认为False。
- sparse_embedding_function (BaseSparseEmbeddingFunction, optional): 如果enable_sparse为True,则应提供此对象以将文本转换为稀疏嵌入。
- hybrid_ranker (str): 指定在混合搜索查询中使用的排名器类型。目前仅支持['RRFRanker','WeightedRanker']。默认为"RRFRanker"。
- hybrid_ranker_params (dict, optional): 混合排名器的配置参数。此字典的结构取决于所使用的具体排名器:
- 对于"RRFRanker",它应包括:
- 'k' (int): 在Reciprocal Rank Fusion (RRF)中使用的参数。该值用于计算排名分数作为RRF算法的一部分,该算法将多个排名策略组合成单个分数,以提高搜索相关性。
- 对于"WeightedRanker",它期望:
- 'weights' (float列表): 两个权重的列表:
1. 稠密嵌入组件的权重。
2. 稀疏嵌入组件的权重。
这些权重用于调整嵌入的稠密和稀疏组件在混合检索过程中的重要性。
默认为空字典,表示排名器将使用其预定义的默认设置运行。
抛出:
- ImportError: 无法导入`pymilvus`。
- MilvusException: 与Milvus通信时出错,更多信息可以在Debug下的日志中找到。
Returns:
- MilvusVectorstore: 支持添加、删除和查询的向量存储。
示例:
`pip install llama-index-vector-stores-milvus`
```python
from llama_index.vector_stores.milvus import MilvusVectorStore
设置MilvusVectorStore
vector_store = MilvusVectorStore(
dim=1536,
collection_name="your_collection_name",
uri="http://milvus_address:port",
token="your_milvus_token_here",
overwrite=True
)
```"""
stores_text: bool = True
stores_node: bool = True
uri: str = "./milvus_llamaindex.db"
token: str = ""
collection_name: str = "llamacollection"
dim: Optional[int]
embedding_field: str = DEFAULT_EMBEDDING_KEY
doc_id_field: str = DEFAULT_DOC_ID_KEY
similarity_metric: str = "IP"
consistency_level: str = "Strong"
overwrite: bool = False
text_key: Optional[str]
output_fields: List[str] = Field(default_factory=list)
index_config: Optional[dict]
search_config: Optional[dict]
batch_size: int = DEFAULT_BATCH_SIZE
enable_sparse: bool = False
sparse_embedding_field: str = "sparse_embedding"
sparse_embedding_function: Any
hybrid_ranker: str
hybrid_ranker_params: dict = {}
_milvusclient: MilvusClient = PrivateAttr()
_collection: Any = PrivateAttr()
def __init__(
self,
uri: str = "./milvus_llamaindex.db",
token: str = "",
collection_name: str = "llamacollection",
dim: Optional[int] = None,
embedding_field: str = DEFAULT_EMBEDDING_KEY,
doc_id_field: str = DEFAULT_DOC_ID_KEY,
similarity_metric: str = "IP",
consistency_level: str = "Strong",
overwrite: bool = False,
text_key: Optional[str] = None,
output_fields: Optional[List[str]] = None,
index_config: Optional[dict] = None,
search_config: Optional[dict] = None,
batch_size: int = DEFAULT_BATCH_SIZE,
enable_sparse: bool = False,
sparse_embedding_function: Optional[BaseSparseEmbeddingFunction] = None,
hybrid_ranker: str = "RRFRanker",
hybrid_ranker_params: dict = {},
**kwargs: Any,
) -> None:
"""初始化参数。"""
super().__init__(
collection_name=collection_name,
dim=dim,
embedding_field=embedding_field,
doc_id_field=doc_id_field,
consistency_level=consistency_level,
overwrite=overwrite,
text_key=text_key,
output_fields=output_fields or [],
index_config=index_config if index_config else {},
search_config=search_config if search_config else {},
batch_size=batch_size,
enable_sparse=enable_sparse,
sparse_embedding_function=sparse_embedding_function,
hybrid_ranker=hybrid_ranker,
hybrid_ranker_params=hybrid_ranker_params,
)
# Select the similarity metric
similarity_metrics_map = {
"ip": "IP",
"l2": "L2",
"euclidean": "L2",
"cosine": "COSINE",
}
self.similarity_metric = similarity_metrics_map.get(
similarity_metric.lower(), "L2"
)
# Connect to Milvus instance
self._milvusclient = MilvusClient(
uri=uri,
token=token,
**kwargs, # pass additional arguments such as server_pem_path
)
# Delete previous collection if overwriting
if overwrite and collection_name in self.client.list_collections():
self._milvusclient.drop_collection(collection_name)
# Create the collection if it does not exist
if collection_name not in self.client.list_collections():
if dim is None:
raise ValueError("Dim argument required for collection creation.")
if self.enable_sparse is False:
self._milvusclient.create_collection(
collection_name=collection_name,
dimension=dim,
primary_field_name=MILVUS_ID_FIELD,
vector_field_name=embedding_field,
id_type="string",
metric_type=self.similarity_metric,
max_length=65_535,
consistency_level=consistency_level,
)
else:
try:
_ = DataType.SPARSE_FLOAT_VECTOR
except Exception as e:
logger.error(
"Hybrid retrieval is only supported in Milvus 2.4.0 or later."
)
raise NotImplementedError(
"Hybrid retrieval requires Milvus 2.4.0 or later."
) from e
self._create_hybrid_index(collection_name)
self._collection = Collection(collection_name, using=self._milvusclient._using)
self._create_index_if_required()
self.enable_sparse = enable_sparse
if self.enable_sparse is True and sparse_embedding_function is None:
logger.warning("Sparse embedding function is not provided, using default.")
self.sparse_embedding_function = get_defualt_sparse_embedding_function()
elif self.enable_sparse is True and sparse_embedding_function is not None:
self.sparse_embedding_function = sparse_embedding_function
else:
pass
logger.debug(f"Successfully created a new collection: {self.collection_name}")
@property
def client(self) -> Any:
"""获取客户端。"""
return self._milvusclient
def add(self, nodes: List[BaseNode], **add_kwargs: Any) -> List[str]:
"""将嵌入和它们的节点添加到Milvus中。
Args:
nodes(List[BaseNode]):具有要插入的嵌入的节点列表。
引发:
MilvusException:插入数据失败。
Returns:
List[str]:插入的id列表。
"""
insert_list = []
insert_ids = []
if self.enable_sparse is True and self.sparse_embedding_function is None:
logger.fatal(
"sparse_embedding_function is None when enable_sparse is True."
)
# Process that data we are going to insert
for node in nodes:
entry = node_to_metadata_dict(node)
entry[MILVUS_ID_FIELD] = node.node_id
entry[self.embedding_field] = node.embedding
if self.enable_sparse is True:
entry[
self.sparse_embedding_field
] = self.sparse_embedding_function.encode_documents([node.text])[0]
insert_ids.append(node.node_id)
insert_list.append(entry)
# Insert the data into milvus
for insert_batch in iter_batch(insert_list, self.batch_size):
self._collection.insert(insert_batch)
if add_kwargs.get("force_flush", False):
self._collection.flush()
self._create_index_if_required()
logger.debug(
f"Successfully inserted embeddings into: {self.collection_name} "
f"Num Inserted: {len(insert_list)}"
)
return insert_ids
def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
"""使用ref_doc_id删除节点。
Args:
ref_doc_id(str):要删除的文档的doc_id。
引发:
MilvusException:删除文档失败。
"""
# Adds ability for multiple doc delete in future.
doc_ids: List[str]
if isinstance(ref_doc_id, list):
doc_ids = ref_doc_id # type: ignore
else:
doc_ids = [ref_doc_id]
# Begin by querying for the primary keys to delete
doc_ids = ['"' + entry + '"' for entry in doc_ids]
entries = self._milvusclient.query(
collection_name=self.collection_name,
filter=f"{self.doc_id_field} in [{','.join(doc_ids)}]",
)
if len(entries) > 0:
ids = [entry["id"] for entry in entries]
self._milvusclient.delete(collection_name=self.collection_name, pks=ids)
logger.debug(f"Successfully deleted embedding with doc_id: {doc_ids}")
def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
"""查询索引以获取前k个最相似的节点。
Args:
query_embedding(List[float]):查询嵌入
similarity_top_k(int):前k个最相似的节点
doc_ids(Optional[List[str]]):要按照过滤的doc_ids列表
node_ids(Optional[List[str]]):要按照过滤的node_ids列表
output_fields(Optional[List[str]]):要返回的字段列表
embedding_field(Optional[str]):嵌入字段的名称
"""
if query.mode == VectorStoreQueryMode.DEFAULT:
pass
elif query.mode == VectorStoreQueryMode.HYBRID:
if self.enable_sparse is False:
raise ValueError(f"QueryMode is HYBRID, but enable_sparse is False.")
else:
raise ValueError(f"Milvus does not support {query.mode} yet.")
expr = []
output_fields = ["*"]
# Parse the filter
if query.filters is not None:
expr.append(_to_milvus_filter(query.filters))
# Parse any docs we are filtering on
if query.doc_ids is not None and len(query.doc_ids) != 0:
expr_list = ['"' + entry + '"' for entry in query.doc_ids]
expr.append(f"{self.doc_id_field} in [{','.join(expr_list)}]")
# Parse any nodes we are filtering on
if query.node_ids is not None and len(query.node_ids) != 0:
expr_list = ['"' + entry + '"' for entry in query.node_ids]
expr.append(f"{MILVUS_ID_FIELD} in [{','.join(expr_list)}]")
# Limit output fields
if query.output_fields is not None:
output_fields = query.output_fields
elif len(self.output_fields) > 0:
output_fields = self.output_fields
# Convert to string expression
string_expr = ""
if len(expr) != 0:
string_expr = f" and ".join(expr)
# Perform the search
if query.mode == VectorStoreQueryMode.DEFAULT:
# Perform default search
res = self._milvusclient.search(
collection_name=self.collection_name,
data=[query.query_embedding],
filter=string_expr,
limit=query.similarity_top_k,
output_fields=output_fields,
search_params=self.search_config,
anns_field=self.embedding_field,
)
logger.debug(
f"Successfully searched embedding in collection: {self.collection_name}"
f" Num Results: {len(res[0])}"
)
nodes = []
similarities = []
ids = []
# Parse the results
for hit in res[0]:
if not self.text_key:
node = metadata_dict_to_node(
{
"_node_content": hit["entity"].get("_node_content", None),
"_node_type": hit["entity"].get("_node_type", None),
}
)
else:
try:
text = hit["entity"].get(self.text_key)
except Exception:
raise ValueError(
"The passed in text_key value does not exist "
"in the retrieved entity."
)
metadata = {
key: hit["entity"].get(key) for key in self.output_fields
}
node = TextNode(text=text, metadata=metadata)
nodes.append(node)
similarities.append(hit["distance"])
ids.append(hit["id"])
else:
# Perform hybrid search
sparse_emb = self.sparse_embedding_function.encode_queries(
[query.query_str]
)[0]
sparse_search_params = {"metric_type": "IP"}
sparse_req = AnnSearchRequest(
[sparse_emb],
self.sparse_embedding_field,
sparse_search_params,
limit=query.similarity_top_k,
)
dense_search_params = {
"metric_type": self.similarity_metric,
"params": self.search_config,
}
dense_emb = query.query_embedding
dense_req = AnnSearchRequest(
[dense_emb],
self.embedding_field,
dense_search_params,
limit=query.similarity_top_k,
)
ranker = None
if WeightedRanker is None or RRFRanker is None:
logger.error(
"Hybrid retrieval is only supported in Milvus 2.4.0 or later."
)
raise ValueError(
"Hybrid retrieval is only supported in Milvus 2.4.0 or later."
)
if self.hybrid_ranker == "WeightedRanker":
if self.hybrid_ranker_params == {}:
self.hybrid_ranker_params = {"weights": [1.0, 1.0]}
ranker = WeightedRanker(*self.hybrid_ranker_params["weights"])
elif self.hybrid_ranker == "RRFRanker":
if self.hybrid_ranker_params == {}:
self.hybrid_ranker_params = {"k": 60}
ranker = RRFRanker(self.hybrid_ranker_params["k"])
else:
raise ValueError(f"Unsupported ranker: {self.hybrid_ranker}")
res = self._collection.hybrid_search(
[dense_req, sparse_req],
rerank=ranker,
limit=query.similarity_top_k,
output_fields=output_fields,
)
logger.debug(
f"Successfully searched embedding in collection: {self.collection_name}"
f" Num Results: {len(res[0])}"
)
nodes = []
similarities = []
ids = []
# Parse the results
for hit in res[0]:
if not self.text_key:
node = metadata_dict_to_node(
{
"_node_content": hit.entity.get("_node_content"),
"_node_type": hit.entity.get("_node_type"),
}
)
else:
try:
text = hit.entity.get(self.text_key)
except Exception:
raise ValueError(
"The passed in text_key value does not exist "
"in the retrieved entity."
)
metadata = {key: hit.entity.get(key) for key in self.output_fields}
node = TextNode(text=text, metadata=metadata)
nodes.append(node)
similarities.append(hit.distance)
ids.append(hit.id)
return VectorStoreQueryResult(nodes=nodes, similarities=similarities, ids=ids)
def _create_index_if_required(self, force: bool = False) -> None:
# This helper method is introduced to allow the index to be created
# both in the constructor and in the `add` method. The `force` flag is
# provided to ensure that the index is created in the constructor even
# if self.overwrite is false. In the `add` method, the index is
# recreated only if self.overwrite is true.
if self.enable_sparse is False:
if (self._collection.has_index() and self.overwrite) or force:
self._collection.release()
self._collection.drop_index()
base_params: Dict[str, Any] = self.index_config.copy()
index_type: str = base_params.pop("index_type", "FLAT")
index_params: Dict[str, Union[str, Dict[str, Any]]] = {
"params": base_params,
"metric_type": self.similarity_metric,
"index_type": index_type,
}
self._collection.create_index(
self.embedding_field, index_params=index_params
)
self._collection.load()
else:
if (
self._collection.has_index(index_name=self.embedding_field)
and self.overwrite
) or force:
if self._collection.has_index(index_name=self.embedding_field) is True:
self._collection.release()
self._collection.drop_index(index_name=self.embedding_field)
if (
self._collection.has_index(index_name=self.sparse_embedding_field)
is True
):
self._collection.drop_index(index_name=self.sparse_embedding_field)
self._create_hybrid_index(self.collection_name)
self._collection.load()
def _create_hybrid_index(self, collection_name):
schema = MilvusClient.create_schema(auto_id=False, enable_dynamic_field=True)
schema.add_field(
field_name="id",
datatype=DataType.VARCHAR,
max_length=65535,
is_primary=True,
)
schema.add_field(
field_name=self.embedding_field,
datatype=DataType.FLOAT_VECTOR,
dim=self.dim,
)
schema.add_field(
field_name=self.sparse_embedding_field,
datatype=DataType.SPARSE_FLOAT_VECTOR,
)
self._collection = Collection(
collection_name, schema=schema, using=self._milvusclient._using
)
sparse_index = {"index_type": "SPARSE_INVERTED_INDEX", "metric_type": "IP"}
self._collection.create_index(self.sparse_embedding_field, sparse_index)
base_params = self.index_config.copy()
index_type = base_params.pop("index_type", "FLAT")
dense_index = {
"params": base_params,
"metric_type": self.similarity_metric,
"index_type": index_type,
}
self._collection.create_index(self.embedding_field, dense_index)
|