langchain_google_community.gmail.get_message ηš„ζΊδ»£η 

import base64
import email
from typing import Dict, Optional, Type

from langchain_core.callbacks import CallbackManagerForToolRun
from pydantic import BaseModel, Field

from langchain_google_community.gmail.base import GmailBaseTool
from langchain_google_community.gmail.utils import clean_email_body


[docs] class SearchArgsSchema(BaseModel): """Input for GetMessageTool.""" message_id: str = Field( ..., description="The unique ID of the email message, retrieved from a search.", )
[docs] class GmailGetMessage(GmailBaseTool): """Tool that gets a message by ID from Gmail.""" name: str = "get_gmail_message" description: str = ( "Use this tool to fetch an email by message ID." " Returns the thread ID, snippet, body, subject, and sender." ) args_schema: Type[SearchArgsSchema] = SearchArgsSchema def _run( self, message_id: str, run_manager: Optional[CallbackManagerForToolRun] = None, ) -> Dict: """Run the tool.""" query = ( self.api_resource.users() .messages() .get(userId="me", format="raw", id=message_id) ) message_data = query.execute() raw_message = base64.urlsafe_b64decode(message_data["raw"]) email_msg = email.message_from_bytes(raw_message) subject = email_msg["Subject"] sender = email_msg["From"] message_body = "" if email_msg.is_multipart(): for part in email_msg.walk(): ctype = part.get_content_type() cdispo = str(part.get("Content-Disposition")) if ctype == "text/plain" and "attachment" not in cdispo: message_body = part.get_payload(decode=True).decode("utf-8") # type: ignore[union-attr] break else: message_body = email_msg.get_payload(decode=True).decode("utf-8") # type: ignore[union-attr] body = clean_email_body(message_body) return { "id": message_id, "threadId": message_data["threadId"], "snippet": message_data["snippet"], "body": body, "subject": subject, "sender": sender, }