Skip to content

Redis

RedisKVStore #

Bases: BaseKVStore

Redis KV 存储。

Parameters:

Name Type Description Default
redis_uri str

Redis URI

'redis://127.0.0.1:6379'
redis_client Any

Redis 客户端

None
async_redis_client Any

异步 Redis 客户端

None

引发: ValueError: 如果未安装 redis-py

示例: >>> from llama_index.storage.kvstore.redis import RedisKVStore >>> # 创建一个 RedisKVStore >>> redis_kv_store = RedisKVStore( >>> redis_url="redis://127.0.0.1:6379")

Source code in llama_index/storage/kvstore/redis/base.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
class RedisKVStore(BaseKVStore):
    """Redis KV 存储。

Args:
    redis_uri (str): Redis URI
    redis_client (Any): Redis 客户端
    async_redis_client (Any): 异步 Redis 客户端

引发:
    ValueError: 如果未安装 redis-py

示例:
    >>> from llama_index.storage.kvstore.redis import RedisKVStore
    >>> # 创建一个 RedisKVStore
    >>> redis_kv_store = RedisKVStore(
    >>>     redis_url="redis://127.0.0.1:6379")"""

    def __init__(
        self,
        redis_uri: Optional[str] = "redis://127.0.0.1:6379",
        redis_client: Optional[Redis] = None,
        async_redis_client: Optional[AsyncRedis] = None,
        **kwargs: Any,
    ) -> None:
        # user could inject customized redis client.
        # for instance, redis have specific TLS connection, etc.
        if redis_client is not None:
            self._redis_client = redis_client

            # create async client from sync client
            if async_redis_client is not None:
                self._async_redis_client = async_redis_client
            else:
                try:
                    self._async_redis_client = AsyncRedis.from_url(
                        self._redis_client.connection_pool.connection_kwargs["url"]
                    )
                except Exception:
                    print(
                        "Could not create async redis client from sync client, "
                        "pass in `async_redis_client` explicitly."
                    )
                    self._async_redis_client = None
        elif redis_uri is not None:
            # otherwise, try initializing redis client
            try:
                # connect to redis from url
                self._redis_client = Redis.from_url(redis_uri, **kwargs)
                self._async_redis_client = AsyncRedis.from_url(redis_uri, **kwargs)
            except ValueError as e:
                raise ValueError(f"Redis failed to connect: {e}")
        else:
            raise ValueError("Either 'redis_client' or redis_url must be provided.")

    def put(self, key: str, val: dict, collection: str = DEFAULT_COLLECTION) -> None:
        """将一个键值对放入存储中。

Args:
    key(str):键
    val(dict):值
    collection(str):集合名称
"""
        self._redis_client.hset(name=collection, key=key, value=json.dumps(val))

    async def aput(
        self, key: str, val: dict, collection: str = DEFAULT_COLLECTION
    ) -> None:
        """将一个键值对放入存储中。

Args:
    key(str):键
    val(dict):值
    collection(str):集合名称
"""
        await self._async_redis_client.hset(
            name=collection, key=key, value=json.dumps(val)
        )

    def put_all(
        self,
        kv_pairs: List[Tuple[str, dict]],
        collection: str = DEFAULT_COLLECTION,
        batch_size: int = DEFAULT_BATCH_SIZE,
    ) -> None:
        """将键值对字典放入存储中。

Args:
    kv_pairs(List[Tuple[str, dict]]):键值对
    collection(str):集合名称
"""
        with self._redis_client.pipeline() as pipe:
            cur_batch = 0
            for key, val in kv_pairs:
                pipe.hset(name=collection, key=key, value=json.dumps(val))
                cur_batch += 1

                if cur_batch >= batch_size:
                    cur_batch = 0
                    pipe.execute()

            if cur_batch > 0:
                pipe.execute()

    def get(self, key: str, collection: str = DEFAULT_COLLECTION) -> Optional[dict]:
        """从存储中获取一个值。

Args:
    key(str):键
    collection(str):集合名称
"""
        val_str = self._redis_client.hget(name=collection, key=key)
        if val_str is None:
            return None
        return json.loads(val_str)

    async def aget(
        self, key: str, collection: str = DEFAULT_COLLECTION
    ) -> Optional[dict]:
        """从存储中获取一个值。

Args:
    key(str):键
    collection(str):集合名称
"""
        val_str = await self._async_redis_client.hget(name=collection, key=key)
        if val_str is None:
            return None
        return json.loads(val_str)

    def get_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """从商店获取所有的数值。"""
        collection_kv_dict = {}
        for key, val_str in self._redis_client.hscan_iter(name=collection):
            value = dict(json.loads(val_str))
            collection_kv_dict[key.decode()] = value
        return collection_kv_dict

    async def aget_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """从商店获取所有的数值。"""
        collection_kv_dict = {}
        async for key, val_str in self._async_redis_client.hscan_iter(name=collection):
            value = dict(json.loads(val_str))
            collection_kv_dict[key.decode()] = value
        return collection_kv_dict

    def delete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """从存储中删除一个值。

Args:
    key (str): 键
    collection (str): 集合名称
"""
        deleted_num = self._redis_client.hdel(collection, key)
        return bool(deleted_num > 0)

    async def adelete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """从存储中删除一个值。

Args:
    key (str): 键
    collection (str): 集合名称
"""
        deleted_num = await self._async_redis_client.hdel(collection, key)
        return bool(deleted_num > 0)

    @classmethod
    def from_host_and_port(
        cls,
        host: str,
        port: int,
    ) -> "RedisKVStore":
        """从Redis主机和端口加载一个RedisKVStore。

Args:
    host (str): Redis主机
    port (int): Redis端口
"""
        url = f"redis://{host}:{port}".format(host=host, port=port)
        return cls(redis_uri=url)

    @classmethod
    def from_redis_client(cls, redis_client: Any) -> "RedisKVStore":
        """从Redis客户端加载一个RedisKVStore。

Args:
    redis_client(Redis):Redis客户端
"""
        return cls(redis_client=redis_client)

put #

put(
    key: str,
    val: dict,
    collection: str = DEFAULT_COLLECTION,
) -> None

将一个键值对放入存储中。

Source code in llama_index/storage/kvstore/redis/base.py
68
69
70
71
72
73
74
75
76
    def put(self, key: str, val: dict, collection: str = DEFAULT_COLLECTION) -> None:
        """将一个键值对放入存储中。

Args:
    key(str):键
    val(dict):值
    collection(str):集合名称
"""
        self._redis_client.hset(name=collection, key=key, value=json.dumps(val))

aput async #

aput(
    key: str,
    val: dict,
    collection: str = DEFAULT_COLLECTION,
) -> None

将一个键值对放入存储中。

Source code in llama_index/storage/kvstore/redis/base.py
78
79
80
81
82
83
84
85
86
87
88
89
90
    async def aput(
        self, key: str, val: dict, collection: str = DEFAULT_COLLECTION
    ) -> None:
        """将一个键值对放入存储中。

Args:
    key(str):键
    val(dict):值
    collection(str):集合名称
"""
        await self._async_redis_client.hset(
            name=collection, key=key, value=json.dumps(val)
        )

put_all #

put_all(
    kv_pairs: List[Tuple[str, dict]],
    collection: str = DEFAULT_COLLECTION,
    batch_size: int = DEFAULT_BATCH_SIZE,
) -> None

将键值对字典放入存储中。

Source code in llama_index/storage/kvstore/redis/base.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
    def put_all(
        self,
        kv_pairs: List[Tuple[str, dict]],
        collection: str = DEFAULT_COLLECTION,
        batch_size: int = DEFAULT_BATCH_SIZE,
    ) -> None:
        """将键值对字典放入存储中。

Args:
    kv_pairs(List[Tuple[str, dict]]):键值对
    collection(str):集合名称
"""
        with self._redis_client.pipeline() as pipe:
            cur_batch = 0
            for key, val in kv_pairs:
                pipe.hset(name=collection, key=key, value=json.dumps(val))
                cur_batch += 1

                if cur_batch >= batch_size:
                    cur_batch = 0
                    pipe.execute()

            if cur_batch > 0:
                pipe.execute()

get #

get(
    key: str, collection: str = DEFAULT_COLLECTION
) -> Optional[dict]

从存储中获取一个值。

Source code in llama_index/storage/kvstore/redis/base.py
117
118
119
120
121
122
123
124
125
126
127
    def get(self, key: str, collection: str = DEFAULT_COLLECTION) -> Optional[dict]:
        """从存储中获取一个值。

Args:
    key(str):键
    collection(str):集合名称
"""
        val_str = self._redis_client.hget(name=collection, key=key)
        if val_str is None:
            return None
        return json.loads(val_str)

aget async #

aget(
    key: str, collection: str = DEFAULT_COLLECTION
) -> Optional[dict]

从存储中获取一个值。

Source code in llama_index/storage/kvstore/redis/base.py
129
130
131
132
133
134
135
136
137
138
139
140
141
    async def aget(
        self, key: str, collection: str = DEFAULT_COLLECTION
    ) -> Optional[dict]:
        """从存储中获取一个值。

Args:
    key(str):键
    collection(str):集合名称
"""
        val_str = await self._async_redis_client.hget(name=collection, key=key)
        if val_str is None:
            return None
        return json.loads(val_str)

get_all #

get_all(
    collection: str = DEFAULT_COLLECTION,
) -> Dict[str, dict]

从商店获取所有的数值。

Source code in llama_index/storage/kvstore/redis/base.py
143
144
145
146
147
148
149
def get_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
    """从商店获取所有的数值。"""
    collection_kv_dict = {}
    for key, val_str in self._redis_client.hscan_iter(name=collection):
        value = dict(json.loads(val_str))
        collection_kv_dict[key.decode()] = value
    return collection_kv_dict

aget_all async #

aget_all(
    collection: str = DEFAULT_COLLECTION,
) -> Dict[str, dict]

从商店获取所有的数值。

Source code in llama_index/storage/kvstore/redis/base.py
151
152
153
154
155
156
157
async def aget_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
    """从商店获取所有的数值。"""
    collection_kv_dict = {}
    async for key, val_str in self._async_redis_client.hscan_iter(name=collection):
        value = dict(json.loads(val_str))
        collection_kv_dict[key.decode()] = value
    return collection_kv_dict

delete #

delete(
    key: str, collection: str = DEFAULT_COLLECTION
) -> bool

从存储中删除一个值。

Parameters:

Name Type Description Default
key str

required
collection str

集合名称

DEFAULT_COLLECTION
Source code in llama_index/storage/kvstore/redis/base.py
159
160
161
162
163
164
165
166
167
    def delete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """从存储中删除一个值。

Args:
    key (str): 键
    collection (str): 集合名称
"""
        deleted_num = self._redis_client.hdel(collection, key)
        return bool(deleted_num > 0)

adelete async #

adelete(
    key: str, collection: str = DEFAULT_COLLECTION
) -> bool

从存储中删除一个值。

Parameters:

Name Type Description Default
key str

required
collection str

集合名称

DEFAULT_COLLECTION
Source code in llama_index/storage/kvstore/redis/base.py
169
170
171
172
173
174
175
176
177
    async def adelete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """从存储中删除一个值。

Args:
    key (str): 键
    collection (str): 集合名称
"""
        deleted_num = await self._async_redis_client.hdel(collection, key)
        return bool(deleted_num > 0)

from_host_and_port classmethod #

from_host_and_port(host: str, port: int) -> RedisKVStore

从Redis主机和端口加载一个RedisKVStore。

Parameters:

Name Type Description Default
host str

Redis主机

required
port int

Redis端口

required
Source code in llama_index/storage/kvstore/redis/base.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
    @classmethod
    def from_host_and_port(
        cls,
        host: str,
        port: int,
    ) -> "RedisKVStore":
        """从Redis主机和端口加载一个RedisKVStore。

Args:
    host (str): Redis主机
    port (int): Redis端口
"""
        url = f"redis://{host}:{port}".format(host=host, port=port)
        return cls(redis_uri=url)

from_redis_client classmethod #

from_redis_client(redis_client: Any) -> RedisKVStore

从Redis客户端加载一个RedisKVStore。

Source code in llama_index/storage/kvstore/redis/base.py
194
195
196
197
198
199
200
201
    @classmethod
    def from_redis_client(cls, redis_client: Any) -> "RedisKVStore":
        """从Redis客户端加载一个RedisKVStore。

Args:
    redis_client(Redis):Redis客户端
"""
        return cls(redis_client=redis_client)