Source code for langchain_community.vectorstores.clickhouse

from __future__ import annotations

import json
import logging
from hashlib import sha1
from threading import Thread
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union

from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import BaseSettings
from langchain_core.vectorstores import VectorStore

logger = logging.getLogger()


[docs]def has_mul_sub_str(s: str, *args: Any) -> bool: """检查字符串是否包含多个子字符串。 参数: s:要检查的字符串。 *args:要检查的子字符串。 返回: 如果所有子字符串都在字符串中,则返回True,否则返回False。 """ for a in args: if a not in s: return False return True
[docs]class ClickhouseSettings(BaseSettings): """`ClickHouse`客户端配置。 属性: host (str) : 连接到MyScale后端的URL。 默认为'localhost'。 port (int) : 用HTTP连接的URL端口。默认为8443。 username (str) : 登录的用户名。默认为None。 password (str) : 登录的密码。默认为None。 index_type (str): 索引类型字符串。 index_param (list): 索引构建参数。 index_query_params(dict): 索引查询参数。 database (str) : 查找表的数据库名称。默认为'default'。 table (str) : 操作的表名。 默认为'vector_table'。 metric (str) : 计算距离的度量标准, 支持的有('angular', 'euclidean', 'manhattan', 'hamming', 'dot')。默认为'angular'。 https://github.com/spotify/annoy/blob/main/src/annoymodule.cc#L149-L169 column_map (Dict) : 列类型映射,将列名投影到langchain语义上。 必须具有键: `text`, `id`, `vector`, 必须与列数相同。例如: .. code-block:: python { 'id': 'text_id', 'uuid': 'global_unique_id' 'embedding': 'text_embedding', 'document': 'text_plain', 'metadata': 'metadata_dictionary_in_json', } 默认为身份映射。""" host: str = "localhost" port: int = 8123 username: Optional[str] = None password: Optional[str] = None index_type: Optional[str] = "annoy" # Annoy supports L2Distance and cosineDistance. index_param: Optional[Union[List, Dict]] = ["'L2Distance'", 100] index_query_params: Dict[str, str] = {} column_map: Dict[str, str] = { "id": "id", "uuid": "uuid", "document": "document", "embedding": "embedding", "metadata": "metadata", } database: str = "default" table: str = "langchain" metric: str = "angular" def __getitem__(self, item: str) -> Any: return getattr(self, item) class Config: env_file = ".env" env_prefix = "clickhouse_" env_file_encoding = "utf-8"
[docs]class Clickhouse(VectorStore): """`ClickHouse VectorSearch` 向量存储。 您需要 `clickhouse-connect` python 包和有效的账户来连接到 ClickHouse。 ClickHouse 不仅可以使用简单的向量索引进行搜索, 还支持具有多个条件、约束甚至子查询的复杂查询。 欲了解更多信息,请访问 [ClickHouse 官方网站](https://clickhouse.com/clickhouse)"""
[docs] def __init__( self, embedding: Embeddings, config: Optional[ClickhouseSettings] = None, **kwargs: Any, ) -> None: """ClickHouse包装器到LangChain embedding_function (嵌入函数): config (ClickHouse设置): ClickHouse客户端的配置 其他关键字参数将传递到 [clickhouse-connect](https://docs.clickhouse.com/) """ try: from clickhouse_connect import get_client except ImportError: raise ImportError( "Could not import clickhouse connect python package. " "Please install it with `pip install clickhouse-connect`." ) try: from tqdm import tqdm self.pgbar = tqdm except ImportError: # Just in case if tqdm is not installed self.pgbar = lambda x, **kwargs: x super().__init__() if config is not None: self.config = config else: self.config = ClickhouseSettings() assert self.config assert self.config.host and self.config.port assert ( self.config.column_map and self.config.database and self.config.table and self.config.metric ) for k in ["id", "embedding", "document", "metadata", "uuid"]: assert k in self.config.column_map assert self.config.metric in [ "angular", "euclidean", "manhattan", "hamming", "dot", ] # initialize the schema dim = len(embedding.embed_query("test")) index_params = ( ( ",".join([f"'{k}={v}'" for k, v in self.config.index_param.items()]) if self.config.index_param else "" ) if isinstance(self.config.index_param, Dict) else ( ",".join([str(p) for p in self.config.index_param]) if isinstance(self.config.index_param, List) else self.config.index_param ) ) self.schema = self._schema(dim, index_params) self.dim = dim self.BS = "\\" self.must_escape = ("\\", "'") self.embedding_function = embedding self.dist_order = "ASC" # Only support ConsingDistance and L2Distance # Create a connection to clickhouse self.client = get_client( host=self.config.host, port=self.config.port, username=self.config.username, password=self.config.password, **kwargs, ) # Enable JSON type self.client.command("SET allow_experimental_object_type=1") if self.config.index_type: # Enable index self.client.command( f"SET allow_experimental_{self.config.index_type}_index=1" ) self.client.command(self.schema)
def _schema(self, dim: int, index_params: Optional[str] = "") -> str: """创建表模式 :param dim: 嵌入的维度 :param index_params: 用于索引的参数 此函数根据`self.config.index_type`的值返回一个`CREATE TABLE`语句。 如果指定了索引类型,那么将创建该索引,否则不会创建索引。 如果没有索引,当查询嵌入字段时将执行线性扫描。 """ if self.config.index_type: return f"""\ CREATE TABLE IF NOT EXISTS {self.config.database}.{self.config.table}( {self.config.column_map['id']} Nullable(String), {self.config.column_map['document']} Nullable(String), {self.config.column_map['embedding']} Array(Float32), {self.config.column_map['metadata']} JSON, {self.config.column_map['uuid']} UUID DEFAULT generateUUIDv4(), CONSTRAINT cons_vec_len CHECK length( {self.config.column_map['embedding']}) = {dim}, INDEX vec_idx {self.config.column_map['embedding']} TYPE \ {self.config.index_type}({index_params}) GRANULARITY 1000 ) ENGINE = MergeTree ORDER BY uuid SETTINGS index_granularity = 8192\ """ else: return f"""\ CREATE TABLE IF NOT EXISTS {self.config.database}.{self.config.table}( {self.config.column_map['id']} Nullable(String), {self.config.column_map['document']} Nullable(String), {self.config.column_map['embedding']} Array(Float32), {self.config.column_map['metadata']} JSON, {self.config.column_map['uuid']} UUID DEFAULT generateUUIDv4(), CONSTRAINT cons_vec_len CHECK length({ self.config.column_map['embedding']}) = {dim} ) ENGINE = MergeTree ORDER BY uuid """ @property def embeddings(self) -> Embeddings: """提供对Clickhouse实例使用的嵌入机制的访问。 此属性允许直接访问Clickhouse实例正在使用的嵌入函数或模型,将文本文档转换为嵌入向量,用于向量相似性搜索。 返回: 与此Clickhouse实例关联的`Embeddings`实例。 """ return self.embedding_function
[docs] def escape_str(self, value: str) -> str: """在Clickhouse SQL查询中转义字符串中的特殊字符。 此方法用于内部使用,用于准备字符串以安全地插入SQL查询中,通过转义可能会干扰查询语法的特殊字符。 参数: value:要转义的字符串。 返回: 转义后的字符串,可安全插入SQL查询。 """ return "".join(f"{self.BS}{c}" if c in self.must_escape else c for c in value)
def _build_insert_sql(self, transac: Iterable, column_names: Iterable[str]) -> str: """构建一个用于将数据插入Clickhouse数据库的SQL查询。 该方法使用提供的事务数据和列名格式化和构建SQL `INSERT`查询字符串。在批量将文档及其嵌入插入数据库的过程中,在内部使用该方法。 参数: transac: 元组的可迭代对象,表示要插入的数据行。 column_names: 字符串的可迭代对象,表示要插入数据的列名。 返回: 包含构建的SQL `INSERT`查询的字符串。 """ ks = ",".join(column_names) _data = [] for n in transac: n = ",".join([f"'{self.escape_str(str(_n))}'" for _n in n]) _data.append(f"({n})") i_str = f""" INSERT INTO TABLE {self.config.database}.{self.config.table}({ks}) VALUES {','.join(_data)} """ return i_str def _insert(self, transac: Iterable, column_names: Iterable[str]) -> None: """执行一个SQL查询,将数据插入Clickhouse数据库。 该方法通过执行`_build_insert_sql`构建的SQL查询来实际插入数据到数据库中。这是向向量存储中添加新文档及其关联数据的关键步骤。 参数: transac:元组的可迭代对象,表示要插入的数据行。 column_names: 一个字符串的可迭代对象,表示将插入数据的列的名称。 """ _insert_query = self._build_insert_sql(transac, column_names) self.client.command(_insert_query)
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, batch_size: int = 32, ids: Optional[Iterable[str]] = None, **kwargs: Any, ) -> List[str]: """通过嵌入插入更多文本,并添加到VectorStore中。 参数: texts:要添加到VectorStore中的字符串的可迭代对象。 ids:要与文本关联的可选id列表。 batch_size:插入的批量大小。 metadata:要插入的可选列数据。 返回: 将文本添加到VectorStore中的id列表。 """ # Embed and create the documents ids = ids or [sha1(t.encode("utf-8")).hexdigest() for t in texts] colmap_ = self.config.column_map transac = [] column_names = { colmap_["id"]: ids, colmap_["document"]: texts, colmap_["embedding"]: self.embedding_function.embed_documents(list(texts)), } metadatas = metadatas or [{} for _ in texts] column_names[colmap_["metadata"]] = map(json.dumps, metadatas) assert len(set(colmap_) - set(column_names)) >= 0 keys, values = zip(*column_names.items()) try: t = None for v in self.pgbar( zip(*values), desc="Inserting data...", total=len(metadatas) ): assert ( len(v[keys.index(self.config.column_map["embedding"])]) == self.dim ) transac.append(v) if len(transac) == batch_size: if t: t.join() t = Thread(target=self._insert, args=[transac, keys]) t.start() transac = [] if len(transac) > 0: if t: t.join() self._insert(transac, keys) return [i for i in ids] except Exception as e: logger.error(f"\033[91m\033[1m{type(e)}\033[0m \033[95m{str(e)}\033[0m") return []
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[Dict[Any, Any]]] = None, config: Optional[ClickhouseSettings] = None, text_ids: Optional[Iterable[str]] = None, batch_size: int = 32, **kwargs: Any, ) -> Clickhouse: """创建具有现有文本的ClickHouse包装器 参数: embedding_function(Embeddings):用于提取文本嵌入的函数 texts(Iterable[str]):要添加的字符串列表或元组 config(ClickHouseSettings,可选):ClickHouse配置 text_ids(Optional[Iterable],可选):文本的ID。 默认为None。 batch_size(int,可选):传输数据到ClickHouse时的批处理大小。 默认为32。 metadata(List[dict],可选):文本的元数据。默认为None。 其他关键字参数将传递到 [clickhouse-connect](https://clickhouse.com/docs/en/integrations/python#clickhouse-connect-driver-api) 返回: ClickHouse索引 """ ctx = cls(embedding, config, **kwargs) ctx.add_texts(texts, ids=text_ids, batch_size=batch_size, metadatas=metadatas) return ctx
def __repr__(self) -> str: """用于ClickHouse Vector Store的文本表示,打印后端、用户名和模式。与`str(ClickHouse())`一起使用非常方便。 返回: repr: 字符串,显示连接信息和数据模式 """ _repr = f"\033[92m\033[1m{self.config.database}.{self.config.table} @ " _repr += f"{self.config.host}:{self.config.port}\033[0m\n\n" _repr += f"\033[1musername: {self.config.username}\033[0m\n\nTable Schema:\n" _repr += "-" * 51 + "\n" for r in self.client.query( f"DESC {self.config.database}.{self.config.table}" ).named_results(): _repr += ( f"|\033[94m{r['name']:24s}\033[0m|\033[96m{r['type']:24s}\033[0m|\n" ) _repr += "-" * 51 + "\n" return _repr def _build_query_sql( self, q_emb: List[float], topk: int, where_str: Optional[str] = None ) -> str: """为执行相似性搜索构建一个SQL查询。 这个内部方法生成一个SQL查询,用于在数据库中找到与给定查询向量最相似的前k个向量。它允许通过WHERE子句应用可选的过滤条件。 参数: q_emb: 作为浮点数列表的查询向量。 topk: 要检索的前k个相似项的数量。 where_str: 表示查询的额外WHERE条件的可选字符串 默认为None。 返回: 包含用于相似性搜索的SQL查询的字符串。 """ q_emb_str = ",".join(map(str, q_emb)) if where_str: where_str = f"PREWHERE {where_str}" else: where_str = "" settings_strs = [] if self.config.index_query_params: for k in self.config.index_query_params: settings_strs.append(f"SETTING {k}={self.config.index_query_params[k]}") q_str = f""" SELECT {self.config.column_map['document']}, {self.config.column_map['metadata']}, dist FROM {self.config.database}.{self.config.table} {where_str} ORDER BY L2Distance({self.config.column_map['embedding']}, [{q_emb_str}]) AS dist {self.dist_order} LIMIT {topk} {' '.join(settings_strs)} """ return q_str
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, where_str: Optional[str] = None, **kwargs: Any, ) -> List[Document]: """使用ClickHouse通过向量执行相似性搜索 参数: query (str): 查询字符串 k (int, optional): 要检索的前K个邻居。默认为4。 where_str (Optional[str], optional): where条件字符串。 默认为None。 注意:请不要让最终用户填写此内容,并始终注意SQL注入问题。 处理元数据时,请记住使用`{self.metadata_column}.attribute`而不是仅使用`attribute`。 其默认名称为`metadata`。 返回: List[Document]: 文档列表 """ q_str = self._build_query_sql(embedding, k, where_str) try: return [ Document( page_content=r[self.config.column_map["document"]], metadata=r[self.config.column_map["metadata"]], ) for r in self.client.query(q_str).named_results() ] except Exception as e: logger.error(f"\033[91m\033[1m{type(e)}\033[0m \033[95m{str(e)}\033[0m") return []
[docs] def similarity_search_with_relevance_scores( self, query: str, k: int = 4, where_str: Optional[str] = None, **kwargs: Any ) -> List[Tuple[Document, float]]: """使用ClickHouse进行相似性搜索 参数: query (str): 查询字符串 k (int, optional): 要检索的前K个邻居。默认为4。 where_str (Optional[str], optional): where条件字符串。 默认为None。 注意: 请不要让最终用户填写这个内容,并始终注意SQL注入问题。 处理元数据时,请记住使用`{self.metadata_column}.attribute`而不是仅使用`attribute`。 其默认名称为`metadata`。 返回: List[Document]: (Document, 相似度) 列表 """ q_str = self._build_query_sql( self.embedding_function.embed_query(query), k, where_str ) try: return [ ( Document( page_content=r[self.config.column_map["document"]], metadata=r[self.config.column_map["metadata"]], ), r["dist"], ) for r in self.client.query(q_str).named_results() ] except Exception as e: logger.error(f"\033[91m\033[1m{type(e)}\033[0m \033[95m{str(e)}\033[0m") return []
[docs] def drop(self) -> None: """ 辅助函数:丢弃数据 """ self.client.command( f"DROP TABLE IF EXISTS {self.config.database}.{self.config.table}" )
@property def metadata_column(self) -> str: return self.config.column_map["metadata"]