Source code for langchain_community.graphs.gremlin_graph

import hashlib
import sys
from typing import Any, Dict, List, Optional, Union

from langchain_core.utils import get_from_env

from langchain_community.graphs.graph_document import GraphDocument, Node, Relationship
from langchain_community.graphs.graph_store import GraphStore


[docs]class GremlinGraph(GraphStore): """用于图操作的Gremlin包装器。 参数: url(可选[str]):Gremlin数据库服务器的URL或环境变量GREMLIN_URI username(可选[str]):类似于'/dbs/database/colls/graph'的集合标识符 或如果未提供则为环境变量GREMLIN_USERNAME password(可选[str]):用于数据库身份验证的连接密钥 或如果未提供则为环境变量GREMLIN_PASSWORD traversal_source(str):用于查询的遍历源。默认为'g'。 message_serializer(可选[Any]):用于请求的消息序列化器。 默认为serializer.GraphSONSerializersV2d0() *安全提示*:确保数据库连接使用的凭据 仅限于包括必要权限的凭据。 如果未这样做,可能会导致数据损坏或丢失,因为调用 代码可能会尝试命令,这些命令将导致删除、变异 数据如果适当提示或读取敏感数据如果这样的数据 数据存在于数据库中。 防止这种负面结果的最佳方法是(视情况) 限制授予此工具使用的凭据的权限。 有关更多信息,请参见https://python.langchain.com/docs/security。 *实现细节*: Gremlin查询旨在与Azure CosmosDB限制一起使用。""" @property def get_structured_schema(self) -> Dict[str, Any]: return self.structured_schema
[docs] def __init__( self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, traversal_source: str = "g", message_serializer: Optional[Any] = None, ) -> None: """创建一个新的 Gremlin 图包装器实例。""" try: import asyncio from gremlin_python.driver import client, serializer if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) except ImportError: raise ImportError( "Please install gremlin-python first: " "`pip3 install gremlinpython" ) self.client = client.Client( url=get_from_env("url", "GREMLIN_URI", url), traversal_source=traversal_source, username=get_from_env("username", "GREMLIN_USERNAME", username), password=get_from_env("password", "GREMLIN_PASSWORD", password), message_serializer=message_serializer if message_serializer else serializer.GraphSONSerializersV2d0(), ) self.schema: str = ""
@property def get_schema(self) -> str: """返回Gremlin数据库的模式""" if len(self.schema) == 0: self.refresh_schema() return self.schema
[docs] def refresh_schema(self) -> None: """ 刷新Gremlin图模式信息。 """ vertex_schema = self.client.submit("g.V().label().dedup()").all().result() edge_schema = self.client.submit("g.E().label().dedup()").all().result() vertex_properties = ( self.client.submit( "g.V().group().by(label).by(properties().label().dedup().fold())" ) .all() .result()[0] ) self.structured_schema = { "vertex_labels": vertex_schema, "edge_labels": edge_schema, "vertice_props": vertex_properties, } self.schema = "\n".join( [ "Vertex labels are the following:", ",".join(vertex_schema), "Edge labes are the following:", ",".join(edge_schema), f"Vertices have following properties:\n{vertex_properties}", ] )
[docs] def query(self, query: str, params: dict = {}) -> List[Dict[str, Any]]: q = self.client.submit(query) return q.all().result()
[docs] def add_graph_documents( self, graph_documents: List[GraphDocument], include_source: bool = False ) -> None: """ 将GraphDocument作为输入,并使用它来构建图。 """ node_cache: Dict[Union[str, int], Node] = {} for document in graph_documents: if include_source: # Create document vertex doc_props = { "page_content": document.source.page_content, "metadata": document.source.metadata, } doc_id = hashlib.md5(document.source.page_content.encode()).hexdigest() doc_node = self.add_node( Node(id=doc_id, type="Document", properties=doc_props), node_cache ) # Import nodes to vertices for n in document.nodes: node = self.add_node(n) if include_source: # Add Edge to document for each node self.add_edge( Relationship( type="contains information about", source=doc_node, target=node, properties={}, ) ) self.add_edge( Relationship( type="is extracted from", source=node, target=doc_node, properties={}, ) ) # Edges for el in document.relationships: # Find or create the source vertex self.add_node(el.source, node_cache) # Find or create the target vertex self.add_node(el.target, node_cache) # Find or create the edge self.add_edge(el)
[docs] def build_vertex_query(self, node: Node) -> str: base_query = ( f"g.V().has('id','{node.id}').fold()" + f".coalesce(unfold(),addV('{node.type}')" + f".property('id','{node.id}')" + f".property('type','{node.type}')" ) for key, value in node.properties.items(): base_query += f".property('{key}', '{value}')" return base_query + ")"
[docs] def build_edge_query(self, relationship: Relationship) -> str: source_query = f".has('id','{relationship.source.id}')" target_query = f".has('id','{relationship.target.id}')" base_query = f""""g.V(){source_query}.as('a') .V(){target_query}.as('b') .choose( __.inE('{relationship.type}').where(outV().as('a')), __.identity(), __.addE('{relationship.type}').from('a').to('b') ) """.replace("\n", "").replace("\t", "") for key, value in relationship.properties.items(): base_query += f".property('{key}', '{value}')" return base_query
[docs] def add_node(self, node: Node, node_cache: dict = {}) -> Node: # if properties does not have label, add type as label if "label" not in node.properties: node.properties["label"] = node.type if node.id in node_cache: return node_cache[node.id] else: query = self.build_vertex_query(node) _ = self.client.submit(query).all().result()[0] node_cache[node.id] = node return node
[docs] def add_edge(self, relationship: Relationship) -> Any: query = self.build_edge_query(relationship) return self.client.submit(query).all().result()