跳至内容

谷歌

Gmail工具规范 #

基类: BaseToolSpec

Gmail工具规范。

赋予智能体读取、起草和发送Gmail邮件的能力

Source code in llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/gmail/base.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
class GmailToolSpec(BaseToolSpec):
    """
    GMail tool spec.

    Gives the agent the ability to read, draft and send gmail messages

    """

    spec_functions = [
        "load_data",
        "search_messages",
        "create_draft",
        "update_draft",
        "get_draft",
        "send_draft",
    ]
    query: str = None
    use_iterative_parser: bool = False
    max_results: int = 10
    service: Any = None

    def _cache_service(self) -> None:
        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        if not self.service:
            self.service = build("gmail", "v1", credentials=credentials)

    def load_data(self) -> List[Document]:
        """Load emails from the user's account."""
        self._cache_service()

        return self.search_messages(query="")

    def _get_credentials(self) -> Any:
        """
        Get valid user credentials from storage.

        The file token.json stores the user's access and refresh tokens, and is
        created automatically when the authorization flow completes for the first
        time.

        Returns:
            Credentials, the obtained credential.

        """
        import os

        from google.auth.transport.requests import Request
        from google.oauth2.credentials import Credentials
        from google_auth_oauthlib.flow import InstalledAppFlow

        creds = None
        if os.path.exists("token.json"):
            creds = Credentials.from_authorized_user_file("token.json", SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    "credentials.json", SCOPES
                )
                creds = flow.run_local_server(port=8080)
            # Save the credentials for the next run
            with open("token.json", "w") as token:
                token.write(creds.to_json())

        return creds

    def search_messages(self, query: str, max_results: Optional[int] = None):
        """
        Searches email messages given a query string and the maximum number
        of results requested by the user
           Returns: List of relevant message objects up to the maximum number of results.

        Args:
            query (str): The user's query
            max_results (Optional[int]): The maximum number of search results
            to return.

        """
        if not max_results:
            max_results = self.max_results

        self._cache_service()

        messages = (
            self.service.users()
            .messages()
            .list(userId="me", q=query or None, maxResults=int(max_results))
            .execute()
            .get("messages", [])
        )

        results = []
        try:
            for message in messages:
                message_data = self.get_message_data(message)
                text = message_data.pop("body")
                metadata = message_data
                results.append(Document(text=text, metadata=metadata))
        except Exception as e:
            raise Exception("Can't get message data" + str(e))

        return results

    def get_message_data(self, message):
        message_id = message["id"]
        message_data = (
            self.service.users()
            .messages()
            .get(format="raw", userId="me", id=message_id)
            .execute()
        )
        if self.use_iterative_parser:
            body = self.extract_message_body_iterative(message_data)
        else:
            body = self.extract_message_body(message_data)

        if not body:
            return None

        return {
            "id": message_data["id"],
            "threadId": message_data["threadId"],
            "snippet": message_data["snippet"],
            "body": body,
        }

    def extract_message_body_iterative(self, message: dict):
        if message["raw"]:
            body = base64.urlsafe_b64decode(message["raw"].encode("utf8"))
            mime_msg = email.message_from_bytes(body)
        else:
            mime_msg = message

        body_text = ""
        if mime_msg.get_content_type() == "text/plain":
            plain_text = mime_msg.get_payload(decode=True)
            charset = mime_msg.get_content_charset("utf-8")
            body_text = plain_text.decode(charset).encode("utf-8").decode("utf-8")

        elif mime_msg.get_content_maintype() == "multipart":
            msg_parts = mime_msg.get_payload()
            for msg_part in msg_parts:
                body_text += self.extract_message_body_iterative(msg_part)

        return body_text

    def extract_message_body(self, message: dict):
        from bs4 import BeautifulSoup

        try:
            body = base64.urlsafe_b64decode(message["raw"].encode("utf-8"))
            mime_msg = email.message_from_bytes(body)

            # If the message body contains HTML, parse it with BeautifulSoup
            if "text/html" in mime_msg:
                soup = BeautifulSoup(body, "html.parser")
                body = soup.get_text()
            return body.decode("utf-8")
        except Exception as e:
            raise Exception("Can't parse message body" + str(e))

    def _build_draft(
        self,
        to: Optional[List[str]] = None,
        subject: Optional[str] = None,
        message: Optional[str] = None,
    ) -> str:
        email_message = EmailMessage()

        email_message.set_content(message)

        email_message["To"] = to
        email_message["Subject"] = subject

        encoded_message = base64.urlsafe_b64encode(email_message.as_bytes()).decode()

        return {"message": {"raw": encoded_message}}

    def create_draft(
        self,
        to: Optional[List[str]] = None,
        subject: Optional[str] = None,
        message: Optional[str] = None,
    ) -> str:
        """
        Create and insert a draft email.
           Print the returned draft's message and id.
           Returns: Draft object, including draft id and message meta data.

        Args:
            to (Optional[str]): The email addresses to send the message to
            subject (Optional[str]): The subject for the event
            message (Optional[str]): The message for the event

        """
        self._cache_service()
        service = self.service

        return (
            service.users()
            .drafts()
            .create(userId="me", body=self._build_draft(to, subject, message))
            .execute()
        )

    def update_draft(
        self,
        to: Optional[List[str]] = None,
        subject: Optional[str] = None,
        message: Optional[str] = None,
        draft_id: str = None,
    ) -> str:
        """
        Update a draft email.
           Print the returned draft's message and id.
           This function is required to be passed a draft_id that is obtained when creating messages
           Returns: Draft object, including draft id and message meta data.

        Args:
            to (Optional[str]): The email addresses to send the message to
            subject (Optional[str]): The subject for the event
            message (Optional[str]): The message for the event
            draft_id (str): the id of the draft to be updated

        """
        self._cache_service()
        service = self.service

        if draft_id is None:
            return (
                "You did not provide a draft id when calling this function. If you"
                " previously created or retrieved the draft, the id is available in"
                " context"
            )

        draft = self.get_draft(draft_id)
        headers = draft["message"]["payload"]["headers"]
        for header in headers:
            if header["name"] == "To" and not to:
                to = header["value"]
            elif header["name"] == "Subject" and not subject:
                subject = header["value"]

        return (
            service.users()
            .drafts()
            .update(
                userId="me", id=draft_id, body=self._build_draft(to, subject, message)
            )
            .execute()
        )

    def get_draft(self, draft_id: str = None) -> str:
        """
        Get a draft email.
           Print the returned draft's message and id.
           Returns: Draft object, including draft id and message meta data.

        Args:
            draft_id (str): the id of the draft to be updated

        """
        self._cache_service()
        service = self.service
        return service.users().drafts().get(userId="me", id=draft_id).execute()

    def send_draft(self, draft_id: str = None) -> str:
        """
        Sends a draft email.
           Print the returned draft's message and id.
           Returns: Draft object, including draft id and message meta data.

        Args:
            draft_id (str): the id of the draft to be updated

        """
        self._cache_service()
        service = self.service
        return (
            service.users().drafts().send(userId="me", body={"id": draft_id}).execute()
        )

加载数据 #

load_data() -> List[Document]

从用户账户加载电子邮件。

Source code in llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/gmail/base.py
45
46
47
48
49
def load_data(self) -> List[Document]:
    """Load emails from the user's account."""
    self._cache_service()

    return self.search_messages(query="")

search_messages #

search_messages(query: str, max_results: Optional[int] = None)

根据用户提供的查询字符串和请求的最大结果数量搜索电子邮件消息 返回:最多不超过最大结果数量的相关消息对象列表。

参数:

名称 类型 描述 默认值
query str

用户的查询

required
max_results Optional[int]

最大搜索结果数量

None
Source code in llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/gmail/base.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def search_messages(self, query: str, max_results: Optional[int] = None):
    """
    Searches email messages given a query string and the maximum number
    of results requested by the user
       Returns: List of relevant message objects up to the maximum number of results.

    Args:
        query (str): The user's query
        max_results (Optional[int]): The maximum number of search results
        to return.

    """
    if not max_results:
        max_results = self.max_results

    self._cache_service()

    messages = (
        self.service.users()
        .messages()
        .list(userId="me", q=query or None, maxResults=int(max_results))
        .execute()
        .get("messages", [])
    )

    results = []
    try:
        for message in messages:
            message_data = self.get_message_data(message)
            text = message_data.pop("body")
            metadata = message_data
            results.append(Document(text=text, metadata=metadata))
    except Exception as e:
        raise Exception("Can't get message data" + str(e))

    return results

create_draft #

create_draft(to: Optional[List[str]] = None, subject: Optional[str] = None, message: Optional[str] = None) -> str

创建并插入一封草稿邮件。 打印返回草稿的消息和ID。 返回:草稿对象,包括草稿ID和消息元数据。

参数:

名称 类型 描述 默认值
to Optional[str]

要发送消息的电子邮件地址

None
subject Optional[str]

活动主题

None
message Optional[str]

事件的消息

None
Source code in llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/gmail/base.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def create_draft(
    self,
    to: Optional[List[str]] = None,
    subject: Optional[str] = None,
    message: Optional[str] = None,
) -> str:
    """
    Create and insert a draft email.
       Print the returned draft's message and id.
       Returns: Draft object, including draft id and message meta data.

    Args:
        to (Optional[str]): The email addresses to send the message to
        subject (Optional[str]): The subject for the event
        message (Optional[str]): The message for the event

    """
    self._cache_service()
    service = self.service

    return (
        service.users()
        .drafts()
        .create(userId="me", body=self._build_draft(to, subject, message))
        .execute()
    )

更新草稿 #

update_draft(to: Optional[List[str]] = None, subject: Optional[str] = None, message: Optional[str] = None, draft_id: str = None) -> str

更新草稿邮件。 打印返回的草稿消息和ID。 此函数需要传入创建消息时获取的draft_id。 返回:草稿对象,包括草稿ID和消息元数据。

参数:

名称 类型 描述 默认值
to Optional[str]

要发送消息的电子邮件地址

None
subject Optional[str]

活动主题

None
message Optional[str]

事件的消息

None
draft_id str

待更新的草稿ID

None
Source code in llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/gmail/base.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def update_draft(
    self,
    to: Optional[List[str]] = None,
    subject: Optional[str] = None,
    message: Optional[str] = None,
    draft_id: str = None,
) -> str:
    """
    Update a draft email.
       Print the returned draft's message and id.
       This function is required to be passed a draft_id that is obtained when creating messages
       Returns: Draft object, including draft id and message meta data.

    Args:
        to (Optional[str]): The email addresses to send the message to
        subject (Optional[str]): The subject for the event
        message (Optional[str]): The message for the event
        draft_id (str): the id of the draft to be updated

    """
    self._cache_service()
    service = self.service

    if draft_id is None:
        return (
            "You did not provide a draft id when calling this function. If you"
            " previously created or retrieved the draft, the id is available in"
            " context"
        )

    draft = self.get_draft(draft_id)
    headers = draft["message"]["payload"]["headers"]
    for header in headers:
        if header["name"] == "To" and not to:
            to = header["value"]
        elif header["name"] == "Subject" and not subject:
            subject = header["value"]

    return (
        service.users()
        .drafts()
        .update(
            userId="me", id=draft_id, body=self._build_draft(to, subject, message)
        )
        .execute()
    )

获取草稿 #

get_draft(draft_id: str = None) -> str

获取草稿邮件。 打印返回草稿的消息和ID。 返回:草稿对象,包括草稿ID和消息元数据。

参数:

名称 类型 描述 默认值
draft_id str

待更新的草稿ID

None
Source code in llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/gmail/base.py
273
274
275
276
277
278
279
280
281
282
283
284
285
def get_draft(self, draft_id: str = None) -> str:
    """
    Get a draft email.
       Print the returned draft's message and id.
       Returns: Draft object, including draft id and message meta data.

    Args:
        draft_id (str): the id of the draft to be updated

    """
    self._cache_service()
    service = self.service
    return service.users().drafts().get(userId="me", id=draft_id).execute()

发送草稿 #

send_draft(draft_id: str = None) -> str

发送一封草稿邮件。 打印返回的草稿消息和ID。 返回:草稿对象,包括草稿ID和消息元数据。

参数:

名称 类型 描述 默认值
draft_id str

待更新的草稿ID

None
Source code in llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/gmail/base.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def send_draft(self, draft_id: str = None) -> str:
    """
    Sends a draft email.
       Print the returned draft's message and id.
       Returns: Draft object, including draft id and message meta data.

    Args:
        draft_id (str): the id of the draft to be updated

    """
    self._cache_service()
    service = self.service
    return (
        service.users().drafts().send(userId="me", body={"id": draft_id}).execute()
    )

Google日历工具规范 #

基类: BaseToolSpec

Google日历工具规范。

目前仅是对数据加载器的简单封装。 待办事项:为Google日历规范添加更多方法。

Source code in llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/calendar/base.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class GoogleCalendarToolSpec(BaseToolSpec):
    """
    Google Calendar tool spec.

    Currently a simple wrapper around the data loader.
    TODO: add more methods to the Google Calendar spec.

    """

    spec_functions = ["load_data", "create_event", "get_date"]

    def load_data(
        self,
        number_of_results: Optional[int] = 100,
        start_date: Optional[Union[str, datetime.date]] = None,
    ) -> List[Document]:
        """
        Load data from user's calendar.

        Args:
            number_of_results (Optional[int]): the number of events to return. Defaults to 100.
            start_date (Optional[Union[str, datetime.date]]): the start date to return events from in date isoformat. Defaults to today.

        """
        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        service = build("calendar", "v3", credentials=credentials)

        if start_date is None:
            start_date = datetime.date.today()
        elif isinstance(start_date, str):
            start_date = datetime.date.fromisoformat(start_date)

        start_datetime = datetime.datetime.combine(start_date, datetime.time.min)
        start_datetime_utc = start_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

        events_result = (
            service.events()
            .list(
                calendarId="primary",
                timeMin=start_datetime_utc,
                maxResults=number_of_results,
                singleEvents=True,
                orderBy="startTime",
            )
            .execute()
        )

        events = events_result.get("items", [])

        if not events:
            return []

        results = []
        for event in events:
            if "dateTime" in event["start"]:
                start_time = event["start"]["dateTime"]
            else:
                start_time = event["start"]["date"]

            if "dateTime" in event["end"]:
                end_time = event["end"]["dateTime"]
            else:
                end_time = event["end"]["date"]

            event_string = f"Status: {event['status']}, "
            event_string += f"Summary: {event['summary']}, "
            event_string += f"Start time: {start_time}, "
            event_string += f"End time: {end_time}, "

            organizer = event.get("organizer", {})
            display_name = organizer.get("displayName", "N/A")
            email = organizer.get("email", "N/A")
            if display_name != "N/A":
                event_string += f"Organizer: {display_name} ({email})"
            else:
                event_string += f"Organizer: {email}"

            results.append(Document(text=event_string))

        return results

    def _get_credentials(self) -> Any:
        """
        Get valid user credentials from storage.

        The file token.json stores the user's access and refresh tokens, and is
        created automatically when the authorization flow completes for the first
        time.

        Returns:
            Credentials, the obtained credential.

        """
        from google_auth_oauthlib.flow import InstalledAppFlow

        from google.auth.transport.requests import Request
        from google.oauth2.credentials import Credentials

        creds = None
        if os.path.exists("token.json"):
            creds = Credentials.from_authorized_user_file("token.json", SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    "credentials.json", SCOPES
                )
                creds = flow.run_local_server(port=8080)
            # Save the credentials for the next run
            with open("token.json", "w") as token:
                token.write(creds.to_json())

        return creds

    def create_event(
        self,
        title: Optional[str] = None,
        description: Optional[str] = None,
        location: Optional[str] = None,
        start_datetime: Optional[Union[str, datetime.datetime]] = None,
        end_datetime: Optional[Union[str, datetime.datetime]] = None,
        attendees: Optional[List[str]] = None,
    ) -> str:
        """
            Create an event on the users calendar.

        Args:
            title (Optional[str]): The title for the event
            description (Optional[str]): The description for the event
            location (Optional[str]): The location for the event
            start_datetime Optional[Union[str, datetime.datetime]]: The start datetime for the event
            end_datetime Optional[Union[str, datetime.datetime]]: The end datetime for the event
            attendees Optional[List[str]]: A list of email address to invite to the event

        """
        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        service = build("calendar", "v3", credentials=credentials)

        attendees_list = (
            [{"email": attendee} for attendee in attendees] if attendees else []
        )

        start_time = (
            datetime.datetime.strptime(start_datetime, "%Y-%m-%dT%H:%M:%S%z")
            .astimezone()
            .strftime("%Y-%m-%dT%H:%M:%S.%f%z")
        )
        end_time = (
            datetime.datetime.strptime(end_datetime, "%Y-%m-%dT%H:%M:%S%z")
            .astimezone()
            .strftime("%Y-%m-%dT%H:%M:%S.%f%z")
        )

        event = {
            "summary": title,
            "location": location,
            "description": description,
            "start": {
                "dateTime": start_time,
            },
            "end": {
                "dateTime": end_time,
            },
            "attendees": attendees_list,
        }
        event = service.events().insert(calendarId="primary", body=event).execute()
        return (
            "Your calendar event has been created successfully! You can move on to the"
            " next step."
        )

    def get_date(self):
        """
        A function to return todays date. Call this before any other functions if you are unaware of the date.
        """
        return datetime.date.today()

加载数据 #

load_data(number_of_results: Optional[int] = 100, start_date: Optional[Union[str, date]] = None) -> List[Document]

从用户的日历加载数据。

参数:

名称 类型 描述 默认值
number_of_results Optional[int]

返回的事件数量。默认为100。

100
start_date Optional[Union[str, date]]

返回事件的起始日期,采用ISO格式。默认为当天。

None
Source code in llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/calendar/base.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def load_data(
    self,
    number_of_results: Optional[int] = 100,
    start_date: Optional[Union[str, datetime.date]] = None,
) -> List[Document]:
    """
    Load data from user's calendar.

    Args:
        number_of_results (Optional[int]): the number of events to return. Defaults to 100.
        start_date (Optional[Union[str, datetime.date]]): the start date to return events from in date isoformat. Defaults to today.

    """
    from googleapiclient.discovery import build

    credentials = self._get_credentials()
    service = build("calendar", "v3", credentials=credentials)

    if start_date is None:
        start_date = datetime.date.today()
    elif isinstance(start_date, str):
        start_date = datetime.date.fromisoformat(start_date)

    start_datetime = datetime.datetime.combine(start_date, datetime.time.min)
    start_datetime_utc = start_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

    events_result = (
        service.events()
        .list(
            calendarId="primary",
            timeMin=start_datetime_utc,
            maxResults=number_of_results,
            singleEvents=True,
            orderBy="startTime",
        )
        .execute()
    )

    events = events_result.get("items", [])

    if not events:
        return []

    results = []
    for event in events:
        if "dateTime" in event["start"]:
            start_time = event["start"]["dateTime"]
        else:
            start_time = event["start"]["date"]

        if "dateTime" in event["end"]:
            end_time = event["end"]["dateTime"]
        else:
            end_time = event["end"]["date"]

        event_string = f"Status: {event['status']}, "
        event_string += f"Summary: {event['summary']}, "
        event_string += f"Start time: {start_time}, "
        event_string += f"End time: {end_time}, "

        organizer = event.get("organizer", {})
        display_name = organizer.get("displayName", "N/A")
        email = organizer.get("email", "N/A")
        if display_name != "N/A":
            event_string += f"Organizer: {display_name} ({email})"
        else:
            event_string += f"Organizer: {email}"

        results.append(Document(text=event_string))

    return results

create_event #

create_event(title: Optional[str] = None, description: Optional[str] = None, location: Optional[str] = None, start_datetime: Optional[Union[str, datetime]] = None, end_datetime: Optional[Union[str, datetime]] = None, attendees: Optional[List[str]] = None) -> str
Create an event on the users calendar.

参数:

名称 类型 描述 默认值
title Optional[str]

活动标题

None
description Optional[str]

事件的描述

None
location Optional[str]

活动地点

None
start_datetime Optional[Union[str, datetime]]

活动的开始日期时间

None
end_datetime Optional[Union[str, datetime]]

事件的结束日期时间

None
attendees Optional[List[str]]

邀请参加活动的电子邮件地址列表

None
Source code in llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/calendar/base.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def create_event(
    self,
    title: Optional[str] = None,
    description: Optional[str] = None,
    location: Optional[str] = None,
    start_datetime: Optional[Union[str, datetime.datetime]] = None,
    end_datetime: Optional[Union[str, datetime.datetime]] = None,
    attendees: Optional[List[str]] = None,
) -> str:
    """
        Create an event on the users calendar.

    Args:
        title (Optional[str]): The title for the event
        description (Optional[str]): The description for the event
        location (Optional[str]): The location for the event
        start_datetime Optional[Union[str, datetime.datetime]]: The start datetime for the event
        end_datetime Optional[Union[str, datetime.datetime]]: The end datetime for the event
        attendees Optional[List[str]]: A list of email address to invite to the event

    """
    from googleapiclient.discovery import build

    credentials = self._get_credentials()
    service = build("calendar", "v3", credentials=credentials)

    attendees_list = (
        [{"email": attendee} for attendee in attendees] if attendees else []
    )

    start_time = (
        datetime.datetime.strptime(start_datetime, "%Y-%m-%dT%H:%M:%S%z")
        .astimezone()
        .strftime("%Y-%m-%dT%H:%M:%S.%f%z")
    )
    end_time = (
        datetime.datetime.strptime(end_datetime, "%Y-%m-%dT%H:%M:%S%z")
        .astimezone()
        .strftime("%Y-%m-%dT%H:%M:%S.%f%z")
    )

    event = {
        "summary": title,
        "location": location,
        "description": description,
        "start": {
            "dateTime": start_time,
        },
        "end": {
            "dateTime": end_time,
        },
        "attendees": attendees_list,
    }
    event = service.events().insert(calendarId="primary", body=event).execute()
    return (
        "Your calendar event has been created successfully! You can move on to the"
        " next step."
    )

get_date #

get_date()

一个返回今天日期的函数。如果不清楚当前日期,请在调用其他函数前先调用此函数。

Source code in llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/calendar/base.py
204
205
206
207
208
def get_date(self):
    """
    A function to return todays date. Call this before any other functions if you are unaware of the date.
    """
    return datetime.date.today()

Google搜索工具规范 #

基类: BaseToolSpec

Google搜索工具规格说明。

Source code in llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/search/base.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class GoogleSearchToolSpec(BaseToolSpec):
    """Google Search tool spec."""

    spec_functions = ["google_search"]

    def __init__(self, key: str, engine: str, num: Optional[int] = None) -> None:
        """Initialize with parameters."""
        self.key = key
        self.engine = engine
        self.num = num

    def google_search(self, query: str):
        """
        Make a query to the Google search engine to receive a list of results.

        Args:
            query (str): The query to be passed to Google search.
            num (int, optional): The number of search results to return. Defaults to None.

        Raises:
            ValueError: If the 'num' is not an integer between 1 and 10.

        """
        url = QUERY_URL_TMPL.format(
            key=self.key, engine=self.engine, query=urllib.parse.quote_plus(query)
        )

        if self.num is not None:
            if not 1 <= self.num <= 10:
                raise ValueError("num should be an integer between 1 and 10, inclusive")
            url += f"&num={self.num}"

        response = requests.get(url)
        return [Document(text=response.text)]
google_search(query: str)

向谷歌搜索引擎发起查询以获取结果列表。

参数:

名称 类型 描述 默认值
query str

要传递给Google搜索的查询。

required
num int

返回的搜索结果数量。默认为None。

required

抛出异常:

类型 描述
ValueError

如果'num'不是1到10之间的整数。

Source code in llama-index-integrations/tools/llama-index-tools-google/llama_index/tools/google/search/base.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def google_search(self, query: str):
    """
    Make a query to the Google search engine to receive a list of results.

    Args:
        query (str): The query to be passed to Google search.
        num (int, optional): The number of search results to return. Defaults to None.

    Raises:
        ValueError: If the 'num' is not an integer between 1 and 10.

    """
    url = QUERY_URL_TMPL.format(
        key=self.key, engine=self.engine, query=urllib.parse.quote_plus(query)
    )

    if self.num is not None:
        if not 1 <= self.num <= 10:
            raise ValueError("num should be an integer between 1 and 10, inclusive")
        url += f"&num={self.num}"

    response = requests.get(url)
    return [Document(text=response.text)]