Source code for langchain_community.chat_loaders.slack

import json
import logging
import re
import zipfile
from pathlib import Path
from typing import Dict, Iterator, List, Union

from langchain_core.chat_loaders import BaseChatLoader
from langchain_core.chat_sessions import ChatSession
from langchain_core.messages import AIMessage, HumanMessage

logger = logging.getLogger(__name__)


[docs]class SlackChatLoader(BaseChatLoader): """从一个压缩文件中加载`Slack`对话。"""
[docs] def __init__( self, path: Union[str, Path], ): """使用导出的Slack转储zip文件的路径初始化聊天加载程序。 :param path: 导出的Slack转储zip文件的路径。 """ self.zip_path = path if isinstance(path, Path) else Path(path) if not self.zip_path.exists(): raise FileNotFoundError(f"File {self.zip_path} not found")
def _load_single_chat_session(self, messages: List[Dict]) -> ChatSession: results: List[Union[AIMessage, HumanMessage]] = [] previous_sender = None for message in messages: if not isinstance(message, dict): continue text = message.get("text", "") timestamp = message.get("ts", "") sender = message.get("user", "") if not sender: continue skip_pattern = re.compile( r"<@U\d+> has joined the channel", flags=re.IGNORECASE ) if skip_pattern.match(text): continue if sender == previous_sender: results[-1].content += "\n\n" + text results[-1].additional_kwargs["events"].append( {"message_time": timestamp} ) else: results.append( HumanMessage( # type: ignore[call-arg] role=sender, content=text, additional_kwargs={ "sender": sender, "events": [{"message_time": timestamp}], }, ) ) previous_sender = sender return ChatSession(messages=results) def _read_json(self, zip_file: zipfile.ZipFile, file_path: str) -> List[dict]: """从zip子文件中读取JSON数据。""" with zip_file.open(file_path, "r") as f: data = json.load(f) if not isinstance(data, list): raise ValueError(f"Expected list of dictionaries, got {type(data)}") return data
[docs] def lazy_load(self) -> Iterator[ChatSession]: """从Slack转储文件中惰性加载聊天会话,并以所需格式生成它们。 :return: 包含消息的聊天会话的迭代器。 """ with zipfile.ZipFile(str(self.zip_path), "r") as zip_file: for file_path in zip_file.namelist(): if file_path.endswith(".json"): messages = self._read_json(zip_file, file_path) yield self._load_single_chat_session(messages)