"""Redis cache implementation for LangChain."""
from __future__ import annotations
import asyncio
import hashlib
import json
from typing import Any, List, Optional, Union
import numpy as np
from langchain_core.caches import RETURN_VAL_TYPE, BaseCache
from langchain_core.embeddings import Embeddings
from langchain_core.load.dump import dumps
from langchain_core.load.load import loads
from pydantic.v1 import Field as FieldV1
from redis import Redis
from redis.commands.json.path import Path
from redis.exceptions import ResponseError
from redisvl.extensions.llmcache import ( # type: ignore[import]
SemanticCache as RedisVLSemanticCache,
)
from redisvl.schema.fields import VectorDataType # type: ignore[import]
from redisvl.utils.vectorize import BaseVectorizer # type: ignore[import]
from langchain_redis.version import __full_lib_name__
[docs]
class EmbeddingsVectorizer(BaseVectorizer):
# BaseVectorizer is a pydantic.v1.BaseModel so we need to use pydantic.v1.Field.
embeddings: Embeddings = FieldV1(...)
model: str = FieldV1(default="custom_embeddings")
class Config:
arbitrary_types_allowed = True
[docs]
def __init__(self, embeddings: Embeddings):
dims = len(embeddings.embed_query("test"))
super().__init__(model="custom_embeddings", dims=dims, embeddings=embeddings)
[docs]
def encode(
self,
texts: Union[str, List[str]],
dtype: Union[str, VectorDataType],
**kwargs: Any,
) -> np.ndarray:
if isinstance(dtype, VectorDataType):
dtype = dtype.value.lower()
if isinstance(texts, str):
return np.array(self.embeddings.embed_query(texts), dtype=dtype)
return np.array(self.embeddings.embed_documents(texts), dtype=dtype)
[docs]
def embed(
self,
text: str,
dtype: Union[str, VectorDataType] = "float32",
**kwargs: Any,
) -> List[float]:
return self.encode(text, dtype, **kwargs).tolist()
[docs]
def embed_many(
self,
texts: List[str],
dtype: Union[str, VectorDataType] = "float32",
**kwargs: Any,
) -> List[List[float]]:
return self.encode(texts, dtype, **kwargs).tolist()
[docs]
async def aembed(
self,
text: str,
dtype: Union[str, VectorDataType] = "float32",
**kwargs: Any,
) -> List[float]:
return await asyncio.to_thread(self.embed, text, dtype, **kwargs)
[docs]
async def aembed_many(
self,
texts: List[str],
dtype: Union[str, VectorDataType] = "float32",
**kwargs: Any,
) -> List[List[float]]:
return await asyncio.to_thread(self.embed_many, texts, dtype, **kwargs)
[docs]
class RedisCache(BaseCache):
"""Redis cache implementation for LangChain.
This class provides a Redis-based caching mechanism for LangChain, allowing
storage and retrieval of language model responses.
Attributes:
redis (Redis): The Redis client instance.
ttl (Optional[int]): Time-to-live for cache entries in seconds.
If None, entries don't expire.
prefix (Optional[str]): Prefix for all keys stored in Redis.
Args:
redis_url (str): The URL of the Redis instance to connect to.
Defaults to "redis://localhost:6379".
ttl (Optional[int]): Time-to-live for cache entries in seconds.
Defaults to None (no expiration).
prefix (Optional[str]): Prefix for all keys stored in Redis.
Defaults to "redis".
redis (Optional[Redis]): An existing Redis client instance.
If provided, redis_url is ignored.
Example:
.. code-block:: python
from langchain_redis import RedisCache
from langchain_core.globals import set_llm_cache
# Create a Redis cache instance
redis_cache = RedisCache(redis_url="redis://localhost:6379", ttl=3600)
# Set it as the global LLM cache
set_llm_cache(redis_cache)
# Now, when you use an LLM, it will automatically use this cache
Note:
- This cache implementation uses Redis JSON capabilities to store
structured data.
- The cache key is created using MD5 hashes of the prompt and LLM string.
- If TTL is set, cache entries will automatically expire
after the specified duration.
- The prefix can be used to namespace cache entries.
"""
[docs]
def __init__(
self,
redis_url: str = "redis://localhost:6379",
ttl: Optional[int] = None,
prefix: Optional[str] = "redis",
redis_client: Optional[Redis] = None,
):
self.redis = redis_client or Redis.from_url(redis_url)
try:
self.redis.client_setinfo("LIB-NAME", __full_lib_name__) # type: ignore
except ResponseError:
# Fall back to a simple log echo
self.redis.echo(__full_lib_name__)
self.ttl = ttl
self.prefix = prefix
def _key(self, prompt: str, llm_string: str) -> str:
"""Create a key for the cache."""
prompt_hash = hashlib.md5(prompt.encode()).hexdigest()
llm_string_hash = hashlib.md5(llm_string.encode()).hexdigest()
return f"{self.prefix}:{prompt_hash}:{llm_string_hash}"
[docs]
def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]:
"""Look up the result of a previous language model call in the Redis cache.
This method checks if there's a cached result for the given prompt and language
model combination.
Args:
prompt (str): The input prompt for which to look up the cached result.
llm_string (str): A string representation of the language model and
its parameters.
Returns:
Optional[RETURN_VAL_TYPE]: The cached result if found, or None if not
present in the cache.
The result is typically a list containing a single Generation object.
Example:
.. code-block:: python
cache = RedisCache(redis_url="redis://localhost:6379")
prompt = "What is the capital of France?"
llm_string = "openai/gpt-3.5-turbo"
result = cache.lookup(prompt, llm_string)
if result:
print("Cache hit:", result[0].text)
else:
print("Cache miss")
Note:
- The method uses an MD5 hash of the prompt and llm_string to create
the cache key.
- The cached value is stored as JSON and parsed back into a
Generation object.
- If the key exists but the value is None or cannot be parsed,
None is returned.
- This method is typically called internally by LangChain, but can be used
directly for manual cache interactions.
"""
key = self._key(prompt, llm_string)
result = self.redis.json().get(key)
if result:
return [loads(json.dumps(result))]
return None
[docs]
def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None:
"""Update the cache with a new result for a given prompt and language model.
This method stores a new result in the Redis cache for the specified prompt and
language model combination.
Args:
prompt (str): The input prompt associated with the result.
llm_string (str): A string representation of the language model
and its parameters.
return_val (RETURN_VAL_TYPE): The result to be cached, typically a list
containing a single Generation object.
Returns:
None
Example:
.. code-block:: python
from langchain_core.outputs import Generation
cache = RedisCache(redis_url="redis://localhost:6379", ttl=3600)
prompt = "What is the capital of France?"
llm_string = "openai/gpt-3.5-turbo"
result = [Generation(text="The capital of France is Paris.")]
cache.update(prompt, llm_string, result)
Note:
- The method uses an MD5 hash of the prompt and llm_string to create the
cache key.
- The result is stored as JSON in Redis.
- If a TTL (Time To Live) was specified when initializing the cache,
it will be applied to this entry.
- This method is typically called internally by LangChain after a language
model generates a response, but it can be used directly
for manual cache updates.
- If the cache already contains an entry for this prompt and llm_string,
it will be overwritten.
"""
key = self._key(prompt, llm_string)
json_value = json.loads(dumps(return_val[0]))
self.redis.json().set(key, Path.root_path(), json_value)
if self.ttl is not None:
self.redis.expire(key, self.ttl)
[docs]
def clear(self, **kwargs: Any) -> None:
"""Clear all entries in the Redis cache that match the cache prefix.
This method removes all cache entries that start with the specified prefix.
Args:
**kwargs: Additional keyword arguments. Currently not used, but included
for potential future extensions.
Returns:
None
Example:
.. code-block:: python
cache = RedisCache(
redis_url="redis://localhost:6379",
prefix="my_cache"
)
# Add some entries to the cache
cache.update("prompt1", "llm1", [Generation(text="Result 1")])
cache.update("prompt2", "llm2", [Generation(text="Result 2")])
# Clear all entries
cache.clear()
# After this, all entries with keys starting with "my_cache:"
# will be removed
Note:
- This method uses Redis SCAN to iterate over keys, which is safe
for large datasets.
- It deletes keys in batches of 100 to optimize performance.
- Only keys that start with the specified prefix (default is "redis:")
will be deleted.
- This operation is irreversible. Make sure you want to clear all cached
data before calling this method.
- If no keys match the prefix, the method will complete without any errors.
"""
cursor = 0
pipe = self.redis.pipeline()
while True:
try:
cursor, keys = self.redis.scan(
cursor, match=f"{self.prefix}:*", count=100
) # type: ignore[misc]
if keys:
pipe.delete(*keys)
pipe.execute()
if cursor == 0:
break
finally:
pipe.reset()
[docs]
class RedisSemanticCache(BaseCache):
"""Redis-based semantic cache implementation for LangChain.
This class provides a semantic caching mechanism using Redis and vector similarity
search. It allows for storing and retrieving language model responses based on the
semantic similarity of prompts, rather than exact string matching.
Attributes:
redis (Redis): The Redis client instance.
embeddings (Embeddings): The embedding function to use for encoding prompts.
cache (RedisVLSemanticCache): The underlying RedisVL semantic cache instance.
Args:
embeddings (Embeddings): The embedding function to use for encoding prompts.
redis_url (str): The URL of the Redis instance to connect to.
Defaults to "redis://localhost:6379".
distance_threshold (float): The maximum distance for considering a cache hit.
Defaults to 0.2.
ttl (Optional[int]): Time-to-live for cache entries in seconds.
Defaults to None (no expiration).
name (Optional[str]): Name for the cache index. Defaults to "llmcache".
prefix (Optional[str]): Prefix for all keys stored in Redis.
Defaults to "llmcache".
redis (Optional[Redis]): An existing Redis client instance.
If provided, redis_url is ignored.
Example:
.. code-block:: python
from langchain_redis import RedisSemanticCache
from langchain_openai import OpenAIEmbeddings
from langchain_core.globals import set_llm_cache
embeddings = OpenAIEmbeddings()
semantic_cache = RedisSemanticCache(
embeddings=embeddings,
redis_url="redis://localhost:6379",
distance_threshold=0.1
)
set_llm_cache(semantic_cache)
# Now, when you use an LLM, it will automatically use this semantic cache
Note:
- This cache uses vector similarity search to find semantically similar prompts.
- The distance_threshold determines how similar a prompt must be to trigger
a cache hit.
- Lowering the distance_threshold increases precision but may reduce cache hits.
- The cache uses the RedisVL library for efficient vector storage and retrieval.
- Semantic caching can be more flexible than exact matching, allowing cache hits
for prompts that are semantically similar but not identical.
"""
[docs]
def __init__(
self,
embeddings: Embeddings,
redis_url: str = "redis://localhost:6379",
distance_threshold: float = 0.2,
ttl: Optional[int] = None,
name: Optional[str] = "llmcache",
prefix: Optional[str] = "llmcache",
redis_client: Optional[Redis] = None,
):
self.redis = redis_client or Redis.from_url(redis_url)
self.embeddings = embeddings
self.prefix = prefix
vectorizer = EmbeddingsVectorizer(embeddings=self.embeddings)
self.cache = RedisVLSemanticCache(
vectorizer=vectorizer,
redis_client=self.redis,
distance_threshold=distance_threshold,
ttl=ttl,
name=name,
prefix=prefix,
)
[docs]
def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]:
"""Look up the result of a previous language model call in the
Redis semantic cache.
This method checks if there's a cached result for a semantically similar prompt
and the same language model combination.
Args:
prompt (str): The input prompt for which to look up the cached result.
llm_string (str): A string representation of the language model
and its parameters.
Returns:
Optional[RETURN_VAL_TYPE]: The cached result if a semantically similar
prompt is found,
or None if no suitable match is present in the cache. The result
is typically a list containing a single Generation object.
Example:
.. code-block:: python
from langchain_openai import OpenAIEmbeddings
cache = RedisSemanticCache(
embeddings=OpenAIEmbeddings(),
redis_url="redis://localhost:6379"
)
prompt = "What's the capital city of France?"
llm_string = "openai/gpt-3.5-turbo"
result = cache.lookup(prompt, llm_string)
if result:
print("Semantic cache hit:", result[0].text)
else:
print("Semantic cache miss")
Note:
- This method uses vector similarity search to find semantically
similar prompts.
- The prompt is embedded using the provided embedding function.
- The method checks for cached results within the distance
threshold specified during cache initialization.
- If multiple results are within the threshold, the most similar
one is returned.
- The llm_string is used to ensure the cached result is from the
same language model.
- This method is typically called internally by LangChain, but can
be used directly for manual cache interactions.
- Unlike exact matching, this may return results for prompts that
are semantically similar but not identical to the input.
"""
vector = self.cache._vectorize_prompt(prompt)
results = self.cache.check(vector=vector)
if results:
for result in results:
if result.get("metadata", {}).get("llm_string") == llm_string:
try:
return [
loads(gen_str)
for gen_str in json.loads(result.get("response"))
]
except (json.JSONDecodeError, TypeError):
return None
return None
[docs]
def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None:
"""Update the semantic cache with a new result for a given prompt
and language model.
This method stores a new result in the Redis semantic cache for the
specified prompt and language model combination, using vector embedding
for semantic similarity.
Args:
prompt (str): The input prompt associated with the result.
llm_string (str): A string representation of the language model
and its parameters.
return_val (RETURN_VAL_TYPE): The result to be cached, typically a list
containing a single Generation object.
Returns:
None
Example:
.. code-block:: python
from langchain_core.outputs import Generation
from langchain_openai import OpenAIEmbeddings
cache = RedisSemanticCache(
embeddings=OpenAIEmbeddings(),
redis_url="redis://localhost:6379"
)
prompt = "What is the capital of France?"
llm_string = "openai/gpt-3.5-turbo"
result = [Generation(text="The capital of France is Paris.")]
cache.update(prompt, llm_string, result)
Note:
- The method uses the provided embedding function to convert the prompt
into a vector.
- The vector, along with the prompt, llm_string, and result, is stored in
the Redis cache.
- If a TTL (Time To Live) was specified when initializing the cache, it will
be applied to this entry.
- This method is typically called internally by LangChain after a language
model generates a response, but it can be used directly for manual
cache updates.
- Unlike exact matching caches, this allows for semantic similarity
lookups later.
- If the cache already contains very similar entries, this will add a
new entry rather than overwriting.
- The effectiveness of the cache depends on the quality of the embedding
function used.
"""
serialized_response = json.dumps([dumps(gen) for gen in return_val])
vector = self.cache._vectorize_prompt(prompt)
self.cache.store(
prompt=prompt,
response=serialized_response,
vector=vector,
metadata={"llm_string": llm_string},
)
[docs]
def clear(self, **kwargs: Any) -> None:
"""Clear all entries in the Redis semantic cache.
This method removes all cache entries from the semantic cache.
Args:
**kwargs: Additional keyword arguments. Currently not used, but included
for potential future extensions.
Returns:
None
Example:
.. code-block:: python
from langchain_openai import OpenAIEmbeddings
cache = RedisSemanticCache(
embeddings=OpenAIEmbeddings(),
redis_url="redis://localhost:6379",
name="my_semantic_cache"
)
# Add some entries to the cache
cache.update(
"What is the capital of France?",
"llm1",
[Generation(text="Paris")]
)
cache.update(
"Who wrote Romeo and Juliet?",
"llm2",
[Generation(text="Shakespeare")]
)
# Clear all entries
cache.clear()
# After this, all entries in the semantic cache will be removed
Note:
- This method clears all entries in the semantic cache, regardless of their
content or similarity.
- It uses the underlying cache implementation's clear method, which
efficiently removes all entries.
- This operation is irreversible. Make sure you want to clear all cached
data before calling this method.
- After clearing, the cache will be empty, but the index structure is
maintained and ready for new entries.
- This method is useful for resetting the cache or clearing out old data,
especially if the nature of the queries or the embedding model has
changed significantly.
"""
self.cache.clear()
def _key(self, prompt: str, llm_string: str) -> str:
"""Create a key for the cache."""
prompt_hash = hashlib.md5(prompt.encode()).hexdigest()
llm_string_hash = hashlib.md5(llm_string.encode()).hexdigest()
return f"{self.prefix}:{prompt_hash}:{llm_string_hash}"
[docs]
def name(self) -> str:
"""Get the name of the semantic cache index.
This method returns the name of the index used for the semantic cache in Redis.
Returns:
str: The name of the semantic cache index.
Example:
.. code-block:: python
from langchain_openai import OpenAIEmbeddings
cache = RedisSemanticCache(
embeddings=OpenAIEmbeddings(),
redis_url="redis://localhost:6379",
name="my_custom_cache"
)
index_name = cache.name()
print(f"The semantic cache is using index: {index_name}")
Note:
- The index name is set during the initialization of the RedisSemanticCache.
- If no custom name was provided during initialization,
a default name is used.
- This name is used internally to identify and manage the semantic cache
in Redis.
- Knowing the index name can be useful for debugging or for direct
interactions with the Redis database outside of this cache interface.
"""
return self.cache.index.name
[docs]
async def alookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]:
"""Async look up based on prompt and llm_string.
A cache implementation is expected to generate a key from the 2-tuple
of prompt and llm_string (e.g., by concatenating them with a delimiter).
Args:
prompt: a string representation of the prompt.
In the case of a Chat model, the prompt is a non-trivial
serialization of the prompt into the language model.
llm_string: A string representation of the LLM configuration.
This is used to capture the invocation parameters of the LLM
(e.g., model name, temperature, stop tokens, max tokens, etc.).
These invocation parameters are serialized into a string
representation.
Returns:
On a cache miss, return None. On a cache hit, return the cached value.
The cached value is a list of Generations (or subclasses).
"""
vector = await self.cache._avectorize_prompt(prompt)
results = await self.cache.acheck(vector=vector)
if results:
for result in results:
if result.get("metadata", {}).get("llm_string") == llm_string:
try:
return [
loads(gen_str)
for gen_str in json.loads(result.get("response"))
]
except (json.JSONDecodeError, TypeError):
return None
return None
[docs]
async def aupdate(
self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE
) -> None:
"""Async update cache based on prompt and llm_string.
The prompt and llm_string are used to generate a key for the cache.
The key should match that of the look up method.
Args:
prompt: a string representation of the prompt.
In the case of a Chat model, the prompt is a non-trivial
serialization of the prompt into the language model.
llm_string: A string representation of the LLM configuration.
This is used to capture the invocation parameters of the LLM
(e.g., model name, temperature, stop tokens, max tokens, etc.).
These invocation parameters are serialized into a string
representation.
return_val: The value to be cached. The value is a list of Generations
(or subclasses).
"""
serialized_response = json.dumps([dumps(gen) for gen in return_val])
vector = self.cache._vectorize_prompt(prompt)
await self.cache.astore(
prompt=prompt,
response=serialized_response,
vector=vector,
metadata={"llm_string": llm_string},
)
[docs]
async def aclear(self, **kwargs: Any) -> None:
"""Async clear cache that can take additional keyword arguments."""
await self.cache.aclear()