Skip to content

Redis

RedisChatStore #

Bases: BaseChatStore

Redis 聊天存储。

Source code in llama_index/storage/chat_store/redis/base.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
class RedisChatStore(BaseChatStore):
    """Redis 聊天存储。"""

    redis_client: Any = Field(description="Redis client.")
    ttl: Optional[int] = Field(default=None, description="Time to live in seconds.")

    def __init__(
        self,
        redis_url: str = "redis://localhost:6379",
        redis_client: Optional[Any] = None,
        ttl: Optional[int] = None,
        **kwargs: Any,
    ) -> None:
        """初始化。"""
        redis_client = redis_client or self._get_client(redis_url, **kwargs)
        super().__init__(redis_client=redis_client, ttl=ttl)

    @classmethod
    def class_name(cls) -> str:
        """获取类名。"""
        return "RedisChatStore"

    def set_messages(self, key: str, messages: List[ChatMessage]) -> None:
        """为一个键设置消息。"""
        self.redis_client.delete(key)
        for message in messages:
            self.add_message(key, message)

        if self.ttl:
            self.redis_client.expire(key, self.ttl)

    def get_messages(self, key: str) -> List[ChatMessage]:
        """获取特定键的消息。"""
        items = self.redis_client.lrange(key, 0, -1)
        if len(items) == 0:
            return []

        items_json = [json.loads(m.decode("utf-8")) for m in items]
        return [_dict_to_message(d) for d in items_json]

    def add_message(
        self, key: str, message: ChatMessage, idx: Optional[int] = None
    ) -> None:
        """为键添加一条消息。"""
        if idx is None:
            item = json.dumps(_message_to_dict(message))
            self.redis_client.rpush(key, item)
        else:
            self._insert_element_at_index(key, idx, message)

        if self.ttl:
            self.redis_client.expire(key, self.ttl)

    def delete_messages(self, key: str) -> Optional[List[ChatMessage]]:
        """删除指定键的消息。"""
        self.redis_client.delete(key)
        return None

    def delete_message(self, key: str, idx: int) -> Optional[ChatMessage]:
        """删除特定键的消息。"""
        current_list = self.redis_client.lrange(key, 0, -1)
        if 0 <= idx < len(current_list):
            removed_item = current_list.pop(idx)

            self.redis_client.delete(key)
            self.redis_client.lpush(key, *current_list)
            return removed_item
        else:
            return None

    def delete_last_message(self, key: str) -> Optional[ChatMessage]:
        """删除指定键的最后一条消息。"""
        return self.redis_client.rpop(key)

    def get_keys(self) -> List[str]:
        """获取所有的键。"""
        return [key.decode("utf-8") for key in self.redis_client.keys("*")]

    def _insert_element_at_index(
        self, key: str, index: int, message: ChatMessage
    ) -> List[ChatMessage]:
        # Step 1: Retrieve the current list
        current_list = self.get_messages(key)
        # Step 2: Insert the new element at the desired index in the local list
        current_list.insert(index, message)

        # Step 3: Push the modified local list back to Redis
        self.redis_client.delete(key)  # Remove the existing list
        self.set_messages(key, current_list)
        return self.get_messages(key)

    def _redis_cluster_client(self, redis_url: str, **kwargs: Any) -> "Redis":
        return RedisCluster.from_url(redis_url, **kwargs)  # type: ignore

    def _check_for_cluster(self, redis_client: "Redis") -> bool:
        try:
            cluster_info = redis_client.info("cluster")
            return cluster_info["cluster_enabled"] == 1
        except redis.exceptions.RedisError:
            return False

    def _redis_sentinel_client(self, redis_url: str, **kwargs: Any) -> "Redis":
        """辅助方法,用于解析(非官方的)redis+sentinel url,并创建一个Sentinel连接,以获取最终的redis客户端连接到主从复制的redis实例,以进行读写操作。

如果提供了用于身份验证的用户名和/或密码,则相同的凭据将用于Redis Sentinel和Redis服务器。使用redis url的这种实现方式不可能在两个系统上使用不同的身份验证数据。
"""
        parsed_url = urlparse(redis_url)
        # sentinel needs list with (host, port) tuple, use default port if none available
        sentinel_list = [(parsed_url.hostname or "localhost", parsed_url.port or 26379)]
        if parsed_url.path:
            # "/mymaster/0" first part is service name, optional second part is db number
            path_parts = parsed_url.path.split("/")
            service_name = path_parts[1] or "mymaster"
            if len(path_parts) > 2:
                kwargs["db"] = path_parts[2]
        else:
            service_name = "mymaster"

        sentinel_args = {}
        if parsed_url.password:
            sentinel_args["password"] = parsed_url.password
            kwargs["password"] = parsed_url.password
        if parsed_url.username:
            sentinel_args["username"] = parsed_url.username
            kwargs["username"] = parsed_url.username

        # check for all SSL related properties and copy them into sentinel_kwargs too,
        # add client_name also
        for arg in kwargs:
            if arg.startswith("ssl") or arg == "client_name":
                sentinel_args[arg] = kwargs[arg]

        # sentinel user/pass is part of sentinel_kwargs, user/pass for redis server
        # connection as direct parameter in kwargs
        sentinel_client = redis.sentinel.Sentinel(
            sentinel_list, sentinel_kwargs=sentinel_args, **kwargs
        )

        # redis server might have password but not sentinel - fetch this error and try
        # again without pass, everything else cannot be handled here -> user needed
        try:
            sentinel_client.execute_command("ping")
        except redis.exceptions.AuthenticationError:
            exception_info = sys.exc_info()
            exception = exception_info[1] or None
            if exception is not None and "no password is set" in exception.args[0]:
                logging.warning(
                    msg="Redis sentinel connection configured with password but Sentinel \
    answered NO PASSWORD NEEDED - Please check Sentinel configuration"
                )
                sentinel_client = redis.sentinel.Sentinel(sentinel_list, **kwargs)
            else:
                raise

        return sentinel_client.master_for(service_name)

    def _get_client(self, redis_url: str, **kwargs: Any) -> "Redis":
        """从给定的连接URL获取一个redis客户端。这个辅助函数接受Redis服务器的URL(带/不带TLS的TCP或UnixSocket),以及Redis Sentinel连接。

不支持Redis集群。

在创建连接之前,会检查数据库驱动程序的存在,否则会引发ValueError。

要使用此功能,您应该已经安装了``redis`` python包。

示例:
    .. 代码块:: python

        redis_client = get_client(
            redis_url="redis://username:password@localhost:6379"
        )

要使用具有多个Redis服务器和Redis Sentinel的Redis复制设置,请将"redis_url"设置为"redis+sentinel://"方案。使用此URL格式,需要一个路径来保存在sentinels中获取正确的Redis服务器连接的Redis服务的名称。默认服务名称为"mymaster"。路径的可选第二部分是要连接的Redis数据库编号。

用于Redis服务器和Sentinel的可选用户名或密码用于连接,不支持服务器和Sentinel的不同密码。另一个限制是只能提供一个Sentinel实例:

示例:
    .. 代码块:: python

        redis_client = get_client(
            redis_url="redis+sentinel://username:password@sentinelhost:26379/mymaster/0"
        )
"""
        # Initialize with necessary components.
        redis_client: "Redis"
        # check if normal redis:// or redis+sentinel:// url
        if redis_url.startswith("redis+sentinel"):
            redis_client = self._redis_sentinel_client(redis_url, **kwargs)
        elif redis_url.startswith(
            "rediss+sentinel"
        ):  # sentinel with TLS support enables
            kwargs["ssl"] = True
            if "ssl_cert_reqs" not in kwargs:
                kwargs["ssl_cert_reqs"] = "none"
            redis_client = self._redis_sentinel_client(redis_url, **kwargs)
        else:
            # connect to redis server from url, reconnect with cluster client if needed
            redis_client = redis.from_url(redis_url, **kwargs)
            if self._check_for_cluster(redis_client):
                redis_client.close()
                redis_client = self._redis_cluster_client(redis_url, **kwargs)
        return redis_client

class_name classmethod #

class_name() -> str

获取类名。

Source code in llama_index/storage/chat_store/redis/base.py
43
44
45
46
@classmethod
def class_name(cls) -> str:
    """获取类名。"""
    return "RedisChatStore"

set_messages #

set_messages(key: str, messages: List[ChatMessage]) -> None

为一个键设置消息。

Source code in llama_index/storage/chat_store/redis/base.py
48
49
50
51
52
53
54
55
def set_messages(self, key: str, messages: List[ChatMessage]) -> None:
    """为一个键设置消息。"""
    self.redis_client.delete(key)
    for message in messages:
        self.add_message(key, message)

    if self.ttl:
        self.redis_client.expire(key, self.ttl)

get_messages #

get_messages(key: str) -> List[ChatMessage]

获取特定键的消息。

Source code in llama_index/storage/chat_store/redis/base.py
57
58
59
60
61
62
63
64
def get_messages(self, key: str) -> List[ChatMessage]:
    """获取特定键的消息。"""
    items = self.redis_client.lrange(key, 0, -1)
    if len(items) == 0:
        return []

    items_json = [json.loads(m.decode("utf-8")) for m in items]
    return [_dict_to_message(d) for d in items_json]

add_message #

add_message(
    key: str,
    message: ChatMessage,
    idx: Optional[int] = None,
) -> None

为键添加一条消息。

Source code in llama_index/storage/chat_store/redis/base.py
66
67
68
69
70
71
72
73
74
75
76
77
def add_message(
    self, key: str, message: ChatMessage, idx: Optional[int] = None
) -> None:
    """为键添加一条消息。"""
    if idx is None:
        item = json.dumps(_message_to_dict(message))
        self.redis_client.rpush(key, item)
    else:
        self._insert_element_at_index(key, idx, message)

    if self.ttl:
        self.redis_client.expire(key, self.ttl)

delete_messages #

delete_messages(key: str) -> Optional[List[ChatMessage]]

删除指定键的消息。

Source code in llama_index/storage/chat_store/redis/base.py
79
80
81
82
def delete_messages(self, key: str) -> Optional[List[ChatMessage]]:
    """删除指定键的消息。"""
    self.redis_client.delete(key)
    return None

delete_message #

delete_message(key: str, idx: int) -> Optional[ChatMessage]

删除特定键的消息。

Source code in llama_index/storage/chat_store/redis/base.py
84
85
86
87
88
89
90
91
92
93
94
def delete_message(self, key: str, idx: int) -> Optional[ChatMessage]:
    """删除特定键的消息。"""
    current_list = self.redis_client.lrange(key, 0, -1)
    if 0 <= idx < len(current_list):
        removed_item = current_list.pop(idx)

        self.redis_client.delete(key)
        self.redis_client.lpush(key, *current_list)
        return removed_item
    else:
        return None

delete_last_message #

delete_last_message(key: str) -> Optional[ChatMessage]

删除指定键的最后一条消息。

Source code in llama_index/storage/chat_store/redis/base.py
96
97
98
def delete_last_message(self, key: str) -> Optional[ChatMessage]:
    """删除指定键的最后一条消息。"""
    return self.redis_client.rpop(key)

get_keys #

get_keys() -> List[str]

获取所有的键。

Source code in llama_index/storage/chat_store/redis/base.py
100
101
102
def get_keys(self) -> List[str]:
    """获取所有的键。"""
    return [key.decode("utf-8") for key in self.redis_client.keys("*")]