Source code for langchain_community.chat_loaders.imessage
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Iterator, List, Optional, Union
from langchain_core.chat_loaders import BaseChatLoader
from langchain_core.chat_sessions import ChatSession
from langchain_core.messages import HumanMessage
if TYPE_CHECKING:
import sqlite3
[docs]def nanoseconds_from_2001_to_datetime(nanoseconds: int) -> datetime:
# Convert nanoseconds to seconds (1 second = 1e9 nanoseconds)
timestamp_in_seconds = nanoseconds / 1e9
# The reference date is January 1, 2001, in Unix time
reference_date_seconds = datetime(2001, 1, 1).timestamp()
# Calculate the actual timestamp by adding the reference date
actual_timestamp = reference_date_seconds + timestamp_in_seconds
# Convert to a datetime object
return datetime.fromtimestamp(actual_timestamp)
[docs]class IMessageChatLoader(BaseChatLoader):
"""从`iMessage` chat.db SQLite文件加载聊天会话。
仅在macOS上启用iMessage并拥有chat.db文件时才能使用。
chat.db文件可能位于~/Library/Messages/chat.db。但是,您的终端可能没有权限访问此文件。要解决此问题,您可以将文件复制到其他位置,更改文件的权限,或在系统设置>安全性与隐私>完全磁盘访问中为您的终端模拟器授予完全磁盘访问权限。"""
[docs] def __init__(self, path: Optional[Union[str, Path]] = None):
"""初始化IMessageChatLoader。
参数:
path(str或Path,可选):chat.db SQLite文件的路径。
默认为None,此时将使用默认路径
~/Library/Messages/chat.db。
"""
if path is None:
path = Path.home() / "Library" / "Messages" / "chat.db"
self.db_path = path if isinstance(path, Path) else Path(path)
if not self.db_path.exists():
raise FileNotFoundError(f"File {self.db_path} not found")
try:
import sqlite3 # noqa: F401
except ImportError as e:
raise ImportError(
"The sqlite3 module is required to load iMessage chats.\n"
"Please install it with `pip install pysqlite3`"
) from e
def _parse_attributedBody(self, attributedBody: bytes) -> str:
"""解析消息表中的attributedBody字段以获取消息的文本内容。
attributedBody字段是一个二进制数据块,包含了在字节字符串b"NSString"之后的消息内容:
5字节 1-3字节 `len`字节
... | b"NSString" | 前导内容 | `len` | 内容 | ...
这5个前导字节始终为b"+"
`len`的大小可以是1个字节或3个字节:
- 如果`len`中的第一个字节为b"",则`len`为3个字节长。因此,消息长度为后面的2个字节,采用小端序。
- 否则,`len`的大小为1个字节,消息长度即为该字节。
参数:
attributedBody (bytes): 消息表中的attributedBody字段。
返回:
str: 消息的文本内容。
"""
content = attributedBody.split(b"NSString")[1][5:]
length, start = content[0], 1
if content[0] == 129:
length, start = int.from_bytes(content[1:3], "little"), 3
return content[start : start + length].decode("utf-8", errors="ignore")
def _get_session_query(self, use_chat_handle_table: bool) -> str:
# Messages sent pre OSX 12 require a join through the chat_handle_join table
# However, the table doesn't exist if database created with OSX 12 or above.
joins_w_chat_handle = """
JOIN chat_handle_join ON
chat_message_join.chat_id = chat_handle_join.chat_id
JOIN handle ON
handle.ROWID = chat_handle_join.handle_id"""
joins_no_chat_handle = """
JOIN handle ON message.handle_id = handle.ROWID
"""
joins = joins_w_chat_handle if use_chat_handle_table else joins_no_chat_handle
return f"""
SELECT message.date,
handle.id,
message.text,
message.is_from_me,
message.attributedBody
FROM message
JOIN chat_message_join ON
message.ROWID = chat_message_join.message_id
{joins}
WHERE chat_message_join.chat_id = ?
ORDER BY message.date ASC;
"""
def _load_single_chat_session(
self, cursor: "sqlite3.Cursor", use_chat_handle_table: bool, chat_id: int
) -> ChatSession:
"""从iMessage chat.db中加载单个聊天会话。
参数:
cursor: SQLite游标对象。
chat_id (int): 要加载的聊天会话的ID。
返回:
ChatSession: 加载的聊天会话。
"""
results: List[HumanMessage] = []
query = self._get_session_query(use_chat_handle_table)
cursor.execute(query, (chat_id,))
messages = cursor.fetchall()
for date, sender, text, is_from_me, attributedBody in messages:
if text:
content = text
elif attributedBody:
content = self._parse_attributedBody(attributedBody)
else: # Skip messages with no content
continue
results.append(
HumanMessage( # type: ignore[call-arg]
role=sender,
content=content,
additional_kwargs={
"message_time": date,
"message_time_as_datetime": nanoseconds_from_2001_to_datetime(
date
),
"sender": sender,
"is_from_me": bool(is_from_me),
},
)
)
return ChatSession(messages=results)
[docs] def lazy_load(self) -> Iterator[ChatSession]:
"""从iMessage chat.db中延迟加载聊天会话,并以所需的格式生成它们。
生成:
ChatSession:已加载的聊天会话。
"""
import sqlite3
try:
conn = sqlite3.connect(self.db_path)
except sqlite3.OperationalError as e:
raise ValueError(
f"Could not open iMessage DB file {self.db_path}.\n"
"Make sure your terminal emulator has disk access to this file.\n"
" You can either copy the DB file to an accessible location"
" or grant full disk access for your terminal emulator."
" You can grant full disk access for your terminal emulator"
" in System Settings > Security and Privacy > Full Disk Access."
) from e
cursor = conn.cursor()
# See if chat_handle_join table exists:
query = """SELECT name FROM sqlite_master
WHERE type='table' AND name='chat_handle_join';"""
cursor.execute(query)
is_chat_handle_join_exists = cursor.fetchone()
# Fetch the list of chat IDs sorted by time (most recent first)
query = """SELECT chat_id
FROM message
JOIN chat_message_join ON message.ROWID = chat_message_join.message_id
GROUP BY chat_id
ORDER BY MAX(date) DESC;"""
cursor.execute(query)
chat_ids = [row[0] for row in cursor.fetchall()]
for chat_id in chat_ids:
yield self._load_single_chat_session(
cursor, is_chat_handle_join_exists, chat_id
)
conn.close()