Source code for langchain_community.chat_loaders.gmail

import base64
import re
from typing import Any, Iterator

from langchain_core._api.deprecation import deprecated
from langchain_core.chat_loaders import BaseChatLoader
from langchain_core.chat_sessions import ChatSession
from langchain_core.messages import HumanMessage


def _extract_email_content(msg: Any) -> HumanMessage:
    from_email = None
    for values in msg["payload"]["headers"]:
        name = values["name"]
        if name == "From":
            from_email = values["value"]
    if from_email is None:
        raise ValueError
    for part in msg["payload"]["parts"]:
        if part["mimeType"] == "text/plain":
            data = part["body"]["data"]
            data = base64.urlsafe_b64decode(data).decode("utf-8")
            # Regular expression to split the email body at the first
            # occurrence of a line that starts with "On ... wrote:"
            pattern = re.compile(r"\r\nOn .+(\r\n)*wrote:\r\n")
            # Split the email body and extract the first part
            newest_response = re.split(pattern, data)[0]
            message = HumanMessage(
                content=newest_response, additional_kwargs={"sender": from_email}
            )
            return message
    raise ValueError


def _get_message_data(service: Any, message: Any) -> ChatSession:
    msg = service.users().messages().get(userId="me", id=message["id"]).execute()
    message_content = _extract_email_content(msg)
    in_reply_to = None
    email_data = msg["payload"]["headers"]
    for values in email_data:
        name = values["name"]
        if name == "In-Reply-To":
            in_reply_to = values["value"]
    if in_reply_to is None:
        raise ValueError

    thread_id = msg["threadId"]

    thread = service.users().threads().get(userId="me", id=thread_id).execute()
    messages = thread["messages"]

    response_email = None
    for message in messages:
        email_data = message["payload"]["headers"]
        for values in email_data:
            if values["name"] == "Message-ID":
                message_id = values["value"]
                if message_id == in_reply_to:
                    response_email = message
    if response_email is None:
        raise ValueError
    starter_content = _extract_email_content(response_email)
    return ChatSession(messages=[starter_content, message_content])


[docs]@deprecated( since="0.0.32", removal="0.3.0", alternative_import="langchain_google_community.GMailLoader", ) class GMailLoader(BaseChatLoader): """从`GMail`加载数据。 有许多种方法可以从GMail加载数据。 目前,这个加载程序在如何加载数据方面有一些固定的看法。 它首先查找您发送过的所有消息。 然后查找您回复以前邮件的消息。 然后获取该先前的电子邮件,并创建一个训练示例,该示例包括该电子邮件,然后是您的电子邮件。 请注意,这里存在明显的限制。例如, 所有创建的示例仅查看上一个电子邮件以获取上下文。 使用方法: - 设置Google开发者帐户: 转到Google开发者控制台,创建一个项目, 并为该项目启用Gmail API。 这将为您提供一个后续需要的credentials.json文件。"""
[docs] def __init__(self, creds: Any, n: int = 100, raise_error: bool = False) -> None: super().__init__() self.creds = creds self.n = n self.raise_error = raise_error
[docs] def lazy_load(self) -> Iterator[ChatSession]: from googleapiclient.discovery import build service = build("gmail", "v1", credentials=self.creds) results = ( service.users() .messages() .list(userId="me", labelIds=["SENT"], maxResults=self.n) .execute() ) messages = results.get("messages", []) for message in messages: try: yield _get_message_data(service, message) except Exception as e: # TODO: handle errors better if self.raise_error: raise e else: pass