Source code for langchain_community.document_loaders.slack_directory

import json
import zipfile
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Union

from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader


[docs]class SlackDirectoryLoader(BaseLoader): """从`Slack`目录转储中加载。"""
[docs] def __init__(self, zip_path: Union[str, Path], workspace_url: Optional[str] = None): """初始化SlackDirectoryLoader。 参数: zip_path(str):Slack目录转储zip文件的路径。 workspace_url(可选[str]):Slack工作区URL。 包括URL将转换 源为链接。默认为None。 """ self.zip_path = Path(zip_path) self.workspace_url = workspace_url self.channel_id_map = self._get_channel_id_map(self.zip_path)
@staticmethod def _get_channel_id_map(zip_path: Path) -> Dict[str, str]: """获取一个将频道名称映射到它们对应ID的字典。""" with zipfile.ZipFile(zip_path, "r") as zip_file: try: with zip_file.open("channels.json", "r") as f: channels = json.load(f) return {channel["name"]: channel["id"] for channel in channels} except KeyError: return {}
[docs] def lazy_load(self) -> Iterator[Document]: """从Slack目录转储中加载并返回文档。""" with zipfile.ZipFile(self.zip_path, "r") as zip_file: for channel_path in zip_file.namelist(): channel_name = Path(channel_path).parent.name if not channel_name: continue if channel_path.endswith(".json"): messages = self._read_json(zip_file, channel_path) for message in messages: yield self._convert_message_to_document(message, channel_name)
def _read_json(self, zip_file: zipfile.ZipFile, file_path: str) -> List[dict]: """从zip子文件中读取JSON数据。""" with zip_file.open(file_path, "r") as f: data = json.load(f) return data def _convert_message_to_document( self, message: dict, channel_name: str ) -> Document: """将消息转换为文档对象。 参数: message (dict): 以字典形式表示的消息。 channel_name (str): 消息所属频道的名称。 返回: Document: 表示消息的文档对象。 """ text = message.get("text", "") metadata = self._get_message_metadata(message, channel_name) return Document( page_content=text, metadata=metadata, ) def _get_message_metadata(self, message: dict, channel_name: str) -> dict: """创建并返回给定消息和频道的元数据。""" timestamp = message.get("ts", "") user = message.get("user", "") source = self._get_message_source(channel_name, user, timestamp) return { "source": source, "channel": channel_name, "timestamp": timestamp, "user": user, } def _get_message_source(self, channel_name: str, user: str, timestamp: str) -> str: """获取消息源作为字符串。 参数: channel_name (str): 消息所属频道的名称。 user (str): 发送消息的用户ID。 timestamp (str): 消息的时间戳。 返回: str: 消息源。 """ if self.workspace_url: channel_id = self.channel_id_map.get(channel_name, "") return ( f"{self.workspace_url}/archives/{channel_id}" + f"/p{timestamp.replace('.', '')}" ) else: return f"{channel_name} - {user} - {timestamp}"