Skip to content

S3

S3DBKVStore #

Bases: BaseKVStore

S3键值存储。 在S3存储桶中存储键值对。可以选择性地指定存储KV数据的文件夹路径。 KV数据进一步分为集合,这些集合是路径中的子文件夹。 每个键值对都存储为一个JSON文件。

Parameters:

Name Type Description Default
s3_bucket Any

boto3 S3 Bucket实例

required
path Optional[str]

存储KV数据的S3存储桶中文件夹的路径

'./'
Source code in llama_index/storage/kvstore/s3/base.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class S3DBKVStore(BaseKVStore):
    """S3键值存储。
    在S3存储桶中存储键值对。可以选择性地指定存储KV数据的文件夹路径。
    KV数据进一步分为集合,这些集合是路径中的子文件夹。
    每个键值对都存储为一个JSON文件。

    Args:
        s3_bucket (Any): boto3 S3 Bucket实例
        path (Optional[str]): 存储KV数据的S3存储桶中文件夹的路径"""

    def __init__(
        self,
        bucket: Any,
        path: Optional[str] = "./",
    ) -> None:
        """初始化一个S3DBKVStore。"""
        self._bucket = bucket
        self._path = path or "./"

    @classmethod
    def from_s3_location(
        cls,
        bucket_name: str,
        path: Optional[str] = None,
    ) -> "S3DBKVStore":
        """Load a S3DBKVStore from a S3 URI.

        Args:
            bucket_name (str): S3 bucket name
            path (Optional[str]): path to folder in S3 bucket where KV data is stored
        """
        s3 = boto3.resource("s3")
        bucket = s3.Bucket(bucket_name)
        return cls(
            bucket,
            path=path,
        )

    def _get_object_key(self, collection: str, key: str) -> str:
        return str(PurePath(f"{self._path}/{collection}/{key}.json"))

    def put(
        self,
        key: str,
        val: dict,
        collection: str = DEFAULT_COLLECTION,
    ) -> None:
        """将一个键值对放入存储中。

Args:
    key(str):键
    val(dict):值
    collection(str):集合名称
"""
        obj_key = self._get_object_key(collection, key)
        self._bucket.put_object(
            Key=obj_key,
            Body=json.dumps(val),
        )

    async def aput(
        self,
        key: str,
        val: dict,
        collection: str = DEFAULT_COLLECTION,
    ) -> None:
        """将一个键值对放入存储中。

Args:
    key(str):键
    val(dict):值
    collection(str):集合名称
"""
        raise NotImplementedError

    def get(self, key: str, collection: str = DEFAULT_COLLECTION) -> Optional[dict]:
        """从存储中获取一个值。

Args:
    key(str):键
    collection(str):集合名称
"""
        obj_key = self._get_object_key(collection, key)
        try:
            obj = next(iter(self._bucket.objects.filter(Prefix=obj_key).limit(1)))
        except StopIteration:
            return None
        body = obj.get()["Body"].read()
        return json.loads(body)

    async def aget(
        self, key: str, collection: str = DEFAULT_COLLECTION
    ) -> Optional[dict]:
        """从存储中获取一个值。

Args:
    key(str):键
    collection(str):集合名称
"""
        raise NotImplementedError

    def get_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """从商店中获取所有的数值。

Args:
    collection (str): 集合名称
"""
        collection_path = str(PurePath(f"{self._path}/{collection}/"))
        collection_kv_dict = {}
        for obj in self._bucket.objects.filter(Prefix=collection_path):
            body = obj.get()["Body"].read()
            json_filename = os.path.split(obj.key)[-1]
            key = os.path.splitext(json_filename)[0]
            value = json.loads(body)
            collection_kv_dict[key] = value
        return collection_kv_dict

    async def aget_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """从商店中获取所有的数值。

Args:
    collection (str): 集合名称
"""
        raise NotImplementedError

    def delete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """从存储中删除一个值。

Args:
    key (str): 键
    collection (str): 集合名称
"""
        obj_key = self._get_object_key(collection, key)
        matched_objs = list(self._bucket.objects.filter(Prefix=obj_key).limit(1))
        if len(matched_objs) == 0:
            return False
        obj = matched_objs[0]
        obj.delete()
        return True

    async def adelete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """从存储中删除一个值。

Args:
    key (str): 键
    collection (str): 集合名称
"""
        raise NotImplementedError

from_s3_location classmethod #

from_s3_location(
    bucket_name: str, path: Optional[str] = None
) -> S3DBKVStore

Load a S3DBKVStore from a S3 URI.

Parameters:

Name Type Description Default
bucket_name str

S3 bucket name

required
path Optional[str]

path to folder in S3 bucket where KV data is stored

None
Source code in llama_index/storage/kvstore/s3/base.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@classmethod
def from_s3_location(
    cls,
    bucket_name: str,
    path: Optional[str] = None,
) -> "S3DBKVStore":
    """Load a S3DBKVStore from a S3 URI.

    Args:
        bucket_name (str): S3 bucket name
        path (Optional[str]): path to folder in S3 bucket where KV data is stored
    """
    s3 = boto3.resource("s3")
    bucket = s3.Bucket(bucket_name)
    return cls(
        bucket,
        path=path,
    )

put #

put(
    key: str,
    val: dict,
    collection: str = DEFAULT_COLLECTION,
) -> None

将一个键值对放入存储中。

Source code in llama_index/storage/kvstore/s3/base.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
    def put(
        self,
        key: str,
        val: dict,
        collection: str = DEFAULT_COLLECTION,
    ) -> None:
        """将一个键值对放入存储中。

Args:
    key(str):键
    val(dict):值
    collection(str):集合名称
"""
        obj_key = self._get_object_key(collection, key)
        self._bucket.put_object(
            Key=obj_key,
            Body=json.dumps(val),
        )

aput async #

aput(
    key: str,
    val: dict,
    collection: str = DEFAULT_COLLECTION,
) -> None

将一个键值对放入存储中。

Source code in llama_index/storage/kvstore/s3/base.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
    async def aput(
        self,
        key: str,
        val: dict,
        collection: str = DEFAULT_COLLECTION,
    ) -> None:
        """将一个键值对放入存储中。

Args:
    key(str):键
    val(dict):值
    collection(str):集合名称
"""
        raise NotImplementedError

get #

get(
    key: str, collection: str = DEFAULT_COLLECTION
) -> Optional[dict]

从存储中获取一个值。

Source code in llama_index/storage/kvstore/s3/base.py
85
86
87
88
89
90
91
92
93
94
95
96
97
98
    def get(self, key: str, collection: str = DEFAULT_COLLECTION) -> Optional[dict]:
        """从存储中获取一个值。

Args:
    key(str):键
    collection(str):集合名称
"""
        obj_key = self._get_object_key(collection, key)
        try:
            obj = next(iter(self._bucket.objects.filter(Prefix=obj_key).limit(1)))
        except StopIteration:
            return None
        body = obj.get()["Body"].read()
        return json.loads(body)

aget async #

aget(
    key: str, collection: str = DEFAULT_COLLECTION
) -> Optional[dict]

从存储中获取一个值。

Source code in llama_index/storage/kvstore/s3/base.py
100
101
102
103
104
105
106
107
108
109
    async def aget(
        self, key: str, collection: str = DEFAULT_COLLECTION
    ) -> Optional[dict]:
        """从存储中获取一个值。

Args:
    key(str):键
    collection(str):集合名称
"""
        raise NotImplementedError

get_all #

get_all(
    collection: str = DEFAULT_COLLECTION,
) -> Dict[str, dict]

从商店中获取所有的数值。

Parameters:

Name Type Description Default
collection str

集合名称

DEFAULT_COLLECTION
Source code in llama_index/storage/kvstore/s3/base.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
    def get_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """从商店中获取所有的数值。

Args:
    collection (str): 集合名称
"""
        collection_path = str(PurePath(f"{self._path}/{collection}/"))
        collection_kv_dict = {}
        for obj in self._bucket.objects.filter(Prefix=collection_path):
            body = obj.get()["Body"].read()
            json_filename = os.path.split(obj.key)[-1]
            key = os.path.splitext(json_filename)[0]
            value = json.loads(body)
            collection_kv_dict[key] = value
        return collection_kv_dict

aget_all async #

aget_all(
    collection: str = DEFAULT_COLLECTION,
) -> Dict[str, dict]

从商店中获取所有的数值。

Parameters:

Name Type Description Default
collection str

集合名称

DEFAULT_COLLECTION
Source code in llama_index/storage/kvstore/s3/base.py
127
128
129
130
131
132
133
    async def aget_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """从商店中获取所有的数值。

Args:
    collection (str): 集合名称
"""
        raise NotImplementedError

delete #

delete(
    key: str, collection: str = DEFAULT_COLLECTION
) -> bool

从存储中删除一个值。

Parameters:

Name Type Description Default
key str

required
collection str

集合名称

DEFAULT_COLLECTION
Source code in llama_index/storage/kvstore/s3/base.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
    def delete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """从存储中删除一个值。

Args:
    key (str): 键
    collection (str): 集合名称
"""
        obj_key = self._get_object_key(collection, key)
        matched_objs = list(self._bucket.objects.filter(Prefix=obj_key).limit(1))
        if len(matched_objs) == 0:
            return False
        obj = matched_objs[0]
        obj.delete()
        return True

adelete async #

adelete(
    key: str, collection: str = DEFAULT_COLLECTION
) -> bool

从存储中删除一个值。

Parameters:

Name Type Description Default
key str

required
collection str

集合名称

DEFAULT_COLLECTION
Source code in llama_index/storage/kvstore/s3/base.py
150
151
152
153
154
155
156
157
    async def adelete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """从存储中删除一个值。

Args:
    key (str): 键
    collection (str): 集合名称
"""
        raise NotImplementedError