from __future__ import annotations
import asyncio
import json
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, Optional, Union
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
if TYPE_CHECKING:
import pandas as pd
from telethon.hints import EntityLike
[docs]def concatenate_rows(row: dict) -> str:
"""将消息信息组合成一个易读的格式,准备好供使用。"""
date = row["date"]
sender = row["from"]
text = row["text"]
return f"{sender} on {date}: {text}\n\n"
[docs]class TelegramChatFileLoader(BaseLoader):
"""从`Telegram聊天`转储中加载。"""
[docs] def __init__(self, path: Union[str, Path]):
"""使用路径进行初始化。"""
self.file_path = path
[docs] def load(self) -> List[Document]:
"""加载文档。"""
p = Path(self.file_path)
with open(p, encoding="utf8") as f:
d = json.load(f)
text = "".join(
concatenate_rows(message)
for message in d["messages"]
if message["type"] == "message" and isinstance(message["text"], str)
)
metadata = {"source": str(p)}
return [Document(page_content=text, metadata=metadata)]
[docs]def text_to_docs(text: Union[str, List[str]]) -> List[Document]:
"""将字符串或字符串列表转换为带有元数据的文档列表。"""
from langchain_text_splitters import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=800,
separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""],
chunk_overlap=20,
)
if isinstance(text, str):
# Take a single string as one page
text = [text]
page_docs = [Document(page_content=page) for page in text]
# Add page numbers as metadata
for i, doc in enumerate(page_docs):
doc.metadata["page"] = i + 1
# Split pages into chunks
doc_chunks = []
for doc in page_docs:
chunks = text_splitter.split_text(doc.page_content)
for i, chunk in enumerate(chunks):
doc = Document(
page_content=chunk, metadata={"page": doc.metadata["page"], "chunk": i}
)
# Add sources a metadata
doc.metadata["source"] = f"{doc.metadata['page']}-{doc.metadata['chunk']}"
doc_chunks.append(doc)
return doc_chunks
[docs]class TelegramChatApiLoader(BaseLoader):
"""加载`Telegram`聊天json目录转储。"""
[docs] def __init__(
self,
chat_entity: Optional[EntityLike] = None,
api_id: Optional[int] = None,
api_hash: Optional[str] = None,
username: Optional[str] = None,
file_path: str = "telegram_data.json",
):
"""使用API参数进行初始化。
参数:
chat_entity: 用于获取数据的聊天实体。
api_id: API ID。
api_hash: API哈希。
username: 用户名。
file_path: 保存数据的文件路径。默认为 "telegram_data.json"。
"""
self.chat_entity = chat_entity
self.api_id = api_id
self.api_hash = api_hash
self.username = username
self.file_path = file_path
[docs] async def fetch_data_from_telegram(self) -> None:
"""从Telegram API获取数据并将其保存为JSON文件。"""
from telethon.sync import TelegramClient
data = []
async with TelegramClient(self.username, self.api_id, self.api_hash) as client:
async for message in client.iter_messages(self.chat_entity):
is_reply = message.reply_to is not None
reply_to_id = message.reply_to.reply_to_msg_id if is_reply else None
data.append(
{
"sender_id": message.sender_id,
"text": message.text,
"date": message.date.isoformat(),
"message.id": message.id,
"is_reply": is_reply,
"reply_to_id": reply_to_id,
}
)
with open(self.file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
def _get_message_threads(self, data: pd.DataFrame) -> dict:
"""从给定数据创建一个消息线程的字典。
参数:
data(pd.DataFrame):包含对话数据的DataFrame,包含以下列:
- message.sender_id
- text
- date
- message.id
- is_reply
- reply_to_id
返回:
dict:一个字典,其中键是父消息ID,值是按升序排列的消息ID列表。
"""
def find_replies(parent_id: int, reply_data: pd.DataFrame) -> List[int]:
"""递归查找给定父消息ID的所有回复。
参数:
parent_id(int):父消息ID。
reply_data(pd.DataFrame):包含回复消息的DataFrame。
返回:
list:回复给父消息ID的消息ID列表。
"""
# Find direct replies to the parent message ID
direct_replies = reply_data[reply_data["reply_to_id"] == parent_id][
"message.id"
].tolist()
# Recursively find replies to the direct replies
all_replies = []
for reply_id in direct_replies:
all_replies += [reply_id] + find_replies(reply_id, reply_data)
return all_replies
# Filter out parent messages
parent_messages = data[~data["is_reply"]]
# Filter out reply messages and drop rows with NaN in 'reply_to_id'
reply_messages = data[data["is_reply"]].dropna(subset=["reply_to_id"])
# Convert 'reply_to_id' to integer
reply_messages["reply_to_id"] = reply_messages["reply_to_id"].astype(int)
# Create a dictionary of message threads with parent message IDs as keys and \
# lists of reply message IDs as values
message_threads = {
parent_id: [parent_id] + find_replies(parent_id, reply_messages)
for parent_id in parent_messages["message.id"]
}
return message_threads
def _combine_message_texts(
self, message_threads: Dict[int, List[int]], data: pd.DataFrame
) -> str:
"""根据消息线程列表,将每个父消息ID的消息文本组合在一起。
参数:
message_threads(dict):一个字典,其中键是父消息ID,值是按升序排列的消息ID列表。
data(pd.DataFrame):包含对话数据的DataFrame:
- message.sender_id
- text
- date
- message.id
- is_reply
- reply_to_id
返回:
str:按日期排序的消息文本组合字符串。
"""
combined_text = ""
# Iterate through sorted parent message IDs
for parent_id, message_ids in message_threads.items():
# Get the message texts for the message IDs and sort them by date
message_texts = (
data[data["message.id"].isin(message_ids)]
.sort_values(by="date")["text"]
.tolist()
)
message_texts = [str(elem) for elem in message_texts]
# Combine the message texts
combined_text += " ".join(message_texts) + ".\n"
return combined_text.strip()
[docs] def load(self) -> List[Document]:
"""加载文档。"""
if self.chat_entity is not None:
try:
import nest_asyncio
nest_asyncio.apply()
asyncio.run(self.fetch_data_from_telegram())
except ImportError:
raise ImportError(
"""`nest_asyncio` package not found.
please install with `pip install nest_asyncio`
"""
)
p = Path(self.file_path)
with open(p, encoding="utf8") as f:
d = json.load(f)
try:
import pandas as pd
except ImportError:
raise ImportError(
"""`pandas` package not found.
please install with `pip install pandas`
"""
)
normalized_messages = pd.json_normalize(d)
df = pd.DataFrame(normalized_messages)
message_threads = self._get_message_threads(df)
combined_texts = self._combine_message_texts(message_threads, df)
return text_to_docs(combined_texts)
# For backwards compatibility
TelegramChatLoader = TelegramChatFileLoader