47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423 | class VectaraIndex(BaseManagedIndex):
"""
Vectara索引。
Vectara索引实现了一个使用Vectara作为后端的托管索引。
Vectara在后端执行传统索引中的许多功能:
- 将文档分解为块(节点)
- 为每个块(节点)创建嵌入
- 执行对查询的前k个最相似节点的搜索
- 可选地对前k个节点进行摘要
Args:
show_progress(布尔值):是否显示tqdm进度条。默认为False。"""
def __init__(
self,
show_progress: bool = False,
nodes: Optional[Sequence[BaseNode]] = None,
vectara_customer_id: Optional[str] = None,
vectara_corpus_id: Optional[str] = None,
vectara_api_key: Optional[str] = None,
use_core_api: bool = False,
parallelize_ingest: bool = False,
**kwargs: Any,
) -> None:
"""初始化Vectara API。"""
self.parallelize_ingest = parallelize_ingest
index_struct = VectaraIndexStruct(
index_id=str(vectara_corpus_id),
summary="Vectara Index",
)
super().__init__(
show_progress=show_progress,
index_struct=index_struct,
**kwargs,
)
self._vectara_customer_id = vectara_customer_id or os.environ.get(
"VECTARA_CUSTOMER_ID"
)
self._vectara_corpus_id = vectara_corpus_id or str(
os.environ.get("VECTARA_CORPUS_ID")
)
self._vectara_api_key = vectara_api_key or os.environ.get("VECTARA_API_KEY")
if (
self._vectara_customer_id is None
or self._vectara_corpus_id is None
or self._vectara_api_key is None
):
_logger.warning(
"Can't find Vectara credentials, customer_id or corpus_id in "
"environment."
)
raise ValueError("Missing Vectara credentials")
else:
_logger.debug(f"Using corpus id {self._vectara_corpus_id}")
# setup requests session with max 3 retries and 90s timeout
# for calling Vectara API
self._session = requests.Session() # to reuse connections
adapter = requests.adapters.HTTPAdapter(max_retries=3)
self._session.mount("https://", adapter)
self.vectara_api_timeout = 90
self.use_core_api = use_core_api
self.doc_ids: List[str] = []
# if nodes is specified, consider each node as a single document
# and use _build_index_from_nodes() to add them to the index
if nodes is not None:
self._build_index_from_nodes(nodes, use_core_api)
def _build_index_from_nodes(
self, nodes: Sequence[BaseNode], use_core_api: bool = False
) -> IndexDict:
docs = [
Document(
text=node.get_content(metadata_mode=MetadataMode.NONE),
metadata=node.metadata, # type: ignore
id_=node.id_, # type: ignore
)
for node in nodes
]
self.add_documents(docs, use_core_api)
return self.index_struct
def _get_corpus_id(self, corpus_id: str) -> str:
"""获取用于索引的语料库ID。
如果提供了corpus_id,则检查它是否是有效的语料库ID。
如果没有,则使用列表中的第一个语料库ID。
"""
if corpus_id is not None:
if corpus_id in self._vectara_corpus_id.split(","):
return corpus_id
return self._vectara_corpus_id.split(",")[0]
def _get_post_headers(self) -> dict:
"""返回应附加到每个POST请求的标头。"""
return {
"x-api-key": self._vectara_api_key,
"customer-id": self._vectara_customer_id,
"Content-Type": "application/json",
"X-Source": "llama_index",
}
def _delete_doc(self, doc_id: str, corpus_id: Optional[str] = None) -> bool:
"""从Vectara语料库中删除文档。
Args:
url(str):要删除的页面的URL。
doc_id(str):要删除的文档的ID。
corpus_id(str):要从中删除文档的语料库ID。
Returns:
bool:如果删除成功则为True,否则为False。
"""
valid_corpus_id = self._get_corpus_id(corpus_id)
body = {
"customerId": self._vectara_customer_id,
"corpusId": valid_corpus_id,
"documentId": doc_id,
}
response = self._session.post(
"https://api.vectara.io/v1/delete-doc",
data=json.dumps(body),
verify=True,
headers=self._get_post_headers(),
timeout=self.vectara_api_timeout,
)
if response.status_code != 200:
_logger.error(
f"Delete request failed for doc_id = {doc_id} with status code "
f"{response.status_code}, reason {response.reason}, text "
f"{response.text}"
)
return False
return True
def _index_doc(self, doc: dict, corpus_id) -> str:
request: Dict[str, Any] = {}
request["customerId"] = self._vectara_customer_id
request["corpusId"] = corpus_id
request["document"] = doc
if "parts" in doc:
api_url = "https://api.vectara.io/v1/core/index"
else:
api_url = "https://api.vectara.io/v1/index"
response = self._session.post(
headers=self._get_post_headers(),
url=api_url,
data=json.dumps(request),
timeout=self.vectara_api_timeout,
verify=True,
)
status_code = response.status_code
result = response.json()
status_str = result["status"]["code"] if "status" in result else None
if status_code == 409 and status_str and (status_str == "ALREADY_EXISTS"):
return "E_ALREADY_EXISTS"
elif status_code == 200 and status_str and (status_str == "INVALID_ARGUMENT"):
return "E_INVALID_ARGUMENT"
elif status_str and (status_str == "FORBIDDEN"):
return "E_NO_PERMISSIONS"
else:
return "E_SUCCEEDED"
def _insert(
self,
nodes: Sequence[BaseNode],
corpus_id: Optional[str] = None,
use_core_api: bool = False,
**insert_kwargs: Any,
) -> None:
"""插入一组文档(每个文档都是一个节点)。"""
def gen_hash(s: str) -> str:
hash_object = blake2b(digest_size=32)
hash_object.update(s.encode("utf-8"))
return hash_object.hexdigest()
docs = []
for node in nodes:
metadata = node.metadata.copy()
metadata["framework"] = "llama_index"
section_key = "parts" if use_core_api else "section"
text = node.get_content(metadata_mode=MetadataMode.NONE)
doc_id = gen_hash(text)
doc = {
"documentId": doc_id,
"metadataJson": json.dumps(node.metadata),
section_key: [{"text": text}],
}
docs.append(doc)
valid_corpus_id = self._get_corpus_id(corpus_id)
if self.parallelize_ingest:
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(self._index_doc, doc, valid_corpus_id)
for doc in docs
]
for future in futures:
ecode = future.result()
if ecode != "E_SUCCEEDED":
_logger.error(
f"Error indexing document in Vectara with error code {ecode}"
)
self.doc_ids.extend([doc["documentId"] for doc in docs])
else:
for doc in docs:
ecode = self._index_doc(doc, valid_corpus_id)
if ecode != "E_SUCCEEDED":
_logger.error(
f"Error indexing document in Vectara with error code {ecode}"
)
self.doc_ids.append(doc["documentId"])
def add_documents(
self,
docs: Sequence[Document],
corpus_id: Optional[str],
use_core_api: bool = False,
allow_update: bool = True,
) -> None:
nodes = [
TextNode(text=doc.get_content(), metadata=doc.metadata) for doc in docs # type: ignore
]
self._insert(nodes, corpus_id, use_core_api)
def insert_file(
self,
file_path: str,
metadata: Optional[dict] = None,
corpus_id: Optional[str] = None,
**insert_kwargs: Any,
) -> Optional[str]:
"""Vectara提供了一种通过我们的API直接添加文件(二进制或文本)的方式,在其中进行预处理和分块处理,以最佳方式内部进行。
这种方法提供了一种在Llama_index中使用该API的方式。
# ruff: noqa: E501
完整的API文档:https://docs.vectara.com/docs/api-reference/indexing-apis/
file-upload/file-upload-filetypes
Args:
file_path:本地文件路径
文件可以是文本、HTML、PDF、markdown、doc/docx、ppt/pptx等。
请参阅API文档以获取完整列表
metadata:与文件关联的可选元数据列表
Returns:
与每个索引文件相关联的ID列表
"""
if not os.path.exists(file_path):
_logger.error(f"File {file_path} does not exist")
return None
metadata = metadata or {}
metadata["framework"] = "llama_index"
files: dict = {
"file": (file_path, open(file_path, "rb")),
"doc_metadata": json.dumps(metadata),
}
headers = self._get_post_headers()
headers.pop("Content-Type")
valid_corpus_id = self._get_corpus_id(corpus_id)
response = self._session.post(
f"https://api.vectara.io/upload?c={self._vectara_customer_id}&o={valid_corpus_id}&d=True",
files=files,
verify=True,
headers=headers,
timeout=self.vectara_api_timeout,
)
res = response.json()
if response.status_code == 409:
_logger.info(
f"File {file_path} already exists on Vectara, skipping indexing"
)
return None
elif response.status_code == 200:
quota = res["response"]["quotaConsumed"]["numChars"]
if quota == 0:
_logger.warning(
f"File Upload for {file_path} returned 0 quota consumed, please check your Vectara account quota"
)
doc_id = res["document"]["documentId"]
self.doc_ids.append(doc_id)
return doc_id
else:
_logger.info(f"Error indexing file {file_path}: {res}")
return None
def delete_ref_doc(
self, ref_doc_id: str, delete_from_docstore: bool = False, **delete_kwargs: Any
) -> None:
raise NotImplementedError(
"Vectara does not support deleting a reference document"
)
def update_ref_doc(self, document: Document, **update_kwargs: Any) -> None:
raise NotImplementedError(
"Vectara does not support updating a reference document"
)
def as_retriever(self, **kwargs: Any) -> BaseRetriever:
"""返回此托管索引的Retriever。"""
from llama_index.indices.managed.vectara.retriever import (
VectaraRetriever,
)
return VectaraRetriever(self, **kwargs)
def as_chat_engine(self, **kwargs: Any) -> BaseChatEngine:
kwargs["summary_enabled"] = True
retriever = self.as_retriever(**kwargs)
kwargs.pop("summary_enabled")
from llama_index.indices.managed.vectara.query import (
VectaraChatEngine,
)
return VectaraChatEngine.from_args(retriever, **kwargs) # type: ignore
def as_query_engine(
self, llm: Optional[LLMType] = None, **kwargs: Any
) -> BaseQueryEngine:
if kwargs.get("summary_enabled", True):
from llama_index.indices.managed.vectara.query import (
VectaraQueryEngine,
)
kwargs["summary_enabled"] = True
retriever = self.as_retriever(**kwargs)
return VectaraQueryEngine.from_args(retriever=retriever, **kwargs) # type: ignore
else:
from llama_index.core.query_engine.retriever_query_engine import (
RetrieverQueryEngine,
)
llm = (
resolve_llm(llm, callback_manager=self._callback_manager)
or Settings.llm
)
retriever = self.as_retriever(**kwargs)
response_synthesizer = get_response_synthesizer(
response_mode=ResponseMode.COMPACT,
llm=llm,
)
return RetrieverQueryEngine.from_args(
retriever=retriever,
response_synthesizer=response_synthesizer,
**kwargs,
)
@classmethod
def from_documents(
cls: Type[IndexType],
documents: Sequence[Document],
show_progress: bool = False,
callback_manager: Optional[CallbackManager] = None,
transformations: Optional[List[TransformComponent]] = None,
**kwargs: Any,
) -> IndexType:
"""从一系列文档中构建 Vectara 索引。"""
nodes = [
TextNode(text=document.get_content(), metadata=document.metadata) # type: ignore
for document in documents
]
return cls(
nodes=nodes,
show_progress=show_progress,
**kwargs,
)
|