Skip to content

Google

GmailToolSpec #

Bases: BaseToolSpec

GMail工具规范。

给予代理人阅读、起草和发送Gmail消息的能力。

Source code in llama_index/tools/google/gmail/base.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
class GmailToolSpec(BaseToolSpec):
    """GMail工具规范。

    给予代理人阅读、起草和发送Gmail消息的能力。"""

    spec_functions = [
        "load_data",
        "search_messages",
        "create_draft",
        "update_draft",
        "get_draft",
        "send_draft",
    ]
    query: str = None
    use_iterative_parser: bool = False
    max_results: int = 10
    service: Any = None

    def _cache_service(self) -> None:
        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        if not self.service:
            self.service = build("gmail", "v1", credentials=credentials)

    def load_data(self) -> List[Document]:
        """从用户的账户中加载电子邮件。"""
        self._cache_service()

        return self.search_messages()

    def _get_credentials(self) -> Any:
        """从存储中获取有效的用户凭据。

文件token.json存储用户的访问和刷新令牌,在首次授权流程完成时会自动创建。

返回:
    凭据,获取到的凭据。
"""
        import os

        from google.auth.transport.requests import Request
        from google.oauth2.credentials import Credentials
        from google_auth_oauthlib.flow import InstalledAppFlow

        creds = None
        if os.path.exists("token.json"):
            creds = Credentials.from_authorized_user_file("token.json", SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    "credentials.json", SCOPES
                )
                creds = flow.run_local_server(port=8080)
            # Save the credentials for the next run
            with open("token.json", "w") as token:
                token.write(creds.to_json())

        return creds

    def search_messages(self, query: str, max_results: Optional[int] = None):
        if not max_results:
            max_results = self.max_results

        self._cache_service()

        messages = (
            self.service.users()
            .messages()
            .list(userId="me", q=query, maxResults=int(max_results))
            .execute()
            .get("messages", [])
        )

        results = []
        try:
            for message in messages:
                message_data = self.get_message_data(message)
                text = message_data.pop("body")
                extra_info = message_data
                results.append(Document(text=text, extra_info=extra_info))
        except Exception as e:
            raise Exception("Can't get message data" + str(e))

        return results

    def get_message_data(self, message):
        message_id = message["id"]
        message_data = (
            self.service.users()
            .messages()
            .get(format="raw", userId="me", id=message_id)
            .execute()
        )
        if self.use_iterative_parser:
            body = self.extract_message_body_iterative(message_data)
        else:
            body = self.extract_message_body(message_data)

        if not body:
            return None

        return {
            "id": message_data["id"],
            "threadId": message_data["threadId"],
            "snippet": message_data["snippet"],
            "body": body,
        }

    def extract_message_body_iterative(self, message: dict):
        if message["raw"]:
            body = base64.urlsafe_b64decode(message["raw"].encode("utf8"))
            mime_msg = email.message_from_bytes(body)
        else:
            mime_msg = message

        body_text = ""
        if mime_msg.get_content_type() == "text/plain":
            plain_text = mime_msg.get_payload(decode=True)
            charset = mime_msg.get_content_charset("utf-8")
            body_text = plain_text.decode(charset).encode("utf-8").decode("utf-8")

        elif mime_msg.get_content_maintype() == "multipart":
            msg_parts = mime_msg.get_payload()
            for msg_part in msg_parts:
                body_text += self.extract_message_body_iterative(msg_part)

        return body_text

    def extract_message_body(self, message: dict):
        from bs4 import BeautifulSoup

        try:
            body = base64.urlsafe_b64decode(message["raw"].encode("utf-8"))
            mime_msg = email.message_from_bytes(body)

            # If the message body contains HTML, parse it with BeautifulSoup
            if "text/html" in mime_msg:
                soup = BeautifulSoup(body, "html.parser")
                body = soup.get_text()
            return body.decode("utf-8")
        except Exception as e:
            raise Exception("Can't parse message body" + str(e))

    def _build_draft(
        self,
        to: Optional[List[str]] = None,
        subject: Optional[str] = None,
        message: Optional[str] = None,
    ) -> str:
        email_message = EmailMessage()

        email_message.set_content(message)

        email_message["To"] = to
        email_message["Subject"] = subject

        encoded_message = base64.urlsafe_b64encode(email_message.as_bytes()).decode()

        return {"message": {"raw": encoded_message}}

    def create_draft(
        self,
        to: Optional[List[str]] = None,
        subject: Optional[str] = None,
        message: Optional[str] = None,
    ) -> str:
        """创建并插入草稿邮件。
   打印返回的草稿邮件的消息和id。
   Returns:包括草稿id和消息元数据的草稿对象。

Args:
    to(可选[str]):要发送消息的电子邮件地址
    subject(可选[str]):事件的主题
    message(可选[str]):事件的消息
"""
        self._cache_service()
        service = self.service

        return (
            service.users()
            .drafts()
            .create(userId="me", body=self._build_draft(to, subject, message))
            .execute()
        )

    def update_draft(
        self,
        to: Optional[List[str]] = None,
        subject: Optional[str] = None,
        message: Optional[str] = None,
        draft_id: str = None,
    ) -> str:
        """更新草稿邮件。
打印返回的草稿消息和id。
此函数需要传递一个在创建消息时获得的draft_id。
Returns:包括草稿id和消息元数据的草稿对象。

Args:
    to(可选[str]):要发送消息的电子邮件地址
    subject(可选[str]):事件的主题
    message(可选[str]):事件的消息
    draft_id(str):要更新的草稿的id
"""
        self._cache_service()
        service = self.service

        if draft_id is None:
            return (
                "You did not provide a draft id when calling this function. If you"
                " previously created or retrieved the draft, the id is available in"
                " context"
            )

        draft = self.get_draft(draft_id)
        headers = draft["message"]["payload"]["headers"]
        for header in headers:
            if header["name"] == "To" and not to:
                to = header["value"]
            elif header["name"] == "Subject" and not subject:
                subject = header["value"]

        return (
            service.users()
            .drafts()
            .update(
                userId="me", id=draft_id, body=self._build_draft(to, subject, message)
            )
            .execute()
        )

    def get_draft(self, draft_id: str = None) -> str:
        """获取草稿邮件。
   打印返回的草稿邮件的消息和id。
   Returns:包括草稿id和消息元数据的草稿对象。

Args:
    draft_id(str):要更新的草稿的id
"""
        self._cache_service()
        service = self.service
        return service.users().drafts().get(userId="me", id=draft_id).execute()

    def send_draft(self, draft_id: str = None) -> str:
        """发送草稿邮件。
   打印返回的草稿邮件的消息和id。
   Returns:包括草稿id和消息元数据的草稿对象。

Args:
    draft_id (str): 要更新的草稿的id
"""
        self._cache_service()
        service = self.service
        return (
            service.users().drafts().send(userId="me", body={"id": draft_id}).execute()
        )

load_data #

load_data() -> List[Document]

从用户的账户中加载电子邮件。

Source code in llama_index/tools/google/gmail/base.py
42
43
44
45
46
def load_data(self) -> List[Document]:
    """从用户的账户中加载电子邮件。"""
    self._cache_service()

    return self.search_messages()

create_draft #

create_draft(
    to: Optional[List[str]] = None,
    subject: Optional[str] = None,
    message: Optional[str] = None,
) -> str

创建并插入草稿邮件。 打印返回的草稿邮件的消息和id。 Returns:包括草稿id和消息元数据的草稿对象。

Source code in llama_index/tools/google/gmail/base.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
    def create_draft(
        self,
        to: Optional[List[str]] = None,
        subject: Optional[str] = None,
        message: Optional[str] = None,
    ) -> str:
        """创建并插入草稿邮件。
   打印返回的草稿邮件的消息和id。
   Returns:包括草稿id和消息元数据的草稿对象。

Args:
    to(可选[str]):要发送消息的电子邮件地址
    subject(可选[str]):事件的主题
    message(可选[str]):事件的消息
"""
        self._cache_service()
        service = self.service

        return (
            service.users()
            .drafts()
            .create(userId="me", body=self._build_draft(to, subject, message))
            .execute()
        )

update_draft #

update_draft(
    to: Optional[List[str]] = None,
    subject: Optional[str] = None,
    message: Optional[str] = None,
    draft_id: str = None,
) -> str

更新草稿邮件。 打印返回的草稿消息和id。 此函数需要传递一个在创建消息时获得的draft_id。 Returns:包括草稿id和消息元数据的草稿对象。

Source code in llama_index/tools/google/gmail/base.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
    def update_draft(
        self,
        to: Optional[List[str]] = None,
        subject: Optional[str] = None,
        message: Optional[str] = None,
        draft_id: str = None,
    ) -> str:
        """更新草稿邮件。
打印返回的草稿消息和id。
此函数需要传递一个在创建消息时获得的draft_id。
Returns:包括草稿id和消息元数据的草稿对象。

Args:
    to(可选[str]):要发送消息的电子邮件地址
    subject(可选[str]):事件的主题
    message(可选[str]):事件的消息
    draft_id(str):要更新的草稿的id
"""
        self._cache_service()
        service = self.service

        if draft_id is None:
            return (
                "You did not provide a draft id when calling this function. If you"
                " previously created or retrieved the draft, the id is available in"
                " context"
            )

        draft = self.get_draft(draft_id)
        headers = draft["message"]["payload"]["headers"]
        for header in headers:
            if header["name"] == "To" and not to:
                to = header["value"]
            elif header["name"] == "Subject" and not subject:
                subject = header["value"]

        return (
            service.users()
            .drafts()
            .update(
                userId="me", id=draft_id, body=self._build_draft(to, subject, message)
            )
            .execute()
        )

get_draft #

get_draft(draft_id: str = None) -> str

获取草稿邮件。 打印返回的草稿邮件的消息和id。 Returns:包括草稿id和消息元数据的草稿对象。

Source code in llama_index/tools/google/gmail/base.py
251
252
253
254
255
256
257
258
259
260
261
    def get_draft(self, draft_id: str = None) -> str:
        """获取草稿邮件。
   打印返回的草稿邮件的消息和id。
   Returns:包括草稿id和消息元数据的草稿对象。

Args:
    draft_id(str):要更新的草稿的id
"""
        self._cache_service()
        service = self.service
        return service.users().drafts().get(userId="me", id=draft_id).execute()

send_draft #

send_draft(draft_id: str = None) -> str

发送草稿邮件。 打印返回的草稿邮件的消息和id。 Returns:包括草稿id和消息元数据的草稿对象。

Parameters:

Name Type Description Default
draft_id str

要更新的草稿的id

None
Source code in llama_index/tools/google/gmail/base.py
263
264
265
266
267
268
269
270
271
272
273
274
275
    def send_draft(self, draft_id: str = None) -> str:
        """发送草稿邮件。
   打印返回的草稿邮件的消息和id。
   Returns:包括草稿id和消息元数据的草稿对象。

Args:
    draft_id (str): 要更新的草稿的id
"""
        self._cache_service()
        service = self.service
        return (
            service.users().drafts().send(userId="me", body={"id": draft_id}).execute()
        )

GoogleCalendarToolSpec #

Bases: BaseToolSpec

谷歌日历工具规范。

目前只是数据加载器的简单封装。 TODO: 添加更多的方法到谷歌日历规范中。

Source code in llama_index/tools/google/calendar/base.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class GoogleCalendarToolSpec(BaseToolSpec):
    """谷歌日历工具规范。

    目前只是数据加载器的简单封装。
    TODO: 添加更多的方法到谷歌日历规范中。"""

    spec_functions = ["load_data", "create_event", "get_date"]

    def load_data(
        self,
        number_of_results: Optional[int] = 100,
        start_date: Optional[Union[str, datetime.date]] = None,
    ) -> List[Document]:
        """从用户日历加载数据。

Args:
    number_of_results (可选[int]): 要返回的事件数量。默认为100。
    start_date (可选[Union[str, datetime.date]]): 要返回事件的开始日期,使用日期isoformat。默认为今天。
"""
        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        service = build("calendar", "v3", credentials=credentials)

        if start_date is None:
            start_date = datetime.date.today()
        elif isinstance(start_date, str):
            start_date = datetime.date.fromisoformat(start_date)

        start_datetime = datetime.datetime.combine(start_date, datetime.time.min)
        start_datetime_utc = start_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

        events_result = (
            service.events()
            .list(
                calendarId="primary",
                timeMin=start_datetime_utc,
                maxResults=number_of_results,
                singleEvents=True,
                orderBy="startTime",
            )
            .execute()
        )

        events = events_result.get("items", [])

        if not events:
            return []

        results = []
        for event in events:
            if "dateTime" in event["start"]:
                start_time = event["start"]["dateTime"]
            else:
                start_time = event["start"]["date"]

            if "dateTime" in event["end"]:
                end_time = event["end"]["dateTime"]
            else:
                end_time = event["end"]["date"]

            event_string = f"Status: {event['status']}, "
            event_string += f"Summary: {event['summary']}, "
            event_string += f"Start time: {start_time}, "
            event_string += f"End time: {end_time}, "

            organizer = event.get("organizer", {})
            display_name = organizer.get("displayName", "N/A")
            email = organizer.get("email", "N/A")
            if display_name != "N/A":
                event_string += f"Organizer: {display_name} ({email})"
            else:
                event_string += f"Organizer: {email}"

            results.append(Document(text=event_string))

        return results

    def _get_credentials(self) -> Any:
        """从存储中获取有效的用户凭据。

文件token.json存储用户的访问和刷新令牌,在首次授权流程完成时会自动创建。

返回:
    凭据,获取到的凭据。
"""
        from google_auth_oauthlib.flow import InstalledAppFlow

        from google.auth.transport.requests import Request
        from google.oauth2.credentials import Credentials

        creds = None
        if os.path.exists("token.json"):
            creds = Credentials.from_authorized_user_file("token.json", SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    "credentials.json", SCOPES
                )
                creds = flow.run_local_server(port=8080)
            # Save the credentials for the next run
            with open("token.json", "w") as token:
                token.write(creds.to_json())

        return creds

    def create_event(
        self,
        title: Optional[str] = None,
        description: Optional[str] = None,
        location: Optional[str] = None,
        start_datetime: Optional[Union[str, datetime.datetime]] = None,
        end_datetime: Optional[Union[str, datetime.datetime]] = None,
        attendees: Optional[List[str]] = None,
    ) -> str:
        """在用户日历上创建一个事件。

Args:
    title (可选[str]): 事件的标题
    description (可选[str]): 事件的描述
    location (可选[str]): 事件的地点
    start_datetime (可选[Union[str, datetime.datetime]]): 事件的开始时间
    end_datetime (可选[Union[str, datetime.datetime]]): 事件的结束时间
    attendees (可选[List[str]]): 邀请参加事件的电子邮件地址列表
"""
        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        service = build("calendar", "v3", credentials=credentials)

        attendees_list = (
            [{"email": attendee} for attendee in attendees] if attendees else []
        )

        start_time = (
            datetime.datetime.strptime(start_datetime, "%Y-%m-%dT%H:%M:%S%z")
            .astimezone()
            .strftime("%Y-%m-%dT%H:%M:%S.%f%z")
        )
        end_time = (
            datetime.datetime.strptime(end_datetime, "%Y-%m-%dT%H:%M:%S%z")
            .astimezone()
            .strftime("%Y-%m-%dT%H:%M:%S.%f%z")
        )

        event = {
            "summary": title,
            "location": location,
            "description": description,
            "start": {
                "dateTime": start_time,
            },
            "end": {
                "dateTime": end_time,
            },
            "attendees": attendees_list,
        }
        event = service.events().insert(calendarId="primary", body=event).execute()
        return (
            "Your calendar event has been created successfully! You can move on to the"
            " next step."
        )

    def get_date(self):
        """
        一个用于返回今天日期的函数。如果你不知道日期,调用这个函数之后再调用其他函数。
        """
        return datetime.date.today()

load_data #

load_data(
    number_of_results: Optional[int] = 100,
    start_date: Optional[Union[str, date]] = None,
) -> List[Document]

从用户日历加载数据。

Parameters:

Name Type Description Default
number_of_results 可选[int]

要返回的事件数量。默认为100。

100
start_date 可选[Union[str, date]]

要返回事件的开始日期,使用日期isoformat。默认为今天。

None
Source code in llama_index/tools/google/calendar/base.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
    def load_data(
        self,
        number_of_results: Optional[int] = 100,
        start_date: Optional[Union[str, datetime.date]] = None,
    ) -> List[Document]:
        """从用户日历加载数据。

Args:
    number_of_results (可选[int]): 要返回的事件数量。默认为100。
    start_date (可选[Union[str, datetime.date]]): 要返回事件的开始日期,使用日期isoformat。默认为今天。
"""
        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        service = build("calendar", "v3", credentials=credentials)

        if start_date is None:
            start_date = datetime.date.today()
        elif isinstance(start_date, str):
            start_date = datetime.date.fromisoformat(start_date)

        start_datetime = datetime.datetime.combine(start_date, datetime.time.min)
        start_datetime_utc = start_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

        events_result = (
            service.events()
            .list(
                calendarId="primary",
                timeMin=start_datetime_utc,
                maxResults=number_of_results,
                singleEvents=True,
                orderBy="startTime",
            )
            .execute()
        )

        events = events_result.get("items", [])

        if not events:
            return []

        results = []
        for event in events:
            if "dateTime" in event["start"]:
                start_time = event["start"]["dateTime"]
            else:
                start_time = event["start"]["date"]

            if "dateTime" in event["end"]:
                end_time = event["end"]["dateTime"]
            else:
                end_time = event["end"]["date"]

            event_string = f"Status: {event['status']}, "
            event_string += f"Summary: {event['summary']}, "
            event_string += f"Start time: {start_time}, "
            event_string += f"End time: {end_time}, "

            organizer = event.get("organizer", {})
            display_name = organizer.get("displayName", "N/A")
            email = organizer.get("email", "N/A")
            if display_name != "N/A":
                event_string += f"Organizer: {display_name} ({email})"
            else:
                event_string += f"Organizer: {email}"

            results.append(Document(text=event_string))

        return results

create_event #

create_event(
    title: Optional[str] = None,
    description: Optional[str] = None,
    location: Optional[str] = None,
    start_datetime: Optional[Union[str, datetime]] = None,
    end_datetime: Optional[Union[str, datetime]] = None,
    attendees: Optional[List[str]] = None,
) -> str

在用户日历上创建一个事件。

Parameters:

Name Type Description Default
title 可选[str]

事件的标题

None
description 可选[str]

事件的描述

None
location 可选[str]

事件的地点

None
start_datetime 可选[Union[str, datetime]]

事件的开始时间

None
end_datetime 可选[Union[str, datetime]]

事件的结束时间

None
attendees 可选[List[str]]

邀请参加事件的电子邮件地址列表

None
Source code in llama_index/tools/google/calendar/base.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
    def create_event(
        self,
        title: Optional[str] = None,
        description: Optional[str] = None,
        location: Optional[str] = None,
        start_datetime: Optional[Union[str, datetime.datetime]] = None,
        end_datetime: Optional[Union[str, datetime.datetime]] = None,
        attendees: Optional[List[str]] = None,
    ) -> str:
        """在用户日历上创建一个事件。

Args:
    title (可选[str]): 事件的标题
    description (可选[str]): 事件的描述
    location (可选[str]): 事件的地点
    start_datetime (可选[Union[str, datetime.datetime]]): 事件的开始时间
    end_datetime (可选[Union[str, datetime.datetime]]): 事件的结束时间
    attendees (可选[List[str]]): 邀请参加事件的电子邮件地址列表
"""
        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        service = build("calendar", "v3", credentials=credentials)

        attendees_list = (
            [{"email": attendee} for attendee in attendees] if attendees else []
        )

        start_time = (
            datetime.datetime.strptime(start_datetime, "%Y-%m-%dT%H:%M:%S%z")
            .astimezone()
            .strftime("%Y-%m-%dT%H:%M:%S.%f%z")
        )
        end_time = (
            datetime.datetime.strptime(end_datetime, "%Y-%m-%dT%H:%M:%S%z")
            .astimezone()
            .strftime("%Y-%m-%dT%H:%M:%S.%f%z")
        )

        event = {
            "summary": title,
            "location": location,
            "description": description,
            "start": {
                "dateTime": start_time,
            },
            "end": {
                "dateTime": end_time,
            },
            "attendees": attendees_list,
        }
        event = service.events().insert(calendarId="primary", body=event).execute()
        return (
            "Your calendar event has been created successfully! You can move on to the"
            " next step."
        )

get_date #

get_date()

一个用于返回今天日期的函数。如果你不知道日期,调用这个函数之后再调用其他函数。

Source code in llama_index/tools/google/calendar/base.py
193
194
195
196
197
def get_date(self):
    """
    一个用于返回今天日期的函数。如果你不知道日期,调用这个函数之后再调用其他函数。
    """
    return datetime.date.today()

GoogleSearchToolSpec #

Bases: BaseToolSpec

谷歌搜索工具规范。

Source code in llama_index/tools/google/search/base.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class GoogleSearchToolSpec(BaseToolSpec):
    """谷歌搜索工具规范。"""

    spec_functions = ["google_search"]

    def __init__(self, key: str, engine: str, num: Optional[int] = None) -> None:
        """使用参数进行初始化。"""
        self.key = key
        self.engine = engine
        self.num = num

    def google_search(self, query: str):
        """向谷歌搜索引擎发出查询,以接收结果列表。

Args:
    query (str): 要传递给谷歌搜索的查询。
    num (int, 可选): 要返回的搜索结果数量。默认为None。

引发:
    ValueError: 如果'num'不是介于1和10之间的整数。
"""
        url = QUERY_URL_TMPL.format(
            key=self.key, engine=self.engine, query=urllib.parse.quote_plus(query)
        )

        if self.num is not None:
            if not 1 <= self.num <= 10:
                raise ValueError("num should be an integer between 1 and 10, inclusive")
            url += f"&num={self.num}"

        response = requests.get(url)
        return [Document(text=response.text)]
google_search(query: str)

向谷歌搜索引擎发出查询,以接收结果列表。

Parameters:

Name Type Description Default
query str

要传递给谷歌搜索的查询。

required
num (int, 可选)

要返回的搜索结果数量。默认为None。

required

引发: ValueError: 如果'num'不是介于1和10之间的整数。

Source code in llama_index/tools/google/search/base.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
    def google_search(self, query: str):
        """向谷歌搜索引擎发出查询,以接收结果列表。

Args:
    query (str): 要传递给谷歌搜索的查询。
    num (int, 可选): 要返回的搜索结果数量。默认为None。

引发:
    ValueError: 如果'num'不是介于1和10之间的整数。
"""
        url = QUERY_URL_TMPL.format(
            key=self.key, engine=self.engine, query=urllib.parse.quote_plus(query)
        )

        if self.num is not None:
            if not 1 <= self.num <= 10:
                raise ValueError("num should be an integer between 1 and 10, inclusive")
            url += f"&num={self.num}"

        response = requests.get(url)
        return [Document(text=response.text)]