from __future__ import annotations
import enum
import logging
import os
from hashlib import md5
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
Type,
)
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils import get_from_dict_or_env
from langchain_core.vectorstores import VectorStore
from langchain_community.graphs import Neo4jGraph
from langchain_community.vectorstores.utils import DistanceStrategy
DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.COSINE
DISTANCE_MAPPING = {
DistanceStrategy.EUCLIDEAN_DISTANCE: "euclidean",
DistanceStrategy.COSINE: "cosine",
}
COMPARISONS_TO_NATIVE = {
"$eq": "=",
"$ne": "<>",
"$lt": "<",
"$lte": "<=",
"$gt": ">",
"$gte": ">=",
}
SPECIAL_CASED_OPERATORS = {
"$in",
"$nin",
"$between",
}
TEXT_OPERATORS = {
"$like",
"$ilike",
}
LOGICAL_OPERATORS = {"$and", "$or"}
SUPPORTED_OPERATORS = (
set(COMPARISONS_TO_NATIVE)
.union(TEXT_OPERATORS)
.union(LOGICAL_OPERATORS)
.union(SPECIAL_CASED_OPERATORS)
)
[docs]class SearchType(str, enum.Enum):
"""距离策略的枚举器。"""
VECTOR = "vector"
HYBRID = "hybrid"
DEFAULT_SEARCH_TYPE = SearchType.VECTOR
[docs]class IndexType(str, enum.Enum):
"""索引类型的枚举器。"""
NODE = "NODE"
RELATIONSHIP = "RELATIONSHIP"
DEFAULT_INDEX_TYPE = IndexType.NODE
def _get_search_index_query(
search_type: SearchType, index_type: IndexType = DEFAULT_INDEX_TYPE
) -> str:
if index_type == IndexType.NODE:
type_to_query_map = {
SearchType.VECTOR: (
"CALL db.index.vector.queryNodes($index, $k, $embedding) "
"YIELD node, score "
),
SearchType.HYBRID: (
"CALL { "
"CALL db.index.vector.queryNodes($index, $k, $embedding) "
"YIELD node, score "
"WITH collect({node:node, score:score}) AS nodes, max(score) AS max "
"UNWIND nodes AS n "
# We use 0 as min
"RETURN n.node AS node, (n.score / max) AS score UNION "
"CALL db.index.fulltext.queryNodes($keyword_index, $query, "
"{limit: $k}) YIELD node, score "
"WITH collect({node:node, score:score}) AS nodes, max(score) AS max "
"UNWIND nodes AS n "
# We use 0 as min
"RETURN n.node AS node, (n.score / max) AS score "
"} "
# dedup
"WITH node, max(score) AS score ORDER BY score DESC LIMIT $k "
),
}
return type_to_query_map[search_type]
else:
return (
"CALL db.index.vector.queryRelationships($index, $k, $embedding) "
"YIELD relationship, score "
)
[docs]def check_if_not_null(props: List[str], values: List[Any]) -> None:
"""检查数值是否不是None或空字符串"""
for prop, value in zip(props, values):
if not value:
raise ValueError(f"Parameter `{prop}` must not be None or empty string")
[docs]def sort_by_index_name(
lst: List[Dict[str, Any]], index_name: str
) -> List[Dict[str, Any]]:
"""将第一个元素排序以匹配索引名称(如果存在)"""
return sorted(lst, key=lambda x: x.get("name") != index_name)
[docs]def remove_lucene_chars(text: str) -> str:
"""删除Lucene特殊字符"""
special_chars = [
"+",
"-",
"&",
"|",
"!",
"(",
")",
"{",
"}",
"[",
"]",
"^",
'"',
"~",
"*",
"?",
":",
"\\",
]
for char in special_chars:
if char in text:
text = text.replace(char, " ")
return text.strip()
[docs]def dict_to_yaml_str(input_dict: Dict, indent: int = 0) -> str:
"""将字典转换为类似YAML的字符串,不使用外部库。
参数:
- input_dict (dict): 要转换的字典。
- indent (int): 当前缩进级别。
返回:
- str: 输入字典的类似YAML的字符串表示。
"""
yaml_str = ""
for key, value in input_dict.items():
padding = " " * indent
if isinstance(value, dict):
yaml_str += f"{padding}{key}:\n{dict_to_yaml_str(value, indent + 1)}"
elif isinstance(value, list):
yaml_str += f"{padding}{key}:\n"
for item in value:
yaml_str += f"{padding}- {item}\n"
else:
yaml_str += f"{padding}{key}: {value}\n"
return yaml_str
[docs]def combine_queries(
input_queries: List[Tuple[str, Dict[str, Any]]], operator: str
) -> Tuple[str, Dict[str, Any]]:
"""使用运算符组合多个查询。"""
# Initialize variables to hold the combined query and parameters
combined_query: str = ""
combined_params: Dict = {}
param_counter: Dict = {}
for query, params in input_queries:
# Process each query fragment and its parameters
new_query = query
for param, value in params.items():
# Update the parameter name to ensure uniqueness
if param in param_counter:
param_counter[param] += 1
else:
param_counter[param] = 1
new_param_name = f"{param}_{param_counter[param]}"
# Replace the parameter in the query fragment
new_query = new_query.replace(f"${param}", f"${new_param_name}")
# Add the parameter to the combined parameters dictionary
combined_params[new_param_name] = value
# Combine the query fragments with an AND operator
if combined_query:
combined_query += f" {operator} "
combined_query += f"({new_query})"
return combined_query, combined_params
[docs]def collect_params(
input_data: List[Tuple[str, Dict[str, str]]],
) -> Tuple[List[str], Dict[str, Any]]:
"""将输入数据转换为所需的格式。
参数:
- input_data(元组列表):需要转换的输入数据。
每个元组包含一个字符串和一个字典。
返回:
- 元组:包含一个字符串列表和一个字典的元组。
"""
# Initialize variables to hold the output parts
query_parts = []
params = {}
# Loop through each item in the input data
for query_part, param in input_data:
# Append the query part to the list
query_parts.append(query_part)
# Update the params dictionary with the param dictionary
params.update(param)
# Return the transformed data
return (query_parts, params)
def _handle_field_filter(
field: str, value: Any, param_number: int = 1
) -> Tuple[str, Dict]:
"""为特定字段创建过滤器。
参数:
field:字段的名称
value:要过滤的值
如果直接提供,则这将是一个相等过滤器
如果提供为字典,则这将是一个过滤器,键将是运算符,值将是要过滤的值
param_number:用于在参数字典和Cypher片段之间进行映射的参数序列号
返回一个元组,包括
- Cypher过滤器片段
- 用于过滤器片段中的参数的字典
"""
if not isinstance(field, str):
raise ValueError(
f"field should be a string but got: {type(field)} with value: {field}"
)
if field.startswith("$"):
raise ValueError(
f"Invalid filter condition. Expected a field but got an operator: "
f"{field}"
)
# Allow [a-zA-Z0-9_], disallow $ for now until we support escape characters
if not field.isidentifier():
raise ValueError(f"Invalid field name: {field}. Expected a valid identifier.")
if isinstance(value, dict):
# This is a filter specification
if len(value) != 1:
raise ValueError(
"Invalid filter condition. Expected a value which "
"is a dictionary with a single key that corresponds to an operator "
f"but got a dictionary with {len(value)} keys. The first few "
f"keys are: {list(value.keys())[:3]}"
)
operator, filter_value = list(value.items())[0]
# Verify that that operator is an operator
if operator not in SUPPORTED_OPERATORS:
raise ValueError(
f"Invalid operator: {operator}. "
f"Expected one of {SUPPORTED_OPERATORS}"
)
else: # Then we assume an equality operator
operator = "$eq"
filter_value = value
if operator in COMPARISONS_TO_NATIVE:
# Then we implement an equality filter
# native is trusted input
native = COMPARISONS_TO_NATIVE[operator]
query_snippet = f"n.`{field}` {native} $param_{param_number}"
query_param = {f"param_{param_number}": filter_value}
return (query_snippet, query_param)
elif operator == "$between":
low, high = filter_value
query_snippet = (
f"$param_{param_number}_low <= n.`{field}` <= $param_{param_number}_high"
)
query_param = {
f"param_{param_number}_low": low,
f"param_{param_number}_high": high,
}
return (query_snippet, query_param)
elif operator in {"$in", "$nin", "$like", "$ilike"}:
# We'll do force coercion to text
if operator in {"$in", "$nin"}:
for val in filter_value:
if not isinstance(val, (str, int, float)):
raise NotImplementedError(
f"Unsupported type: {type(val)} for value: {val}"
)
if operator in {"$in"}:
query_snippet = f"n.`{field}` IN $param_{param_number}"
query_param = {f"param_{param_number}": filter_value}
return (query_snippet, query_param)
elif operator in {"$nin"}:
query_snippet = f"n.`{field}` NOT IN $param_{param_number}"
query_param = {f"param_{param_number}": filter_value}
return (query_snippet, query_param)
elif operator in {"$like"}:
query_snippet = f"n.`{field}` CONTAINS $param_{param_number}"
query_param = {f"param_{param_number}": filter_value.rstrip("%")}
return (query_snippet, query_param)
elif operator in {"$ilike"}:
query_snippet = f"toLower(n.`{field}`) CONTAINS $param_{param_number}"
query_param = {f"param_{param_number}": filter_value.rstrip("%")}
return (query_snippet, query_param)
else:
raise NotImplementedError()
else:
raise NotImplementedError()
[docs]class Neo4jVector(VectorStore):
"""`Neo4j` 向量索引。
要使用,应安装 ``neo4j`` python 包。
参数:
url: Neo4j 连接 url
username: Neo4j 用户名。
password: Neo4j 密码
database: 可选提供 Neo4j 数据库
默认为 "neo4j"
embedding: 任何实现 `langchain.embeddings.base.Embeddings` 接口的嵌入函数。
distance_strategy: 要使用的距离策略。 (默认: COSINE)
pre_delete_collection: 如果为 True,将删除现有数据(如果存在)。
(默认: False)。用于测试。
示例:
.. code-block:: python
from langchain_community.vectorstores.neo4j_vector import Neo4jVector
from langchain_community.embeddings.openai import OpenAIEmbeddings
url="bolt://localhost:7687"
username="neo4j"
password="pleaseletmein"
embeddings = OpenAIEmbeddings()
vectorestore = Neo4jVector.from_documents(
embedding=embeddings,
documents=docs,
url=url
username=username,
password=password,
)"""
[docs] def __init__(
self,
embedding: Embeddings,
*,
search_type: SearchType = SearchType.VECTOR,
username: Optional[str] = None,
password: Optional[str] = None,
url: Optional[str] = None,
keyword_index_name: Optional[str] = "keyword",
database: Optional[str] = None,
index_name: str = "vector",
node_label: str = "Chunk",
embedding_node_property: str = "embedding",
text_node_property: str = "text",
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
logger: Optional[logging.Logger] = None,
pre_delete_collection: bool = False,
retrieval_query: str = "",
relevance_score_fn: Optional[Callable[[float], float]] = None,
index_type: IndexType = DEFAULT_INDEX_TYPE,
graph: Optional[Neo4jGraph] = None,
) -> None:
try:
import neo4j
except ImportError:
raise ImportError(
"Could not import neo4j python package. "
"Please install it with `pip install neo4j`."
)
# Allow only cosine and euclidean distance strategies
if distance_strategy not in [
DistanceStrategy.EUCLIDEAN_DISTANCE,
DistanceStrategy.COSINE,
]:
raise ValueError(
"distance_strategy must be either 'EUCLIDEAN_DISTANCE' or 'COSINE'"
)
# Graph object takes precedent over env or input params
if graph:
self._driver = graph._driver
self._database = graph._database
else:
# Handle if the credentials are environment variables
# Support URL for backwards compatibility
if not url:
url = os.environ.get("NEO4J_URL")
url = get_from_dict_or_env({"url": url}, "url", "NEO4J_URI")
username = get_from_dict_or_env(
{"username": username}, "username", "NEO4J_USERNAME"
)
password = get_from_dict_or_env(
{"password": password}, "password", "NEO4J_PASSWORD"
)
database = get_from_dict_or_env(
{"database": database}, "database", "NEO4J_DATABASE", "neo4j"
)
self._driver = neo4j.GraphDatabase.driver(url, auth=(username, password))
self._database = database
# Verify connection
try:
self._driver.verify_connectivity()
except neo4j.exceptions.ServiceUnavailable:
raise ValueError(
"Could not connect to Neo4j database. "
"Please ensure that the url is correct"
)
except neo4j.exceptions.AuthError:
raise ValueError(
"Could not connect to Neo4j database. "
"Please ensure that the username and password are correct"
)
self.schema = ""
# Verify if the version support vector index
self._is_enterprise = False
self.verify_version()
# Verify that required values are not null
check_if_not_null(
[
"index_name",
"node_label",
"embedding_node_property",
"text_node_property",
],
[index_name, node_label, embedding_node_property, text_node_property],
)
self.embedding = embedding
self._distance_strategy = distance_strategy
self.index_name = index_name
self.keyword_index_name = keyword_index_name
self.node_label = node_label
self.embedding_node_property = embedding_node_property
self.text_node_property = text_node_property
self.logger = logger or logging.getLogger(__name__)
self.override_relevance_score_fn = relevance_score_fn
self.retrieval_query = retrieval_query
self.search_type = search_type
self._index_type = index_type
# Calculate embedding dimension
self.embedding_dimension = len(embedding.embed_query("foo"))
# Delete existing data if flagged
if pre_delete_collection:
from neo4j.exceptions import DatabaseError
self.query(
f"MATCH (n:`{self.node_label}`) "
"CALL { WITH n DETACH DELETE n } "
"IN TRANSACTIONS OF 10000 ROWS;"
)
# Delete index
try:
self.query(f"DROP INDEX {self.index_name}")
except DatabaseError: # Index didn't exist yet
pass
[docs] def query(
self, query: str, *, params: Optional[dict] = None
) -> List[Dict[str, Any]]:
"""这个方法将一个Cypher查询发送到已连接的Neo4j数据库,并将结果作为一个字典列表返回。
参数:
query (str): 要执行的Cypher查询。
params (dict, optional): 查询参数的字典。默认为{}。
返回:
List[Dict[str, Any]]: 包含查询结果的字典列表。
"""
from neo4j.exceptions import CypherSyntaxError
params = params or {}
with self._driver.session(database=self._database) as session:
try:
data = session.run(query, params)
return [r.data() for r in data]
except CypherSyntaxError as e:
raise ValueError(f"Cypher Statement is not valid\n{e}")
[docs] def verify_version(self) -> None:
"""检查连接的Neo4j数据库版本是否支持向量索引。
查询Neo4j数据库以检索其版本,并将其与已知支持向量索引的目标版本(5.11.0)进行比较。如果连接的Neo4j版本不受支持,则引发ValueError。
"""
db_data = self.query("CALL dbms.components()")
version = db_data[0]["versions"][0]
if "aura" in version:
version_tuple = tuple(map(int, version.split("-")[0].split("."))) + (0,)
else:
version_tuple = tuple(map(int, version.split(".")))
target_version = (5, 11, 0)
if version_tuple < target_version:
raise ValueError(
"Version index is only supported in Neo4j version 5.11 or greater"
)
# Flag for metadata filtering
metadata_target_version = (5, 18, 0)
if version_tuple < metadata_target_version:
self.support_metadata_filter = False
else:
self.support_metadata_filter = True
# Flag for enterprise
self._is_enterprise = True if db_data[0]["edition"] == "enterprise" else False
[docs] def retrieve_existing_index(self) -> Tuple[Optional[int], Optional[str]]:
"""检查向量索引是否存在于Neo4j数据库中,并返回其嵌入维度。
此方法查询Neo4j数据库中的现有索引,并尝试检索具有指定名称的向量索引的维度。如果索引存在,则返回其维度。如果索引不存在,则返回`None`。
返回:
int或None:如果找到现有索引,则返回其嵌入维度。
"""
index_information = self.query(
"SHOW INDEXES YIELD name, type, entityType, labelsOrTypes, "
"properties, options WHERE type = 'VECTOR' AND (name = $index_name "
"OR (labelsOrTypes[0] = $node_label AND "
"properties[0] = $embedding_node_property)) "
"RETURN name, entityType, labelsOrTypes, properties, options ",
params={
"index_name": self.index_name,
"node_label": self.node_label,
"embedding_node_property": self.embedding_node_property,
},
)
# sort by index_name
index_information = sort_by_index_name(index_information, self.index_name)
try:
self.index_name = index_information[0]["name"]
self.node_label = index_information[0]["labelsOrTypes"][0]
self.embedding_node_property = index_information[0]["properties"][0]
self._index_type = index_information[0]["entityType"]
embedding_dimension = index_information[0]["options"]["indexConfig"][
"vector.dimensions"
]
return embedding_dimension, index_information[0]["entityType"]
except IndexError:
return None, None
[docs] def retrieve_existing_fts_index(
self, text_node_properties: List[str] = []
) -> Optional[str]:
"""检查Neo4j数据库中是否存在全文索引
该方法查询具有指定名称的现有fts索引的Neo4j数据库。
返回:
(元组):关键字索引信息
"""
index_information = self.query(
"SHOW INDEXES YIELD name, type, labelsOrTypes, properties, options "
"WHERE type = 'FULLTEXT' AND (name = $keyword_index_name "
"OR (labelsOrTypes = [$node_label] AND "
"properties = $text_node_property)) "
"RETURN name, labelsOrTypes, properties, options ",
params={
"keyword_index_name": self.keyword_index_name,
"node_label": self.node_label,
"text_node_property": text_node_properties or [self.text_node_property],
},
)
# sort by index_name
index_information = sort_by_index_name(index_information, self.index_name)
try:
self.keyword_index_name = index_information[0]["name"]
self.text_node_property = index_information[0]["properties"][0]
node_label = index_information[0]["labelsOrTypes"][0]
return node_label
except IndexError:
return None
[docs] def create_new_index(self) -> None:
"""这个方法构建一个Cypher查询并执行它来在Neo4j中创建一个新的向量索引。
"""
index_query = (
"CALL db.index.vector.createNodeIndex("
"$index_name,"
"$node_label,"
"$embedding_node_property,"
"toInteger($embedding_dimension),"
"$similarity_metric )"
)
parameters = {
"index_name": self.index_name,
"node_label": self.node_label,
"embedding_node_property": self.embedding_node_property,
"embedding_dimension": self.embedding_dimension,
"similarity_metric": DISTANCE_MAPPING[self._distance_strategy],
}
self.query(index_query, params=parameters)
[docs] def create_new_keyword_index(self, text_node_properties: List[str] = []) -> None:
"""这个方法构造一个Cypher查询并执行它来在Neo4j中创建一个新的全文索引。
"""
node_props = text_node_properties or [self.text_node_property]
fts_index_query = (
f"CREATE FULLTEXT INDEX {self.keyword_index_name} "
f"FOR (n:`{self.node_label}`) ON EACH "
f"[{', '.join(['n.`' + el + '`' for el in node_props])}]"
)
self.query(fts_index_query)
@property
def embeddings(self) -> Embeddings:
return self.embedding
@classmethod
def __from(
cls,
texts: List[str],
embeddings: List[List[float]],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
create_id_index: bool = True,
search_type: SearchType = SearchType.VECTOR,
**kwargs: Any,
) -> Neo4jVector:
if ids is None:
ids = [md5(text.encode("utf-8")).hexdigest() for text in texts]
if not metadatas:
metadatas = [{} for _ in texts]
store = cls(
embedding=embedding,
search_type=search_type,
**kwargs,
)
# Check if the vector index already exists
embedding_dimension, index_type = store.retrieve_existing_index()
# Raise error if relationship index type
if index_type == "RELATIONSHIP":
raise ValueError(
"Data ingestion is not supported with relationship vector index."
)
# If the vector index doesn't exist yet
if not embedding_dimension:
store.create_new_index()
# If the index already exists, check if embedding dimensions match
elif not store.embedding_dimension == embedding_dimension:
raise ValueError(
f"Index with name {store.index_name} already exists."
"The provided embedding function and vector index "
"dimensions do not match.\n"
f"Embedding function dimension: {store.embedding_dimension}\n"
f"Vector index dimension: {embedding_dimension}"
)
if search_type == SearchType.HYBRID:
fts_node_label = store.retrieve_existing_fts_index()
# If the FTS index doesn't exist yet
if not fts_node_label:
store.create_new_keyword_index()
else: # Validate that FTS and Vector index use the same information
if not fts_node_label == store.node_label:
raise ValueError(
"Vector and keyword index don't index the same node label"
)
# Create unique constraint for faster import
if create_id_index:
store.query(
"CREATE CONSTRAINT IF NOT EXISTS "
f"FOR (n:`{store.node_label}`) REQUIRE n.id IS UNIQUE;"
)
store.add_embeddings(
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
)
return store
[docs] def add_embeddings(
self,
texts: Iterable[str],
embeddings: List[List[float]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""将嵌入添加到向量存储中。
参数:
texts:要添加到向量存储中的字符串的可迭代对象。
embeddings:嵌入向量的列表的列表。
metadatas:与文本相关联的元数据列表。
kwargs:向量存储特定参数。
"""
if ids is None:
ids = [md5(text.encode("utf-8")).hexdigest() for text in texts]
if not metadatas:
metadatas = [{} for _ in texts]
import_query = (
"UNWIND $data AS row "
"CALL { WITH row "
f"MERGE (c:`{self.node_label}` {{id: row.id}}) "
"WITH c, row "
f"CALL db.create.setVectorProperty(c, "
f"'{self.embedding_node_property}', row.embedding) "
"YIELD node "
f"SET c.`{self.text_node_property}` = row.text "
"SET c += row.metadata } IN TRANSACTIONS OF 1000 ROWS"
)
parameters = {
"data": [
{"text": text, "metadata": metadata, "embedding": embedding, "id": id}
for text, metadata, embedding, id in zip(
texts, metadatas, embeddings, ids
)
]
}
self.query(import_query, params=parameters)
return ids
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储。
参数:
texts:要添加到向量存储的字符串的可迭代对象。
metadatas:与文本相关联的元数据的可选列表。
kwargs:向量存储特定参数
返回:
将文本添加到向量存储中的ID列表。
"""
embeddings = self.embedding.embed_documents(list(texts))
return self.add_embeddings(
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
)
[docs] def similarity_search(
self,
query: str,
k: int = 4,
params: Dict[str, Any] = {},
filter: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""使用Neo4jVector运行相似性搜索。
参数:
query (str): 要搜索的查询文本。
k (int): 要返回的结果数量。默认为4。
返回:
与查询最相似的文档列表。
"""
embedding = self.embedding.embed_query(text=query)
return self.similarity_search_by_vector(
embedding=embedding,
k=k,
query=query,
params=params,
filter=filter,
**kwargs,
)
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
params: Dict[str, Any] = {},
filter: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的文档。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
返回:
与查询最相似的文档列表,以及每个文档的分数。
"""
embedding = self.embedding.embed_query(query)
docs = self.similarity_search_with_score_by_vector(
embedding=embedding,
k=k,
query=query,
params=params,
filter=filter,
**kwargs,
)
return docs
[docs] def similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Dict[str, Any]] = None,
params: Dict[str, Any] = {},
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""在Neo4j数据库中使用给定向量执行相似性搜索,并返回前k个相似文档及其分数。
该方法使用Cypher查询来查找与给定嵌入最相似的前k个文档。相似性是使用Neo4j数据库中的向量索引来衡量的。结果以元组列表的形式返回,每个元组包含一个Document对象和其相似性分数。
参数:
embedding (List[float]): 要进行比较的嵌入向量。
k (int, optional): 要检索的前k个相似文档的数量。
返回:
List[Tuple[Document, float]]: 一个元组列表,每个元组包含一个Document对象和其相似性分数。
"""
if filter:
# Verify that 5.18 or later is used
if not self.support_metadata_filter:
raise ValueError(
"Metadata filtering is only supported in "
"Neo4j version 5.18 or greater"
)
# Metadata filtering and hybrid doesn't work
if self.search_type == SearchType.HYBRID:
raise ValueError(
"Metadata filtering can't be use in combination with "
"a hybrid search approach"
)
parallel_query = (
"CYPHER runtime = parallel parallelRuntimeSupport=all "
if self._is_enterprise
else ""
)
base_index_query = parallel_query + (
f"MATCH (n:`{self.node_label}`) WHERE "
f"n.`{self.embedding_node_property}` IS NOT NULL AND "
f"size(n.`{self.embedding_node_property}`) = "
f"toInteger({self.embedding_dimension}) AND "
)
base_cosine_query = (
" WITH n as node, vector.similarity.cosine("
f"n.`{self.embedding_node_property}`, "
"$embedding) AS score ORDER BY score DESC LIMIT toInteger($k) "
)
filter_snippets, filter_params = construct_metadata_filter(filter)
index_query = base_index_query + filter_snippets + base_cosine_query
else:
index_query = _get_search_index_query(self.search_type, self._index_type)
filter_params = {}
if self._index_type == IndexType.RELATIONSHIP:
default_retrieval = (
f"RETURN relationship.`{self.text_node_property}` AS text, score, "
f"relationship {{.*, `{self.text_node_property}`: Null, "
f"`{self.embedding_node_property}`: Null, id: Null }} AS metadata"
)
else:
default_retrieval = (
f"RETURN node.`{self.text_node_property}` AS text, score, "
f"node {{.*, `{self.text_node_property}`: Null, "
f"`{self.embedding_node_property}`: Null, id: Null }} AS metadata"
)
retrieval_query = (
self.retrieval_query if self.retrieval_query else default_retrieval
)
read_query = index_query + retrieval_query
parameters = {
"index": self.index_name,
"k": k,
"embedding": embedding,
"keyword_index": self.keyword_index_name,
"query": remove_lucene_chars(kwargs["query"]),
**params,
**filter_params,
}
results = self.query(read_query, params=parameters)
if any(result["text"] is None for result in results):
if not self.retrieval_query:
raise ValueError(
f"Make sure that none of the `{self.text_node_property}` "
f"properties on nodes with label `{self.node_label}` "
"are missing or empty"
)
else:
raise ValueError(
"Inspect the `retrieval_query` and ensure it doesn't "
"return None for the `text` column"
)
docs = [
(
Document(
page_content=dict_to_yaml_str(result["text"])
if isinstance(result["text"], dict)
else result["text"],
metadata={
k: v for k, v in result["metadata"].items() if v is not None
},
),
result["score"],
)
for result in results
]
return docs
[docs] def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找与之相似文档的嵌入。
k: 要返回的文档数量。默认为4。
返回:
与查询向量最相似的文档列表。
"""
docs_and_scores = self.similarity_search_with_score_by_vector(
embedding=embedding, k=k, filter=filter, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs] @classmethod
def from_texts(
cls: Type[Neo4jVector],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> Neo4jVector:
"""返回从文本和嵌入初始化的Neo4jVector。
需要以`url`、`username`和`password`的形式提供Neo4j凭据,还可以选择性地提供`database`参数。
"""
embeddings = embedding.embed_documents(list(texts))
return cls.__from(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
distance_strategy=distance_strategy,
**kwargs,
)
[docs] @classmethod
def from_embeddings(
cls,
text_embeddings: List[Tuple[str, List[float]]],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
ids: Optional[List[str]] = None,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> Neo4jVector:
"""从原始文档和预生成的嵌入中构建Neo4jVector包装器。
返回从文档和嵌入初始化的Neo4jVector。需要Neo4j凭据,包括`url`、`username`和`password`参数,以及可选的`database`参数。
示例:
.. code-block:: python
from langchain_community.vectorstores.neo4j_vector import Neo4jVector
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
text_embeddings = embeddings.embed_documents(texts)
text_embedding_pairs = list(zip(texts, text_embeddings))
vectorstore = Neo4jVector.from_embeddings(
text_embedding_pairs, embeddings)
"""
texts = [t[0] for t in text_embeddings]
embeddings = [t[1] for t in text_embeddings]
return cls.__from(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
**kwargs,
)
[docs] @classmethod
def from_existing_index(
cls: Type[Neo4jVector],
embedding: Embeddings,
index_name: str,
search_type: SearchType = DEFAULT_SEARCH_TYPE,
keyword_index_name: Optional[str] = None,
**kwargs: Any,
) -> Neo4jVector:
"""获取现有的Neo4j向量索引实例。此方法将返回存储的实例,而不会插入任何新的嵌入。
需要Neo4j凭据,包括`url`、`username`和`password`,以及可选的`database`参数,还需要定义`index_name`。
"""
if search_type == SearchType.HYBRID and not keyword_index_name:
raise ValueError(
"keyword_index name has to be specified "
"when using hybrid search option"
)
store = cls(
embedding=embedding,
index_name=index_name,
keyword_index_name=keyword_index_name,
search_type=search_type,
**kwargs,
)
embedding_dimension, index_type = store.retrieve_existing_index()
# Raise error if relationship index type
if index_type == "RELATIONSHIP":
raise ValueError(
"Relationship vector index is not supported with "
"`from_existing_index` method. Please use the "
"`from_existing_relationship_index` method."
)
if not embedding_dimension:
raise ValueError(
"The specified vector index name does not exist. "
"Make sure to check if you spelled it correctly"
)
# Check if embedding function and vector index dimensions match
if not store.embedding_dimension == embedding_dimension:
raise ValueError(
"The provided embedding function and vector index "
"dimensions do not match.\n"
f"Embedding function dimension: {store.embedding_dimension}\n"
f"Vector index dimension: {embedding_dimension}"
)
if search_type == SearchType.HYBRID:
fts_node_label = store.retrieve_existing_fts_index()
# If the FTS index doesn't exist yet
if not fts_node_label:
raise ValueError(
"The specified keyword index name does not exist. "
"Make sure to check if you spelled it correctly"
)
else: # Validate that FTS and Vector index use the same information
if not fts_node_label == store.node_label:
raise ValueError(
"Vector and keyword index don't index the same node label"
)
return store
[docs] @classmethod
def from_existing_relationship_index(
cls: Type[Neo4jVector],
embedding: Embeddings,
index_name: str,
search_type: SearchType = DEFAULT_SEARCH_TYPE,
**kwargs: Any,
) -> Neo4jVector:
"""获取现有的Neo4j关系向量索引的实例。
该方法将返回存储的实例,而不会插入任何新的嵌入。
需要Neo4j凭据,包括`url`、`username`和`password`参数,以及可选的`database`参数,还有`index_name`定义。
"""
if search_type == SearchType.HYBRID:
raise ValueError(
"Hybrid search is not supported in combination "
"with relationship vector index"
)
store = cls(
embedding=embedding,
index_name=index_name,
**kwargs,
)
embedding_dimension, index_type = store.retrieve_existing_index()
if not embedding_dimension:
raise ValueError(
"The specified vector index name does not exist. "
"Make sure to check if you spelled it correctly"
)
# Raise error if relationship index type
if index_type == "NODE":
raise ValueError(
"Node vector index is not supported with "
"`from_existing_relationship_index` method. Please use the "
"`from_existing_index` method."
)
# Check if embedding function and vector index dimensions match
if not store.embedding_dimension == embedding_dimension:
raise ValueError(
"The provided embedding function and vector index "
"dimensions do not match.\n"
f"Embedding function dimension: {store.embedding_dimension}\n"
f"Vector index dimension: {embedding_dimension}"
)
return store
[docs] @classmethod
def from_documents(
cls: Type[Neo4jVector],
documents: List[Document],
embedding: Embeddings,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> Neo4jVector:
"""返回从文档和嵌入初始化的Neo4jVector。
需要以`url`、`username`和`password`的形式提供Neo4j凭据,还可以选择性地提供`database`参数。
"""
texts = [d.page_content for d in documents]
metadatas = [d.metadata for d in documents]
return cls.from_texts(
texts=texts,
embedding=embedding,
distance_strategy=distance_strategy,
metadatas=metadatas,
ids=ids,
**kwargs,
)
[docs] @classmethod
def from_existing_graph(
cls: Type[Neo4jVector],
embedding: Embeddings,
node_label: str,
embedding_node_property: str,
text_node_properties: List[str],
*,
keyword_index_name: Optional[str] = "keyword",
index_name: str = "vector",
search_type: SearchType = DEFAULT_SEARCH_TYPE,
retrieval_query: str = "",
**kwargs: Any,
) -> Neo4jVector:
"""从现有图形初始化并返回一个Neo4jVector实例。
该方法使用提供的参数和现有图形初始化一个Neo4jVector实例。它验证索引的存在,并在索引不存在时创建新的索引。
返回:
Neo4jVector:使用提供的参数和现有图形初始化的Neo4jVector实例。
示例:
>>> neo4j_vector = Neo4jVector.from_existing_graph(
... embedding=my_embedding,
... node_label="Document",
... embedding_node_property="embedding",
... text_node_properties=["title", "content"]
... )
注意:
需要Neo4j凭据,形式为`url`、`username`和`password`,还可以作为额外关键字参数传递`database`参数。
"""
# Validate the list is not empty
if not text_node_properties:
raise ValueError(
"Parameter `text_node_properties` must not be an empty list"
)
# Prefer retrieval query from params, otherwise construct it
if not retrieval_query:
retrieval_query = (
f"RETURN reduce(str='', k IN {text_node_properties} |"
" str + '\\n' + k + ': ' + coalesce(node[k], '')) AS text, "
"node {.*, `"
+ embedding_node_property
+ "`: Null, id: Null, "
+ ", ".join([f"`{prop}`: Null" for prop in text_node_properties])
+ "} AS metadata, score"
)
store = cls(
embedding=embedding,
index_name=index_name,
keyword_index_name=keyword_index_name,
search_type=search_type,
retrieval_query=retrieval_query,
node_label=node_label,
embedding_node_property=embedding_node_property,
**kwargs,
)
# Check if the vector index already exists
embedding_dimension, index_type = store.retrieve_existing_index()
# Raise error if relationship index type
if index_type == "RELATIONSHIP":
raise ValueError(
"`from_existing_graph` method does not support "
" existing relationship vector index. "
"Please use `from_existing_relationship_index` method"
)
# If the vector index doesn't exist yet
if not embedding_dimension:
store.create_new_index()
# If the index already exists, check if embedding dimensions match
elif not store.embedding_dimension == embedding_dimension:
raise ValueError(
f"Index with name {store.index_name} already exists."
"The provided embedding function and vector index "
"dimensions do not match.\n"
f"Embedding function dimension: {store.embedding_dimension}\n"
f"Vector index dimension: {embedding_dimension}"
)
# FTS index for Hybrid search
if search_type == SearchType.HYBRID:
fts_node_label = store.retrieve_existing_fts_index(text_node_properties)
# If the FTS index doesn't exist yet
if not fts_node_label:
store.create_new_keyword_index(text_node_properties)
else: # Validate that FTS and Vector index use the same information
if not fts_node_label == store.node_label:
raise ValueError(
"Vector and keyword index don't index the same node label"
)
# Populate embeddings
while True:
fetch_query = (
f"MATCH (n:`{node_label}`) "
f"WHERE n.{embedding_node_property} IS null "
"AND any(k in $props WHERE n[k] IS NOT null) "
f"RETURN elementId(n) AS id, reduce(str='',"
"k IN $props | str + '\\n' + k + ':' + coalesce(n[k], '')) AS text "
"LIMIT 1000"
)
data = store.query(fetch_query, params={"props": text_node_properties})
text_embeddings = embedding.embed_documents([el["text"] for el in data])
params = {
"data": [
{"id": el["id"], "embedding": embedding}
for el, embedding in zip(data, text_embeddings)
]
}
store.query(
"UNWIND $data AS row "
f"MATCH (n:`{node_label}`) "
"WHERE elementId(n) = row.id "
f"CALL db.create.setVectorProperty(n, "
f"'{embedding_node_property}', row.embedding) "
"YIELD node RETURN count(*)",
params=params,
)
# If embedding calculation should be stopped
if len(data) < 1000:
break
return store
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""“正确”的相关性函数可能会有所不同,取决于一些因素,包括:
- 向量存储中使用的距离/相似度度量
- 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!)
- 嵌入的维度
- 等等。
"""
if self.override_relevance_score_fn is not None:
return self.override_relevance_score_fn
# Default strategy is to rely on distance strategy provided
# in vectorstore constructor
if self._distance_strategy == DistanceStrategy.COSINE:
return lambda x: x
elif self._distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
return lambda x: x
else:
raise ValueError(
"No supported normalization function"
f" for distance_strategy of {self._distance_strategy}."
"Consider providing relevance_score_fn to PGVector constructor."
)