Skip to content

Google

GmailReader #

Bases: BaseReader, BaseModel

Gmail阅读器。

读取邮件

Parameters:

Name Type Description Default
max_results int

默认为10。

required
query str

Gmail查询。默认为None。

required
service Any

Gmail服务。默认为None。

required
results_per_page Optional[int]

每页的最大结果数。默认为10。

required
use_iterative_parser bool

使用迭代解析器。默认为False。

required
Source code in llama_index/readers/google/gmail/base.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
class GmailReader(BaseReader, BaseModel):
    """Gmail阅读器。

    读取邮件

    Args:
        max_results (int): 默认为10。
        query (str): Gmail查询。默认为None。
        service (Any): Gmail服务。默认为None。
        results_per_page (Optional[int]): 每页的最大结果数。默认为10。
        use_iterative_parser (bool): 使用迭代解析器。默认为False。"""

    query: str = None
    use_iterative_parser: bool = False
    max_results: int = 10
    service: Any
    results_per_page: Optional[int]

    def load_data(self) -> List[Document]:
        """从用户的账户中加载电子邮件。"""
        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        if not self.service:
            self.service = build("gmail", "v1", credentials=credentials)

        messages = self.search_messages()

        results = []
        for message in messages:
            text = message.pop("body")
            extra_info = message
            results.append(Document(text=text, extra_info=extra_info or {}))

        return results

    def _get_credentials(self) -> Any:
        """从存储中获取有效的用户凭据。

文件token.json存储用户的访问和刷新令牌,在首次授权流程完成时会自动创建。

返回:
    凭据,获取到的凭据。
"""
        import os

        from google_auth_oauthlib.flow import InstalledAppFlow

        from google.auth.transport.requests import Request
        from google.oauth2.credentials import Credentials

        creds = None
        if os.path.exists("token.json"):
            creds = Credentials.from_authorized_user_file("token.json", SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    "credentials.json", SCOPES
                )
                creds = flow.run_local_server(port=8080)
            # Save the credentials for the next run
            with open("token.json", "w") as token:
                token.write(creds.to_json())

        return creds

    def search_messages(self):
        query = self.query

        max_results = self.max_results
        if self.results_per_page:
            max_results = self.results_per_page

        results = (
            self.service.users()
            .messages()
            .list(userId="me", q=query, maxResults=int(max_results))
            .execute()
        )
        messages = results.get("messages", [])

        if len(messages) < self.max_results:
            # paginate if there are more results
            while "nextPageToken" in results:
                page_token = results["nextPageToken"]
                results = (
                    self.service.users()
                    .messages()
                    .list(
                        userId="me",
                        q=query,
                        pageToken=page_token,
                        maxResults=int(max_results),
                    )
                    .execute()
                )
                messages.extend(results["messages"])
                if len(messages) >= self.max_results:
                    break

        result = []
        try:
            for message in messages:
                message_data = self.get_message_data(message)
                if not message_data:
                    continue
                result.append(message_data)
        except Exception as e:
            raise Exception("Can't get message data" + str(e))

        return result

    def get_message_data(self, message):
        message_id = message["id"]
        message_data = (
            self.service.users()
            .messages()
            .get(format="raw", userId="me", id=message_id)
            .execute()
        )
        if self.use_iterative_parser:
            body = self.extract_message_body_iterative(message_data)
        else:
            body = self.extract_message_body(message_data)

        if not body:
            return None

        # https://developers.google.com/gmail/api/reference/rest/v1/users.messages
        return {
            "id": message_data["id"],
            "threadId": message_data["threadId"],
            "snippet": message_data["snippet"],
            "internalDate": message_data["internalDate"],
            "body": body,
        }

    def extract_message_body_iterative(self, message: dict):
        if message["raw"]:
            body = base64.urlsafe_b64decode(message["raw"].encode("utf-8"))
            mime_msg = email.message_from_bytes(body)
        else:
            mime_msg = message

        body_text = ""
        if mime_msg.get_content_type() == "text/plain":
            plain_text = mime_msg.get_payload(decode=True)
            charset = mime_msg.get_content_charset("utf-8")
            body_text = plain_text.decode(charset).encode("utf-8").decode("utf-8")

        elif mime_msg.get_content_maintype() == "multipart":
            msg_parts = mime_msg.get_payload()
            for msg_part in msg_parts:
                body_text += self.extract_message_body_iterative(msg_part)

        return body_text

    def extract_message_body(self, message: dict):
        from bs4 import BeautifulSoup

        try:
            body = base64.urlsafe_b64decode(message["raw"].encode("utf-8"))
            mime_msg = email.message_from_bytes(body)

            # If the message body contains HTML, parse it with BeautifulSoup
            if "text/html" in mime_msg:
                soup = BeautifulSoup(body, "html.parser")
                body = soup.get_text()
            return body.decode("utf-8")
        except Exception as e:
            raise Exception("Can't parse message body" + str(e))

load_data #

load_data() -> List[Document]

从用户的账户中加载电子邮件。

Source code in llama_index/readers/google/gmail/base.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def load_data(self) -> List[Document]:
    """从用户的账户中加载电子邮件。"""
    from googleapiclient.discovery import build

    credentials = self._get_credentials()
    if not self.service:
        self.service = build("gmail", "v1", credentials=credentials)

    messages = self.search_messages()

    results = []
    for message in messages:
        text = message.pop("body")
        extra_info = message
        results.append(Document(text=text, extra_info=extra_info or {}))

    return results

GoogleCalendarReader #

Bases: BaseReader

谷歌日历阅读器。

从谷歌日历中读取事件

Source code in llama_index/readers/google/calendar/base.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class GoogleCalendarReader(BaseReader):
    """谷歌日历阅读器。

    从谷歌日历中读取事件

    """

    def load_data(
        self,
        number_of_results: Optional[int] = 100,
        start_date: Optional[Union[str, datetime.date]] = None,
    ) -> List[Document]:
        """从用户日历加载数据。

Args:
    number_of_results (可选[int]): 要返回的事件数量。默认为100。
    start_date (可选[Union[str, datetime.date]]): 要返回事件的开始日期。默认为今天。
"""
        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        service = build("calendar", "v3", credentials=credentials)

        if start_date is None:
            start_date = datetime.date.today()
        elif isinstance(start_date, str):
            start_date = datetime.date.fromisoformat(start_date)

        start_datetime = datetime.datetime.combine(start_date, datetime.time.min)
        start_datetime_utc = start_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

        events_result = (
            service.events()
            .list(
                calendarId="primary",
                timeMin=start_datetime_utc,
                maxResults=number_of_results,
                singleEvents=True,
                orderBy="startTime",
            )
            .execute()
        )

        events = events_result.get("items", [])

        if not events:
            return []

        results = []
        for event in events:
            if "dateTime" in event["start"]:
                start_time = event["start"]["dateTime"]
            else:
                start_time = event["start"]["date"]

            if "dateTime" in event["end"]:
                end_time = event["end"]["dateTime"]
            else:
                end_time = event["end"]["date"]

            event_string = f"Status: {event['status']}, "
            event_string += f"Summary: {event['summary']}, "
            event_string += f"Start time: {start_time}, "
            event_string += f"End time: {end_time}, "

            organizer = event.get("organizer", {})
            display_name = organizer.get("displayName", "N/A")
            email = organizer.get("email", "N/A")
            if display_name != "N/A":
                event_string += f"Organizer: {display_name} ({email})"
            else:
                event_string += f"Organizer: {email}"

            results.append(Document(text=event_string))

        return results

    def _get_credentials(self) -> Any:
        """从存储中获取有效的用户凭据。

文件token.json存储用户的访问和刷新令牌,在首次授权流程完成时会自动创建。

返回:
    凭据,获取到的凭据。
"""
        from google_auth_oauthlib.flow import InstalledAppFlow

        from google.auth.transport.requests import Request
        from google.oauth2.credentials import Credentials

        creds = None
        if os.path.exists("token.json"):
            creds = Credentials.from_authorized_user_file("token.json", SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    "credentials.json", SCOPES
                )
                creds = flow.run_local_server(port=0)
            # Save the credentials for the next run
            with open("token.json", "w") as token:
                token.write(creds.to_json())

        return creds

load_data #

load_data(
    number_of_results: Optional[int] = 100,
    start_date: Optional[Union[str, date]] = None,
) -> List[Document]

从用户日历加载数据。

Parameters:

Name Type Description Default
number_of_results 可选[int]

要返回的事件数量。默认为100。

100
start_date 可选[Union[str, date]]

要返回事件的开始日期。默认为今天。

None
Source code in llama_index/readers/google/calendar/base.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
    def load_data(
        self,
        number_of_results: Optional[int] = 100,
        start_date: Optional[Union[str, datetime.date]] = None,
    ) -> List[Document]:
        """从用户日历加载数据。

Args:
    number_of_results (可选[int]): 要返回的事件数量。默认为100。
    start_date (可选[Union[str, datetime.date]]): 要返回事件的开始日期。默认为今天。
"""
        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        service = build("calendar", "v3", credentials=credentials)

        if start_date is None:
            start_date = datetime.date.today()
        elif isinstance(start_date, str):
            start_date = datetime.date.fromisoformat(start_date)

        start_datetime = datetime.datetime.combine(start_date, datetime.time.min)
        start_datetime_utc = start_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

        events_result = (
            service.events()
            .list(
                calendarId="primary",
                timeMin=start_datetime_utc,
                maxResults=number_of_results,
                singleEvents=True,
                orderBy="startTime",
            )
            .execute()
        )

        events = events_result.get("items", [])

        if not events:
            return []

        results = []
        for event in events:
            if "dateTime" in event["start"]:
                start_time = event["start"]["dateTime"]
            else:
                start_time = event["start"]["date"]

            if "dateTime" in event["end"]:
                end_time = event["end"]["dateTime"]
            else:
                end_time = event["end"]["date"]

            event_string = f"Status: {event['status']}, "
            event_string += f"Summary: {event['summary']}, "
            event_string += f"Start time: {start_time}, "
            event_string += f"End time: {end_time}, "

            organizer = event.get("organizer", {})
            display_name = organizer.get("displayName", "N/A")
            email = organizer.get("email", "N/A")
            if display_name != "N/A":
                event_string += f"Organizer: {display_name} ({email})"
            else:
                event_string += f"Organizer: {email}"

            results.append(Document(text=event_string))

        return results

GoogleDocsReader #

Bases: BasePydanticReader

谷歌文档阅读器。

从谷歌文档中读取页面。

Source code in llama_index/readers/google/docs/base.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
class GoogleDocsReader(BasePydanticReader):
    """谷歌文档阅读器。

    从谷歌文档中读取页面。"""

    is_remote: bool = True

    split_on_heading_level: Optional[int] = Field(
        default=None,
        description="If set the document will be split on the specified heading level.",
    )

    include_toc: bool = Field(
        default=True, description="Include table of contents elements."
    )

    @classmethod
    def class_name(cls) -> str:
        return "GoogleDocsReader"

    def load_data(self, document_ids: List[str]) -> List[Document]:
        """从输入目录加载数据。

Args:
    document_ids(List[str]):文档id的列表。
"""
        if document_ids is None:
            raise ValueError('Must specify a "document_ids" in `load_kwargs`.')

        results = []
        for document_id in document_ids:
            docs = self._load_doc(document_id)
            results.extend(docs)

        return results

    def _load_doc(self, document_id: str) -> str:
        """从Google Docs加载文档。

Args:
    document_id:文档ID。

Returns:
    文档文本。
"""
        credentials = self._get_credentials()
        docs_service = discovery.build("docs", "v1", credentials=credentials)
        google_doc = docs_service.documents().get(documentId=document_id).execute()
        google_doc_content = google_doc.get("body").get("content")

        doc_metadata = {"document_id": document_id}

        return self._structural_elements_to_docs(google_doc_content, doc_metadata)

    def _get_credentials(self) -> Any:
        """从存储中获取有效的用户凭据。

文件token.json存储用户的访问和刷新令牌,在首次授权流程完成时会自动创建。

返回:
    凭据,获取到的凭据。
"""
        creds = None
        if os.path.exists("token.json"):
            creds = Credentials.from_authorized_user_file("token.json", SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    "credentials.json", SCOPES
                )
                creds = flow.run_local_server(port=0)
            # Save the credentials for the next run
            with open("token.json", "w") as token:
                token.write(creds.to_json())

        return creds

    def _read_paragraph_element(self, element: Any) -> Any:
        """返回给定段落元素中的文本。

Args:
    element: 来自Google文档的段落元素。
"""
        text_run = element.get("textRun")
        if not text_run:
            return ""
        return text_run.get("content")

    def _read_structural_elements(self, elements: List[Any]) -> Any:
        """递归遍历结构元素列表。

读取文档的文本,其中文本可能嵌套在元素中。

Args:
    elements: 结构元素的列表。
"""
        text = ""
        for value in elements:
            if "paragraph" in value:
                elements = value.get("paragraph").get("elements")
                for elem in elements:
                    text += self._read_paragraph_element(elem)
            elif "table" in value:
                # The text in table cells are in nested Structural Elements
                # and tables may be nested.
                table = value.get("table")
                for row in table.get("tableRows"):
                    cells = row.get("tableCells")
                    for cell in cells:
                        text += self._read_structural_elements(cell.get("content"))
            elif "tableOfContents" in value:
                # The text in the TOC is also in a Structural Element.
                toc = value.get("tableOfContents")
                text += self._read_structural_elements(toc.get("content"))
        return text

    def _determine_heading_level(self, element):
        """提取文档元素的标题级别、标签和ID。

Args:
    element:结构元素。
"""
        level = None
        heading_key = None
        heading_id = None
        if self.split_on_heading_level and "paragraph" in element:
            style = element.get("paragraph").get("paragraphStyle")
            style_type = style.get("namedStyleType", "")
            heading_id = style.get("headingId", None)
            if style_type == "TITLE":
                level = 0
                heading_key = "title"
            elif style_type.startswith("HEADING_"):
                level = int(style_type.split("_")[1])
                if level > self.split_on_heading_level:
                    return None, None, None

                heading_key = f"Header {level}"

        return level, heading_key, heading_id

    def _generate_doc_id(self, metadata: dict):
        if "heading_id" in metadata:
            heading_id = metadata["heading_id"]
        else:
            heading_id = "".join(
                random.choices(string.ascii_letters + string.digits, k=8)
            )
        return f"{metadata['document_id']}_{heading_id}"

    def _structural_elements_to_docs(
        self, elements: List[Any], doc_metadata: dict
    ) -> Any:
        """递归遍历结构元素列表。

如果设置了split_on_heading_level,则根据标题拆分文档。

Args:
elements: 结构元素列表。
"""
        docs = []

        current_heading_level = self.split_on_heading_level

        metadata = doc_metadata.copy()
        text = ""
        for value in elements:
            element_text = self._read_structural_elements([value])

            level, heading_key, heading_id = self._determine_heading_level(value)

            if level is not None:
                if level == self.split_on_heading_level:
                    if text.strip():
                        docs.append(
                            Document(
                                id_=self._generate_doc_id(metadata),
                                text=text,
                                metadata=metadata.copy(),
                            )
                        )
                        text = ""
                    if "heading_id" in metadata:
                        metadata["heading_id"] = heading_id
                elif level < current_heading_level:
                    metadata = doc_metadata.copy()

                metadata[heading_key] = element_text
                current_heading_level = level
            else:
                text += element_text

        if text:
            if docs:
                id_ = self._generate_doc_id(metadata)
            else:
                id_ = metadata["document_id"]
            docs.append(Document(id_=id_, text=text, metadata=metadata))

        return docs

load_data #

load_data(document_ids: List[str]) -> List[Document]

从输入目录加载数据。

Source code in llama_index/readers/google/docs/base.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
    def load_data(self, document_ids: List[str]) -> List[Document]:
        """从输入目录加载数据。

Args:
    document_ids(List[str]):文档id的列表。
"""
        if document_ids is None:
            raise ValueError('Must specify a "document_ids" in `load_kwargs`.')

        results = []
        for document_id in document_ids:
            docs = self._load_doc(document_id)
            results.extend(docs)

        return results

GoogleDriveReader #

Bases: BasePydanticReader

Google Drive阅读器。

从Google Drive中读取文件。直接传递给构造函数的凭据将优先于传递的文件路径。

Source code in llama_index/readers/google/drive/base.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
class GoogleDriveReader(BasePydanticReader):
    """Google Drive阅读器。

从Google Drive中读取文件。直接传递给构造函数的凭据将优先于传递的文件路径。

Args:
    drive_id(可选[str]):Google Drive中共享驱动器的驱动器ID。
    folder_id(可选[str]):Google Drive中文件夹的文件夹ID。
    file_ids(可选[str]):Google Drive中文件的文件ID。
    query_string:用于过滤文档的更通用的查询字符串,例如“name contains 'test'”。
        它提供了更灵活的过滤文档的方式。更多信息:https://developers.google.com/drive/api/v3/search-files
    is_cloud(可选[bool]):指示阅读器是否在云环境中使用。如果是,则不会将凭据保存到磁盘上。
        默认为False。
    credentials_path(可选[str]):客户端配置文件的路径。
        默认为None。
    token_path(可选[str]):授权用户信息文件的路径。默认为None。
    service_account_key_path(可选[str]):服务帐户密钥文件的路径。默认为None。
    client_config(可选[dict]):包含客户端配置的字典。
        默认为None。
    authorized_user_info(可选[dict]):包含授权用户信息的字典。
        默认为None。
    service_account_key(可选[dict]):包含服务帐户密钥的字典。
        默认为None。
    file_extractor(可选[Dict[str, BaseReader]]):文件扩展名到BaseReader类的映射,指定如何将该文件转换为文本。有关更多详细信息,请参见“SimpleDirectoryReader”。

    """

    drive_id: Optional[str] = None
    folder_id: Optional[str] = None
    file_ids: Optional[List[str]] = None
    query_string: Optional[str] = None
    client_config: Optional[dict] = None
    authorized_user_info: Optional[dict] = None
    service_account_key: Optional[dict] = None
    token_path: Optional[str] = None
    file_extractor: Optional[Dict[str, Union[str, BaseReader]]] = Field(
        default=None, exclude=True
    )

    _is_cloud: bool = PrivateAttr(default=False)
    _creds: Credentials = PrivateAttr()
    _mimetypes: dict = PrivateAttr()

    def __init__(
        self,
        drive_id: Optional[str] = None,
        folder_id: Optional[str] = None,
        file_ids: Optional[List[str]] = None,
        query_string: Optional[str] = None,
        is_cloud: Optional[bool] = False,
        credentials_path: str = "credentials.json",
        token_path: str = "token.json",
        service_account_key_path: str = "service_account_key.json",
        client_config: Optional[dict] = None,
        authorized_user_info: Optional[dict] = None,
        service_account_key: Optional[dict] = None,
        file_extractor: Optional[Dict[str, Union[str, BaseReader]]] = None,
        **kwargs: Any,
    ) -> None:
        """使用参数进行初始化。"""
        self._creds = None
        self._is_cloud = (is_cloud,)
        # Download Google Docs/Slides/Sheets as actual files
        # See https://developers.google.com/drive/v3/web/mime-types
        self._mimetypes = {
            "application/vnd.google-apps.document": {
                "mimetype": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
                "extension": ".docx",
            },
            "application/vnd.google-apps.spreadsheet": {
                "mimetype": (
                    "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
                ),
                "extension": ".xlsx",
            },
            "application/vnd.google-apps.presentation": {
                "mimetype": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
                "extension": ".pptx",
            },
        }

        # Read the file contents so they can be serialized and stored.
        if client_config is None and os.path.isfile(credentials_path):
            with open(credentials_path, encoding="utf-8") as json_file:
                client_config = json.load(json_file)

        if authorized_user_info is None and os.path.isfile(token_path):
            with open(token_path, encoding="utf-8") as json_file:
                authorized_user_info = json.load(json_file)

        if service_account_key is None and os.path.isfile(service_account_key_path):
            with open(service_account_key_path, encoding="utf-8") as json_file:
                service_account_key = json.load(json_file)

        if (
            client_config is None
            and service_account_key is None
            and authorized_user_info is None
        ):
            raise ValueError(
                "Must specify `client_config` or `service_account_key` or `authorized_user_info`."
            )

        super().__init__(
            drive_id=drive_id,
            folder_id=folder_id,
            file_ids=file_ids,
            query_string=query_string,
            client_config=client_config,
            authorized_user_info=authorized_user_info,
            service_account_key=service_account_key,
            token_path=token_path,
            file_extractor=file_extractor,
            **kwargs,
        )

    @classmethod
    def class_name(cls) -> str:
        return "GoogleDriveReader"

    def _get_credentials(self) -> Tuple[Credentials]:
        """使用Google进行身份验证并保存凭据。
按照以下说明下载service_account_key.json文件:https://cloud.google.com/iam/docs/keys-create-delete。

重要提示:确保与服务账号共享文件夹/文件。否则将无法读取文档。

返回:
    凭据
"""
        from google_auth_oauthlib.flow import InstalledAppFlow

        # First, we need the Google API credentials for the app
        creds = None

        if self.authorized_user_info is not None:
            creds = Credentials.from_authorized_user_info(
                self.authorized_user_info, SCOPES
            )
        elif self.service_account_key is not None:
            return service_account.Credentials.from_service_account_info(
                self.service_account_key, scopes=SCOPES
            )

        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_config(self.client_config, SCOPES)
                creds = flow.run_local_server(port=0)

            # Save the credentials for the next run
            if not self._is_cloud:
                with open(self.token_path, "w", encoding="utf-8") as token:
                    token.write(creds.to_json())

        return creds

    def _get_fileids_meta(
        self,
        drive_id: Optional[str] = None,
        folder_id: Optional[str] = None,
        file_id: Optional[str] = None,
        mime_types: Optional[List[str]] = None,
        query_string: Optional[str] = None,
    ) -> List[List[str]]:
        """获取文件夹/文件id中存在的文件id
Args:
    drive_id:谷歌云端硬盘中共享驱动器的驱动器id。
    folder_id:谷歌云端硬盘中文件夹的文件夹id。
    file_id:谷歌云端硬盘中文件的文件id。
    mime_types:您想要允许的mime类型,例如:"application/vnd.google-apps.document"。
    query_string:用于过滤文档的更通用的查询字符串,例如"name contains 'test'"。

Returns:
    metadata:文件id的元数据列表。
"""
        from googleapiclient.discovery import build

        try:
            service = build("drive", "v3", credentials=self._creds)
            fileids_meta = []
            if folder_id:
                folder_mime_type = "application/vnd.google-apps.folder"
                query = "('" + folder_id + "' in parents)"

                # Add mimeType filter to query
                if mime_types:
                    if folder_mime_type not in mime_types:
                        mime_types.append(folder_mime_type)  # keep the recursiveness
                    mime_query = " or ".join(
                        [f"mimeType='{mime_type}'" for mime_type in mime_types]
                    )
                    query += f" and ({mime_query})"

                # Add query string filter
                if query_string:
                    # to keep the recursiveness, we need to add folder_mime_type to the mime_types
                    query += (
                        f" and ((mimeType='{folder_mime_type}') or ({query_string}))"
                    )

                items = []
                # get files taking into account that the results are paginated
                while True:
                    if drive_id:
                        results = (
                            service.files()
                            .list(
                                q=query,
                                driveId=drive_id,
                                corpora="drive",
                                includeItemsFromAllDrives=True,
                                supportsAllDrives=True,
                                fields="*",
                            )
                            .execute()
                        )
                    else:
                        results = (
                            service.files()
                            .list(
                                q=query,
                                includeItemsFromAllDrives=True,
                                supportsAllDrives=True,
                                fields="*",
                            )
                            .execute()
                        )
                    items.extend(results.get("files", []))
                    page_token = results.get("nextPageToken", None)
                    if page_token is None:
                        break

                for item in items:
                    if item["mimeType"] == folder_mime_type:
                        if drive_id:
                            fileids_meta.extend(
                                self._get_fileids_meta(
                                    drive_id=drive_id,
                                    folder_id=item["id"],
                                    mime_types=mime_types,
                                    query_string=query_string,
                                )
                            )
                        else:
                            fileids_meta.extend(
                                self._get_fileids_meta(
                                    folder_id=item["id"],
                                    mime_types=mime_types,
                                    query_string=query_string,
                                )
                            )
                    else:
                        # Check if file doesn't belong to a Shared Drive. "owners" doesn't exist in a Shared Drive
                        is_shared_drive = "driveId" in item
                        author = (
                            item["owners"][0]["displayName"]
                            if not is_shared_drive
                            else "Shared Drive"
                        )

                        fileids_meta.append(
                            (
                                item["id"],
                                author,
                                item["name"],
                                item["mimeType"],
                                item["createdTime"],
                                item["modifiedTime"],
                            )
                        )
            else:
                # Get the file details
                file = (
                    service.files()
                    .get(fileId=file_id, supportsAllDrives=True, fields="*")
                    .execute()
                )
                # Get metadata of the file
                # Check if file doesn't belong to a Shared Drive. "owners" doesn't exist in a Shared Drive
                is_shared_drive = "driveId" in file
                author = (
                    file["owners"][0]["displayName"]
                    if not is_shared_drive
                    else "Shared Drive"
                )

                fileids_meta.append(
                    (
                        file["id"],
                        author,
                        file["name"],
                        file["mimeType"],
                        file["createdTime"],
                        file["modifiedTime"],
                    )
                )
            return fileids_meta

        except Exception as e:
            logger.error(
                f"An error occurred while getting fileids metadata: {e}", exc_info=True
            )

    def _download_file(self, fileid: str, filename: str) -> str:
        """下载具有文件id和文件名的文件
Args:
    fileid:谷歌云端硬盘中文件的文件id
    filename:将要下载的文件名
Returns:
    下载的文件名,可能具有新的扩展名。
"""
        from io import BytesIO

        from googleapiclient.discovery import build
        from googleapiclient.http import MediaIoBaseDownload

        try:
            # Get file details
            service = build("drive", "v3", credentials=self._creds)
            file = service.files().get(fileId=fileid, supportsAllDrives=True).execute()

            if file["mimeType"] in self._mimetypes:
                download_mimetype = self._mimetypes[file["mimeType"]]["mimetype"]
                download_extension = self._mimetypes[file["mimeType"]]["extension"]
                new_file_name = filename + download_extension

                # Download and convert file
                request = service.files().export_media(
                    fileId=fileid, mimeType=download_mimetype
                )
            else:
                new_file_name = filename

                # Download file without conversion
                request = service.files().get_media(fileId=fileid)

            # Download file data
            file_data = BytesIO()
            downloader = MediaIoBaseDownload(file_data, request)
            done = False

            while not done:
                status, done = downloader.next_chunk()

            # Save the downloaded file
            with open(new_file_name, "wb") as f:
                f.write(file_data.getvalue())

            return new_file_name
        except Exception as e:
            logger.error(
                f"An error occurred while downloading file: {e}", exc_info=True
            )

    def _load_data_fileids_meta(self, fileids_meta: List[List[str]]) -> List[Document]:
        """从文件id元数据中加载数据
Args:
    fileids_meta:谷歌驱动器中文件id的元数据。

Returns:
    Lis[Document]:文件id中存在的数据的文档列表。
"""
        try:
            with tempfile.TemporaryDirectory() as temp_dir:

                def get_metadata(filename):
                    return metadata[filename]

                temp_dir = Path(temp_dir)
                metadata = {}

                for fileid_meta in fileids_meta:
                    # Download files and name them with their fileid
                    fileid = fileid_meta[0]
                    filepath = os.path.join(temp_dir, fileid)
                    final_filepath = self._download_file(fileid, filepath)

                    # Add metadata of the file to metadata dictionary
                    metadata[final_filepath] = {
                        "file id": fileid_meta[0],
                        "author": fileid_meta[1],
                        "file name": fileid_meta[2],
                        "mime type": fileid_meta[3],
                        "created at": fileid_meta[4],
                        "modified at": fileid_meta[5],
                    }
                loader = SimpleDirectoryReader(
                    temp_dir,
                    file_extractor=self.file_extractor,
                    file_metadata=get_metadata,
                )
                documents = loader.load_data()
                for doc in documents:
                    doc.id_ = doc.metadata.get("file id", doc.id_)

            return documents
        except Exception as e:
            logger.error(
                f"An error occurred while loading data from fileids meta: {e}",
                exc_info=True,
            )

    def _load_from_file_ids(
        self,
        drive_id: Optional[str],
        file_ids: List[str],
        mime_types: Optional[List[str]],
        query_string: Optional[str],
    ) -> List[Document]:
        """从文件id加载数据
Args:
    file_ids:谷歌驱动中文件的文件id。
    mime_types:您想要允许的mime类型,例如:"application/vnd.google-apps.document"
    query_string:用于过滤文档的查询字符串列表,例如"name contains 'test'"。

Returns:
    Document:文本文档的列表。
"""
        try:
            fileids_meta = []
            for file_id in file_ids:
                fileids_meta.extend(
                    self._get_fileids_meta(
                        drive_id=drive_id,
                        file_id=file_id,
                        mime_types=mime_types,
                        query_string=query_string,
                    )
                )
            return self._load_data_fileids_meta(fileids_meta)
        except Exception as e:
            logger.error(
                f"An error occurred while loading with fileid: {e}", exc_info=True
            )

    def _load_from_folder(
        self,
        drive_id: Optional[str],
        folder_id: str,
        mime_types: Optional[List[str]],
        query_string: Optional[str],
    ) -> List[Document]:
        """从文件夹ID加载数据。

Args:
    drive_id:谷歌云端硬盘中共享驱动器的驱动器ID。
    folder_id:谷歌云端硬盘中文件夹的文件夹ID。
    mime_types:您想要允许的mime类型,例如:"application/vnd.google-apps.document"。
    query_string:用于过滤文档的更通用的查询字符串,例如"name contains 'test'"。

Returns:
    文档:文本文档的文档列表。
"""
        try:
            fileids_meta = self._get_fileids_meta(
                drive_id=drive_id,
                folder_id=folder_id,
                mime_types=mime_types,
                query_string=query_string,
            )
            return self._load_data_fileids_meta(fileids_meta)
        except Exception as e:
            logger.error(
                f"An error occurred while loading from folder: {e}", exc_info=True
            )

    def load_data(
        self,
        drive_id: Optional[str] = None,
        folder_id: Optional[str] = None,
        file_ids: Optional[List[str]] = None,
        mime_types: Optional[List[str]] = None,  # Deprecated
        query_string: Optional[str] = None,
    ) -> List[Document]:
        """从文件夹ID或文件ID加载数据。

Args:
    drive_id:谷歌云端硬盘中共享驱动器的驱动器ID。
    folder_id:谷歌云端硬盘中文件夹的文件夹ID。
    file_ids:谷歌云端硬盘中文件的文件ID。
    mime_types:您想要允许的mime类型,例如:"application/vnd.google-apps.document"。
    query_string:用于过滤文档的更通用的查询字符串,例如 "name contains 'test'"。
        它提供了更灵活的过滤文档的方式。更多信息:https://developers.google.com/drive/api/v3/search-files

Returns:
    List[Document]:文档列表。
"""
        self._creds = self._get_credentials()

        # If no arguments are provided to load_data, default to the object attributes
        if drive_id is None:
            drive_id = self.drive_id
        if folder_id is None:
            folder_id = self.folder_id
        if file_ids is None:
            file_ids = self.file_ids
        if query_string is None:
            query_string = self.query_string

        if folder_id:
            return self._load_from_folder(drive_id, folder_id, mime_types, query_string)
        elif file_ids:
            return self._load_from_file_ids(
                drive_id, file_ids, mime_types, query_string
            )
        else:
            logger.warning("Either 'folder_id' or 'file_ids' must be provided.")
            return []

load_data #

load_data(
    drive_id: Optional[str] = None,
    folder_id: Optional[str] = None,
    file_ids: Optional[List[str]] = None,
    mime_types: Optional[List[str]] = None,
    query_string: Optional[str] = None,
) -> List[Document]

从文件夹ID或文件ID加载数据。

Returns:

Type Description
List[Document]

List[Document]:文档列表。

Source code in llama_index/readers/google/drive/base.py
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
    def load_data(
        self,
        drive_id: Optional[str] = None,
        folder_id: Optional[str] = None,
        file_ids: Optional[List[str]] = None,
        mime_types: Optional[List[str]] = None,  # Deprecated
        query_string: Optional[str] = None,
    ) -> List[Document]:
        """从文件夹ID或文件ID加载数据。

Args:
    drive_id:谷歌云端硬盘中共享驱动器的驱动器ID。
    folder_id:谷歌云端硬盘中文件夹的文件夹ID。
    file_ids:谷歌云端硬盘中文件的文件ID。
    mime_types:您想要允许的mime类型,例如:"application/vnd.google-apps.document"。
    query_string:用于过滤文档的更通用的查询字符串,例如 "name contains 'test'"。
        它提供了更灵活的过滤文档的方式。更多信息:https://developers.google.com/drive/api/v3/search-files

Returns:
    List[Document]:文档列表。
"""
        self._creds = self._get_credentials()

        # If no arguments are provided to load_data, default to the object attributes
        if drive_id is None:
            drive_id = self.drive_id
        if folder_id is None:
            folder_id = self.folder_id
        if file_ids is None:
            file_ids = self.file_ids
        if query_string is None:
            query_string = self.query_string

        if folder_id:
            return self._load_from_folder(drive_id, folder_id, mime_types, query_string)
        elif file_ids:
            return self._load_from_file_ids(
                drive_id, file_ids, mime_types, query_string
            )
        else:
            logger.warning("Either 'folder_id' or 'file_ids' must be provided.")
            return []

GoogleKeepReader #

Bases: BaseReader

谷歌Keep阅读器。

从谷歌Keep读取笔记

Source code in llama_index/readers/google/keep/base.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class GoogleKeepReader(BaseReader):
    """谷歌Keep阅读器。

从谷歌Keep读取笔记

    """

    def load_data(self, document_ids: List[str]) -> List[Document]:
        """从document_ids中加载数据。

Args:
    document_ids (List[str]): 笔记id的列表。
"""
        keep = self._get_keep()

        if document_ids is None:
            raise ValueError('Must specify a "document_ids" in `load_kwargs`.')

        results = []
        for note_id in document_ids:
            note = keep.get(note_id)
            if note is None:
                raise ValueError(f"Note with id {note_id} not found.")
            text = f"Title: {note.title}\nContent: {note.text}"
            results.append(Document(text=text, extra_info={"note_id": note_id}))
        return results

    def load_all_notes(self) -> List[Document]:
        """从Google Keep加载所有的便签。"""
        keep = self._get_keep()

        notes = keep.all()
        results = []
        for note in notes:
            text = f"Title: {note.title}\nContent: {note.text}"
            results.append(Document(text=text, extra_info={"note_id": note.id}))
        return results

    def _get_keep(self) -> Any:
        import gkeepapi

        """Get a Google Keep object with login."""
        # Read username and password from keep_credentials.json
        if os.path.exists("keep_credentials.json"):
            with open("keep_credentials.json") as f:
                credentials = json.load(f)
        else:
            raise RuntimeError("Failed to load keep_credentials.json.")

        keep = gkeepapi.Keep()

        success = keep.login(credentials["username"], credentials["password"])
        if not success:
            raise RuntimeError("Failed to login to Google Keep.")

        return keep

load_data #

load_data(document_ids: List[str]) -> List[Document]

从document_ids中加载数据。

Parameters:

Name Type Description Default
document_ids List[str]

笔记id的列表。

required
Source code in llama_index/readers/google/keep/base.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    def load_data(self, document_ids: List[str]) -> List[Document]:
        """从document_ids中加载数据。

Args:
    document_ids (List[str]): 笔记id的列表。
"""
        keep = self._get_keep()

        if document_ids is None:
            raise ValueError('Must specify a "document_ids" in `load_kwargs`.')

        results = []
        for note_id in document_ids:
            note = keep.get(note_id)
            if note is None:
                raise ValueError(f"Note with id {note_id} not found.")
            text = f"Title: {note.title}\nContent: {note.text}"
            results.append(Document(text=text, extra_info={"note_id": note_id}))
        return results

load_all_notes #

load_all_notes() -> List[Document]

从Google Keep加载所有的便签。

Source code in llama_index/readers/google/keep/base.py
38
39
40
41
42
43
44
45
46
47
def load_all_notes(self) -> List[Document]:
    """从Google Keep加载所有的便签。"""
    keep = self._get_keep()

    notes = keep.all()
    results = []
    for note in notes:
        text = f"Title: {note.title}\nContent: {note.text}"
        results.append(Document(text=text, extra_info={"note_id": note.id}))
    return results

GoogleSheetsReader #

Bases: BasePydanticReader

谷歌表格读取器。

从谷歌表格中以TSV格式读取表格。

Source code in llama_index/readers/google/sheets/base.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class GoogleSheetsReader(BasePydanticReader):
    """谷歌表格读取器。

    从谷歌表格中以TSV格式读取表格。"""

    is_remote: bool = True

    def __init__(self) -> None:
        """使用参数进行初始化。"""
        try:
            import google  # noqa
            import google_auth_oauthlib  # noqa
            import googleapiclient  # noqa
        except ImportError:
            raise ImportError(
                "`google_auth_oauthlib`, `googleapiclient` and `google` "
                "must be installed to use the GoogleSheetsReader.\n"
                "Please run `pip install --upgrade google-api-python-client "
                "google-auth-httplib2 google-auth-oauthlib`."
            )

    @classmethod
    def class_name(cls) -> str:
        return "GoogleSheetsReader"

    def load_data(self, spreadsheet_ids: List[str]) -> List[Document]:
        """从输入目录加载数据。

Args:
    spreadsheet_ids(List[str]):文档id的列表。
"""
        if spreadsheet_ids is None:
            raise ValueError('Must specify a "spreadsheet_ids" in `load_kwargs`.')

        results = []
        for spreadsheet_id in spreadsheet_ids:
            sheet = self._load_sheet(spreadsheet_id)
            results.append(
                Document(
                    id_=spreadsheet_id,
                    text=sheet,
                    metadata={"spreadsheet_id": spreadsheet_id},
                )
            )
        return results

    def load_data_in_pandas(self, spreadsheet_ids: List[str]) -> List[pd.DataFrame]:
        """从输入目录加载数据。

Args:
    spreadsheet_ids(List[str]):文档id的列表。
"""
        if spreadsheet_ids is None:
            raise ValueError('Must specify a "spreadsheet_ids" in `load_kwargs`.')

        results = []
        for spreadsheet_id in spreadsheet_ids:
            dataframes = self._load_sheet_in_pandas(spreadsheet_id)
            results.extend(dataframes)
        return results

    def _load_sheet(self, spreadsheet_id: str) -> str:
        """从Google Sheets加载一个表格。

Args:
    spreadsheet_id: 表格的id。

Returns:
    表格数据。
"""
        credentials = self._get_credentials()
        sheets_service = discovery.build("sheets", "v4", credentials=credentials)
        spreadsheet_data = (
            sheets_service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute()
        )
        sheets = spreadsheet_data.get("sheets")
        sheet_text = ""

        for sheet in sheets:
            properties = sheet.get("properties")
            title = properties.get("title")
            sheet_text += title + "\n"
            grid_props = properties.get("gridProperties")
            rows = grid_props.get("rowCount")
            cols = grid_props.get("columnCount")
            range_pattern = f"R1C1:R{rows}C{cols}"
            response = (
                sheets_service.spreadsheets()
                .values()
                .get(spreadsheetId=spreadsheet_id, range=range_pattern)
                .execute()
            )
            sheet_text += (
                "\n".join("\t".join(row) for row in response.get("values", [])) + "\n"
            )
        return sheet_text

    def _load_sheet_in_pandas(self, spreadsheet_id: str) -> List[pd.DataFrame]:
        """从Google Sheets加载一个表格。

Args:
    spreadsheet_id: 表格的id。
    sheet_name: 表格的名称。

Returns:
    表格数据。
"""
        credentials = self._get_credentials()
        sheets_service = discovery.build("sheets", "v4", credentials=credentials)
        sheet = sheets_service.spreadsheets()
        spreadsheet_data = sheet.get(spreadsheetId=spreadsheet_id).execute()
        sheets = spreadsheet_data.get("sheets")
        dataframes = []
        for sheet in sheets:
            properties = sheet.get("properties")
            title = properties.get("title")
            grid_props = properties.get("gridProperties")
            rows = grid_props.get("rowCount")
            cols = grid_props.get("columnCount")
            range_pattern = f"{title}!R1C1:R{rows}C{cols}"
            response = (
                sheets_service.spreadsheets()
                .values()
                .get(spreadsheetId=spreadsheet_id, range=range_pattern)
                .execute()
            )
            values = response.get("values", [])
            if not values:
                print(f"No data found in {title}")
            else:
                df = pd.DataFrame(values[1:], columns=values[0])
                dataframes.append(df)
        return dataframes

    def _get_credentials(self) -> Any:
        """从存储中获取有效的用户凭据。

文件token.json存储用户的访问和刷新令牌,在首次授权流程完成时会自动创建。

返回:
    凭据,获取到的凭据。
"""
        creds = None
        if os.path.exists("token.json"):
            creds = Credentials.from_authorized_user_file("token.json", SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    "credentials.json", SCOPES
                )
                creds = flow.run_local_server(port=0)
            # Save the credentials for the next run
            with open("token.json", "w") as token:
                token.write(creds.to_json())

        return creds

load_data #

load_data(spreadsheet_ids: List[str]) -> List[Document]

从输入目录加载数据。

Source code in llama_index/readers/google/sheets/base.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
    def load_data(self, spreadsheet_ids: List[str]) -> List[Document]:
        """从输入目录加载数据。

Args:
    spreadsheet_ids(List[str]):文档id的列表。
"""
        if spreadsheet_ids is None:
            raise ValueError('Must specify a "spreadsheet_ids" in `load_kwargs`.')

        results = []
        for spreadsheet_id in spreadsheet_ids:
            sheet = self._load_sheet(spreadsheet_id)
            results.append(
                Document(
                    id_=spreadsheet_id,
                    text=sheet,
                    metadata={"spreadsheet_id": spreadsheet_id},
                )
            )
        return results

load_data_in_pandas #

load_data_in_pandas(
    spreadsheet_ids: List[str],
) -> List[DataFrame]

从输入目录加载数据。

Source code in llama_index/readers/google/sheets/base.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
    def load_data_in_pandas(self, spreadsheet_ids: List[str]) -> List[pd.DataFrame]:
        """从输入目录加载数据。

Args:
    spreadsheet_ids(List[str]):文档id的列表。
"""
        if spreadsheet_ids is None:
            raise ValueError('Must specify a "spreadsheet_ids" in `load_kwargs`.')

        results = []
        for spreadsheet_id in spreadsheet_ids:
            dataframes = self._load_sheet_in_pandas(spreadsheet_id)
            results.extend(dataframes)
        return results