from hashlib import md5
from typing import Any, Dict, List, Optional
from langchain_core.utils import get_from_dict_or_env
from langchain_community.graphs.graph_document import GraphDocument
from langchain_community.graphs.graph_store import GraphStore
BASE_ENTITY_LABEL = "__Entity__"
EXCLUDED_LABELS = ["_Bloom_Perspective_", "_Bloom_Scene_"]
EXCLUDED_RELS = ["_Bloom_HAS_SCENE_"]
EXHAUSTIVE_SEARCH_LIMIT = 10000
LIST_LIMIT = 128
# Threshold for returning all available prop values in graph schema
DISTINCT_VALUE_LIMIT = 10
node_properties_query = """
CALL apoc.meta.data()
YIELD label, other, elementType, type, property
WHERE NOT type = "RELATIONSHIP" AND elementType = "node"
AND NOT label IN $EXCLUDED_LABELS
WITH label AS nodeLabels, collect({property:property, type:type}) AS properties
RETURN {labels: nodeLabels, properties: properties} AS output
"""
rel_properties_query = """
CALL apoc.meta.data()
YIELD label, other, elementType, type, property
WHERE NOT type = "RELATIONSHIP" AND elementType = "relationship"
AND NOT label in $EXCLUDED_LABELS
WITH label AS nodeLabels, collect({property:property, type:type}) AS properties
RETURN {type: nodeLabels, properties: properties} AS output
"""
rel_query = """
CALL apoc.meta.data()
YIELD label, other, elementType, type, property
WHERE type = "RELATIONSHIP" AND elementType = "node"
UNWIND other AS other_node
WITH * WHERE NOT label IN $EXCLUDED_LABELS
AND NOT other_node IN $EXCLUDED_LABELS
RETURN {start: label, type: property, end: toString(other_node)} AS output
"""
include_docs_query = (
"MERGE (d:Document {id:$document.metadata.id}) "
"SET d.text = $document.page_content "
"SET d += $document.metadata "
"WITH d "
)
[docs]def clean_string_values(text: str) -> str:
return text.replace("\n", " ").replace("\r", " ")
[docs]def value_sanitize(d: Any) -> Any:
"""清理输入的字典或列表。
通过删除类似嵌入值、具有超过128个元素的列表来清理输入,这些值在LLM上下文中生成答案时大多是不相关的。如果将这些属性保留在结果中,可能会占用大量上下文空间,并通过引入不必要的噪音和成本降低LLM的性能。
"""
if isinstance(d, dict):
new_dict = {}
for key, value in d.items():
if isinstance(value, dict):
sanitized_value = value_sanitize(value)
if (
sanitized_value is not None
): # Check if the sanitized value is not None
new_dict[key] = sanitized_value
elif isinstance(value, list):
if len(value) < LIST_LIMIT:
sanitized_value = value_sanitize(value)
if (
sanitized_value is not None
): # Check if the sanitized value is not None
new_dict[key] = sanitized_value
# Do not include the key if the list is oversized
else:
new_dict[key] = value
return new_dict
elif isinstance(d, list):
if len(d) < LIST_LIMIT:
return [
value_sanitize(item) for item in d if value_sanitize(item) is not None
]
else:
return None
else:
return d
def _get_node_import_query(baseEntityLabel: bool, include_source: bool) -> str:
if baseEntityLabel:
return (
f"{include_docs_query if include_source else ''}"
"UNWIND $data AS row "
f"MERGE (source:`{BASE_ENTITY_LABEL}` {{id: row.id}}) "
"SET source += row.properties "
f"{'MERGE (d)-[:MENTIONS]->(source) ' if include_source else ''}"
"WITH source, row "
"CALL apoc.create.addLabels( source, [row.type] ) YIELD node "
"RETURN distinct 'done' AS result"
)
else:
return (
f"{include_docs_query if include_source else ''}"
"UNWIND $data AS row "
"CALL apoc.merge.node([row.type], {id: row.id}, "
"row.properties, {}) YIELD node "
f"{'MERGE (d)-[:MENTIONS]->(node) ' if include_source else ''}"
"RETURN distinct 'done' AS result"
)
def _get_rel_import_query(baseEntityLabel: bool) -> str:
if baseEntityLabel:
return (
"UNWIND $data AS row "
f"MERGE (source:`{BASE_ENTITY_LABEL}` {{id: row.source}}) "
f"MERGE (target:`{BASE_ENTITY_LABEL}` {{id: row.target}}) "
"WITH source, target, row "
"CALL apoc.merge.relationship(source, row.type, "
"{}, row.properties, target) YIELD rel "
"RETURN distinct 'done'"
)
else:
return (
"UNWIND $data AS row "
"CALL apoc.merge.node([row.source_label], {id: row.source},"
"{}, {}) YIELD node as source "
"CALL apoc.merge.node([row.target_label], {id: row.target},"
"{}, {}) YIELD node as target "
"CALL apoc.merge.relationship(source, row.type, "
"{}, row.properties, target) YIELD rel "
"RETURN distinct 'done'"
)
def _format_schema(schema: Dict, is_enhanced: bool) -> str:
formatted_node_props = []
formatted_rel_props = []
if is_enhanced:
# Enhanced formatting for nodes
for node_type, properties in schema["node_props"].items():
formatted_node_props.append(f"- **{node_type}**")
for prop in properties:
example = ""
if prop["type"] == "STRING":
if prop.get("distinct_count", 11) > DISTINCT_VALUE_LIMIT:
example = (
f'Example: "{clean_string_values(prop["values"][0])}"'
if prop["values"]
else ""
)
else: # If less than 10 possible values return all
example = (
(
"Available options: "
f'{[clean_string_values(el) for el in prop["values"]]}'
)
if prop["values"]
else ""
)
elif prop["type"] in [
"INTEGER",
"FLOAT",
"DATE",
"DATE_TIME",
"LOCAL_DATE_TIME",
]:
if prop.get("min") is not None:
example = f'Min: {prop["min"]}, Max: {prop["max"]}'
else:
example = (
f'Example: "{prop["values"][0]}"'
if prop.get("values")
else ""
)
elif prop["type"] == "LIST":
# Skip embeddings
if not prop.get("min_size") or prop["min_size"] > LIST_LIMIT:
continue
example = (
f'Min Size: {prop["min_size"]}, Max Size: {prop["max_size"]}'
)
formatted_node_props.append(
f" - `{prop['property']}`: {prop['type']} {example}"
)
# Enhanced formatting for relationships
for rel_type, properties in schema["rel_props"].items():
formatted_rel_props.append(f"- **{rel_type}**")
for prop in properties:
example = ""
if prop["type"] == "STRING":
if prop.get("distinct_count", 11) > DISTINCT_VALUE_LIMIT:
example = (
f'Example: "{clean_string_values(prop["values"][0])}"'
if prop["values"]
else ""
)
else: # If less than 10 possible values return all
example = (
(
"Available options: "
f'{[clean_string_values(el) for el in prop["values"]]}'
)
if prop["values"]
else ""
)
elif prop["type"] in [
"INTEGER",
"FLOAT",
"DATE",
"DATE_TIME",
"LOCAL_DATE_TIME",
]:
if prop.get("min"): # If we have min/max
example = f'Min: {prop["min"]}, Max: {prop["max"]}'
else: # return a single value
example = (
f'Example: "{prop["values"][0]}"' if prop["values"] else ""
)
elif prop["type"] == "LIST":
# Skip embeddings
if prop["min_size"] > LIST_LIMIT:
continue
example = (
f'Min Size: {prop["min_size"]}, Max Size: {prop["max_size"]}'
)
formatted_rel_props.append(
f" - `{prop['property']}: {prop['type']}` {example}"
)
else:
# Format node properties
for label, props in schema["node_props"].items():
props_str = ", ".join(
[f"{prop['property']}: {prop['type']}" for prop in props]
)
formatted_node_props.append(f"{label} {{{props_str}}}")
# Format relationship properties using structured_schema
for type, props in schema["rel_props"].items():
props_str = ", ".join(
[f"{prop['property']}: {prop['type']}" for prop in props]
)
formatted_rel_props.append(f"{type} {{{props_str}}}")
# Format relationships
formatted_rels = [
f"(:{el['start']})-[:{el['type']}]->(:{el['end']})"
for el in schema["relationships"]
]
return "\n".join(
[
"Node properties:",
"\n".join(formatted_node_props),
"Relationship properties:",
"\n".join(formatted_rel_props),
"The relationships:",
"\n".join(formatted_rels),
]
)
[docs]class Neo4jGraph(GraphStore):
"""Neo4j数据库包装器,用于各种图操作。
参数:
url(可选[str]):Neo4j数据库服务器的URL。
username(可选[str]):数据库身份验证的用户名。
password(可选[str]):数据库身份验证的密码。
database(str):要连接的数据库的名称。默认为'neo4j'。
timeout(可选[float]):以秒为单位的事务超时时间。
用于终止运行时间较长的查询。
默认情况下,未设置超时时间。
sanitize(bool):指示是否从结果中删除具有超过128个元素的列表的标志。
用于从数据库响应中删除类似嵌入属性。默认为False。
refresh_schema(bool):一个标志,指示是否在初始化时刷新模式信息。默认为True。
enhanced_schema(bool):一个标志,指示是否扫描数据库以查找示例值并在图模式中使用它们。默认为False。
driver_config(Dict):传递给Neo4j Driver的配置。
*安全提示*:确保数据库连接使用的凭据范围狭窄,仅包括必要的权限。
如果未这样做,可能会导致数据损坏或丢失,因为调用代码可能会尝试执行会导致删除、变异数据的命令,如果适当提示的话,或读取敏感数据,如果数据库中存在这样的数据。
防范这些负面结果的最佳方法是(视情况)限制授予此工具使用的凭据的权限。
有关更多信息,请参见https://python.langchain.com/docs/security。"""
[docs] def __init__(
self,
url: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
database: Optional[str] = None,
timeout: Optional[float] = None,
sanitize: bool = False,
refresh_schema: bool = True,
*,
driver_config: Optional[Dict] = None,
enhanced_schema: bool = False,
) -> None:
"""创建一个新的Neo4j图包装器实例。"""
try:
import neo4j
except ImportError:
raise ImportError(
"Could not import neo4j python package. "
"Please install it with `pip install neo4j`."
)
url = get_from_dict_or_env({"url": url}, "url", "NEO4J_URI")
username = get_from_dict_or_env(
{"username": username}, "username", "NEO4J_USERNAME"
)
password = get_from_dict_or_env(
{"password": password}, "password", "NEO4J_PASSWORD"
)
database = get_from_dict_or_env(
{"database": database}, "database", "NEO4J_DATABASE", "neo4j"
)
self._driver = neo4j.GraphDatabase.driver(
url, auth=(username, password), **(driver_config or {})
)
self._database = database
self.timeout = timeout
self.sanitize = sanitize
self._enhanced_schema = enhanced_schema
self.schema: str = ""
self.structured_schema: Dict[str, Any] = {}
# Verify connection
try:
self._driver.verify_connectivity()
except neo4j.exceptions.ServiceUnavailable:
raise ValueError(
"Could not connect to Neo4j database. "
"Please ensure that the url is correct"
)
except neo4j.exceptions.AuthError:
raise ValueError(
"Could not connect to Neo4j database. "
"Please ensure that the username and password are correct"
)
# Set schema
if refresh_schema:
try:
self.refresh_schema()
except neo4j.exceptions.ClientError as e:
if e.code == "Neo.ClientError.Procedure.ProcedureNotFound":
raise ValueError(
"Could not use APOC procedures. "
"Please ensure the APOC plugin is installed in Neo4j and that "
"'apoc.meta.data()' is allowed in Neo4j configuration "
)
raise e
@property
def get_schema(self) -> str:
"""返回图的模式"""
return self.schema
@property
def get_structured_schema(self) -> Dict[str, Any]:
"""返回图的结构化模式"""
return self.structured_schema
[docs] def query(self, query: str, params: dict = {}) -> List[Dict[str, Any]]:
"""查询Neo4j数据库。"""
from neo4j import Query
from neo4j.exceptions import CypherSyntaxError
with self._driver.session(database=self._database) as session:
try:
data = session.run(Query(text=query, timeout=self.timeout), params)
json_data = [r.data() for r in data]
if self.sanitize:
json_data = [value_sanitize(el) for el in json_data]
return json_data
except CypherSyntaxError as e:
raise ValueError(f"Generated Cypher Statement is not valid\n{e}")
[docs] def refresh_schema(self) -> None:
"""
刷新Neo4j图模式信息。
"""
from neo4j.exceptions import ClientError
node_properties = [
el["output"]
for el in self.query(
node_properties_query,
params={"EXCLUDED_LABELS": EXCLUDED_LABELS + [BASE_ENTITY_LABEL]},
)
]
rel_properties = [
el["output"]
for el in self.query(
rel_properties_query, params={"EXCLUDED_LABELS": EXCLUDED_RELS}
)
]
relationships = [
el["output"]
for el in self.query(
rel_query,
params={"EXCLUDED_LABELS": EXCLUDED_LABELS + [BASE_ENTITY_LABEL]},
)
]
# Get constraints & indexes
try:
constraint = self.query("SHOW CONSTRAINTS")
index = self.query(
"CALL apoc.schema.nodes() YIELD label, properties, type, size, "
"valuesSelectivity WHERE type = 'RANGE' RETURN *, "
"size * valuesSelectivity as distinctValues"
)
except (
ClientError
): # Read-only user might not have access to schema information
constraint = []
index = []
self.structured_schema = {
"node_props": {el["labels"]: el["properties"] for el in node_properties},
"rel_props": {el["type"]: el["properties"] for el in rel_properties},
"relationships": relationships,
"metadata": {"constraint": constraint, "index": index},
}
if self._enhanced_schema:
schema_counts = self.query(
"CALL apoc.meta.graphSample() YIELD nodes, relationships "
"RETURN nodes, [rel in relationships | {name:apoc.any.property"
"(rel, 'type'), count: apoc.any.property(rel, 'count')}]"
" AS relationships"
)
# Update node info
for node in schema_counts[0]["nodes"]:
# Skip bloom labels
if node["name"] in EXCLUDED_LABELS:
continue
node_props = self.structured_schema["node_props"].get(node["name"])
if not node_props: # The node has no properties
continue
enhanced_cypher = self._enhanced_schema_cypher(
node["name"], node_props, node["count"] < EXHAUSTIVE_SEARCH_LIMIT
)
enhanced_info = self.query(enhanced_cypher)[0]["output"]
for prop in node_props:
if prop["property"] in enhanced_info:
prop.update(enhanced_info[prop["property"]])
# Update rel info
for rel in schema_counts[0]["relationships"]:
# Skip bloom labels
if rel["name"] in EXCLUDED_RELS:
continue
rel_props = self.structured_schema["rel_props"].get(rel["name"])
if not rel_props: # The rel has no properties
continue
enhanced_cypher = self._enhanced_schema_cypher(
rel["name"],
rel_props,
rel["count"] < EXHAUSTIVE_SEARCH_LIMIT,
is_relationship=True,
)
enhanced_info = self.query(enhanced_cypher)[0]["output"]
for prop in rel_props:
if prop["property"] in enhanced_info:
prop.update(enhanced_info[prop["property"]])
schema = _format_schema(self.structured_schema, self._enhanced_schema)
self.schema = schema
[docs] def add_graph_documents(
self,
graph_documents: List[GraphDocument],
include_source: bool = False,
baseEntityLabel: bool = False,
) -> None:
"""该方法根据提供的GraphDocument对象在图中构建节点和关系。
参数:
- graph_documents(List[GraphDocument]):包含要添加到图中的节点和关系的GraphDocument对象列表。每个GraphDocument应该封装图的一部分的结构,包括节点、关系和源文档信息。
- include_source(bool,可选):如果为True,则存储源文档并使用MENTIONS关系将其链接到图中的节点。这对于追溯数据的来源很有用。根据源文档元数据中的`id`属性合并源文档(如果可用);否则,根据`page_content`的MD5哈希计算合并过程。默认为False。
- baseEntityLabel(bool,可选):如果为True,则每个新创建的节点都会获得一个次要的__Entity__标签,该标签被索引并提高了导入速度和性能。默认为False。
"""
if baseEntityLabel: # Check if constraint already exists
constraint_exists = any(
[
el["labelsOrTypes"] == [BASE_ENTITY_LABEL]
and el["properties"] == ["id"]
for el in self.structured_schema.get("metadata", {}).get(
"constraint"
)
]
)
if not constraint_exists:
# Create constraint
self.query(
f"CREATE CONSTRAINT IF NOT EXISTS FOR (b:{BASE_ENTITY_LABEL}) "
"REQUIRE b.id IS UNIQUE;"
)
self.refresh_schema() # Refresh constraint information
node_import_query = _get_node_import_query(baseEntityLabel, include_source)
rel_import_query = _get_rel_import_query(baseEntityLabel)
for document in graph_documents:
if not document.source.metadata.get("id"):
document.source.metadata["id"] = md5(
document.source.page_content.encode("utf-8")
).hexdigest()
# Import nodes
self.query(
node_import_query,
{
"data": [el.__dict__ for el in document.nodes],
"document": document.source.__dict__,
},
)
# Import relationships
self.query(
rel_import_query,
{
"data": [
{
"source": el.source.id,
"source_label": el.source.type,
"target": el.target.id,
"target_label": el.target.type,
"type": el.type.replace(" ", "_").upper(),
"properties": el.properties,
}
for el in document.relationships
]
},
)
def _enhanced_schema_cypher(
self,
label_or_type: str,
properties: List[Dict[str, Any]],
exhaustive: bool,
is_relationship: bool = False,
) -> str:
if is_relationship:
match_clause = f"MATCH ()-[n:`{label_or_type}`]->()"
else:
match_clause = f"MATCH (n:`{label_or_type}`)"
with_clauses = []
return_clauses = []
output_dict = {}
if exhaustive:
for prop in properties:
prop_name = prop["property"]
prop_type = prop["type"]
if prop_type == "STRING":
with_clauses.append(
(
f"collect(distinct substring(n.`{prop_name}`, 0, 50)) "
f"AS `{prop_name}_values`"
)
)
return_clauses.append(
(
f"values:`{prop_name}_values`[..{DISTINCT_VALUE_LIMIT}],"
f" distinct_count: size(`{prop_name}_values`)"
)
)
elif prop_type in [
"INTEGER",
"FLOAT",
"DATE",
"DATE_TIME",
"LOCAL_DATE_TIME",
]:
with_clauses.append(f"min(n.`{prop_name}`) AS `{prop_name}_min`")
with_clauses.append(f"max(n.`{prop_name}`) AS `{prop_name}_max`")
with_clauses.append(
f"count(distinct n.`{prop_name}`) AS `{prop_name}_distinct`"
)
return_clauses.append(
(
f"min: toString(`{prop_name}_min`), "
f"max: toString(`{prop_name}_max`), "
f"distinct_count: `{prop_name}_distinct`"
)
)
elif prop_type == "LIST":
with_clauses.append(
(
f"min(size(n.`{prop_name}`)) AS `{prop_name}_size_min`, "
f"max(size(n.`{prop_name}`)) AS `{prop_name}_size_max`"
)
)
return_clauses.append(
f"min_size: `{prop_name}_size_min`, "
f"max_size: `{prop_name}_size_max`"
)
elif prop_type in ["BOOLEAN", "POINT", "DURATION"]:
continue
output_dict[prop_name] = "{" + return_clauses.pop() + "}"
else:
# Just sample 5 random nodes
match_clause += " WITH n LIMIT 5"
for prop in properties:
prop_name = prop["property"]
prop_type = prop["type"]
# Check if indexed property, we can still do exhaustive
prop_index = [
el
for el in self.structured_schema["metadata"]["index"]
if el["label"] == label_or_type
and el["properties"] == [prop_name]
and el["type"] == "RANGE"
]
if prop_type == "STRING":
if (
prop_index
and prop_index[0].get("size") > 0
and prop_index[0].get("distinctValues") <= DISTINCT_VALUE_LIMIT
):
distinct_values = self.query(
f"CALL apoc.schema.properties.distinct("
f"'{label_or_type}', '{prop_name}') YIELD value"
)[0]["value"]
return_clauses.append(
(
f"values: {distinct_values},"
f" distinct_count: {len(distinct_values)}"
)
)
else:
with_clauses.append(
(
f"collect(distinct substring(n.`{prop_name}`, 0, 50)) "
f"AS `{prop_name}_values`"
)
)
return_clauses.append(f"values: `{prop_name}_values`")
elif prop_type in [
"INTEGER",
"FLOAT",
"DATE",
"DATE_TIME",
"LOCAL_DATE_TIME",
]:
if not prop_index:
with_clauses.append(
f"collect(distinct toString(n.`{prop_name}`)) "
f"AS `{prop_name}_values`"
)
return_clauses.append(f"values: `{prop_name}_values`")
else:
with_clauses.append(
f"min(n.`{prop_name}`) AS `{prop_name}_min`"
)
with_clauses.append(
f"max(n.`{prop_name}`) AS `{prop_name}_max`"
)
with_clauses.append(
f"count(distinct n.`{prop_name}`) AS `{prop_name}_distinct`"
)
return_clauses.append(
(
f"min: toString(`{prop_name}_min`), "
f"max: toString(`{prop_name}_max`), "
f"distinct_count: `{prop_name}_distinct`"
)
)
elif prop_type == "LIST":
with_clauses.append(
(
f"min(size(n.`{prop_name}`)) AS `{prop_name}_size_min`, "
f"max(size(n.`{prop_name}`)) AS `{prop_name}_size_max`"
)
)
return_clauses.append(
(
f"min_size: `{prop_name}_size_min`, "
f"max_size: `{prop_name}_size_max`"
)
)
elif prop_type in ["BOOLEAN", "POINT", "DURATION"]:
continue
output_dict[prop_name] = "{" + return_clauses.pop() + "}"
with_clause = "WITH " + ",\n ".join(with_clauses)
return_clause = (
"RETURN {"
+ ", ".join(f"`{k}`: {v}" for k, v in output_dict.items())
+ "} AS output"
)
# Combine all parts of the Cypher query
cypher_query = "\n".join([match_clause, with_clause, return_clause])
return cypher_query