"""提供Infinispan作为VectorStore的模块。"""
from __future__ import annotations
import json
import logging
import uuid
from typing import Any, Iterable, List, Optional, Tuple, Type, cast
import requests
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
logger = logging.getLogger(__name__)
[docs]class InfinispanVS(VectorStore):
"""``Infinispan` VectorStore接口。
该类公开了将Infinispan呈现为VectorStore的方法。它依赖于下面的Infinispan类,该类负责与服务器进行REST接口交互。
示例:
... code-block:: python
from langchain_community.vectorstores import InfinispanVS
from mymodels import RGBEmbeddings
...
vectorDb = InfinispanVS.from_documents(docs,
embedding=RGBEmbeddings(),
output_fields=["texture", "color"],
lambda_key=lambda text,meta: str(meta["_key"]),
lambda_content=lambda item: item["color"])
或者,如果需要在填充存储之前进行初步设置,则可以创建一个空的InfinispanVS实例
... code-block:: python
from langchain_community.vectorstores import InfinispanVS
from mymodels import RGBEmbeddings
...
ispnVS = InfinispanVS()
# 在这里配置Infinispan
# 例如创建缓存和模式
# 然后填充存储
vectorDb = InfinispanVS.from_documents(docs,
embedding=RGBEmbeddings(),
output_fields: ["texture", "color"],
lambda_key: lambda text,meta: str(meta["_key"]),
lambda_content: lambda item: item["color"]})
```"""
[docs] def __init__(
self,
embedding: Optional[Embeddings] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
):
self.ispn = Infinispan(**kwargs)
self._configuration = kwargs
self._cache_name = str(self._configuration.get("cache_name", "vector"))
self._entity_name = str(self._configuration.get("entity_name", "vector"))
self._embedding = embedding
self._textfield = self._configuration.get("textfield", "text")
self._vectorfield = self._configuration.get("vectorfield", "vector")
self._to_content = self._configuration.get(
"lambda_content", lambda item: self._default_content(item)
)
self._to_metadata = self._configuration.get(
"lambda_metadata", lambda item: self._default_metadata(item)
)
self._output_fields = self._configuration.get("output_fields")
self._ids = ids
def _default_metadata(self, item: dict) -> dict:
meta = dict(item)
meta.pop(self._vectorfield, None)
meta.pop(self._textfield, None)
meta.pop("_type", None)
return meta
def _default_content(self, item: dict[str, Any]) -> Any:
return item.get(self._textfield)
[docs] def schema_builder(self, templ: dict, dimension: int) -> str:
metadata_proto_tpl = """
/**
* @Indexed
*/
message %s {
/**
* @Vector(dimension=%d)
*/
repeated float %s = 1;
"""
metadata_proto = metadata_proto_tpl % (
self._entity_name,
dimension,
self._vectorfield,
)
idx = 2
for f, v in templ.items():
if isinstance(v, str):
metadata_proto += "optional string " + f + " = " + str(idx) + ";\n"
elif isinstance(v, int):
metadata_proto += "optional int64 " + f + " = " + str(idx) + ";\n"
elif isinstance(v, float):
metadata_proto += "optional double " + f + " = " + str(idx) + ";\n"
elif isinstance(v, bytes):
metadata_proto += "optional bytes " + f + " = " + str(idx) + ";\n"
elif isinstance(v, bool):
metadata_proto += "optional bool " + f + " = " + str(idx) + ";\n"
else:
raise Exception(
"Unable to build proto schema for metadata. "
"Unhandled type for field: " + f
)
idx += 1
metadata_proto += "}\n"
return metadata_proto
[docs] def schema_create(self, proto: str) -> requests.Response:
"""部署向量数据库的模式
参数:
proto(str): protobuf模式
返回:
包含操作结果的http响应
"""
return self.ispn.schema_post(self._entity_name + ".proto", proto)
[docs] def schema_delete(self) -> requests.Response:
"""删除向量数据库的模式
返回:
包含操作结果的http响应
"""
return self.ispn.schema_delete(self._entity_name + ".proto")
[docs] def cache_create(self, config: str = "") -> requests.Response:
"""创建用于向量数据库的缓存
参数:
config(str): 缓存的配置
返回:
包含操作结果的http响应
"""
if config == "":
config = (
'''
{
"distributed-cache": {
"owners": "2",
"mode": "SYNC",
"statistics": true,
"encoding": {
"media-type": "application/x-protostream"
},
"indexing": {
"enabled": true,
"storage": "filesystem",
"startup-mode": "AUTO",
"indexing-mode": "AUTO",
"indexed-entities": [
"'''
+ self._entity_name
+ """"
]
}
}
}
"""
)
return self.ispn.cache_post(self._cache_name, config)
[docs] def cache_delete(self) -> requests.Response:
"""删除向量数据库的缓存
返回:
包含操作结果的http响应
"""
return self.ispn.cache_delete(self._cache_name)
[docs] def cache_clear(self) -> requests.Response:
"""清除向量数据库的缓存
返回:
包含操作结果的http响应
"""
return self.ispn.cache_clear(self._cache_name)
[docs] def cache_exists(self) -> bool:
"""检查缓存是否存在
返回:
如果存在则为true
"""
return self.ispn.cache_exists(self._cache_name)
[docs] def cache_index_clear(self) -> requests.Response:
"""清除向量数据库的索引
返回:
包含操作结果的http响应
"""
return self.ispn.index_clear(self._cache_name)
[docs] def cache_index_reindex(self) -> requests.Response:
"""重新构建用于向量数据库的for循环
返回:
包含操作结果的http响应
"""
return self.ispn.index_reindex(self._cache_name)
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
last_vector: Optional[List[float]] = None,
**kwargs: Any,
) -> List[str]:
result = []
texts_l = list(texts)
if last_vector:
texts_l.pop()
embeds = self._embedding.embed_documents(texts_l) # type: ignore
if last_vector:
embeds.append(last_vector)
if not metadatas:
metadatas = [{} for _ in texts]
ids = self._ids or [str(uuid.uuid4()) for _ in texts]
data_input = list(zip(metadatas, embeds, ids))
for metadata, embed, key in data_input:
data = {"_type": self._entity_name, self._vectorfield: embed}
data.update(metadata)
data_str = json.dumps(data)
self.ispn.put(key, data_str, self._cache_name)
result.append(key)
return result
[docs] def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
"""返回与查询最相似的文档。"""
documents = self.similarity_search_with_score(query=query, k=k)
return [doc for doc, _ in documents]
[docs] def similarity_search_with_score(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Tuple[Document, float]]:
"""对查询字符串执行搜索,并返回带有分数的结果。
参数:
query(str):要搜索的文本。
k(int,可选):要返回的结果数量。默认为4。
返回:
List[Tuple[Document, float]]
"""
embed = self._embedding.embed_query(query) # type: ignore
documents = self.similarity_search_with_score_by_vector(embedding=embed, k=k)
return documents
[docs] def similarity_search_by_vector(
self, embedding: List[float], k: int = 4, **kwargs: Any
) -> List[Document]:
res = self.similarity_search_with_score_by_vector(embedding, k)
return [doc for doc, _ in res]
[docs] def similarity_search_with_score_by_vector(
self, embedding: List[float], k: int = 4
) -> List[Tuple[Document, float]]:
"""返回与嵌入向量最相似的文档。
参数:
embedding:要查找与之相似的文档的嵌入。
k:要返回的文档数量。默认为4。
返回:
与查询向量最相似的文档对(文档,分数)的列表。
"""
if self._output_fields is None:
query_str = (
"select v, score(v) from "
+ self._entity_name
+ " v where v."
+ self._vectorfield
+ " <-> "
+ json.dumps(embedding)
+ "~"
+ str(k)
)
else:
query_proj = "select "
for field in self._output_fields[:-1]:
query_proj = query_proj + "v." + field + ","
query_proj = query_proj + "v." + self._output_fields[-1]
query_str = (
query_proj
+ ", score(v) from "
+ self._entity_name
+ " v where v."
+ self._vectorfield
+ " <-> "
+ json.dumps(embedding)
+ "~"
+ str(k)
)
query_res = self.ispn.req_query(query_str, self._cache_name)
result = json.loads(query_res.text)
return self._query_result_to_docs(result)
def _query_result_to_docs(
self, result: dict[str, Any]
) -> List[Tuple[Document, float]]:
documents = []
for row in result["hits"]:
hit = row["hit"] or {}
if self._output_fields is None:
entity = hit["*"]
else:
entity = {key: hit.get(key) for key in self._output_fields}
doc = Document(
page_content=self._to_content(entity),
metadata=self._to_metadata(entity),
)
documents.append((doc, hit["score()"]))
return documents
[docs] def config_clear(self) -> None:
self.schema_delete()
self.cache_delete()
[docs] @classmethod
def from_texts(
cls: Type[InfinispanVS],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
clear_old: Optional[bool] = True,
auto_config: Optional[bool] = True,
**kwargs: Any,
) -> InfinispanVS:
"""返回从文本和嵌入初始化的VectorStore。"""
infinispanvs = cls(embedding=embedding, ids=ids, **kwargs)
if auto_config and len(metadatas or []) > 0:
if clear_old:
infinispanvs.config_clear()
vec = embedding.embed_query(texts[len(texts) - 1])
metadatas = cast(List[dict], metadatas)
infinispanvs.configure(metadatas[0], len(vec))
else:
if clear_old:
infinispanvs.cache_clear()
vec = embedding.embed_query(texts[len(texts) - 1])
if texts:
infinispanvs.add_texts(texts, metadatas, vector=vec)
return infinispanvs
REST_TIMEOUT = 10
[docs]class Infinispan:
"""用于`Infinispan` REST接口的辅助类。
该类公开了创建和设置向量数据库所需的Infinispan操作。
您需要一个运行中的Infinispan(15+)服务器,且无需身份验证。
您可以轻松启动一个,请参见:
https://github.com/rigazilla/infinispan-vector#run-infinispan"""
[docs] def __init__(self, **kwargs: Any):
self._configuration = kwargs
self._schema = str(self._configuration.get("schema", "http"))
self._host = str(self._configuration.get("hosts", ["127.0.0.1:11222"])[0])
self._default_node = self._schema + "://" + self._host
self._cache_url = str(self._configuration.get("cache_url", "/rest/v2/caches"))
self._schema_url = str(self._configuration.get("cache_url", "/rest/v2/schemas"))
self._use_post_for_query = str(
self._configuration.get("use_post_for_query", True)
)
[docs] def req_query(
self, query: str, cache_name: str, local: bool = False
) -> requests.Response:
"""请求一个查询
参数:
query(str): 请求的查询
cache_name(str): 目标缓存的名称
local(boolean): 查询是否是本地集群的
返回:
包含结果集或错误的http响应
"""
if self._use_post_for_query:
return self._query_post(query, cache_name, local)
return self._query_get(query, cache_name, local)
def _query_post(
self, query_str: str, cache_name: str, local: bool = False
) -> requests.Response:
api_url = (
self._default_node
+ self._cache_url
+ "/"
+ cache_name
+ "?action=search&local="
+ str(local)
)
data = {"query": query_str}
data_json = json.dumps(data)
response = requests.post(
api_url,
data_json,
headers={"Content-Type": "application/json"},
timeout=REST_TIMEOUT,
)
return response
def _query_get(
self, query_str: str, cache_name: str, local: bool = False
) -> requests.Response:
api_url = (
self._default_node
+ self._cache_url
+ "/"
+ cache_name
+ "?action=search&query="
+ query_str
+ "&local="
+ str(local)
)
response = requests.get(api_url, timeout=REST_TIMEOUT)
return response
[docs] def post(self, key: str, data: str, cache_name: str) -> requests.Response:
"""发布一个条目
参数:
key(str): 条目的键
data(str): 以json格式表示的条目内容
cache_name(str): 目标缓存
返回:
包含操作结果的http响应
"""
api_url = self._default_node + self._cache_url + "/" + cache_name + "/" + key
response = requests.post(
api_url,
data,
headers={"Content-Type": "application/json"},
timeout=REST_TIMEOUT,
)
return response
[docs] def put(self, key: str, data: str, cache_name: str) -> requests.Response:
"""放置一个条目
参数:
key(str): 条目的键
data(str): 以json格式的条目内容
cache_name(str): 目标缓存
返回:
包含操作结果的http响应
"""
api_url = self._default_node + self._cache_url + "/" + cache_name + "/" + key
response = requests.put(
api_url,
data,
headers={"Content-Type": "application/json"},
timeout=REST_TIMEOUT,
)
return response
[docs] def get(self, key: str, cache_name: str) -> requests.Response:
"""获取一个条目
参数:
key(str): 条目的键
cache_name(str): 目标缓存
返回:
包含条目或错误的http响应
"""
api_url = self._default_node + self._cache_url + "/" + cache_name + "/" + key
response = requests.get(
api_url, headers={"Content-Type": "application/json"}, timeout=REST_TIMEOUT
)
return response
[docs] def schema_post(self, name: str, proto: str) -> requests.Response:
"""部署模式
参数:
name(str): 模式的名称。将用作键
proto(str): protobuf 模式
返回:
包含操作结果的 http 响应
"""
api_url = self._default_node + self._schema_url + "/" + name
response = requests.post(api_url, proto, timeout=REST_TIMEOUT)
return response
[docs] def cache_post(self, name: str, config: str) -> requests.Response:
"""创建一个缓存
参数:
name(str): 缓存的名称。
config(str): 缓存的配置。
返回:
包含操作结果的http响应。
"""
api_url = self._default_node + self._cache_url + "/" + name
response = requests.post(
api_url,
config,
headers={"Content-Type": "application/json"},
timeout=REST_TIMEOUT,
)
return response
[docs] def schema_delete(self, name: str) -> requests.Response:
"""删除一个模式
参数:
name(str): 模式的名称。
返回:
包含操作结果的http响应。
"""
api_url = self._default_node + self._schema_url + "/" + name
response = requests.delete(api_url, timeout=REST_TIMEOUT)
return response
[docs] def cache_delete(self, name: str) -> requests.Response:
"""删除缓存
参数:
name(str): 缓存的名称。
返回:
包含操作结果的http响应。
"""
api_url = self._default_node + self._cache_url + "/" + name
response = requests.delete(api_url, timeout=REST_TIMEOUT)
return response
[docs] def cache_clear(self, cache_name: str) -> requests.Response:
"""清除缓存
参数:
cache_name(str): 缓存的名称。
返回:
包含操作结果的http响应。
"""
api_url = (
self._default_node + self._cache_url + "/" + cache_name + "?action=clear"
)
response = requests.post(api_url, timeout=REST_TIMEOUT)
return response
[docs] def cache_exists(self, cache_name: str) -> bool:
"""检查缓存是否存在
参数:
cache_name(str): 缓存的名称。
返回:
如果缓存存在则返回True
"""
api_url = (
self._default_node + self._cache_url + "/" + cache_name + "?action=clear"
)
return self.resource_exists(api_url)
[docs] @staticmethod
def resource_exists(api_url: str) -> bool:
"""检查资源是否存在
参数:
api_url(str): 资源的url。
返回:
如果资源存在则返回true
"""
response = requests.head(api_url, timeout=REST_TIMEOUT)
return response.ok
[docs] def index_clear(self, cache_name: str) -> requests.Response:
"""清除缓存中的索引
参数:
cache_name(str): 缓存的名称。
返回:
包含操作结果的http响应
"""
api_url = (
self._default_node
+ self._cache_url
+ "/"
+ cache_name
+ "/search/indexes?action=clear"
)
return requests.post(api_url, timeout=REST_TIMEOUT)
[docs] def index_reindex(self, cache_name: str) -> requests.Response:
"""重新构建缓存上的索引
参数:
cache_name(str): 缓存的名称。
返回:
包含操作结果的http响应
"""
api_url = (
self._default_node
+ self._cache_url
+ "/"
+ cache_name
+ "/search/indexes?action=reindex"
)
return requests.post(api_url, timeout=REST_TIMEOUT)