Source code for langchain_community.document_loaders.slack_directory
import json
import zipfile
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Union
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
[docs]class SlackDirectoryLoader(BaseLoader):
"""从`Slack`目录转储中加载。"""
[docs] def __init__(self, zip_path: Union[str, Path], workspace_url: Optional[str] = None):
"""初始化SlackDirectoryLoader。
参数:
zip_path(str):Slack目录转储zip文件的路径。
workspace_url(可选[str]):Slack工作区URL。
包括URL将转换
源为链接。默认为None。
"""
self.zip_path = Path(zip_path)
self.workspace_url = workspace_url
self.channel_id_map = self._get_channel_id_map(self.zip_path)
@staticmethod
def _get_channel_id_map(zip_path: Path) -> Dict[str, str]:
"""获取一个将频道名称映射到它们对应ID的字典。"""
with zipfile.ZipFile(zip_path, "r") as zip_file:
try:
with zip_file.open("channels.json", "r") as f:
channels = json.load(f)
return {channel["name"]: channel["id"] for channel in channels}
except KeyError:
return {}
[docs] def lazy_load(self) -> Iterator[Document]:
"""从Slack目录转储中加载并返回文档。"""
with zipfile.ZipFile(self.zip_path, "r") as zip_file:
for channel_path in zip_file.namelist():
channel_name = Path(channel_path).parent.name
if not channel_name:
continue
if channel_path.endswith(".json"):
messages = self._read_json(zip_file, channel_path)
for message in messages:
yield self._convert_message_to_document(message, channel_name)
def _read_json(self, zip_file: zipfile.ZipFile, file_path: str) -> List[dict]:
"""从zip子文件中读取JSON数据。"""
with zip_file.open(file_path, "r") as f:
data = json.load(f)
return data
def _convert_message_to_document(
self, message: dict, channel_name: str
) -> Document:
"""将消息转换为文档对象。
参数:
message (dict): 以字典形式表示的消息。
channel_name (str): 消息所属频道的名称。
返回:
Document: 表示消息的文档对象。
"""
text = message.get("text", "")
metadata = self._get_message_metadata(message, channel_name)
return Document(
page_content=text,
metadata=metadata,
)
def _get_message_metadata(self, message: dict, channel_name: str) -> dict:
"""创建并返回给定消息和频道的元数据。"""
timestamp = message.get("ts", "")
user = message.get("user", "")
source = self._get_message_source(channel_name, user, timestamp)
return {
"source": source,
"channel": channel_name,
"timestamp": timestamp,
"user": user,
}
def _get_message_source(self, channel_name: str, user: str, timestamp: str) -> str:
"""获取消息源作为字符串。
参数:
channel_name (str): 消息所属频道的名称。
user (str): 发送消息的用户ID。
timestamp (str): 消息的时间戳。
返回:
str: 消息源。
"""
if self.workspace_url:
channel_id = self.channel_id_map.get(channel_name, "")
return (
f"{self.workspace_url}/archives/{channel_id}"
+ f"/p{timestamp.replace('.', '')}"
)
else:
return f"{channel_name} - {user} - {timestamp}"