Source code for langchain_community.chat_loaders.imessage

from __future__ import annotations

from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Iterator, List, Optional, Union

from langchain_core.chat_loaders import BaseChatLoader
from langchain_core.chat_sessions import ChatSession
from langchain_core.messages import HumanMessage

if TYPE_CHECKING:
    import sqlite3


[docs]def nanoseconds_from_2001_to_datetime(nanoseconds: int) -> datetime: # Convert nanoseconds to seconds (1 second = 1e9 nanoseconds) timestamp_in_seconds = nanoseconds / 1e9 # The reference date is January 1, 2001, in Unix time reference_date_seconds = datetime(2001, 1, 1).timestamp() # Calculate the actual timestamp by adding the reference date actual_timestamp = reference_date_seconds + timestamp_in_seconds # Convert to a datetime object return datetime.fromtimestamp(actual_timestamp)
[docs]class IMessageChatLoader(BaseChatLoader): """从`iMessage` chat.db SQLite文件加载聊天会话。 仅在macOS上启用iMessage并拥有chat.db文件时才能使用。 chat.db文件可能位于~/Library/Messages/chat.db。但是,您的终端可能没有权限访问此文件。要解决此问题,您可以将文件复制到其他位置,更改文件的权限,或在系统设置>安全性与隐私>完全磁盘访问中为您的终端模拟器授予完全磁盘访问权限。"""
[docs] def __init__(self, path: Optional[Union[str, Path]] = None): """初始化IMessageChatLoader。 参数: path(str或Path,可选):chat.db SQLite文件的路径。 默认为None,此时将使用默认路径 ~/Library/Messages/chat.db。 """ if path is None: path = Path.home() / "Library" / "Messages" / "chat.db" self.db_path = path if isinstance(path, Path) else Path(path) if not self.db_path.exists(): raise FileNotFoundError(f"File {self.db_path} not found") try: import sqlite3 # noqa: F401 except ImportError as e: raise ImportError( "The sqlite3 module is required to load iMessage chats.\n" "Please install it with `pip install pysqlite3`" ) from e
def _parse_attributedBody(self, attributedBody: bytes) -> str: """解析消息表中的attributedBody字段以获取消息的文本内容。 attributedBody字段是一个二进制数据块,包含了在字节字符串b"NSString"之后的消息内容: 5字节 1-3字节 `len`字节 ... | b"NSString" | 前导内容 | `len` | 内容 | ... 这5个前导字节始终为b"+" `len`的大小可以是1个字节或3个字节: - 如果`len`中的第一个字节为b"",则`len`为3个字节长。因此,消息长度为后面的2个字节,采用小端序。 - 否则,`len`的大小为1个字节,消息长度即为该字节。 参数: attributedBody (bytes): 消息表中的attributedBody字段。 返回: str: 消息的文本内容。 """ content = attributedBody.split(b"NSString")[1][5:] length, start = content[0], 1 if content[0] == 129: length, start = int.from_bytes(content[1:3], "little"), 3 return content[start : start + length].decode("utf-8", errors="ignore") def _get_session_query(self, use_chat_handle_table: bool) -> str: # Messages sent pre OSX 12 require a join through the chat_handle_join table # However, the table doesn't exist if database created with OSX 12 or above. joins_w_chat_handle = """ JOIN chat_handle_join ON chat_message_join.chat_id = chat_handle_join.chat_id JOIN handle ON handle.ROWID = chat_handle_join.handle_id""" joins_no_chat_handle = """ JOIN handle ON message.handle_id = handle.ROWID """ joins = joins_w_chat_handle if use_chat_handle_table else joins_no_chat_handle return f""" SELECT message.date, handle.id, message.text, message.is_from_me, message.attributedBody FROM message JOIN chat_message_join ON message.ROWID = chat_message_join.message_id {joins} WHERE chat_message_join.chat_id = ? ORDER BY message.date ASC; """ def _load_single_chat_session( self, cursor: "sqlite3.Cursor", use_chat_handle_table: bool, chat_id: int ) -> ChatSession: """从iMessage chat.db中加载单个聊天会话。 参数: cursor: SQLite游标对象。 chat_id (int): 要加载的聊天会话的ID。 返回: ChatSession: 加载的聊天会话。 """ results: List[HumanMessage] = [] query = self._get_session_query(use_chat_handle_table) cursor.execute(query, (chat_id,)) messages = cursor.fetchall() for date, sender, text, is_from_me, attributedBody in messages: if text: content = text elif attributedBody: content = self._parse_attributedBody(attributedBody) else: # Skip messages with no content continue results.append( HumanMessage( # type: ignore[call-arg] role=sender, content=content, additional_kwargs={ "message_time": date, "message_time_as_datetime": nanoseconds_from_2001_to_datetime( date ), "sender": sender, "is_from_me": bool(is_from_me), }, ) ) return ChatSession(messages=results)
[docs] def lazy_load(self) -> Iterator[ChatSession]: """从iMessage chat.db中延迟加载聊天会话,并以所需的格式生成它们。 生成: ChatSession:已加载的聊天会话。 """ import sqlite3 try: conn = sqlite3.connect(self.db_path) except sqlite3.OperationalError as e: raise ValueError( f"Could not open iMessage DB file {self.db_path}.\n" "Make sure your terminal emulator has disk access to this file.\n" " You can either copy the DB file to an accessible location" " or grant full disk access for your terminal emulator." " You can grant full disk access for your terminal emulator" " in System Settings > Security and Privacy > Full Disk Access." ) from e cursor = conn.cursor() # See if chat_handle_join table exists: query = """SELECT name FROM sqlite_master WHERE type='table' AND name='chat_handle_join';""" cursor.execute(query) is_chat_handle_join_exists = cursor.fetchone() # Fetch the list of chat IDs sorted by time (most recent first) query = """SELECT chat_id FROM message JOIN chat_message_join ON message.ROWID = chat_message_join.message_id GROUP BY chat_id ORDER BY MAX(date) DESC;""" cursor.execute(query) chat_ids = [row[0] for row in cursor.fetchall()] for chat_id in chat_ids: yield self._load_single_chat_session( cursor, is_chat_handle_join_exists, chat_id ) conn.close()