import json
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuple, Union
[docs]class NeptuneQueryException(Exception):
"""Neptune查询的异常。"""
def __init__(self, exception: Union[str, Dict]):
if isinstance(exception, dict):
self.message = exception["message"] if "message" in exception else "unknown"
self.details = exception["details"] if "details" in exception else "unknown"
else:
self.message = exception
self.details = "unknown"
def get_message(self) -> str:
return self.message
def get_details(self) -> Any:
return self.details
[docs]class BaseNeptuneGraph(ABC):
"""Neptune的抽象基类"""
@property
def get_schema(self) -> str:
"""返回 Neptune 数据库的模式"""
return self.schema
[docs] @abstractmethod
def query(self, query: str, params: dict = {}) -> dict:
raise NotImplementedError()
@abstractmethod
def _get_summary(self) -> Dict:
raise NotImplementedError()
def _get_labels(self) -> Tuple[List[str], List[str]]:
"""从Neptune统计摘要中获取节点和边标签"""
summary = self._get_summary()
n_labels = summary["nodeLabels"]
e_labels = summary["edgeLabels"]
return n_labels, e_labels
def _get_triples(self, e_labels: List[str]) -> List[str]:
triple_query = """
MATCH (a)-[e:`{e_label}`]->(b)
WITH a,e,b LIMIT 3000
RETURN DISTINCT labels(a) AS from, type(e) AS edge, labels(b) AS to
LIMIT 10
"""
triple_template = "(:`{a}`)-[:`{e}`]->(:`{b}`)"
triple_schema = []
for label in e_labels:
q = triple_query.format(e_label=label)
data = self.query(q)
for d in data:
triple = triple_template.format(
a=d["from"][0], e=d["edge"], b=d["to"][0]
)
triple_schema.append(triple)
return triple_schema
def _get_node_properties(self, n_labels: List[str], types: Dict) -> List:
node_properties_query = """
MATCH (a:`{n_label}`)
RETURN properties(a) AS props
LIMIT 100
"""
node_properties = []
for label in n_labels:
q = node_properties_query.format(n_label=label)
data = {"label": label, "properties": self.query(q)}
s = set({})
for p in data["properties"]:
for k, v in p["props"].items():
s.add((k, types[type(v).__name__]))
np = {
"properties": [{"property": k, "type": v} for k, v in s],
"labels": label,
}
node_properties.append(np)
return node_properties
def _get_edge_properties(self, e_labels: List[str], types: Dict[str, Any]) -> List:
edge_properties_query = """
MATCH ()-[e:`{e_label}`]->()
RETURN properties(e) AS props
LIMIT 100
"""
edge_properties = []
for label in e_labels:
q = edge_properties_query.format(e_label=label)
data = {"label": label, "properties": self.query(q)}
s = set({})
for p in data["properties"]:
for k, v in p["props"].items():
s.add((k, types[type(v).__name__]))
ep = {
"type": label,
"properties": [{"property": k, "type": v} for k, v in s],
}
edge_properties.append(ep)
return edge_properties
def _refresh_schema(self) -> None:
"""
刷新 Neptune 图模式信息。
"""
types = {
"str": "STRING",
"float": "DOUBLE",
"int": "INTEGER",
"list": "LIST",
"dict": "MAP",
"bool": "BOOLEAN",
}
n_labels, e_labels = self._get_labels()
triple_schema = self._get_triples(e_labels)
node_properties = self._get_node_properties(n_labels, types)
edge_properties = self._get_edge_properties(e_labels, types)
self.schema = f"""
Node properties are the following:
{node_properties}
Relationship properties are the following:
{edge_properties}
The relationships are the following:
{triple_schema}
"""
[docs]class NeptuneAnalyticsGraph(BaseNeptuneGraph):
"""Neptune Analytics用于图操作的包装器。
参数:
client: 可选的boto3 Neptune客户端
credentials_profile_name: 可选的AWS配置文件名称
region_name: 可选的AWS区域,例如,us-west-2
graph_identifier: Neptune Analytics图的图标识符
示例:
.. code-block:: python
graph = NeptuneAnalyticsGraph(
graph_identifier='<my-graph-id>'
)
*安全提示*: 确保数据库连接使用的凭据仅限于包含必要权限。
如果未能这样做,可能会导致数据损坏或丢失,因为调用代码可能会尝试命令,这些命令将导致删除、变异数据(如果适当提示)或读取敏感数据(如果数据库中存在此类数据)。
防范这些负面结果的最佳方法是(根据需要)限制授予此工具使用的凭据的权限。
有关更多信息,请参阅 https://python.langchain.com/docs/security。"""
[docs] def __init__(
self,
graph_identifier: str,
client: Any = None,
credentials_profile_name: Optional[str] = None,
region_name: Optional[str] = None,
) -> None:
"""创建一个新的Neptune Analytics图形包装器实例。"""
try:
if client is not None:
self.client = client
else:
import boto3
if credentials_profile_name is not None:
session = boto3.Session(profile_name=credentials_profile_name)
else:
# use default credentials
session = boto3.Session()
self.graph_identifier = graph_identifier
if region_name:
self.client = session.client(
"neptune-graph", region_name=region_name
)
else:
self.client = session.client("neptune-graph")
except ImportError:
raise ImportError(
"Could not import boto3 python package. "
"Please install it with `pip install boto3`."
)
except Exception as e:
if type(e).__name__ == "UnknownServiceError":
raise ImportError(
"NeptuneGraph requires a boto3 version 1.34.40 or greater."
"Please install it with `pip install -U boto3`."
) from e
else:
raise ValueError(
"Could not load credentials to authenticate with AWS client. "
"Please check that credentials in the specified "
"profile name are valid."
) from e
try:
self._refresh_schema()
except Exception as e:
raise NeptuneQueryException(
{
"message": "Could not get schema for Neptune database",
"detail": str(e),
}
)
[docs] def query(self, query: str, params: dict = {}) -> Dict[str, Any]:
"""查询海王星数据库。"""
try:
resp = self.client.execute_query(
graphIdentifier=self.graph_identifier,
queryString=query,
parameters=params,
language="OPEN_CYPHER",
)
return json.loads(resp["payload"].read().decode("UTF-8"))["results"]
except Exception as e:
raise NeptuneQueryException(
{
"message": "An error occurred while executing the query.",
"details": str(e),
}
)
def _get_summary(self) -> Dict:
try:
response = self.client.get_graph_summary(
graphIdentifier=self.graph_identifier, mode="detailed"
)
except Exception as e:
raise NeptuneQueryException(
{
"message": ("Summary API error occurred on Neptune Analytics"),
"details": str(e),
}
)
try:
summary = response["graphSummary"]
except Exception:
raise NeptuneQueryException(
{
"message": "Summary API did not return a valid response.",
"details": response.content.decode(),
}
)
else:
return summary
[docs]class NeptuneGraph(BaseNeptuneGraph):
"""海王星图操作的封装器。
参数:
host: 数据库实例的端点
port: 数据库实例的端口号,默认为8182
use_https: 是否使用安全连接,默认为True
client: 可选的boto3 Neptune客户端
credentials_profile_name: 可选的AWS配置文件名称
region_name: 可选的AWS区域,例如us-west-2
service: 可选的服务名称,默认为neptunedata
sign: 可选的,是否对请求有效载荷进行签名,默认为True
示例:
.. code-block:: python
graph = NeptuneGraph(
host='<my-cluster>',
port=8182
)
*安全提示*: 确保数据库连接使用的凭据范围仅包括必要的权限。
如果未能这样做,可能会导致数据损坏或丢失,因为调用代码可能会尝试执行命令,
这些命令将导致删除、变异数据(如果适当提示)或读取敏感数据(如果数据库中存在此类数据)。
防范这些负面结果的最佳方法是(视情况)限制授予此工具使用的凭据的权限。
有关更多信息,请参见https://python.langchain.com/docs/security。"""
[docs] def __init__(
self,
host: str,
port: int = 8182,
use_https: bool = True,
client: Any = None,
credentials_profile_name: Optional[str] = None,
region_name: Optional[str] = None,
sign: bool = True,
) -> None:
"""创建一个新的 Neptune 图包装器实例。"""
try:
if client is not None:
self.client = client
else:
import boto3
if credentials_profile_name is not None:
session = boto3.Session(profile_name=credentials_profile_name)
else:
# use default credentials
session = boto3.Session()
client_params = {}
if region_name:
client_params["region_name"] = region_name
protocol = "https" if use_https else "http"
client_params["endpoint_url"] = f"{protocol}://{host}:{port}"
if sign:
self.client = session.client("neptunedata", **client_params)
else:
from botocore import UNSIGNED
from botocore.config import Config
self.client = session.client(
"neptunedata",
**client_params,
config=Config(signature_version=UNSIGNED),
)
except ImportError:
raise ImportError(
"Could not import boto3 python package. "
"Please install it with `pip install boto3`."
)
except Exception as e:
if type(e).__name__ == "UnknownServiceError":
raise ImportError(
"NeptuneGraph requires a boto3 version 1.28.38 or greater."
"Please install it with `pip install -U boto3`."
) from e
else:
raise ValueError(
"Could not load credentials to authenticate with AWS client. "
"Please check that credentials in the specified "
"profile name are valid."
) from e
try:
self._refresh_schema()
except Exception as e:
raise NeptuneQueryException(
{
"message": "Could not get schema for Neptune database",
"detail": str(e),
}
)
[docs] def query(self, query: str, params: dict = {}) -> Dict[str, Any]:
"""查询海王星数据库。"""
try:
return self.client.execute_open_cypher_query(openCypherQuery=query)[
"results"
]
except Exception as e:
raise NeptuneQueryException(
{
"message": "An error occurred while executing the query.",
"details": str(e),
}
)
def _get_summary(self) -> Dict:
try:
response = self.client.get_propertygraph_summary()
except Exception as e:
raise NeptuneQueryException(
{
"message": (
"Summary API is not available for this instance of Neptune,"
"ensure the engine version is >=1.2.1.0"
),
"details": str(e),
}
)
try:
summary = response["payload"]["graphSummary"]
except Exception:
raise NeptuneQueryException(
{
"message": "Summary API did not return a valid response.",
"details": response.content.decode(),
}
)
else:
return summary