Source code for langchain_community.document_loaders.notiondb
from typing import Any, Dict, List, Optional
import requests
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
NOTION_BASE_URL = "https://api.notion.com/v1"
DATABASE_URL = NOTION_BASE_URL + "/databases/{database_id}/query"
PAGE_URL = NOTION_BASE_URL + "/pages/{page_id}"
BLOCK_URL = NOTION_BASE_URL + "/blocks/{block_id}/children"
[docs]class NotionDBLoader(BaseLoader):
"""从`Notion DB`加载。
从Notion数据库中的页面读取内容。
参数:
integration_token (str): Notion集成令牌。
database_id (str): Notion数据库ID。
request_timeout_sec (int): Notion请求的超时时间,单位为秒。
默认为10。
filter_object (Dict[str, Any]): 用于根据指定条件限制返回条目的过滤对象。
例如: {
"timestamp": "last_edited_time",
"last_edited_time": {
"on_or_after": "2024-02-07"
}
} -> 仅返回在2024-02-07或之后最后编辑的条目。
Notion文档: https://developers.notion.com/reference/post-database-query-filter
默认为None,将返回所有条目。"""
[docs] def __init__(
self,
integration_token: str,
database_id: str,
request_timeout_sec: Optional[int] = 10,
*,
filter_object: Optional[Dict[str, Any]] = None,
) -> None:
"""使用参数进行初始化。"""
if not integration_token:
raise ValueError("integration_token must be provided")
if not database_id:
raise ValueError("database_id must be provided")
self.token = integration_token
self.database_id = database_id
self.headers = {
"Authorization": "Bearer " + self.token,
"Content-Type": "application/json",
"Notion-Version": "2022-06-28",
}
self.request_timeout_sec = request_timeout_sec
self.filter_object = filter_object or {}
[docs] def load(self) -> List[Document]:
"""从Notion数据库加载文档。
返回:
List[Document]: 文档列表。
"""
page_summaries = self._retrieve_page_summaries()
return list(self.load_page(page_summary) for page_summary in page_summaries)
def _retrieve_page_summaries(
self, query_dict: Dict[str, Any] = {"page_size": 100}
) -> List[Dict[str, Any]]:
"""从Notion数据库中获取所有页面
或根据指定条件进行筛选。
"""
pages: List[Dict[str, Any]] = []
while True:
data = self._request(
DATABASE_URL.format(database_id=self.database_id),
method="POST",
query_dict=query_dict,
filter_object=self.filter_object,
)
pages.extend(data.get("results"))
if not data.get("has_more"):
break
query_dict["start_cursor"] = data.get("next_cursor")
return pages
[docs] def load_page(self, page_summary: Dict[str, Any]) -> Document:
"""阅读一页。
参数:
page_summary:来自Notion API的页面摘要。
"""
page_id = page_summary["id"]
# load properties as metadata
metadata: Dict[str, Any] = {}
for prop_name, prop_data in page_summary["properties"].items():
prop_type = prop_data["type"]
if prop_type == "rich_text":
value = (
prop_data["rich_text"][0]["plain_text"]
if prop_data["rich_text"]
else None
)
elif prop_type == "title":
value = (
prop_data["title"][0]["plain_text"] if prop_data["title"] else None
)
elif prop_type == "multi_select":
value = (
[item["name"] for item in prop_data["multi_select"]]
if prop_data["multi_select"]
else []
)
elif prop_type == "url":
value = prop_data["url"]
elif prop_type == "unique_id":
value = (
f'{prop_data["unique_id"]["prefix"]}-{prop_data["unique_id"]["number"]}'
if prop_data["unique_id"]
else None
)
elif prop_type == "status":
value = prop_data["status"]["name"] if prop_data["status"] else None
elif prop_type == "people":
value = (
[item["name"] for item in prop_data["people"]]
if prop_data["people"]
else []
)
elif prop_type == "date":
value = prop_data["date"] if prop_data["date"] else None
elif prop_type == "last_edited_time":
value = (
prop_data["last_edited_time"]
if prop_data["last_edited_time"]
else None
)
elif prop_type == "created_time":
value = prop_data["created_time"] if prop_data["created_time"] else None
elif prop_type == "checkbox":
value = prop_data["checkbox"]
elif prop_type == "email":
value = prop_data["email"]
elif prop_type == "number":
value = prop_data["number"]
elif prop_type == "select":
value = prop_data["select"]["name"] if prop_data["select"] else None
else:
value = None
metadata[prop_name.lower()] = value
metadata["id"] = page_id
return Document(page_content=self._load_blocks(page_id), metadata=metadata)
def _load_blocks(self, block_id: str, num_tabs: int = 0) -> str:
"""读取一个块及其子元素。"""
result_lines_arr: List[str] = []
cur_block_id: str = block_id
while cur_block_id:
data = self._request(BLOCK_URL.format(block_id=cur_block_id))
for result in data["results"]:
result_obj = result[result["type"]]
if "rich_text" not in result_obj:
continue
cur_result_text_arr: List[str] = []
for rich_text in result_obj["rich_text"]:
if "text" in rich_text:
cur_result_text_arr.append(
"\t" * num_tabs + rich_text["text"]["content"]
)
if result["has_children"]:
children_text = self._load_blocks(
result["id"], num_tabs=num_tabs + 1
)
cur_result_text_arr.append(children_text)
result_lines_arr.append("\n".join(cur_result_text_arr))
cur_block_id = data.get("next_cursor")
return "\n".join(result_lines_arr)
def _request(
self,
url: str,
method: str = "GET",
query_dict: Dict[str, Any] = {},
*,
filter_object: Optional[Dict[str, Any]] = None,
) -> Any:
json_payload = query_dict.copy()
if filter_object:
json_payload["filter"] = filter_object
res = requests.request(
method,
url,
headers=self.headers,
json=json_payload,
timeout=self.request_timeout_sec,
)
res.raise_for_status()
return res.json()