Source code for langchain_community.vectorstores.infinispanvs

"""提供Infinispan作为VectorStore的模块。"""

from __future__ import annotations

import json
import logging
import uuid
from typing import Any, Iterable, List, Optional, Tuple, Type, cast

import requests
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

logger = logging.getLogger(__name__)


[docs]class InfinispanVS(VectorStore): """``Infinispan` VectorStore接口。 该类公开了将Infinispan呈现为VectorStore的方法。它依赖于下面的Infinispan类,该类负责与服务器进行REST接口交互。 示例: ... code-block:: python from langchain_community.vectorstores import InfinispanVS from mymodels import RGBEmbeddings ... vectorDb = InfinispanVS.from_documents(docs, embedding=RGBEmbeddings(), output_fields=["texture", "color"], lambda_key=lambda text,meta: str(meta["_key"]), lambda_content=lambda item: item["color"]) 或者,如果需要在填充存储之前进行初步设置,则可以创建一个空的InfinispanVS实例 ... code-block:: python from langchain_community.vectorstores import InfinispanVS from mymodels import RGBEmbeddings ... ispnVS = InfinispanVS() # 在这里配置Infinispan # 例如创建缓存和模式 # 然后填充存储 vectorDb = InfinispanVS.from_documents(docs, embedding=RGBEmbeddings(), output_fields: ["texture", "color"], lambda_key: lambda text,meta: str(meta["_key"]), lambda_content: lambda item: item["color"]}) ```"""
[docs] def __init__( self, embedding: Optional[Embeddings] = None, ids: Optional[List[str]] = None, **kwargs: Any, ): self.ispn = Infinispan(**kwargs) self._configuration = kwargs self._cache_name = str(self._configuration.get("cache_name", "vector")) self._entity_name = str(self._configuration.get("entity_name", "vector")) self._embedding = embedding self._textfield = self._configuration.get("textfield", "text") self._vectorfield = self._configuration.get("vectorfield", "vector") self._to_content = self._configuration.get( "lambda_content", lambda item: self._default_content(item) ) self._to_metadata = self._configuration.get( "lambda_metadata", lambda item: self._default_metadata(item) ) self._output_fields = self._configuration.get("output_fields") self._ids = ids
def _default_metadata(self, item: dict) -> dict: meta = dict(item) meta.pop(self._vectorfield, None) meta.pop(self._textfield, None) meta.pop("_type", None) return meta def _default_content(self, item: dict[str, Any]) -> Any: return item.get(self._textfield)
[docs] def schema_builder(self, templ: dict, dimension: int) -> str: metadata_proto_tpl = """ /** * @Indexed */ message %s { /** * @Vector(dimension=%d) */ repeated float %s = 1; """ metadata_proto = metadata_proto_tpl % ( self._entity_name, dimension, self._vectorfield, ) idx = 2 for f, v in templ.items(): if isinstance(v, str): metadata_proto += "optional string " + f + " = " + str(idx) + ";\n" elif isinstance(v, int): metadata_proto += "optional int64 " + f + " = " + str(idx) + ";\n" elif isinstance(v, float): metadata_proto += "optional double " + f + " = " + str(idx) + ";\n" elif isinstance(v, bytes): metadata_proto += "optional bytes " + f + " = " + str(idx) + ";\n" elif isinstance(v, bool): metadata_proto += "optional bool " + f + " = " + str(idx) + ";\n" else: raise Exception( "Unable to build proto schema for metadata. " "Unhandled type for field: " + f ) idx += 1 metadata_proto += "}\n" return metadata_proto
[docs] def schema_create(self, proto: str) -> requests.Response: """部署向量数据库的模式 参数: proto(str): protobuf模式 返回: 包含操作结果的http响应 """ return self.ispn.schema_post(self._entity_name + ".proto", proto)
[docs] def schema_delete(self) -> requests.Response: """删除向量数据库的模式 返回: 包含操作结果的http响应 """ return self.ispn.schema_delete(self._entity_name + ".proto")
[docs] def cache_create(self, config: str = "") -> requests.Response: """创建用于向量数据库的缓存 参数: config(str): 缓存的配置 返回: 包含操作结果的http响应 """ if config == "": config = ( ''' { "distributed-cache": { "owners": "2", "mode": "SYNC", "statistics": true, "encoding": { "media-type": "application/x-protostream" }, "indexing": { "enabled": true, "storage": "filesystem", "startup-mode": "AUTO", "indexing-mode": "AUTO", "indexed-entities": [ "''' + self._entity_name + """" ] } } } """ ) return self.ispn.cache_post(self._cache_name, config)
[docs] def cache_delete(self) -> requests.Response: """删除向量数据库的缓存 返回: 包含操作结果的http响应 """ return self.ispn.cache_delete(self._cache_name)
[docs] def cache_clear(self) -> requests.Response: """清除向量数据库的缓存 返回: 包含操作结果的http响应 """ return self.ispn.cache_clear(self._cache_name)
[docs] def cache_exists(self) -> bool: """检查缓存是否存在 返回: 如果存在则为true """ return self.ispn.cache_exists(self._cache_name)
[docs] def cache_index_clear(self) -> requests.Response: """清除向量数据库的索引 返回: 包含操作结果的http响应 """ return self.ispn.index_clear(self._cache_name)
[docs] def cache_index_reindex(self) -> requests.Response: """重新构建用于向量数据库的for循环 返回: 包含操作结果的http响应 """ return self.ispn.index_reindex(self._cache_name)
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, last_vector: Optional[List[float]] = None, **kwargs: Any, ) -> List[str]: result = [] texts_l = list(texts) if last_vector: texts_l.pop() embeds = self._embedding.embed_documents(texts_l) # type: ignore if last_vector: embeds.append(last_vector) if not metadatas: metadatas = [{} for _ in texts] ids = self._ids or [str(uuid.uuid4()) for _ in texts] data_input = list(zip(metadatas, embeds, ids)) for metadata, embed, key in data_input: data = {"_type": self._entity_name, self._vectorfield: embed} data.update(metadata) data_str = json.dumps(data) self.ispn.put(key, data_str, self._cache_name) result.append(key) return result
[docs] def similarity_search_with_score( self, query: str, k: int = 4, **kwargs: Any ) -> List[Tuple[Document, float]]: """对查询字符串执行搜索,并返回带有分数的结果。 参数: query(str):要搜索的文本。 k(int,可选):要返回的结果数量。默认为4。 返回: List[Tuple[Document, float]] """ embed = self._embedding.embed_query(query) # type: ignore documents = self.similarity_search_with_score_by_vector(embedding=embed, k=k) return documents
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, **kwargs: Any ) -> List[Document]: res = self.similarity_search_with_score_by_vector(embedding, k) return [doc for doc, _ in res]
[docs] def similarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4 ) -> List[Tuple[Document, float]]: """返回与嵌入向量最相似的文档。 参数: embedding:要查找与之相似的文档的嵌入。 k:要返回的文档数量。默认为4。 返回: 与查询向量最相似的文档对(文档,分数)的列表。 """ if self._output_fields is None: query_str = ( "select v, score(v) from " + self._entity_name + " v where v." + self._vectorfield + " <-> " + json.dumps(embedding) + "~" + str(k) ) else: query_proj = "select " for field in self._output_fields[:-1]: query_proj = query_proj + "v." + field + "," query_proj = query_proj + "v." + self._output_fields[-1] query_str = ( query_proj + ", score(v) from " + self._entity_name + " v where v." + self._vectorfield + " <-> " + json.dumps(embedding) + "~" + str(k) ) query_res = self.ispn.req_query(query_str, self._cache_name) result = json.loads(query_res.text) return self._query_result_to_docs(result)
def _query_result_to_docs( self, result: dict[str, Any] ) -> List[Tuple[Document, float]]: documents = [] for row in result["hits"]: hit = row["hit"] or {} if self._output_fields is None: entity = hit["*"] else: entity = {key: hit.get(key) for key in self._output_fields} doc = Document( page_content=self._to_content(entity), metadata=self._to_metadata(entity), ) documents.append((doc, hit["score()"])) return documents
[docs] def configure(self, metadata: dict, dimension: int) -> None: schema = self.schema_builder(metadata, dimension) output = self.schema_create(schema) assert output.ok, "Unable to create schema. Already exists? " "Consider using clear_old=True" assert json.loads(output.text)["error"] is None if not self.cache_exists(): output = self.cache_create() assert output.ok, "Unable to create cache. Already exists? " "Consider using clear_old=True" # Ensure index is clean self.cache_index_clear()
[docs] def config_clear(self) -> None: self.schema_delete() self.cache_delete()
[docs] @classmethod def from_texts( cls: Type[InfinispanVS], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, clear_old: Optional[bool] = True, auto_config: Optional[bool] = True, **kwargs: Any, ) -> InfinispanVS: """返回从文本和嵌入初始化的VectorStore。""" infinispanvs = cls(embedding=embedding, ids=ids, **kwargs) if auto_config and len(metadatas or []) > 0: if clear_old: infinispanvs.config_clear() vec = embedding.embed_query(texts[len(texts) - 1]) metadatas = cast(List[dict], metadatas) infinispanvs.configure(metadatas[0], len(vec)) else: if clear_old: infinispanvs.cache_clear() vec = embedding.embed_query(texts[len(texts) - 1]) if texts: infinispanvs.add_texts(texts, metadatas, vector=vec) return infinispanvs
REST_TIMEOUT = 10
[docs]class Infinispan: """用于`Infinispan` REST接口的辅助类。 该类公开了创建和设置向量数据库所需的Infinispan操作。 您需要一个运行中的Infinispan(15+)服务器,且无需身份验证。 您可以轻松启动一个,请参见: https://github.com/rigazilla/infinispan-vector#run-infinispan"""
[docs] def __init__(self, **kwargs: Any): self._configuration = kwargs self._schema = str(self._configuration.get("schema", "http")) self._host = str(self._configuration.get("hosts", ["127.0.0.1:11222"])[0]) self._default_node = self._schema + "://" + self._host self._cache_url = str(self._configuration.get("cache_url", "/rest/v2/caches")) self._schema_url = str(self._configuration.get("cache_url", "/rest/v2/schemas")) self._use_post_for_query = str( self._configuration.get("use_post_for_query", True) )
[docs] def req_query( self, query: str, cache_name: str, local: bool = False ) -> requests.Response: """请求一个查询 参数: query(str): 请求的查询 cache_name(str): 目标缓存的名称 local(boolean): 查询是否是本地集群的 返回: 包含结果集或错误的http响应 """ if self._use_post_for_query: return self._query_post(query, cache_name, local) return self._query_get(query, cache_name, local)
def _query_post( self, query_str: str, cache_name: str, local: bool = False ) -> requests.Response: api_url = ( self._default_node + self._cache_url + "/" + cache_name + "?action=search&local=" + str(local) ) data = {"query": query_str} data_json = json.dumps(data) response = requests.post( api_url, data_json, headers={"Content-Type": "application/json"}, timeout=REST_TIMEOUT, ) return response def _query_get( self, query_str: str, cache_name: str, local: bool = False ) -> requests.Response: api_url = ( self._default_node + self._cache_url + "/" + cache_name + "?action=search&query=" + query_str + "&local=" + str(local) ) response = requests.get(api_url, timeout=REST_TIMEOUT) return response
[docs] def post(self, key: str, data: str, cache_name: str) -> requests.Response: """发布一个条目 参数: key(str): 条目的键 data(str): 以json格式表示的条目内容 cache_name(str): 目标缓存 返回: 包含操作结果的http响应 """ api_url = self._default_node + self._cache_url + "/" + cache_name + "/" + key response = requests.post( api_url, data, headers={"Content-Type": "application/json"}, timeout=REST_TIMEOUT, ) return response
[docs] def put(self, key: str, data: str, cache_name: str) -> requests.Response: """放置一个条目 参数: key(str): 条目的键 data(str): 以json格式的条目内容 cache_name(str): 目标缓存 返回: 包含操作结果的http响应 """ api_url = self._default_node + self._cache_url + "/" + cache_name + "/" + key response = requests.put( api_url, data, headers={"Content-Type": "application/json"}, timeout=REST_TIMEOUT, ) return response
[docs] def get(self, key: str, cache_name: str) -> requests.Response: """获取一个条目 参数: key(str): 条目的键 cache_name(str): 目标缓存 返回: 包含条目或错误的http响应 """ api_url = self._default_node + self._cache_url + "/" + cache_name + "/" + key response = requests.get( api_url, headers={"Content-Type": "application/json"}, timeout=REST_TIMEOUT ) return response
[docs] def schema_post(self, name: str, proto: str) -> requests.Response: """部署模式 参数: name(str): 模式的名称。将用作键 proto(str): protobuf 模式 返回: 包含操作结果的 http 响应 """ api_url = self._default_node + self._schema_url + "/" + name response = requests.post(api_url, proto, timeout=REST_TIMEOUT) return response
[docs] def cache_post(self, name: str, config: str) -> requests.Response: """创建一个缓存 参数: name(str): 缓存的名称。 config(str): 缓存的配置。 返回: 包含操作结果的http响应。 """ api_url = self._default_node + self._cache_url + "/" + name response = requests.post( api_url, config, headers={"Content-Type": "application/json"}, timeout=REST_TIMEOUT, ) return response
[docs] def schema_delete(self, name: str) -> requests.Response: """删除一个模式 参数: name(str): 模式的名称。 返回: 包含操作结果的http响应。 """ api_url = self._default_node + self._schema_url + "/" + name response = requests.delete(api_url, timeout=REST_TIMEOUT) return response
[docs] def cache_delete(self, name: str) -> requests.Response: """删除缓存 参数: name(str): 缓存的名称。 返回: 包含操作结果的http响应。 """ api_url = self._default_node + self._cache_url + "/" + name response = requests.delete(api_url, timeout=REST_TIMEOUT) return response
[docs] def cache_clear(self, cache_name: str) -> requests.Response: """清除缓存 参数: cache_name(str): 缓存的名称。 返回: 包含操作结果的http响应。 """ api_url = ( self._default_node + self._cache_url + "/" + cache_name + "?action=clear" ) response = requests.post(api_url, timeout=REST_TIMEOUT) return response
[docs] def cache_exists(self, cache_name: str) -> bool: """检查缓存是否存在 参数: cache_name(str): 缓存的名称。 返回: 如果缓存存在则返回True """ api_url = ( self._default_node + self._cache_url + "/" + cache_name + "?action=clear" ) return self.resource_exists(api_url)
[docs] @staticmethod def resource_exists(api_url: str) -> bool: """检查资源是否存在 参数: api_url(str): 资源的url。 返回: 如果资源存在则返回true """ response = requests.head(api_url, timeout=REST_TIMEOUT) return response.ok
[docs] def index_clear(self, cache_name: str) -> requests.Response: """清除缓存中的索引 参数: cache_name(str): 缓存的名称。 返回: 包含操作结果的http响应 """ api_url = ( self._default_node + self._cache_url + "/" + cache_name + "/search/indexes?action=clear" ) return requests.post(api_url, timeout=REST_TIMEOUT)
[docs] def index_reindex(self, cache_name: str) -> requests.Response: """重新构建缓存上的索引 参数: cache_name(str): 缓存的名称。 返回: 包含操作结果的http响应 """ api_url = ( self._default_node + self._cache_url + "/" + cache_name + "/search/indexes?action=reindex" ) return requests.post(api_url, timeout=REST_TIMEOUT)