Skip to content

Zilliz

ZillizCloudPipelineIndex #

Bases: BaseManagedIndex

Zilliz Cloud Pipeline的索引。

Zilliz Cloud Pipeline的索引实现了一个使用Zilliz Cloud Pipeline作为后端的托管索引。

Parameters:

Name Type Description Default
pipeline_ids dict

INGESTION、SEARCH、DELETION的管道id字典。

required
api_key str

Zilliz Cloud的API密钥。

None
cloud_region str='gcp-us-west1'

Zilliz Cloud集群的地区。默认为'gcp-us-west1'。

'gcp-us-west1'
show_progress bool

是否显示tqdm进度条。默认为False。

False
Source code in llama_index/indices/managed/zilliz/base.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
class ZillizCloudPipelineIndex(BaseManagedIndex):
    """Zilliz Cloud Pipeline的索引。

Zilliz Cloud Pipeline的索引实现了一个使用Zilliz Cloud Pipeline作为后端的托管索引。

Args:
    pipeline_ids (dict): INGESTION、SEARCH、DELETION的管道id字典。
    api_key (str): Zilliz Cloud的API密钥。
    cloud_region (str='gcp-us-west1'): Zilliz Cloud集群的地区。默认为'gcp-us-west1'。
    show_progress (bool): 是否显示tqdm进度条。默认为False。"""

    def __init__(
        self,
        pipeline_ids: Dict,
        api_key: str = None,
        cloud_region: str = "gcp-us-west1",
        show_progress: bool = False,
        **kwargs: Any,
    ) -> None:
        self.token = api_key
        self.cloud_region = cloud_region
        self.domain = (
            f"https://controller.api.{cloud_region}.zillizcloud.com/v1/pipelines"
        )
        self.headers = {
            "Authorization": f"Bearer {self.token}",
            "Accept": "application/json",
            "Content-Type": "application/json",
        }
        self.pipeline_ids = pipeline_ids or {}

        if len(self.pipeline_ids) == 0:
            print(
                "Pipeline ids are required. You can use the classmethod `ZillizCloudPipelineIndex.create_pipelines` to create pipelines and get pipeline ids."
            )
        else:
            assert set(PIPELINE_TYPES).issubset(
                set(self.pipeline_ids.keys())
            ), f"Missing pipeline(s): {set(PIPELINE_TYPES) - set(self.pipeline_ids.keys())}"

        index_struct = ZillizCloudPipelineIndexStruct(
            index_id="-".join(pipeline_ids.values()),
            summary="Zilliz Cloud Pipeline Index",
        )

        super().__init__(
            show_progress=show_progress, index_struct=index_struct, **kwargs
        )

    def _insert_doc_url(self, url: str, metadata: Optional[Dict] = None) -> None:
        """使用文档流程将来自URL的文档插入到已初始化的索引中。"""
        ingest_pipe_id = self.pipeline_ids.get("INGESTION")
        ingestion_url = f"{self.domain}/{ingest_pipe_id}/run"

        if metadata is None:
            metadata = {}
        params = {"data": {"doc_url": url}}
        params["data"].update(metadata)
        response = requests.post(ingestion_url, headers=self.headers, json=params)
        if response.status_code != 200:
            raise RuntimeError(response.text)
        response_dict = response.json()
        if response_dict["code"] != 200:
            raise RuntimeError(response_dict)
        return response_dict["data"]

    def _insert(self, nodes: Sequence[BaseNode], metadata: Optional[Dict] = None):
        """使用文本流水线从已初始化索引的文本节点插入文档。"""
        ingest_pipe_id = self.pipeline_ids.get("INGESTION")
        ingestion_url = f"{self.domain}/{ingest_pipe_id}/run"

        text_list = [n.get_content() for n in nodes]
        if metadata is None:
            metadata = {}
        params = {"data": {"text_list": text_list}}
        params["data"].update(metadata)
        response = requests.post(ingestion_url, headers=self.headers, json=params)
        if response.status_code != 200:
            raise RuntimeError(response.text)
        response_dict = response.json()
        if response_dict["code"] != 200:
            raise RuntimeError(response_dict)
        return response_dict["data"]

    def delete_by_expression(self, expression: str):
        """使用相应的删除管道,通过Milvus布尔表达式删除数据。"""
        deletion_pipe_id = self.pipeline_ids.get("DELETION")
        deletion_url = f"{self.domain}/{deletion_pipe_id}/run"

        params = {"data": {"expression": expression}}
        response = requests.post(deletion_url, headers=self.headers, json=params)
        if response.status_code != 200:
            raise RuntimeError(response.text)
        response_dict = response.json()
        if response_dict["code"] != 200:
            raise RuntimeError(response_dict)
        return response_dict["data"]

    def delete_by_doc_name(self, doc_name: str):
        """如果使用相应的删除流程,则按文档名称删除数据。"""
        deletion_pipe_id = self.pipeline_ids.get("DELETION")
        deletion_url = f"{self.domain}/{deletion_pipe_id}/run"

        params = {"data": {"doc_name": doc_name}}
        response = requests.post(deletion_url, headers=self.headers, json=params)
        if response.status_code != 200:
            raise RuntimeError(response.text)
        response_dict = response.json()
        if response_dict["code"] != 200:
            raise RuntimeError(response_dict)
        return response_dict["data"]

    def delete_ref_doc(
        self, ref_doc_id: str, delete_from_docstore: bool = False, **delete_kwargs: Any
    ) -> None:
        raise NotImplementedError(
            "Deleting a reference document is not yet supported with Zilliz Cloud Pipeline."
        )

    def update_ref_doc(self, document: Document, **update_kwargs: Any) -> None:
        raise NotImplementedError(
            "Updating referenced document is not yet supported with Zilliz Cloud Pipeline."
        )

    def as_retriever(self, **kwargs: Any) -> BaseRetriever:
        """返回一个检索器。"""
        from llama_index.indices.managed.zilliz.retriever import (
            ZillizCloudPipelineRetriever,
        )

        return ZillizCloudPipelineRetriever(self, **kwargs)

    @staticmethod
    def create_pipelines(
        project_id: str,
        cluster_id: str,
        cloud_region: str = "gcp-us-west1",
        api_key: str = None,
        collection_name: str = "zcp_llamalection",
        data_type: str = "text",
        metadata_schema: Optional[Dict] = None,
        **kwargs: Any,
    ) -> dict:
        """创建使用self.collection_name的INGESTION、SEARCH、DELETION管道。

Args:
    project_id (str): Zilliz Cloud的项目ID。
    cluster_id (str): Zilliz Cloud的集群ID。
    api_key (str=None): Zilliz Cloud的API密钥。默认为None。
    cloud_region (str='gcp-us-west1'): Zilliz Cloud的集群所在地区。默认为'gcp-us-west1'。
    collection_name (str="zcp_llamalection"): 集合名称,默认为'zcp_llamalection'。
    data_type (str="text"): 管道的数据类型,默认为"text"。目前仅支持"text"或"doc"。
    metadata_schema (Dict=None): 元数据模式的字典,默认为None。使用元数据名称作为键,相应的数据类型作为值:{'field_name': 'field_type'}。
        仅支持以下值作为字段类型:'Bool'、'Int8'、'Int16'、'Int32'、'Int64'、'Float'、'Double'、'VarChar'。
    kwargs: 创建INGESTION和SEARCH管道的可选函数参数。
        - language: 文档的语言。可用选项:"ENGLISH"、"CHINESE"。
        - embedding: 用于INGESTION和SEARCH管道的嵌入服务。
        - reranker: 用于搜索功能的重新排序服务。
        - chunkSize: 拆分文档的块大小。仅适用于doc数据。
        - splitBy: 用于拆分文档的分隔符。仅适用于doc数据。

Returns:
    创建管道的管道ID。

示例:
    >>> from llama_index.indices import ZillizCloudPipelineIndex
    >>> index = ZillizCloudPipelineIndex(
    >>>     project_id='YOUR_ZILLIZ_CLOUD_PROJECT_ID',
    >>>     cluster_id='YOUR_ZILLIZ_CLOUD_CLUSTER_ID',
    >>>     token='YOUR_ZILLIZ_CLOUD_API_KEY',
    >>>     collection_name='your_new_collection_name'
    >>> )
    >>> pipeline_ids = index.create_pipelines(
    >>>     metadata_schema={'year': 'Int32', 'author': 'VarChar'}  # 可选,默认为None
    >>> )
"""
        if data_type == "text":
            ingest_action = "INDEX_TEXT"
            search_action = "SEARCH_TEXT"
        elif data_type == "doc":
            ingest_action = "INDEX_DOC"
            search_action = "SEARCH_DOC_CHUNK"
        else:
            raise Exception("Only text or doc is supported as the data type.")

        params_dict = {}
        additional_params = kwargs or {}

        language = additional_params.pop("language", "ENGLISH")
        embedding = additional_params.pop("embedding", "zilliz/bge-base-en-v1.5")
        reranker = additional_params.pop("reranker", None)
        index_func = {
            "name": "llamaindex_index",
            "action": ingest_action,
            "language": language,
            "embedding": embedding,
        }
        index_func.update(additional_params)
        ingest_functions = [index_func]
        if metadata_schema:
            for k, v in metadata_schema.items():
                preserve_func = {
                    "name": f"keep_{k}",
                    "action": "PRESERVE",
                    "inputField": k,
                    "outputField": k,
                    "fieldType": v,
                }
                ingest_functions.append(preserve_func)
        params_dict["INGESTION"] = {
            "name": f"{collection_name}_ingestion",
            "projectId": project_id,
            "clusterId": cluster_id,
            "collectionName": collection_name,
            "type": "INGESTION",
            "functions": ingest_functions,
        }

        search_function = {
            "name": "llamaindex_search",
            "action": search_action,
            "clusterId": cluster_id,
            "collectionName": collection_name,
            "embedding": embedding,
        }
        if reranker:
            search_function["reranker"] = reranker
        params_dict["SEARCH"] = {
            "name": f"{collection_name}_search",
            "projectId": project_id,
            "type": "SEARCH",
            "functions": [search_function],
        }

        params_dict["DELETION"] = {
            "name": f"{collection_name}_deletion",
            "type": "DELETION",
            "functions": [
                {
                    "name": "purge_by_expression",
                    "action": "PURGE_BY_EXPRESSION",
                }
            ],
            "projectId": project_id,
            "clusterId": cluster_id,
            "collectionName": collection_name,
        }

        domain = f"https://controller.api.{cloud_region}.zillizcloud.com/v1/pipelines"
        headers = {
            "Authorization": f"Bearer {api_key}",
            "Accept": "application/json",
            "Content-Type": "application/json",
        }
        pipeline_ids = {}

        for k, v in params_dict.items():
            response = requests.post(domain, headers=headers, json=v)
            if response.status_code != 200:
                raise RuntimeError(response.text)
            response_dict = response.json()
            if response_dict["code"] != 200:
                raise RuntimeError(response_dict)
            pipeline_ids[k] = response_dict["data"]["pipelineId"]

        return pipeline_ids

    @classmethod
    def from_document_url(
        cls,
        url: str,
        pipeline_ids: Optional[Dict] = None,
        api_key: Optional[str] = None,
        metadata: Optional[Dict] = None,
        show_progress: bool = False,
        **kwargs: Any,
    ) -> BaseManagedIndex:
        """Zilliz Cloud Pipeline从已签名的URL加载文档,然后为其构建自动索引。

Args:
    url: gcs或s3签名的URL。
    pipeline_ids (dict=None): INGESTION、SEARCH、DELETION的管道ID字典。默认为None。
    api_key (str): Zilliz Cloud的API密钥。
    metadata (Dict=None): 元数据字典。默认为None。键必须是字符串,值必须是字符串、浮点数、整数或布尔值。
    show_progress (bool): 是否显示tqdm进度条。默认为False。

Returns:
    初始化的ZillizCloudPipelineIndex

示例:
    >>> from llama_index.indices import ZillizCloudPipelineIndex
    >>> api_key = "{YOUR_ZILLIZ_CLOUD_API_KEY}"
    >>> pipeline_ids = ZillizCloudPipelineIndex.create_pipelines(
    >>>     project_id="{YOUR_ZILLIZ_PROJECT_ID}",
    >>>     cluster_id="{YOUR_ZILLIZ_CLUSTER_ID}",
    >>>     api_key=api_key,
    >>>     data_type="doc"
    >>> )
    >>> ZillizCloudPipelineIndex.from_document_url(
    >>>     url='https://oss_bucket.test_doc.ext',
    >>>     pipeline_ids=pipeline_ids,
    >>>     api_key=api_key
    >>> )
"""
        metadata = metadata or {}
        index = cls(
            pipeline_ids=pipeline_ids,
            api_key=api_key,
            show_progress=show_progress,
            **kwargs,
        )

        try:
            index._insert_doc_url(url=url, metadata=metadata)
        except Exception as e:
            logger.error(
                "Failed to build managed index given document url (%s):\n%s", url, e
            )
        return index

    @classmethod
    def from_documents(
        cls: Type[IndexType],
        documents: Sequence[Document],
        pipeline_ids: Optional[Dict] = None,
        api_key: Optional[str] = None,
        show_progress: bool = False,
        metadata: Optional[Dict] = None,
        **kwargs: Any,
    ) -> IndexType:
        """从一系列文档构建 Zilliz Cloud Pipeline 索引。

Args:
    documents: 一系列 llamaindex 文档。
    pipeline_ids (dict=None): INGESTION、SEARCH、DELETION 的管道 id 字典。默认为 None。
    api_key (str): Zilliz Cloud 的 API 密钥。
    metadata (Dict=None): 元数据字典。默认为 None。键必须为字符串,值必须为字符串、浮点数、整数或布尔值。
    show_progress (bool): 是否显示 tqdm 进度条。默认为 False。

Returns:
    一个初始化的 ZillizCloudPipelineIndex

示例:
    >>> from llama_index.indices import ZillizCloudPipelineIndex
    >>> api_key = "{YOUR_ZILLIZ_CLOUD_API_KEY}"
    >>> pipeline_ids = ZillizCloudPipelineIndex.create_pipelines(
    >>>     project_id="{YOUR_ZILLIZ_PROJECT_ID}",
    >>>     cluster_id="{YOUR_ZILLIZ_CLUSTER_ID}",
    >>>     api_key=api_key,
    >>>     data_type="text"
    >>> )
    >>> ZillizCloudPipelineIndex.from_documents(
    >>>     documents=my_documents,
    >>>     pipeline_ids=pipeline_ids,
    >>>     api_key=api_key
    >>> )
"""
        metadata = metadata or {}
        index = cls(
            pipeline_ids=pipeline_ids,
            api_key=api_key,
            show_progress=show_progress,
            **kwargs,
        )

        try:
            index._insert(nodes=documents, metadata=metadata)
        except Exception as e:
            logger.error("Failed to build managed index given documents:\n%s", e)
        return index

    def _build_index_from_nodes(self, nodes: Sequence[BaseNode]) -> IndexDict:
        raise NotImplementedError(
            "Building index from nodes is not yet supported with Zilliz Cloud Pipeline."
        )

    def _delete_node(self, node_id: str, **delete_kwargs: Any) -> None:
        raise NotImplementedError(
            "Deleting nodes is not yet supported with Zilliz Cloud Pipeline."
        )

delete_by_expression #

delete_by_expression(expression: str)

使用相应的删除管道,通过Milvus布尔表达式删除数据。

Source code in llama_index/indices/managed/zilliz/base.py
129
130
131
132
133
134
135
136
137
138
139
140
141
def delete_by_expression(self, expression: str):
    """使用相应的删除管道,通过Milvus布尔表达式删除数据。"""
    deletion_pipe_id = self.pipeline_ids.get("DELETION")
    deletion_url = f"{self.domain}/{deletion_pipe_id}/run"

    params = {"data": {"expression": expression}}
    response = requests.post(deletion_url, headers=self.headers, json=params)
    if response.status_code != 200:
        raise RuntimeError(response.text)
    response_dict = response.json()
    if response_dict["code"] != 200:
        raise RuntimeError(response_dict)
    return response_dict["data"]

delete_by_doc_name #

delete_by_doc_name(doc_name: str)

如果使用相应的删除流程,则按文档名称删除数据。

Source code in llama_index/indices/managed/zilliz/base.py
143
144
145
146
147
148
149
150
151
152
153
154
155
def delete_by_doc_name(self, doc_name: str):
    """如果使用相应的删除流程,则按文档名称删除数据。"""
    deletion_pipe_id = self.pipeline_ids.get("DELETION")
    deletion_url = f"{self.domain}/{deletion_pipe_id}/run"

    params = {"data": {"doc_name": doc_name}}
    response = requests.post(deletion_url, headers=self.headers, json=params)
    if response.status_code != 200:
        raise RuntimeError(response.text)
    response_dict = response.json()
    if response_dict["code"] != 200:
        raise RuntimeError(response_dict)
    return response_dict["data"]

as_retriever #

as_retriever(**kwargs: Any) -> BaseRetriever

返回一个检索器。

Source code in llama_index/indices/managed/zilliz/base.py
169
170
171
172
173
174
175
def as_retriever(self, **kwargs: Any) -> BaseRetriever:
    """返回一个检索器。"""
    from llama_index.indices.managed.zilliz.retriever import (
        ZillizCloudPipelineRetriever,
    )

    return ZillizCloudPipelineRetriever(self, **kwargs)

create_pipelines staticmethod #

create_pipelines(
    project_id: str,
    cluster_id: str,
    cloud_region: str = "gcp-us-west1",
    api_key: str = None,
    collection_name: str = "zcp_llamalection",
    data_type: str = "text",
    metadata_schema: Optional[Dict] = None,
    **kwargs: Any
) -> dict

创建使用self.collection_name的INGESTION、SEARCH、DELETION管道。

Parameters:

Name Type Description Default
project_id str

Zilliz Cloud的项目ID。

required
cluster_id str

Zilliz Cloud的集群ID。

required
api_key str=None

Zilliz Cloud的API密钥。默认为None。

None
cloud_region str='gcp-us-west1'

Zilliz Cloud的集群所在地区。默认为'gcp-us-west1'。

'gcp-us-west1'
collection_name str="zcp_llamalection"

集合名称,默认为'zcp_llamalection'。

'zcp_llamalection'
data_type str="text"

管道的数据类型,默认为"text"。目前仅支持"text"或"doc"。

'text'
metadata_schema Dict=None

元数据模式的字典,默认为None。使用元数据名称作为键,相应的数据类型作为值:{'field_name': 'field_type'}。 仅支持以下值作为字段类型:'Bool'、'Int8'、'Int16'、'Int32'、'Int64'、'Float'、'Double'、'VarChar'。

None
kwargs Any

创建INGESTION和SEARCH管道的可选函数参数。 - language: 文档的语言。可用选项:"ENGLISH"、"CHINESE"。 - embedding: 用于INGESTION和SEARCH管道的嵌入服务。 - reranker: 用于搜索功能的重新排序服务。 - chunkSize: 拆分文档的块大小。仅适用于doc数据。 - splitBy: 用于拆分文档的分隔符。仅适用于doc数据。

{}

Returns:

Type Description
dict

创建管道的管道ID。

示例: >>> from llama_index.indices import ZillizCloudPipelineIndex >>> index = ZillizCloudPipelineIndex( >>> project_id='YOUR_ZILLIZ_CLOUD_PROJECT_ID', >>> cluster_id='YOUR_ZILLIZ_CLOUD_CLUSTER_ID', >>> token='YOUR_ZILLIZ_CLOUD_API_KEY', >>> collection_name='your_new_collection_name' >>> ) >>> pipeline_ids = index.create_pipelines( >>> metadata_schema={'year': 'Int32', 'author': 'VarChar'} # 可选,默认为None >>> )

Source code in llama_index/indices/managed/zilliz/base.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
    @staticmethod
    def create_pipelines(
        project_id: str,
        cluster_id: str,
        cloud_region: str = "gcp-us-west1",
        api_key: str = None,
        collection_name: str = "zcp_llamalection",
        data_type: str = "text",
        metadata_schema: Optional[Dict] = None,
        **kwargs: Any,
    ) -> dict:
        """创建使用self.collection_name的INGESTION、SEARCH、DELETION管道。

Args:
    project_id (str): Zilliz Cloud的项目ID。
    cluster_id (str): Zilliz Cloud的集群ID。
    api_key (str=None): Zilliz Cloud的API密钥。默认为None。
    cloud_region (str='gcp-us-west1'): Zilliz Cloud的集群所在地区。默认为'gcp-us-west1'。
    collection_name (str="zcp_llamalection"): 集合名称,默认为'zcp_llamalection'。
    data_type (str="text"): 管道的数据类型,默认为"text"。目前仅支持"text"或"doc"。
    metadata_schema (Dict=None): 元数据模式的字典,默认为None。使用元数据名称作为键,相应的数据类型作为值:{'field_name': 'field_type'}。
        仅支持以下值作为字段类型:'Bool'、'Int8'、'Int16'、'Int32'、'Int64'、'Float'、'Double'、'VarChar'。
    kwargs: 创建INGESTION和SEARCH管道的可选函数参数。
        - language: 文档的语言。可用选项:"ENGLISH"、"CHINESE"。
        - embedding: 用于INGESTION和SEARCH管道的嵌入服务。
        - reranker: 用于搜索功能的重新排序服务。
        - chunkSize: 拆分文档的块大小。仅适用于doc数据。
        - splitBy: 用于拆分文档的分隔符。仅适用于doc数据。

Returns:
    创建管道的管道ID。

示例:
    >>> from llama_index.indices import ZillizCloudPipelineIndex
    >>> index = ZillizCloudPipelineIndex(
    >>>     project_id='YOUR_ZILLIZ_CLOUD_PROJECT_ID',
    >>>     cluster_id='YOUR_ZILLIZ_CLOUD_CLUSTER_ID',
    >>>     token='YOUR_ZILLIZ_CLOUD_API_KEY',
    >>>     collection_name='your_new_collection_name'
    >>> )
    >>> pipeline_ids = index.create_pipelines(
    >>>     metadata_schema={'year': 'Int32', 'author': 'VarChar'}  # 可选,默认为None
    >>> )
"""
        if data_type == "text":
            ingest_action = "INDEX_TEXT"
            search_action = "SEARCH_TEXT"
        elif data_type == "doc":
            ingest_action = "INDEX_DOC"
            search_action = "SEARCH_DOC_CHUNK"
        else:
            raise Exception("Only text or doc is supported as the data type.")

        params_dict = {}
        additional_params = kwargs or {}

        language = additional_params.pop("language", "ENGLISH")
        embedding = additional_params.pop("embedding", "zilliz/bge-base-en-v1.5")
        reranker = additional_params.pop("reranker", None)
        index_func = {
            "name": "llamaindex_index",
            "action": ingest_action,
            "language": language,
            "embedding": embedding,
        }
        index_func.update(additional_params)
        ingest_functions = [index_func]
        if metadata_schema:
            for k, v in metadata_schema.items():
                preserve_func = {
                    "name": f"keep_{k}",
                    "action": "PRESERVE",
                    "inputField": k,
                    "outputField": k,
                    "fieldType": v,
                }
                ingest_functions.append(preserve_func)
        params_dict["INGESTION"] = {
            "name": f"{collection_name}_ingestion",
            "projectId": project_id,
            "clusterId": cluster_id,
            "collectionName": collection_name,
            "type": "INGESTION",
            "functions": ingest_functions,
        }

        search_function = {
            "name": "llamaindex_search",
            "action": search_action,
            "clusterId": cluster_id,
            "collectionName": collection_name,
            "embedding": embedding,
        }
        if reranker:
            search_function["reranker"] = reranker
        params_dict["SEARCH"] = {
            "name": f"{collection_name}_search",
            "projectId": project_id,
            "type": "SEARCH",
            "functions": [search_function],
        }

        params_dict["DELETION"] = {
            "name": f"{collection_name}_deletion",
            "type": "DELETION",
            "functions": [
                {
                    "name": "purge_by_expression",
                    "action": "PURGE_BY_EXPRESSION",
                }
            ],
            "projectId": project_id,
            "clusterId": cluster_id,
            "collectionName": collection_name,
        }

        domain = f"https://controller.api.{cloud_region}.zillizcloud.com/v1/pipelines"
        headers = {
            "Authorization": f"Bearer {api_key}",
            "Accept": "application/json",
            "Content-Type": "application/json",
        }
        pipeline_ids = {}

        for k, v in params_dict.items():
            response = requests.post(domain, headers=headers, json=v)
            if response.status_code != 200:
                raise RuntimeError(response.text)
            response_dict = response.json()
            if response_dict["code"] != 200:
                raise RuntimeError(response_dict)
            pipeline_ids[k] = response_dict["data"]["pipelineId"]

        return pipeline_ids

from_document_url classmethod #

from_document_url(
    url: str,
    pipeline_ids: Optional[Dict] = None,
    api_key: Optional[str] = None,
    metadata: Optional[Dict] = None,
    show_progress: bool = False,
    **kwargs: Any
) -> BaseManagedIndex

Zilliz Cloud Pipeline从已签名的URL加载文档,然后为其构建自动索引。

Parameters:

Name Type Description Default
url str

gcs或s3签名的URL。

required
pipeline_ids dict=None

INGESTION、SEARCH、DELETION的管道ID字典。默认为None。

None
api_key str

Zilliz Cloud的API密钥。

None
metadata Dict=None

元数据字典。默认为None。键必须是字符串,值必须是字符串、浮点数、整数或布尔值。

None
show_progress bool

是否显示tqdm进度条。默认为False。

False

Returns:

Type Description
BaseManagedIndex

初始化的ZillizCloudPipelineIndex

示例: >>> from llama_index.indices import ZillizCloudPipelineIndex >>> api_key = "{YOUR_ZILLIZ_CLOUD_API_KEY}" >>> pipeline_ids = ZillizCloudPipelineIndex.create_pipelines( >>> project_id="{YOUR_ZILLIZ_PROJECT_ID}", >>> cluster_id="{YOUR_ZILLIZ_CLUSTER_ID}", >>> api_key=api_key, >>> data_type="doc" >>> ) >>> ZillizCloudPipelineIndex.from_document_url( >>> url='https://oss_bucket.test_doc.ext', >>> pipeline_ids=pipeline_ids, >>> api_key=api_key >>> )

Source code in llama_index/indices/managed/zilliz/base.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
    @classmethod
    def from_document_url(
        cls,
        url: str,
        pipeline_ids: Optional[Dict] = None,
        api_key: Optional[str] = None,
        metadata: Optional[Dict] = None,
        show_progress: bool = False,
        **kwargs: Any,
    ) -> BaseManagedIndex:
        """Zilliz Cloud Pipeline从已签名的URL加载文档,然后为其构建自动索引。

Args:
    url: gcs或s3签名的URL。
    pipeline_ids (dict=None): INGESTION、SEARCH、DELETION的管道ID字典。默认为None。
    api_key (str): Zilliz Cloud的API密钥。
    metadata (Dict=None): 元数据字典。默认为None。键必须是字符串,值必须是字符串、浮点数、整数或布尔值。
    show_progress (bool): 是否显示tqdm进度条。默认为False。

Returns:
    初始化的ZillizCloudPipelineIndex

示例:
    >>> from llama_index.indices import ZillizCloudPipelineIndex
    >>> api_key = "{YOUR_ZILLIZ_CLOUD_API_KEY}"
    >>> pipeline_ids = ZillizCloudPipelineIndex.create_pipelines(
    >>>     project_id="{YOUR_ZILLIZ_PROJECT_ID}",
    >>>     cluster_id="{YOUR_ZILLIZ_CLUSTER_ID}",
    >>>     api_key=api_key,
    >>>     data_type="doc"
    >>> )
    >>> ZillizCloudPipelineIndex.from_document_url(
    >>>     url='https://oss_bucket.test_doc.ext',
    >>>     pipeline_ids=pipeline_ids,
    >>>     api_key=api_key
    >>> )
"""
        metadata = metadata or {}
        index = cls(
            pipeline_ids=pipeline_ids,
            api_key=api_key,
            show_progress=show_progress,
            **kwargs,
        )

        try:
            index._insert_doc_url(url=url, metadata=metadata)
        except Exception as e:
            logger.error(
                "Failed to build managed index given document url (%s):\n%s", url, e
            )
        return index

from_documents classmethod #

from_documents(
    documents: Sequence[Document],
    pipeline_ids: Optional[Dict] = None,
    api_key: Optional[str] = None,
    show_progress: bool = False,
    metadata: Optional[Dict] = None,
    **kwargs: Any
) -> IndexType

从一系列文档构建 Zilliz Cloud Pipeline 索引。

Parameters:

Name Type Description Default
documents Sequence[Document]

一系列 llamaindex 文档。

required
pipeline_ids dict=None

INGESTION、SEARCH、DELETION 的管道 id 字典。默认为 None。

None
api_key str

Zilliz Cloud 的 API 密钥。

None
metadata Dict=None

元数据字典。默认为 None。键必须为字符串,值必须为字符串、浮点数、整数或布尔值。

None
show_progress bool

是否显示 tqdm 进度条。默认为 False。

False

Returns:

Type Description
IndexType

一个初始化的 ZillizCloudPipelineIndex

示例

from llama_index.indices import ZillizCloudPipelineIndex api_key = "{YOUR_ZILLIZ_CLOUD_API_KEY}" pipeline_ids = ZillizCloudPipelineIndex.create_pipelines( project_id="{YOUR_ZILLIZ_PROJECT_ID}", cluster_id="{YOUR_ZILLIZ_CLUSTER_ID}", api_key=api_key, data_type="text" ) ZillizCloudPipelineIndex.from_documents( documents=my_documents, pipeline_ids=pipeline_ids, api_key=api_key )

Source code in llama_index/indices/managed/zilliz/base.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
    @classmethod
    def from_documents(
        cls: Type[IndexType],
        documents: Sequence[Document],
        pipeline_ids: Optional[Dict] = None,
        api_key: Optional[str] = None,
        show_progress: bool = False,
        metadata: Optional[Dict] = None,
        **kwargs: Any,
    ) -> IndexType:
        """从一系列文档构建 Zilliz Cloud Pipeline 索引。

Args:
    documents: 一系列 llamaindex 文档。
    pipeline_ids (dict=None): INGESTION、SEARCH、DELETION 的管道 id 字典。默认为 None。
    api_key (str): Zilliz Cloud 的 API 密钥。
    metadata (Dict=None): 元数据字典。默认为 None。键必须为字符串,值必须为字符串、浮点数、整数或布尔值。
    show_progress (bool): 是否显示 tqdm 进度条。默认为 False。

Returns:
    一个初始化的 ZillizCloudPipelineIndex

示例:
    >>> from llama_index.indices import ZillizCloudPipelineIndex
    >>> api_key = "{YOUR_ZILLIZ_CLOUD_API_KEY}"
    >>> pipeline_ids = ZillizCloudPipelineIndex.create_pipelines(
    >>>     project_id="{YOUR_ZILLIZ_PROJECT_ID}",
    >>>     cluster_id="{YOUR_ZILLIZ_CLUSTER_ID}",
    >>>     api_key=api_key,
    >>>     data_type="text"
    >>> )
    >>> ZillizCloudPipelineIndex.from_documents(
    >>>     documents=my_documents,
    >>>     pipeline_ids=pipeline_ids,
    >>>     api_key=api_key
    >>> )
"""
        metadata = metadata or {}
        index = cls(
            pipeline_ids=pipeline_ids,
            api_key=api_key,
            show_progress=show_progress,
            **kwargs,
        )

        try:
            index._insert(nodes=documents, metadata=metadata)
        except Exception as e:
            logger.error("Failed to build managed index given documents:\n%s", e)
        return index