import hashlib
import sys
from typing import Any, Dict, List, Optional, Union
from langchain_core.utils import get_from_env
from langchain_community.graphs.graph_document import GraphDocument, Node, Relationship
from langchain_community.graphs.graph_store import GraphStore
[docs]class GremlinGraph(GraphStore):
"""用于图操作的Gremlin包装器。
参数:
url(可选[str]):Gremlin数据库服务器的URL或环境变量GREMLIN_URI
username(可选[str]):类似于'/dbs/database/colls/graph'的集合标识符
或如果未提供则为环境变量GREMLIN_USERNAME
password(可选[str]):用于数据库身份验证的连接密钥
或如果未提供则为环境变量GREMLIN_PASSWORD
traversal_source(str):用于查询的遍历源。默认为'g'。
message_serializer(可选[Any]):用于请求的消息序列化器。
默认为serializer.GraphSONSerializersV2d0()
*安全提示*:确保数据库连接使用的凭据
仅限于包括必要权限的凭据。
如果未这样做,可能会导致数据损坏或丢失,因为调用
代码可能会尝试命令,这些命令将导致删除、变异
数据如果适当提示或读取敏感数据如果这样的数据
数据存在于数据库中。
防止这种负面结果的最佳方法是(视情况)
限制授予此工具使用的凭据的权限。
有关更多信息,请参见https://python.langchain.com/docs/security。
*实现细节*:
Gremlin查询旨在与Azure CosmosDB限制一起使用。"""
@property
def get_structured_schema(self) -> Dict[str, Any]:
return self.structured_schema
[docs] def __init__(
self,
url: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
traversal_source: str = "g",
message_serializer: Optional[Any] = None,
) -> None:
"""创建一个新的 Gremlin 图包装器实例。"""
try:
import asyncio
from gremlin_python.driver import client, serializer
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
except ImportError:
raise ImportError(
"Please install gremlin-python first: " "`pip3 install gremlinpython"
)
self.client = client.Client(
url=get_from_env("url", "GREMLIN_URI", url),
traversal_source=traversal_source,
username=get_from_env("username", "GREMLIN_USERNAME", username),
password=get_from_env("password", "GREMLIN_PASSWORD", password),
message_serializer=message_serializer
if message_serializer
else serializer.GraphSONSerializersV2d0(),
)
self.schema: str = ""
@property
def get_schema(self) -> str:
"""返回Gremlin数据库的模式"""
if len(self.schema) == 0:
self.refresh_schema()
return self.schema
[docs] def refresh_schema(self) -> None:
"""
刷新Gremlin图模式信息。
"""
vertex_schema = self.client.submit("g.V().label().dedup()").all().result()
edge_schema = self.client.submit("g.E().label().dedup()").all().result()
vertex_properties = (
self.client.submit(
"g.V().group().by(label).by(properties().label().dedup().fold())"
)
.all()
.result()[0]
)
self.structured_schema = {
"vertex_labels": vertex_schema,
"edge_labels": edge_schema,
"vertice_props": vertex_properties,
}
self.schema = "\n".join(
[
"Vertex labels are the following:",
",".join(vertex_schema),
"Edge labes are the following:",
",".join(edge_schema),
f"Vertices have following properties:\n{vertex_properties}",
]
)
[docs] def query(self, query: str, params: dict = {}) -> List[Dict[str, Any]]:
q = self.client.submit(query)
return q.all().result()
[docs] def add_graph_documents(
self, graph_documents: List[GraphDocument], include_source: bool = False
) -> None:
"""
将GraphDocument作为输入,并使用它来构建图。
"""
node_cache: Dict[Union[str, int], Node] = {}
for document in graph_documents:
if include_source:
# Create document vertex
doc_props = {
"page_content": document.source.page_content,
"metadata": document.source.metadata,
}
doc_id = hashlib.md5(document.source.page_content.encode()).hexdigest()
doc_node = self.add_node(
Node(id=doc_id, type="Document", properties=doc_props), node_cache
)
# Import nodes to vertices
for n in document.nodes:
node = self.add_node(n)
if include_source:
# Add Edge to document for each node
self.add_edge(
Relationship(
type="contains information about",
source=doc_node,
target=node,
properties={},
)
)
self.add_edge(
Relationship(
type="is extracted from",
source=node,
target=doc_node,
properties={},
)
)
# Edges
for el in document.relationships:
# Find or create the source vertex
self.add_node(el.source, node_cache)
# Find or create the target vertex
self.add_node(el.target, node_cache)
# Find or create the edge
self.add_edge(el)
[docs] def build_vertex_query(self, node: Node) -> str:
base_query = (
f"g.V().has('id','{node.id}').fold()"
+ f".coalesce(unfold(),addV('{node.type}')"
+ f".property('id','{node.id}')"
+ f".property('type','{node.type}')"
)
for key, value in node.properties.items():
base_query += f".property('{key}', '{value}')"
return base_query + ")"
[docs] def build_edge_query(self, relationship: Relationship) -> str:
source_query = f".has('id','{relationship.source.id}')"
target_query = f".has('id','{relationship.target.id}')"
base_query = f""""g.V(){source_query}.as('a')
.V(){target_query}.as('b')
.choose(
__.inE('{relationship.type}').where(outV().as('a')),
__.identity(),
__.addE('{relationship.type}').from('a').to('b')
)
""".replace("\n", "").replace("\t", "")
for key, value in relationship.properties.items():
base_query += f".property('{key}', '{value}')"
return base_query
[docs] def add_node(self, node: Node, node_cache: dict = {}) -> Node:
# if properties does not have label, add type as label
if "label" not in node.properties:
node.properties["label"] = node.type
if node.id in node_cache:
return node_cache[node.id]
else:
query = self.build_vertex_query(node)
_ = self.client.submit(query).all().result()[0]
node_cache[node.id] = node
return node
[docs] def add_edge(self, relationship: Relationship) -> Any:
query = self.build_edge_query(relationship)
return self.client.submit(query).all().result()