Source code for langchain_community.graphs.neptune_graph

import json
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuple, Union


[docs]class NeptuneQueryException(Exception): """Neptune查询的异常。""" def __init__(self, exception: Union[str, Dict]): if isinstance(exception, dict): self.message = exception["message"] if "message" in exception else "unknown" self.details = exception["details"] if "details" in exception else "unknown" else: self.message = exception self.details = "unknown" def get_message(self) -> str: return self.message def get_details(self) -> Any: return self.details
[docs]class BaseNeptuneGraph(ABC): """Neptune的抽象基类""" @property def get_schema(self) -> str: """返回 Neptune 数据库的模式""" return self.schema
[docs] @abstractmethod def query(self, query: str, params: dict = {}) -> dict: raise NotImplementedError()
@abstractmethod def _get_summary(self) -> Dict: raise NotImplementedError() def _get_labels(self) -> Tuple[List[str], List[str]]: """从Neptune统计摘要中获取节点和边标签""" summary = self._get_summary() n_labels = summary["nodeLabels"] e_labels = summary["edgeLabels"] return n_labels, e_labels def _get_triples(self, e_labels: List[str]) -> List[str]: triple_query = """ MATCH (a)-[e:`{e_label}`]->(b) WITH a,e,b LIMIT 3000 RETURN DISTINCT labels(a) AS from, type(e) AS edge, labels(b) AS to LIMIT 10 """ triple_template = "(:`{a}`)-[:`{e}`]->(:`{b}`)" triple_schema = [] for label in e_labels: q = triple_query.format(e_label=label) data = self.query(q) for d in data: triple = triple_template.format( a=d["from"][0], e=d["edge"], b=d["to"][0] ) triple_schema.append(triple) return triple_schema def _get_node_properties(self, n_labels: List[str], types: Dict) -> List: node_properties_query = """ MATCH (a:`{n_label}`) RETURN properties(a) AS props LIMIT 100 """ node_properties = [] for label in n_labels: q = node_properties_query.format(n_label=label) data = {"label": label, "properties": self.query(q)} s = set({}) for p in data["properties"]: for k, v in p["props"].items(): s.add((k, types[type(v).__name__])) np = { "properties": [{"property": k, "type": v} for k, v in s], "labels": label, } node_properties.append(np) return node_properties def _get_edge_properties(self, e_labels: List[str], types: Dict[str, Any]) -> List: edge_properties_query = """ MATCH ()-[e:`{e_label}`]->() RETURN properties(e) AS props LIMIT 100 """ edge_properties = [] for label in e_labels: q = edge_properties_query.format(e_label=label) data = {"label": label, "properties": self.query(q)} s = set({}) for p in data["properties"]: for k, v in p["props"].items(): s.add((k, types[type(v).__name__])) ep = { "type": label, "properties": [{"property": k, "type": v} for k, v in s], } edge_properties.append(ep) return edge_properties def _refresh_schema(self) -> None: """ 刷新 Neptune 图模式信息。 """ types = { "str": "STRING", "float": "DOUBLE", "int": "INTEGER", "list": "LIST", "dict": "MAP", "bool": "BOOLEAN", } n_labels, e_labels = self._get_labels() triple_schema = self._get_triples(e_labels) node_properties = self._get_node_properties(n_labels, types) edge_properties = self._get_edge_properties(e_labels, types) self.schema = f""" Node properties are the following: {node_properties} Relationship properties are the following: {edge_properties} The relationships are the following: {triple_schema} """
[docs]class NeptuneAnalyticsGraph(BaseNeptuneGraph): """Neptune Analytics用于图操作的包装器。 参数: client: 可选的boto3 Neptune客户端 credentials_profile_name: 可选的AWS配置文件名称 region_name: 可选的AWS区域,例如,us-west-2 graph_identifier: Neptune Analytics图的图标识符 示例: .. code-block:: python graph = NeptuneAnalyticsGraph( graph_identifier='<my-graph-id>' ) *安全提示*: 确保数据库连接使用的凭据仅限于包含必要权限。 如果未能这样做,可能会导致数据损坏或丢失,因为调用代码可能会尝试命令,这些命令将导致删除、变异数据(如果适当提示)或读取敏感数据(如果数据库中存在此类数据)。 防范这些负面结果的最佳方法是(根据需要)限制授予此工具使用的凭据的权限。 有关更多信息,请参阅 https://python.langchain.com/docs/security。"""
[docs] def __init__( self, graph_identifier: str, client: Any = None, credentials_profile_name: Optional[str] = None, region_name: Optional[str] = None, ) -> None: """创建一个新的Neptune Analytics图形包装器实例。""" try: if client is not None: self.client = client else: import boto3 if credentials_profile_name is not None: session = boto3.Session(profile_name=credentials_profile_name) else: # use default credentials session = boto3.Session() self.graph_identifier = graph_identifier if region_name: self.client = session.client( "neptune-graph", region_name=region_name ) else: self.client = session.client("neptune-graph") except ImportError: raise ImportError( "Could not import boto3 python package. " "Please install it with `pip install boto3`." ) except Exception as e: if type(e).__name__ == "UnknownServiceError": raise ImportError( "NeptuneGraph requires a boto3 version 1.34.40 or greater." "Please install it with `pip install -U boto3`." ) from e else: raise ValueError( "Could not load credentials to authenticate with AWS client. " "Please check that credentials in the specified " "profile name are valid." ) from e try: self._refresh_schema() except Exception as e: raise NeptuneQueryException( { "message": "Could not get schema for Neptune database", "detail": str(e), } )
[docs] def query(self, query: str, params: dict = {}) -> Dict[str, Any]: """查询海王星数据库。""" try: resp = self.client.execute_query( graphIdentifier=self.graph_identifier, queryString=query, parameters=params, language="OPEN_CYPHER", ) return json.loads(resp["payload"].read().decode("UTF-8"))["results"] except Exception as e: raise NeptuneQueryException( { "message": "An error occurred while executing the query.", "details": str(e), } )
def _get_summary(self) -> Dict: try: response = self.client.get_graph_summary( graphIdentifier=self.graph_identifier, mode="detailed" ) except Exception as e: raise NeptuneQueryException( { "message": ("Summary API error occurred on Neptune Analytics"), "details": str(e), } ) try: summary = response["graphSummary"] except Exception: raise NeptuneQueryException( { "message": "Summary API did not return a valid response.", "details": response.content.decode(), } ) else: return summary
[docs]class NeptuneGraph(BaseNeptuneGraph): """海王星图操作的封装器。 参数: host: 数据库实例的端点 port: 数据库实例的端口号,默认为8182 use_https: 是否使用安全连接,默认为True client: 可选的boto3 Neptune客户端 credentials_profile_name: 可选的AWS配置文件名称 region_name: 可选的AWS区域,例如us-west-2 service: 可选的服务名称,默认为neptunedata sign: 可选的,是否对请求有效载荷进行签名,默认为True 示例: .. code-block:: python graph = NeptuneGraph( host='<my-cluster>', port=8182 ) *安全提示*: 确保数据库连接使用的凭据范围仅包括必要的权限。 如果未能这样做,可能会导致数据损坏或丢失,因为调用代码可能会尝试执行命令, 这些命令将导致删除、变异数据(如果适当提示)或读取敏感数据(如果数据库中存在此类数据)。 防范这些负面结果的最佳方法是(视情况)限制授予此工具使用的凭据的权限。 有关更多信息,请参见https://python.langchain.com/docs/security。"""
[docs] def __init__( self, host: str, port: int = 8182, use_https: bool = True, client: Any = None, credentials_profile_name: Optional[str] = None, region_name: Optional[str] = None, sign: bool = True, ) -> None: """创建一个新的 Neptune 图包装器实例。""" try: if client is not None: self.client = client else: import boto3 if credentials_profile_name is not None: session = boto3.Session(profile_name=credentials_profile_name) else: # use default credentials session = boto3.Session() client_params = {} if region_name: client_params["region_name"] = region_name protocol = "https" if use_https else "http" client_params["endpoint_url"] = f"{protocol}://{host}:{port}" if sign: self.client = session.client("neptunedata", **client_params) else: from botocore import UNSIGNED from botocore.config import Config self.client = session.client( "neptunedata", **client_params, config=Config(signature_version=UNSIGNED), ) except ImportError: raise ImportError( "Could not import boto3 python package. " "Please install it with `pip install boto3`." ) except Exception as e: if type(e).__name__ == "UnknownServiceError": raise ImportError( "NeptuneGraph requires a boto3 version 1.28.38 or greater." "Please install it with `pip install -U boto3`." ) from e else: raise ValueError( "Could not load credentials to authenticate with AWS client. " "Please check that credentials in the specified " "profile name are valid." ) from e try: self._refresh_schema() except Exception as e: raise NeptuneQueryException( { "message": "Could not get schema for Neptune database", "detail": str(e), } )
[docs] def query(self, query: str, params: dict = {}) -> Dict[str, Any]: """查询海王星数据库。""" try: return self.client.execute_open_cypher_query(openCypherQuery=query)[ "results" ] except Exception as e: raise NeptuneQueryException( { "message": "An error occurred while executing the query.", "details": str(e), } )
def _get_summary(self) -> Dict: try: response = self.client.get_propertygraph_summary() except Exception as e: raise NeptuneQueryException( { "message": ( "Summary API is not available for this instance of Neptune," "ensure the engine version is >=1.2.1.0" ), "details": str(e), } ) try: summary = response["payload"]["graphSummary"] except Exception: raise NeptuneQueryException( { "message": "Summary API did not return a valid response.", "details": response.content.decode(), } ) else: return summary