Source code for langchain_community.document_loaders.telegram

from __future__ import annotations

import asyncio
import json
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, Optional, Union

from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader

if TYPE_CHECKING:
    import pandas as pd
    from telethon.hints import EntityLike


[docs]def concatenate_rows(row: dict) -> str: """将消息信息组合成一个易读的格式,准备好供使用。""" date = row["date"] sender = row["from"] text = row["text"] return f"{sender} on {date}: {text}\n\n"
[docs]class TelegramChatFileLoader(BaseLoader): """从`Telegram聊天`转储中加载。"""
[docs] def __init__(self, path: Union[str, Path]): """使用路径进行初始化。""" self.file_path = path
[docs] def load(self) -> List[Document]: """加载文档。""" p = Path(self.file_path) with open(p, encoding="utf8") as f: d = json.load(f) text = "".join( concatenate_rows(message) for message in d["messages"] if message["type"] == "message" and isinstance(message["text"], str) ) metadata = {"source": str(p)} return [Document(page_content=text, metadata=metadata)]
[docs]def text_to_docs(text: Union[str, List[str]]) -> List[Document]: """将字符串或字符串列表转换为带有元数据的文档列表。""" from langchain_text_splitters import RecursiveCharacterTextSplitter text_splitter = RecursiveCharacterTextSplitter( chunk_size=800, separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""], chunk_overlap=20, ) if isinstance(text, str): # Take a single string as one page text = [text] page_docs = [Document(page_content=page) for page in text] # Add page numbers as metadata for i, doc in enumerate(page_docs): doc.metadata["page"] = i + 1 # Split pages into chunks doc_chunks = [] for doc in page_docs: chunks = text_splitter.split_text(doc.page_content) for i, chunk in enumerate(chunks): doc = Document( page_content=chunk, metadata={"page": doc.metadata["page"], "chunk": i} ) # Add sources a metadata doc.metadata["source"] = f"{doc.metadata['page']}-{doc.metadata['chunk']}" doc_chunks.append(doc) return doc_chunks
[docs]class TelegramChatApiLoader(BaseLoader): """加载`Telegram`聊天json目录转储。"""
[docs] def __init__( self, chat_entity: Optional[EntityLike] = None, api_id: Optional[int] = None, api_hash: Optional[str] = None, username: Optional[str] = None, file_path: str = "telegram_data.json", ): """使用API参数进行初始化。 参数: chat_entity: 用于获取数据的聊天实体。 api_id: API ID。 api_hash: API哈希。 username: 用户名。 file_path: 保存数据的文件路径。默认为 "telegram_data.json"。 """ self.chat_entity = chat_entity self.api_id = api_id self.api_hash = api_hash self.username = username self.file_path = file_path
[docs] async def fetch_data_from_telegram(self) -> None: """从Telegram API获取数据并将其保存为JSON文件。""" from telethon.sync import TelegramClient data = [] async with TelegramClient(self.username, self.api_id, self.api_hash) as client: async for message in client.iter_messages(self.chat_entity): is_reply = message.reply_to is not None reply_to_id = message.reply_to.reply_to_msg_id if is_reply else None data.append( { "sender_id": message.sender_id, "text": message.text, "date": message.date.isoformat(), "message.id": message.id, "is_reply": is_reply, "reply_to_id": reply_to_id, } ) with open(self.file_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=4)
def _get_message_threads(self, data: pd.DataFrame) -> dict: """从给定数据创建一个消息线程的字典。 参数: data(pd.DataFrame):包含对话数据的DataFrame,包含以下列: - message.sender_id - text - date - message.id - is_reply - reply_to_id 返回: dict:一个字典,其中键是父消息ID,值是按升序排列的消息ID列表。 """ def find_replies(parent_id: int, reply_data: pd.DataFrame) -> List[int]: """递归查找给定父消息ID的所有回复。 参数: parent_id(int):父消息ID。 reply_data(pd.DataFrame):包含回复消息的DataFrame。 返回: list:回复给父消息ID的消息ID列表。 """ # Find direct replies to the parent message ID direct_replies = reply_data[reply_data["reply_to_id"] == parent_id][ "message.id" ].tolist() # Recursively find replies to the direct replies all_replies = [] for reply_id in direct_replies: all_replies += [reply_id] + find_replies(reply_id, reply_data) return all_replies # Filter out parent messages parent_messages = data[~data["is_reply"]] # Filter out reply messages and drop rows with NaN in 'reply_to_id' reply_messages = data[data["is_reply"]].dropna(subset=["reply_to_id"]) # Convert 'reply_to_id' to integer reply_messages["reply_to_id"] = reply_messages["reply_to_id"].astype(int) # Create a dictionary of message threads with parent message IDs as keys and \ # lists of reply message IDs as values message_threads = { parent_id: [parent_id] + find_replies(parent_id, reply_messages) for parent_id in parent_messages["message.id"] } return message_threads def _combine_message_texts( self, message_threads: Dict[int, List[int]], data: pd.DataFrame ) -> str: """根据消息线程列表,将每个父消息ID的消息文本组合在一起。 参数: message_threads(dict):一个字典,其中键是父消息ID,值是按升序排列的消息ID列表。 data(pd.DataFrame):包含对话数据的DataFrame: - message.sender_id - text - date - message.id - is_reply - reply_to_id 返回: str:按日期排序的消息文本组合字符串。 """ combined_text = "" # Iterate through sorted parent message IDs for parent_id, message_ids in message_threads.items(): # Get the message texts for the message IDs and sort them by date message_texts = ( data[data["message.id"].isin(message_ids)] .sort_values(by="date")["text"] .tolist() ) message_texts = [str(elem) for elem in message_texts] # Combine the message texts combined_text += " ".join(message_texts) + ".\n" return combined_text.strip()
[docs] def load(self) -> List[Document]: """加载文档。""" if self.chat_entity is not None: try: import nest_asyncio nest_asyncio.apply() asyncio.run(self.fetch_data_from_telegram()) except ImportError: raise ImportError( """`nest_asyncio` package not found. please install with `pip install nest_asyncio` """ ) p = Path(self.file_path) with open(p, encoding="utf8") as f: d = json.load(f) try: import pandas as pd except ImportError: raise ImportError( """`pandas` package not found. please install with `pip install pandas` """ ) normalized_messages = pd.json_normalize(d) df = pd.DataFrame(normalized_messages) message_threads = self._get_message_threads(df) combined_texts = self._combine_message_texts(message_threads, df) return text_to_docs(combined_texts)
# For backwards compatibility TelegramChatLoader = TelegramChatFileLoader