Skip to content

Firestore

FirestoreReader #

Bases: BaseReader

简单的Firestore阅读器。

Parameters:

Name Type Description Default
project_id str

Google Cloud项目ID。

required
*args Optional[Any]

其他参数。

()
**kwargs Optional[Any]

其他关键字参数。

{}

Returns:

Name Type Description
FirestoreReader

一个FirestoreReader对象。

Source code in llama_index/readers/firestore/base.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class FirestoreReader(BaseReader):
    """简单的Firestore阅读器。

    Args:
        project_id (str): Google Cloud项目ID。
        *args (Optional[Any]): 其他参数。
        **kwargs (Optional[Any]): 其他关键字参数。

    Returns:
        FirestoreReader: 一个FirestoreReader对象。"""

    def __init__(
        self,
        project_id: str,
        database_id: str = DEFAULT_FIRESTORE_DATABASE,
        *args: Optional[Any],
        **kwargs: Optional[Any],
    ) -> None:
        """使用参数进行初始化。"""
        try:
            from google.cloud import firestore
            from google.cloud.firestore_v1.services.firestore.transports.base import (
                DEFAULT_CLIENT_INFO,
            )
        except ImportError:
            raise ImportError(IMPORT_ERROR_MSG)

        client_info = DEFAULT_CLIENT_INFO
        client_info.user_agent = USER_AGENT
        self.db = firestore.Client(
            project=project_id, database=database_id, client_info=client_info
        )

    def load_data(self, collection: str) -> List[Document]:
        """从Firestore集合中加载数据,返回一个文档列表。

Args:
    collection(str):要读取的Firestore集合的名称。

Returns:
    List[Document]:一个Document对象的列表。
"""
        documents = []
        col_ref = self.db.collection(collection)
        for doc in col_ref.stream():
            doc_str = ", ".join([f"{k}: {v}" for k, v in doc.to_dict().items()])
            documents.append(Document(text=doc_str))
        return documents

    def load_document(self, document_url: str) -> Document:
        """从Firestore中加载单个文档。

Args:
    document_url (str): 要读取的Firestore文档的绝对路径。

Returns:
    Document: 一个Document对象。
"""
        parts = document_url.split("/")
        if len(parts) % 2 != 0:
            raise ValueError(f"Invalid document URL: {document_url}")

        ref = self.db.collection(parts[0])
        for i in range(1, len(parts)):
            if i % 2 == 0:
                ref = ref.collection(parts[i])
            else:
                ref = ref.document(parts[i])

        doc = ref.get()
        if not doc.exists:
            raise ValueError(f"No such document: {document_url}")
        doc_str = ", ".join([f"{k}: {v}" for k, v in doc.to_dict().items()])
        return Document(text=doc_str)

load_data #

load_data(collection: str) -> List[Document]

从Firestore集合中加载数据,返回一个文档列表。

Returns:

Type Description
List[Document]

List[Document]:一个Document对象的列表。

Source code in llama_index/readers/firestore/base.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
    def load_data(self, collection: str) -> List[Document]:
        """从Firestore集合中加载数据,返回一个文档列表。

Args:
    collection(str):要读取的Firestore集合的名称。

Returns:
    List[Document]:一个Document对象的列表。
"""
        documents = []
        col_ref = self.db.collection(collection)
        for doc in col_ref.stream():
            doc_str = ", ".join([f"{k}: {v}" for k, v in doc.to_dict().items()])
            documents.append(Document(text=doc_str))
        return documents

load_document #

load_document(document_url: str) -> Document

从Firestore中加载单个文档。

Parameters:

Name Type Description Default
document_url str

要读取的Firestore文档的绝对路径。

required

Returns:

Name Type Description
Document Document

一个Document对象。

Source code in llama_index/readers/firestore/base.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
    def load_document(self, document_url: str) -> Document:
        """从Firestore中加载单个文档。

Args:
    document_url (str): 要读取的Firestore文档的绝对路径。

Returns:
    Document: 一个Document对象。
"""
        parts = document_url.split("/")
        if len(parts) % 2 != 0:
            raise ValueError(f"Invalid document URL: {document_url}")

        ref = self.db.collection(parts[0])
        for i in range(1, len(parts)):
            if i % 2 == 0:
                ref = ref.collection(parts[i])
            else:
                ref = ref.document(parts[i])

        doc = ref.get()
        if not doc.exists:
            raise ValueError(f"No such document: {document_url}")
        doc_str = ", ".join([f"{k}: {v}" for k, v in doc.to_dict().items()])
        return Document(text=doc_str)