classRedisChatStore(BaseChatStore):"""Redis chat store."""redis_url:str=Field(default="redis://localhost:6379",description="Redis URL.")ttl:Optional[int]=Field(default=None,description="Time to live in seconds.")_redis_client:Optional[Redis]=PrivateAttr()_aredis_client:Optional[AsyncRedis]=PrivateAttr()def__init__(self,redis_url:str="redis://localhost:6379",redis_client:Optional[Redis]=None,aredis_client:Optional[AsyncRedis]=None,ttl:Optional[int]=None,**kwargs:Any,)->None:"""Initialize."""super().__init__(ttl=ttl)self._redis_client=redis_clientorself._get_client(redis_url,**kwargs)self._aredis_client=aredis_clientorself._aget_client(redis_url,**kwargs)@classmethoddefclass_name(cls)->str:"""Get class name."""return"RedisChatStore"defset_messages(self,key:str,messages:List[ChatMessage])->None:"""Set messages for a key."""self._redis_client.delete(key)formessageinmessages:self.add_message(key,message)ifself.ttl:self._redis_client.expire(key,self.ttl)asyncdefaset_messages(self,key:str,messages:List[ChatMessage])->None:awaitself._aredis_client.delete(key)formessageinmessages:awaitself.async_add_message(key,message)ifself.ttl:awaitself._aredis_client.expire(key,self.ttl)defget_messages(self,key:str)->List[ChatMessage]:"""Get messages for a key."""items=self._redis_client.lrange(key,0,-1)iflen(items)==0:return[]items_json=[json.loads(m.decode("utf-8"))forminitems]return[_dict_to_message(d)fordinitems_json]asyncdefaget_messages(self,key:str)->List[ChatMessage]:"""Get messages for a key."""items=awaitself._aredis_client.lrange(key,0,-1)iflen(items)==0:return[]items_json=[json.loads(m.decode("utf-8"))forminitems]return[_dict_to_message(d)fordinitems_json]defadd_message(self,key:str,message:ChatMessage,idx:Optional[int]=None)->None:"""Add a message for a key."""ifidxisNone:item=json.dumps(_message_to_dict(message))self._redis_client.rpush(key,item)else:self._insert_element_at_index(key,idx,message)ifself.ttl:self._redis_client.expire(key,self.ttl)asyncdefasync_add_message(self,key:str,message:ChatMessage,idx:Optional[int]=None)->None:"""Add a message for a key."""ifidxisNone:item=json.dumps(_message_to_dict(message))awaitself._aredis_client.rpush(key,item)else:awaitself._ainsert_element_at_index(key,idx,message)ifself.ttl:awaitself._aredis_client.expire(key,self.ttl)defdelete_messages(self,key:str)->Optional[List[ChatMessage]]:"""Delete messages for a key."""self._redis_client.delete(key)returnNoneasyncdefadelete_messages(self,key:str)->Optional[List[ChatMessage]]:"""Delete messages for a key."""awaitself._aredis_client.delete(key)returnNonedefdelete_message(self,key:str,idx:int)->Optional[ChatMessage]:"""Delete specific message for a key."""current_list=self._redis_client.lrange(key,0,-1)if0<=idx<len(current_list):removed_item=current_list.pop(idx)self._redis_client.delete(key)self._redis_client.lpush(key,*current_list)returnremoved_itemelse:returnNoneasyncdefadelete_message(self,key:str,idx:int)->Optional[ChatMessage]:"""Delete specific message for a key."""current_list=awaitself._aredis_client.lrange(key,0,-1)if0<=idx<len(current_list):removed_item=current_list.pop(idx)awaitself._aredis_client.delete(key)awaitself._aredis_client.lpush(key,*current_list)returnremoved_itemelse:returnNonedefdelete_last_message(self,key:str)->Optional[ChatMessage]:"""Delete last message for a key."""returnself._redis_client.rpop(key)defget_keys(self)->List[str]:"""Get all keys."""return[key.decode("utf-8")forkeyinself._redis_client.keys("*")]def_insert_element_at_index(self,key:str,index:int,message:ChatMessage)->List[ChatMessage]:# Step 1: Retrieve the current listcurrent_list=self.get_messages(key)# Step 2: Insert the new element at the desired index in the local listcurrent_list.insert(index,message)# Step 3: Push the modified local list back to Redisself._redis_client.delete(key)# Remove the existing listself.set_messages(key,current_list)returnself.get_messages(key)asyncdef_ainsert_element_at_index(self,key:str,index:int,message:ChatMessage)->List[ChatMessage]:# Step 1: Retrieve the current listcurrent_list=awaitself.aget_messages(key)# Step 2: Insert the new element at the desired index in the local listcurrent_list.insert(index,message)# Step 3: Push the modified local list back to Redisawaitself._aredis_client.delete(key)# Remove the existing listawaitself.aset_messages(key,current_list)returnawaitself.aget_messages(key)def_redis_cluster_client(self,redis_url:str,**kwargs:Any)->"Redis":returnRedisCluster.from_url(redis_url,**kwargs)# type: ignoredef_aredis_cluster_client(self,redis_url:str,**kwargs:Any)->"AsyncRedis":returnAsyncRedisCluster.from_url(redis_url,**kwargs)def_check_for_cluster(self,redis_client:Union["Redis","AsyncRedis"])->bool:try:cluster_info=redis_client.info("cluster")returncluster_info["cluster_enabled"]==1exceptredis.exceptions.RedisError:returnFalsedef_redis_sentinel_parser(self,redis_url:str,**kwargs)->Tuple[str,List[Tuple[str,int]]]:""" Helper method to parse an (un-official) redis+sentinel url and create a Sentinel connection to fetch the final redis client connection to a replica-master for read-write operations. If username and/or password for authentication is given the same credentials are used for the Redis Sentinel as well as Redis Server. With this implementation using a redis url only it is not possible to use different data for authentication on booth systems. """parsed_url=urlparse(redis_url)# sentinel needs list with (host, port) tuple, use default port if none availablesentinel_list=[(parsed_url.hostnameor"localhost",parsed_url.portor26379)]ifparsed_url.path:# "/mymaster/0" first part is service name, optional second part is db numberpath_parts=parsed_url.path.split("/")service_name=path_parts[1]or"mymaster"iflen(path_parts)>2:kwargs["db"]=path_parts[2]else:service_name="mymaster"sentinel_args={}ifparsed_url.password:sentinel_args["password"]=parsed_url.passwordkwargs["password"]=parsed_url.passwordifparsed_url.username:sentinel_args["username"]=parsed_url.usernamekwargs["username"]=parsed_url.username# check for all SSL related properties and copy them into sentinel_kwargs too,# add client_name alsoforarginkwargs:ifarg.startswith("ssl")orarg=="client_name":sentinel_args[arg]=kwargs[arg]returnsentinel_args,sentinel_list,service_name,kwargsdef_redis_sentinel_client(self,redis_url:str,**kwargs:Any)->"Redis":(sentinel_args,sentinel_list,service_name,kwargs,)=self._redis_sentinel_parser(redis_url,**kwargs)# sentinel user/pass is part of sentinel_kwargs, user/pass for redis server# connection as direct parameter in kwargssentinel_client=Sentinel(sentinel_list,sentinel_kwargs=sentinel_args,**kwargs)# redis server might have password but not sentinel - fetch this error and try# again without pass, everything else cannot be handled here -> user neededtry:sentinel_client.execute_command("ping")exceptredis.exceptions.AuthenticationError:exception_info=sys.exc_info()exception=exception_info[1]orNoneifexceptionisnotNoneand"no password is set"inexception.args[0]:logging.warning(msg="Redis sentinel connection configured with password but Sentinel \ answered NO PASSWORD NEEDED - Please check Sentinel configuration")sentinel_client=Sentinel(sentinel_list,**kwargs)else:raisereturnsentinel_client.master_for(service_name)def_aredis_sentinel_client(self,redis_url:str,**kwargs:Any)->"AsyncRedis":(sentinel_args,sentinel_list,service_name,kwargs,)=self._redis_sentinel_parser(redis_url,**kwargs)sentinel_client=AsyncSentinel(sentinel_list,sentinel_kwargs=sentinel_args,**kwargs)try:asyncio.run(sentinel_client.execute_command("ping"))exceptredis.exceptions.AuthenticationError:exception_info=sys.exc_info()exception=exception_info[1]orNoneifexceptionisnotNoneand"no password is set"inexception.args[0]:logging.warning(msg="Redis sentinel connection configured with password but Sentinel \ answered NO PASSWORD NEEDED - Please check Sentinel configuration")sentinel_client=AsyncSentinel(sentinel_list,**kwargs)else:raisereturnsentinel_client.master_for(service_name)def_get_client(self,redis_url:str,**kwargs:Any)->"Redis":""" Get a redis client from the connection url given. This helper accepts urls for Redis server (TCP with/without TLS or UnixSocket) as well as Redis Sentinel connections. Redis Cluster is not supported. Before creating a connection the existence of the database driver is checked an and ValueError raised otherwise To use, you should have the ``redis`` python package installed. Example: .. code-block:: python redis_client = get_client( redis_url="redis://username:password@localhost:6379" ) To use a redis replication setup with multiple redis server and redis sentinels set "redis_url" to "redis+sentinel://" scheme. With this url format a path is needed holding the name of the redis service within the sentinels to get the correct redis server connection. The default service name is "mymaster". The optional second part of the path is the redis db number to connect to. An optional username or password is used for booth connections to the rediserver and the sentinel, different passwords for server and sentinel are not supported. And as another constraint only one sentinel instance can be given: Example: .. code-block:: python redis_client = get_client( redis_url="redis+sentinel://username:password@sentinelhost:26379/mymaster/0" ) """# Initialize with necessary components.redis_client:"Redis"# check if normal redis:// or redis+sentinel:// urlifredis_url.startswith("redis+sentinel"):redis_client=self._redis_sentinel_client(redis_url,**kwargs)elifredis_url.startswith("rediss+sentinel"):# sentinel with TLS support enableskwargs["ssl"]=Trueif"ssl_cert_reqs"notinkwargs:kwargs["ssl_cert_reqs"]="none"redis_client=self._redis_sentinel_client(redis_url,**kwargs)else:# connect to redis server from url, reconnect with cluster client if neededredis_client=redis.from_url(redis_url,**kwargs)ifself._check_for_cluster(redis_client):redis_client.close()redis_client=self._redis_cluster_client(redis_url,**kwargs)returnredis_clientdef_aget_client(self,redis_url:str,**kwargs:Any)->"AsyncRedis":aredis_client:"AsyncRedis"# check if normal redis:// or redis+sentinel:// urlifredis_url.startswith("redis+sentinel"):aredis_client=self._aredis_sentinel_client(redis_url,**kwargs)elifredis_url.startswith("rediss+sentinel"):# sentinel with TLS support enableskwargs["ssl"]=Trueif"ssl_cert_reqs"notinkwargs:kwargs["ssl_cert_reqs"]="none"aredis_client=self._aredis_sentinel_client(redis_url,**kwargs)else:# connect to redis server from url, reconnect with cluster client if neededaredis_client=redis.asyncio.from_url(redis_url,**kwargs)redis_client=redis.from_url(redis_url,**kwargs)is_cluster=self._check_for_cluster(redis_client)redis_client.close()ifis_cluster:asyncio.create_task(aredis_client.close())aredis_client=self._aredis_cluster_client(redis_url,**kwargs)returnaredis_client
Source code in llama-index-integrations/storage/chat_store/llama-index-storage-chat-store-redis/llama_index/storage/chat_store/redis/base.py
6162636465666768
defset_messages(self,key:str,messages:List[ChatMessage])->None:"""Set messages for a key."""self._redis_client.delete(key)formessageinmessages:self.add_message(key,message)ifself.ttl:self._redis_client.expire(key,self.ttl)
Source code in llama-index-integrations/storage/chat_store/llama-index-storage-chat-store-redis/llama_index/storage/chat_store/redis/base.py
7879808182838485
defget_messages(self,key:str)->List[ChatMessage]:"""Get messages for a key."""items=self._redis_client.lrange(key,0,-1)iflen(items)==0:return[]items_json=[json.loads(m.decode("utf-8"))forminitems]return[_dict_to_message(d)fordinitems_json]
Source code in llama-index-integrations/storage/chat_store/llama-index-storage-chat-store-redis/llama_index/storage/chat_store/redis/base.py
8788899091929394
asyncdefaget_messages(self,key:str)->List[ChatMessage]:"""Get messages for a key."""items=awaitself._aredis_client.lrange(key,0,-1)iflen(items)==0:return[]items_json=[json.loads(m.decode("utf-8"))forminitems]return[_dict_to_message(d)fordinitems_json]
Source code in llama-index-integrations/storage/chat_store/llama-index-storage-chat-store-redis/llama_index/storage/chat_store/redis/base.py
96 97 98 99100101102103104105106107
defadd_message(self,key:str,message:ChatMessage,idx:Optional[int]=None)->None:"""Add a message for a key."""ifidxisNone:item=json.dumps(_message_to_dict(message))self._redis_client.rpush(key,item)else:self._insert_element_at_index(key,idx,message)ifself.ttl:self._redis_client.expire(key,self.ttl)
Source code in llama-index-integrations/storage/chat_store/llama-index-storage-chat-store-redis/llama_index/storage/chat_store/redis/base.py
109110111112113114115116117118119120
asyncdefasync_add_message(self,key:str,message:ChatMessage,idx:Optional[int]=None)->None:"""Add a message for a key."""ifidxisNone:item=json.dumps(_message_to_dict(message))awaitself._aredis_client.rpush(key,item)else:awaitself._ainsert_element_at_index(key,idx,message)ifself.ttl:awaitself._aredis_client.expire(key,self.ttl)
Source code in llama-index-integrations/storage/chat_store/llama-index-storage-chat-store-redis/llama_index/storage/chat_store/redis/base.py
132133134135136137138139140141142
defdelete_message(self,key:str,idx:int)->Optional[ChatMessage]:"""Delete specific message for a key."""current_list=self._redis_client.lrange(key,0,-1)if0<=idx<len(current_list):removed_item=current_list.pop(idx)self._redis_client.delete(key)self._redis_client.lpush(key,*current_list)returnremoved_itemelse:returnNone
Source code in llama-index-integrations/storage/chat_store/llama-index-storage-chat-store-redis/llama_index/storage/chat_store/redis/base.py
144145146147148149150151152153154
asyncdefadelete_message(self,key:str,idx:int)->Optional[ChatMessage]:"""Delete specific message for a key."""current_list=awaitself._aredis_client.lrange(key,0,-1)if0<=idx<len(current_list):removed_item=current_list.pop(idx)awaitself._aredis_client.delete(key)awaitself._aredis_client.lpush(key,*current_list)returnremoved_itemelse:returnNone