from typing import (
Any,
AsyncIterator,
Callable,
Iterator,
List,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
)
from langchain_core.stores import BaseStore
K = TypeVar("K")
V = TypeVar("V")
[docs]class EncoderBackedStore(BaseStore[K, V]):
"""使用键和值编码器/解码器包装存储。
使用JSON进行编码/解码的示例:
.. code-block:: python
import json
def key_encoder(key: int) -> str:
return json.dumps(key)
def value_serializer(value: float) -> str:
return json.dumps(value)
def value_deserializer(serialized_value: str) -> float:
return json.loads(serialized_value)
# 创建抽象存储的实例
abstract_store = MyCustomStore()
# 创建支持编码器的存储的实例
store = EncoderBackedStore(
store=abstract_store,
key_encoder=key_encoder,
value_serializer=value_serializer,
value_deserializer=value_deserializer
)
# 使用支持编码器的存储方法
store.mset([(1, 3.14), (2, 2.718)])
values = store.mget([1, 2]) # 检索 [3.14, 2.718]
store.mdelete([1, 2]) # 删除键1和2"""
[docs] def __init__(
self,
store: BaseStore[str, Any],
key_encoder: Callable[[K], str],
value_serializer: Callable[[V], bytes],
value_deserializer: Callable[[Any], V],
) -> None:
"""初始化一个EncodedStore。"""
self.store = store
self.key_encoder = key_encoder
self.value_serializer = value_serializer
self.value_deserializer = value_deserializer
[docs] def mget(self, keys: Sequence[K]) -> List[Optional[V]]:
"""获取与给定键相关联的值。"""
encoded_keys: List[str] = [self.key_encoder(key) for key in keys]
values = self.store.mget(encoded_keys)
return [
self.value_deserializer(value) if value is not None else value
for value in values
]
[docs] async def amget(self, keys: Sequence[K]) -> List[Optional[V]]:
"""获取与给定键相关联的值。"""
encoded_keys: List[str] = [self.key_encoder(key) for key in keys]
values = await self.store.amget(encoded_keys)
return [
self.value_deserializer(value) if value is not None else value
for value in values
]
[docs] def mset(self, key_value_pairs: Sequence[Tuple[K, V]]) -> None:
"""设置给定键的值。"""
encoded_pairs = [
(self.key_encoder(key), self.value_serializer(value))
for key, value in key_value_pairs
]
self.store.mset(encoded_pairs)
[docs] async def amset(self, key_value_pairs: Sequence[Tuple[K, V]]) -> None:
"""设置给定键的值。"""
encoded_pairs = [
(self.key_encoder(key), self.value_serializer(value))
for key, value in key_value_pairs
]
await self.store.amset(encoded_pairs)
[docs] def mdelete(self, keys: Sequence[K]) -> None:
"""删除给定的键及其关联的值。"""
encoded_keys = [self.key_encoder(key) for key in keys]
self.store.mdelete(encoded_keys)
[docs] async def amdelete(self, keys: Sequence[K]) -> None:
"""删除给定的键及其关联的值。"""
encoded_keys = [self.key_encoder(key) for key in keys]
await self.store.amdelete(encoded_keys)
[docs] def yield_keys(
self, *, prefix: Optional[str] = None
) -> Union[Iterator[K], Iterator[str]]:
"""获取与给定前缀匹配的键的迭代器。"""
# For the time being this does not return K, but str
# it's for debugging purposes. Should fix this.
yield from self.store.yield_keys(prefix=prefix)
[docs] async def ayield_keys(
self, *, prefix: Optional[str] = None
) -> Union[AsyncIterator[K], AsyncIterator[str]]:
"""获取与给定前缀匹配的键的迭代器。"""
# For the time being this does not return K, but str
# it's for debugging purposes. Should fix this.
async for key in self.store.ayield_keys(prefix=prefix):
yield key