Skip to content

Dynamodb

DynamoDBKVStore #

Bases: BaseKVStore

DynamoDB键值存储。 在DynamoDB表中存储键值对。 DynamoDB表必须同时具有哈希键和范围键,并且它们的类型必须为字符串。

您可以通过设置DYNAMODB_URL环境变量来指定DynamoDB的自定义URL。如果您在开发或测试中使用本地实例的DynamoDB,则这很有用。如果未设置DYNAMODB_URL,应用程序将使用默认的AWS DynamoDB服务。

Source code in llama_index/storage/kvstore/dynamodb/base.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class DynamoDBKVStore(BaseKVStore):
    """DynamoDB键值存储。
    在DynamoDB表中存储键值对。
    DynamoDB表必须同时具有哈希键和范围键,并且它们的类型必须为字符串。

    您可以通过设置`DYNAMODB_URL`环境变量来指定DynamoDB的自定义URL。如果您在开发或测试中使用本地实例的DynamoDB,则这很有用。如果未设置`DYNAMODB_URL`,应用程序将使用默认的AWS DynamoDB服务。

    Args:
        table(任意):DynamoDB表服务资源"""

    def __init__(self, table: Any):
        """初始化一个DynamoDBKVStore。"""
        self._table = table
        self._boto3_key = Key
        self._key_hash, self._key_range = parse_schema(table)

    @classmethod
    def from_table_name(cls, table_name: str) -> DynamoDBKVStore:
        """从DynamoDB表名加载一个DynamoDBKVStore。

Args:
    table_name (str): DynamoDB表名
"""
        # Get the DynamoDB URL from environment variable
        dynamodb_url = os.getenv("DYNAMODB_URL")

        # Create a session
        session = boto3.Session()

        # If the DynamoDB URL is set, use it as the endpoint URL
        if dynamodb_url:
            ddb = session.resource("dynamodb", endpoint_url=dynamodb_url)
        else:
            # Otherwise, let boto3 use its default configuration
            ddb = session.resource("dynamodb")
        return cls(table=ddb.Table(table_name))

    def put(self, key: str, val: dict, collection: str = DEFAULT_COLLECTION) -> None:
        """将一个键值对放入存储中。

Args:
    key(str):键
    val(dict):值
    collection(str):集合名称
"""
        item = {k: convert_float_to_decimal(v) for k, v in val.items()}
        item[self._key_hash] = collection
        item[self._key_range] = key
        self._table.put_item(Item=item)

    async def aput(
        self, key: str, val: dict, collection: str = DEFAULT_COLLECTION
    ) -> None:
        """将一个键值对放入存储中。

Args:
    key(str):键
    val(dict):值
    collection(str):集合名称
"""
        raise NotImplementedError

    def get(self, key: str, collection: str = DEFAULT_COLLECTION) -> dict | None:
        """从存储中获取一个值。

Args:
    key(str):键
    collection(str):集合名称
"""
        resp = self._table.get_item(
            Key={self._key_hash: collection, self._key_range: key}
        )
        if (item := resp.get("Item")) is None:
            return None
        else:
            return {
                k: convert_decimal_to_int_or_float(v)
                for k, v in item.items()
                if k not in {self._key_hash, self._key_range}
            }

    async def aget(self, key: str, collection: str = DEFAULT_COLLECTION) -> dict | None:
        """从存储中获取一个值。

Args:
    key(str):键
    collection(str):集合名称
"""
        raise NotImplementedError

    def get_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """从商店中获取所有的数值。

Args:
    collection (str): 集合名称
"""
        result = {}
        last_evaluated_key = None
        is_first = True
        while last_evaluated_key is not None or is_first:
            if is_first:
                is_first = False
            option = {
                "KeyConditionExpression": self._boto3_key(self._key_hash).eq(collection)
            }
            if last_evaluated_key is not None:
                option["ExclusiveStartKey"] = last_evaluated_key
            resp = self._table.query(**option)
            for item in resp.get("Items", []):
                item.pop(self._key_hash)
                key = item.pop(self._key_range)
                result[key] = {
                    k: convert_decimal_to_int_or_float(v) for k, v in item.items()
                }
            last_evaluated_key = resp.get("LastEvaluatedKey")
        return result

    async def aget_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """从商店中获取所有的数值。

Args:
    collection (str): 集合名称
"""
        raise NotImplementedError

    def delete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """从存储中删除一个值。

Args:
    key (str): 键
    collection (str): 集合名称
"""
        resp = self._table.delete_item(
            Key={self._key_hash: collection, self._key_range: key},
            ReturnValues="ALL_OLD",
        )

        if (item := resp.get("Attributes")) is None:
            return False
        else:
            return len(item) > 0

    async def adelete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """从存储中删除一个值。

Args:
    key (str): 键
    collection (str): 集合名称
"""
        raise NotImplementedError

from_table_name classmethod #

from_table_name(table_name: str) -> DynamoDBKVStore

从DynamoDB表名加载一个DynamoDBKVStore。

Parameters:

Name Type Description Default
table_name str

DynamoDB表名

required
Source code in llama_index/storage/kvstore/dynamodb/base.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
    @classmethod
    def from_table_name(cls, table_name: str) -> DynamoDBKVStore:
        """从DynamoDB表名加载一个DynamoDBKVStore。

Args:
    table_name (str): DynamoDB表名
"""
        # Get the DynamoDB URL from environment variable
        dynamodb_url = os.getenv("DYNAMODB_URL")

        # Create a session
        session = boto3.Session()

        # If the DynamoDB URL is set, use it as the endpoint URL
        if dynamodb_url:
            ddb = session.resource("dynamodb", endpoint_url=dynamodb_url)
        else:
            # Otherwise, let boto3 use its default configuration
            ddb = session.resource("dynamodb")
        return cls(table=ddb.Table(table_name))

put #

put(
    key: str,
    val: dict,
    collection: str = DEFAULT_COLLECTION,
) -> None

将一个键值对放入存储中。

Source code in llama_index/storage/kvstore/dynamodb/base.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
    def put(self, key: str, val: dict, collection: str = DEFAULT_COLLECTION) -> None:
        """将一个键值对放入存储中。

Args:
    key(str):键
    val(dict):值
    collection(str):集合名称
"""
        item = {k: convert_float_to_decimal(v) for k, v in val.items()}
        item[self._key_hash] = collection
        item[self._key_range] = key
        self._table.put_item(Item=item)

aput async #

aput(
    key: str,
    val: dict,
    collection: str = DEFAULT_COLLECTION,
) -> None

将一个键值对放入存储中。

Source code in llama_index/storage/kvstore/dynamodb/base.py
104
105
106
107
108
109
110
111
112
113
114
    async def aput(
        self, key: str, val: dict, collection: str = DEFAULT_COLLECTION
    ) -> None:
        """将一个键值对放入存储中。

Args:
    key(str):键
    val(dict):值
    collection(str):集合名称
"""
        raise NotImplementedError

get #

get(
    key: str, collection: str = DEFAULT_COLLECTION
) -> dict | None

从存储中获取一个值。

Source code in llama_index/storage/kvstore/dynamodb/base.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
    def get(self, key: str, collection: str = DEFAULT_COLLECTION) -> dict | None:
        """从存储中获取一个值。

Args:
    key(str):键
    collection(str):集合名称
"""
        resp = self._table.get_item(
            Key={self._key_hash: collection, self._key_range: key}
        )
        if (item := resp.get("Item")) is None:
            return None
        else:
            return {
                k: convert_decimal_to_int_or_float(v)
                for k, v in item.items()
                if k not in {self._key_hash, self._key_range}
            }

aget async #

aget(
    key: str, collection: str = DEFAULT_COLLECTION
) -> dict | None

从存储中获取一个值。

Source code in llama_index/storage/kvstore/dynamodb/base.py
135
136
137
138
139
140
141
142
    async def aget(self, key: str, collection: str = DEFAULT_COLLECTION) -> dict | None:
        """从存储中获取一个值。

Args:
    key(str):键
    collection(str):集合名称
"""
        raise NotImplementedError

get_all #

get_all(
    collection: str = DEFAULT_COLLECTION,
) -> Dict[str, dict]

从商店中获取所有的数值。

Parameters:

Name Type Description Default
collection str

集合名称

DEFAULT_COLLECTION
Source code in llama_index/storage/kvstore/dynamodb/base.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
    def get_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """从商店中获取所有的数值。

Args:
    collection (str): 集合名称
"""
        result = {}
        last_evaluated_key = None
        is_first = True
        while last_evaluated_key is not None or is_first:
            if is_first:
                is_first = False
            option = {
                "KeyConditionExpression": self._boto3_key(self._key_hash).eq(collection)
            }
            if last_evaluated_key is not None:
                option["ExclusiveStartKey"] = last_evaluated_key
            resp = self._table.query(**option)
            for item in resp.get("Items", []):
                item.pop(self._key_hash)
                key = item.pop(self._key_range)
                result[key] = {
                    k: convert_decimal_to_int_or_float(v) for k, v in item.items()
                }
            last_evaluated_key = resp.get("LastEvaluatedKey")
        return result

aget_all async #

aget_all(
    collection: str = DEFAULT_COLLECTION,
) -> Dict[str, dict]

从商店中获取所有的数值。

Parameters:

Name Type Description Default
collection str

集合名称

DEFAULT_COLLECTION
Source code in llama_index/storage/kvstore/dynamodb/base.py
171
172
173
174
175
176
177
    async def aget_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """从商店中获取所有的数值。

Args:
    collection (str): 集合名称
"""
        raise NotImplementedError

delete #

delete(
    key: str, collection: str = DEFAULT_COLLECTION
) -> bool

从存储中删除一个值。

Parameters:

Name Type Description Default
key str

required
collection str

集合名称

DEFAULT_COLLECTION
Source code in llama_index/storage/kvstore/dynamodb/base.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
    def delete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """从存储中删除一个值。

Args:
    key (str): 键
    collection (str): 集合名称
"""
        resp = self._table.delete_item(
            Key={self._key_hash: collection, self._key_range: key},
            ReturnValues="ALL_OLD",
        )

        if (item := resp.get("Attributes")) is None:
            return False
        else:
            return len(item) > 0

adelete async #

adelete(
    key: str, collection: str = DEFAULT_COLLECTION
) -> bool

从存储中删除一个值。

Parameters:

Name Type Description Default
key str

required
collection str

集合名称

DEFAULT_COLLECTION
Source code in llama_index/storage/kvstore/dynamodb/base.py
196
197
198
199
200
201
202
203
    async def adelete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """从存储中删除一个值。

Args:
    key (str): 键
    collection (str): 集合名称
"""
        raise NotImplementedError