Source code for langchain_community.graphs.neptune_rdf_graph

import json
from types import SimpleNamespace
from typing import Any, Dict, Optional, Sequence

import requests

# Query to find OWL datatype properties
DTPROP_QUERY = """
SELECT DISTINCT ?elem 
WHERE { 
 ?elem a owl:DatatypeProperty . 
}
"""

# Query to find OWL object properties
OPROP_QUERY = """
SELECT DISTINCT ?elem 
WHERE { 
 ?elem a owl:ObjectProperty . 
}
"""

ELEM_TYPES = {
    "classes": None,
    "rels": None,
    "dtprops": DTPROP_QUERY,
    "oprops": OPROP_QUERY,
}


[docs]class NeptuneRdfGraph: """Neptune用于RDF图操作的包装器。 Args: host: 数据库实例的端点 port: 数据库实例的端口号,默认为8182 use_iam_auth: 布尔值,指示Neptune集群中是否启用了IAM身份验证 use_https: 是否使用安全连接,默认为True client: 可选的boto3 Neptune客户端 credentials_profile_name: 可选的AWS配置文件名称 region_name: 可选的AWS区域,例如,us-west-2 service: 可选的服务名称,默认为neptunedata sign: 可选的,是否对请求有效载荷进行签名,默认为True 示例: .. code-block:: python graph = NeptuneRdfGraph( host='<SPARQL host'>, port=<SPARQL port> ) schema = graph.get_schema() graph = NeptuneRdfGraph( host='<SPARQL host'>, port=<SPARQL port> ) schema_elem = graph.get_schema_elements() #... 更改schema_elements ... graph.load_schema(schema_elem) *安全注意事项*: 确保数据库连接使用的凭据仅限于包含必要权限。如果未这样做,可能会导致数据损坏或丢失,因为调用代码可能会尝试执行会导致删除、变异数据的命令,如果适当提示或读取敏感数据,如果数据库中存在这样的数据。防范这些负面结果的最佳方法是(根据需要)限制授予此工具使用的凭据的权限。 有关更多信息,请参见https://python.langchain.com/docs/security。"""
[docs] def __init__( self, host: str, port: int = 8182, use_https: bool = True, use_iam_auth: bool = False, client: Any = None, credentials_profile_name: Optional[str] = None, region_name: Optional[str] = None, service: str = "neptunedata", sign: bool = True, ) -> None: self.use_iam_auth = use_iam_auth self.region_name = region_name self.query_endpoint = f"https://{host}:{port}/sparql" try: if client is not None: self.client = client else: import boto3 if credentials_profile_name is not None: self.session = boto3.Session(profile_name=credentials_profile_name) else: # use default credentials self.session = boto3.Session() client_params = {} if region_name: client_params["region_name"] = region_name protocol = "https" if use_https else "http" client_params["endpoint_url"] = f"{protocol}://{host}:{port}" if sign: self.client = self.session.client(service, **client_params) else: from botocore import UNSIGNED from botocore.config import Config self.client = self.session.client( service, **client_params, config=Config(signature_version=UNSIGNED), ) except ImportError: raise ImportError( "Could not import boto3 python package. " "Please install it with `pip install boto3`." ) except Exception as e: if type(e).__name__ == "UnknownServiceError": raise ImportError( "NeptuneGraph requires a boto3 version 1.28.38 or greater." "Please install it with `pip install -U boto3`." ) from e else: raise ValueError( "Could not load credentials to authenticate with AWS client. " "Please check that credentials in the specified " "profile name are valid." ) from e # Set schema self.schema = "" self.schema_elements: Dict[str, Any] = {} self._refresh_schema()
@property def get_schema(self) -> str: """ 返回图数据库的模式。 """ return self.schema @property def get_schema_elements(self) -> Dict[str, Any]: return self.schema_elements
[docs] def get_summary(self) -> Dict[str, Any]: """ 获取图中类和谓词的Neptune统计摘要。 """ return self.client.get_rdf_graph_summary(mode="detailed")
[docs] def query( self, query: str, ) -> Dict[str, Any]: """ 运行 Neptune 查询。 """ request_data = {"query": query} data = request_data request_hdr = None if self.use_iam_auth: credentials = self.session.get_credentials() credentials = credentials.get_frozen_credentials() access_key = credentials.access_key secret_key = credentials.secret_key service = "neptune-db" session_token = credentials.token params = None creds = SimpleNamespace( access_key=access_key, secret_key=secret_key, token=session_token, region=self.region_name, ) from botocore.awsrequest import AWSRequest request = AWSRequest( method="POST", url=self.query_endpoint, data=data, params=params ) from botocore.auth import SigV4Auth SigV4Auth(creds, service, self.region_name).add_auth(request) request.headers["Content-Type"] = "application/x-www-form-urlencoded" request_hdr = request.headers else: request_hdr = {} request_hdr["Content-Type"] = "application/x-www-form-urlencoded" queryres = requests.request( method="POST", url=self.query_endpoint, headers=request_hdr, data=data ) json_resp = json.loads(queryres.text) return json_resp
[docs] def load_schema(self, schema_elements: Dict[str, Any]) -> None: """从schema_elements生成并设置模式。在需要修剪内省模式的情况下很有帮助。 """ elem_str = {} for elem in ELEM_TYPES: res_list = [] for elem_rec in schema_elements[elem]: uri = elem_rec["uri"] local = elem_rec["local"] res_str = f"<{uri}> ({local})" res_list.append(res_str) elem_str[elem] = ", ".join(res_list) self.schema = ( "In the following, each IRI is followed by the local name and " "optionally its description in parentheses. \n" "The graph supports the following node types:\n" f"{elem_str['classes']}\n" "The graph supports the following relationships:\n" f"{elem_str['rels']}\n" "The graph supports the following OWL object properties:\n" f"{elem_str['dtprops']}\n" "The graph supports the following OWL data properties:\n" f"{elem_str['oprops']}" )
def _get_local_name(self, iri: str) -> Sequence[str]: """ 将IRI拆分为前缀和本地部分 """ if "#" in iri: tokens = iri.split("#") return [f"{tokens[0]}#", tokens[-1]] elif "/" in iri: tokens = iri.split("/") return [f"{'/'.join(tokens[0:len(tokens)-1])}/", tokens[-1]] else: raise ValueError(f"Unexpected IRI '{iri}', contains neither '#' nor '/'.") def _refresh_schema(self) -> None: """ 查询 Neptune 来审查模式。 """ self.schema_elements["distinct_prefixes"] = {} # get summary and build list of classes and rels summary = self.get_summary() reslist = [] for c in summary["payload"]["graphSummary"]["classes"]: uri = c tokens = self._get_local_name(uri) elem_record = {"uri": uri, "local": tokens[1]} reslist.append(elem_record) if tokens[0] not in self.schema_elements["distinct_prefixes"]: self.schema_elements["distinct_prefixes"][tokens[0]] = "y" self.schema_elements["classes"] = reslist reslist = [] for r in summary["payload"]["graphSummary"]["predicates"]: for p in r: uri = p tokens = self._get_local_name(uri) elem_record = {"uri": uri, "local": tokens[1]} reslist.append(elem_record) if tokens[0] not in self.schema_elements["distinct_prefixes"]: self.schema_elements["distinct_prefixes"][tokens[0]] = "y" self.schema_elements["rels"] = reslist # get dtprops and oprops too for elem in ELEM_TYPES: q = ELEM_TYPES.get(elem) if not q: continue items = self.query(q) reslist = [] for r in items["results"]["bindings"]: uri = r["elem"]["value"] tokens = self._get_local_name(uri) elem_record = {"uri": uri, "local": tokens[1]} reslist.append(elem_record) if tokens[0] not in self.schema_elements["distinct_prefixes"]: self.schema_elements["distinct_prefixes"][tokens[0]] = "y" self.schema_elements[elem] = reslist self.load_schema(self.schema_elements)