import json
from types import SimpleNamespace
from typing import Any, Dict, Optional, Sequence
import requests
# Query to find OWL datatype properties
DTPROP_QUERY = """
SELECT DISTINCT ?elem
WHERE {
?elem a owl:DatatypeProperty .
}
"""
# Query to find OWL object properties
OPROP_QUERY = """
SELECT DISTINCT ?elem
WHERE {
?elem a owl:ObjectProperty .
}
"""
ELEM_TYPES = {
"classes": None,
"rels": None,
"dtprops": DTPROP_QUERY,
"oprops": OPROP_QUERY,
}
[docs]class NeptuneRdfGraph:
"""Neptune用于RDF图操作的包装器。
Args:
host: 数据库实例的端点
port: 数据库实例的端口号,默认为8182
use_iam_auth: 布尔值,指示Neptune集群中是否启用了IAM身份验证
use_https: 是否使用安全连接,默认为True
client: 可选的boto3 Neptune客户端
credentials_profile_name: 可选的AWS配置文件名称
region_name: 可选的AWS区域,例如,us-west-2
service: 可选的服务名称,默认为neptunedata
sign: 可选的,是否对请求有效载荷进行签名,默认为True
示例:
.. code-block:: python
graph = NeptuneRdfGraph(
host='<SPARQL host'>,
port=<SPARQL port>
)
schema = graph.get_schema()
或
graph = NeptuneRdfGraph(
host='<SPARQL host'>,
port=<SPARQL port>
)
schema_elem = graph.get_schema_elements()
#... 更改schema_elements ...
graph.load_schema(schema_elem)
*安全注意事项*: 确保数据库连接使用的凭据仅限于包含必要权限。如果未这样做,可能会导致数据损坏或丢失,因为调用代码可能会尝试执行会导致删除、变异数据的命令,如果适当提示或读取敏感数据,如果数据库中存在这样的数据。防范这些负面结果的最佳方法是(根据需要)限制授予此工具使用的凭据的权限。
有关更多信息,请参见https://python.langchain.com/docs/security。"""
[docs] def __init__(
self,
host: str,
port: int = 8182,
use_https: bool = True,
use_iam_auth: bool = False,
client: Any = None,
credentials_profile_name: Optional[str] = None,
region_name: Optional[str] = None,
service: str = "neptunedata",
sign: bool = True,
) -> None:
self.use_iam_auth = use_iam_auth
self.region_name = region_name
self.query_endpoint = f"https://{host}:{port}/sparql"
try:
if client is not None:
self.client = client
else:
import boto3
if credentials_profile_name is not None:
self.session = boto3.Session(profile_name=credentials_profile_name)
else:
# use default credentials
self.session = boto3.Session()
client_params = {}
if region_name:
client_params["region_name"] = region_name
protocol = "https" if use_https else "http"
client_params["endpoint_url"] = f"{protocol}://{host}:{port}"
if sign:
self.client = self.session.client(service, **client_params)
else:
from botocore import UNSIGNED
from botocore.config import Config
self.client = self.session.client(
service,
**client_params,
config=Config(signature_version=UNSIGNED),
)
except ImportError:
raise ImportError(
"Could not import boto3 python package. "
"Please install it with `pip install boto3`."
)
except Exception as e:
if type(e).__name__ == "UnknownServiceError":
raise ImportError(
"NeptuneGraph requires a boto3 version 1.28.38 or greater."
"Please install it with `pip install -U boto3`."
) from e
else:
raise ValueError(
"Could not load credentials to authenticate with AWS client. "
"Please check that credentials in the specified "
"profile name are valid."
) from e
# Set schema
self.schema = ""
self.schema_elements: Dict[str, Any] = {}
self._refresh_schema()
@property
def get_schema(self) -> str:
"""
返回图数据库的模式。
"""
return self.schema
@property
def get_schema_elements(self) -> Dict[str, Any]:
return self.schema_elements
[docs] def get_summary(self) -> Dict[str, Any]:
"""
获取图中类和谓词的Neptune统计摘要。
"""
return self.client.get_rdf_graph_summary(mode="detailed")
[docs] def query(
self,
query: str,
) -> Dict[str, Any]:
"""
运行 Neptune 查询。
"""
request_data = {"query": query}
data = request_data
request_hdr = None
if self.use_iam_auth:
credentials = self.session.get_credentials()
credentials = credentials.get_frozen_credentials()
access_key = credentials.access_key
secret_key = credentials.secret_key
service = "neptune-db"
session_token = credentials.token
params = None
creds = SimpleNamespace(
access_key=access_key,
secret_key=secret_key,
token=session_token,
region=self.region_name,
)
from botocore.awsrequest import AWSRequest
request = AWSRequest(
method="POST", url=self.query_endpoint, data=data, params=params
)
from botocore.auth import SigV4Auth
SigV4Auth(creds, service, self.region_name).add_auth(request)
request.headers["Content-Type"] = "application/x-www-form-urlencoded"
request_hdr = request.headers
else:
request_hdr = {}
request_hdr["Content-Type"] = "application/x-www-form-urlencoded"
queryres = requests.request(
method="POST", url=self.query_endpoint, headers=request_hdr, data=data
)
json_resp = json.loads(queryres.text)
return json_resp
[docs] def load_schema(self, schema_elements: Dict[str, Any]) -> None:
"""从schema_elements生成并设置模式。在需要修剪内省模式的情况下很有帮助。
"""
elem_str = {}
for elem in ELEM_TYPES:
res_list = []
for elem_rec in schema_elements[elem]:
uri = elem_rec["uri"]
local = elem_rec["local"]
res_str = f"<{uri}> ({local})"
res_list.append(res_str)
elem_str[elem] = ", ".join(res_list)
self.schema = (
"In the following, each IRI is followed by the local name and "
"optionally its description in parentheses. \n"
"The graph supports the following node types:\n"
f"{elem_str['classes']}\n"
"The graph supports the following relationships:\n"
f"{elem_str['rels']}\n"
"The graph supports the following OWL object properties:\n"
f"{elem_str['dtprops']}\n"
"The graph supports the following OWL data properties:\n"
f"{elem_str['oprops']}"
)
def _get_local_name(self, iri: str) -> Sequence[str]:
"""
将IRI拆分为前缀和本地部分
"""
if "#" in iri:
tokens = iri.split("#")
return [f"{tokens[0]}#", tokens[-1]]
elif "/" in iri:
tokens = iri.split("/")
return [f"{'/'.join(tokens[0:len(tokens)-1])}/", tokens[-1]]
else:
raise ValueError(f"Unexpected IRI '{iri}', contains neither '#' nor '/'.")
def _refresh_schema(self) -> None:
"""
查询 Neptune 来审查模式。
"""
self.schema_elements["distinct_prefixes"] = {}
# get summary and build list of classes and rels
summary = self.get_summary()
reslist = []
for c in summary["payload"]["graphSummary"]["classes"]:
uri = c
tokens = self._get_local_name(uri)
elem_record = {"uri": uri, "local": tokens[1]}
reslist.append(elem_record)
if tokens[0] not in self.schema_elements["distinct_prefixes"]:
self.schema_elements["distinct_prefixes"][tokens[0]] = "y"
self.schema_elements["classes"] = reslist
reslist = []
for r in summary["payload"]["graphSummary"]["predicates"]:
for p in r:
uri = p
tokens = self._get_local_name(uri)
elem_record = {"uri": uri, "local": tokens[1]}
reslist.append(elem_record)
if tokens[0] not in self.schema_elements["distinct_prefixes"]:
self.schema_elements["distinct_prefixes"][tokens[0]] = "y"
self.schema_elements["rels"] = reslist
# get dtprops and oprops too
for elem in ELEM_TYPES:
q = ELEM_TYPES.get(elem)
if not q:
continue
items = self.query(q)
reslist = []
for r in items["results"]["bindings"]:
uri = r["elem"]["value"]
tokens = self._get_local_name(uri)
elem_record = {"uri": uri, "local": tokens[1]}
reslist.append(elem_record)
if tokens[0] not in self.schema_elements["distinct_prefixes"]:
self.schema_elements["distinct_prefixes"][tokens[0]] = "y"
self.schema_elements[elem] = reslist
self.load_schema(self.schema_elements)