Source code for langchain_community.storage.redis

from typing import Any, Iterator, List, Optional, Sequence, Tuple, cast

from langchain_core.stores import ByteStore

from langchain_community.utilities.redis import get_client


[docs]class RedisStore(ByteStore): """基于Redis作为底层存储的BaseStore实现。 示例: 创建一个RedisStore实例并对其执行操作: .. code-block:: python # 使用Redis连接实例化RedisStore from langchain_community.storage import RedisStore from langchain_community.utilities.redis import get_client client = get_client('redis://localhost:6379') redis_store = RedisStore(client) # 为键设置值 redis_store.mset([("key1", b"value1"), ("key2", b"value2")]) # 获取键的值 values = redis_store.mget(["key1", "key2"]) # [b"value1", b"value2"] # 删除键 redis_store.mdelete(["key1"]) # 遍历键 for key in redis_store.yield_keys(): print(key) # noqa: T201"""
[docs] def __init__( self, *, client: Any = None, redis_url: Optional[str] = None, client_kwargs: Optional[dict] = None, ttl: Optional[int] = None, namespace: Optional[str] = None, ) -> None: """使用Redis连接初始化RedisStore。 必须提供Redis客户端或带有可选client_kwargs的redis_url。 参数: client:Redis连接实例 redis_url:redis URL client_kwargs:传递给Redis客户端的关键字参数 ttl:如果提供,以秒为单位设置键的过期时间,如果为None,则键永远不会过期 namespace:如果提供,所有键都将以此命名空间为前缀 """ try: from redis import Redis except ImportError as e: raise ImportError( "The RedisStore requires the redis library to be installed. " "pip install redis" ) from e if client and redis_url or client and client_kwargs: raise ValueError( "Either a Redis client or a redis_url with optional client_kwargs " "must be provided, but not both." ) if client: if not isinstance(client, Redis): raise TypeError( f"Expected Redis client, got {type(client).__name__} instead." ) _client = client else: if not redis_url: raise ValueError( "Either a Redis client or a redis_url must be provided." ) _client = get_client(redis_url, **(client_kwargs or {})) self.client = _client if not isinstance(ttl, int) and ttl is not None: raise TypeError(f"Expected int or None, got {type(ttl)} instead.") self.ttl = ttl self.namespace = namespace
def _get_prefixed_key(self, key: str) -> str: """获取带有命名空间前缀的键。 参数: key(str):原始键。 返回: str:带有命名空间前缀的键。 """ delimiter = "/" if self.namespace: return f"{self.namespace}{delimiter}{key}" return key
[docs] def mget(self, keys: Sequence[str]) -> List[Optional[bytes]]: """获取与给定键相关联的值。""" return cast( List[Optional[bytes]], self.client.mget([self._get_prefixed_key(key) for key in keys]), )
[docs] def mset(self, key_value_pairs: Sequence[Tuple[str, bytes]]) -> None: """设置给定的键值对。""" pipe = self.client.pipeline() for key, value in key_value_pairs: pipe.set(self._get_prefixed_key(key), value, ex=self.ttl) pipe.execute()
[docs] def mdelete(self, keys: Sequence[str]) -> None: """删除给定的键。""" _keys = [self._get_prefixed_key(key) for key in keys] self.client.delete(*_keys)
[docs] def yield_keys(self, *, prefix: Optional[str] = None) -> Iterator[str]: """在存储中生成密钥。""" if prefix: pattern = self._get_prefixed_key(prefix) else: pattern = self._get_prefixed_key("*") scan_iter = cast(Iterator[bytes], self.client.scan_iter(match=pattern)) for key in scan_iter: decoded_key = key.decode("utf-8") if self.namespace: relative_key = decoded_key[len(self.namespace) + 1 :] yield relative_key else: yield decoded_key