importjsonimportloggingfromtimeimporttimefromtypingimportTYPE_CHECKING,Any,Dict,List,Optional,Sequencefromlangchain_core.chat_historyimportBaseChatMessageHistoryfromlangchain_core.messagesimportBaseMessage,message_to_dict,messages_from_dictfromlangchain_elasticsearch._utilitiesimportasync_with_user_agent_headerfromlangchain_elasticsearch.clientimportcreate_async_elasticsearch_clientifTYPE_CHECKING:fromelasticsearchimportAsyncElasticsearchlogger=logging.getLogger(__name__)classAsyncElasticsearchChatMessageHistory(BaseChatMessageHistory):"""Chat message history that stores history in Elasticsearch. Args: es_url: URL of the Elasticsearch instance to connect to. es_cloud_id: Cloud ID of the Elasticsearch instance to connect to. es_user: Username to use when connecting to Elasticsearch. es_password: Password to use when connecting to Elasticsearch. es_api_key: API key to use when connecting to Elasticsearch. es_connection: Optional pre-existing Elasticsearch connection. esnsure_ascii: Used to escape ASCII symbols in json.dumps. Defaults to True. index: Name of the index to use. session_id: Arbitrary key that is used to store the messages of a single chat session. For synchronous applications, use the `ElasticsearchChatMessageHistory` class. For asyhchronous applications, use the `AsyncElasticsearchChatMessageHistory` class. """
[docs]def__init__(self,index:str,session_id:str,*,es_connection:Optional["AsyncElasticsearch"]=None,es_url:Optional[str]=None,es_cloud_id:Optional[str]=None,es_user:Optional[str]=None,es_api_key:Optional[str]=None,es_password:Optional[str]=None,esnsure_ascii:Optional[bool]=True,):self.index:str=indexself.session_id:str=session_idself.ensure_ascii=esnsure_ascii# Initialize Elasticsearch client from passed client arg or connection infoifes_connectionisnotNone:self.client=es_connectionelifes_urlisnotNoneores_cloud_idisnotNone:try:self.client=create_async_elasticsearch_client(url=es_url,username=es_user,password=es_password,cloud_id=es_cloud_id,api_key=es_api_key,)exceptExceptionaserr:logger.error(f"Error connecting to Elasticsearch: {err}")raiseerrelse:raiseValueError("""Either provide a pre-existing Elasticsearch connection, \ or valid credentials for creating a new connection.""")self.client=async_with_user_agent_header(self.client,"langchain-py-ms")self.created=False
[docs]asyncdefcreate_if_missing(self)->None:ifnotself.created:ifawaitself.client.indices.exists(index=self.index):logger.debug((f"Chat history index {self.index} already exists, ""skipping creation."))else:logger.debug(f"Creating index {self.index} for storing chat history.")awaitself.client.indices.create(index=self.index,mappings={"properties":{"session_id":{"type":"keyword"},"created_at":{"type":"date"},"history":{"type":"text"},}},)self.created=True
[docs]asyncdefaget_messages(self)->List[BaseMessage]:# type: ignore[override]"""Retrieve the messages from Elasticsearch"""fromelasticsearchimportApiErrorawaitself.create_if_missing()search_after:Dict[str,Any]={}items=[]whileTrue:try:result=awaitself.client.search(index=self.index,query={"term":{"session_id":self.session_id}},sort="created_at:asc",size=100,**search_after,)exceptApiErroraserr:logger.error(f"Could not retrieve messages from Elasticsearch: {err}")raiseerrifresultandlen(result["hits"]["hits"])>0:items+=[json.loads(document["_source"]["history"])fordocumentinresult["hits"]["hits"]]search_after={"search_after":result["hits"]["hits"][-1]["sort"]}else:breakreturnmessages_from_dict(items)
[docs]asyncdefaadd_message(self,message:BaseMessage)->None:"""Add messages to the chat session in Elasticsearch"""try:fromelasticsearchimportApiErrorawaitself.create_if_missing()awaitself.client.index(index=self.index,document={"session_id":self.session_id,"created_at":round(time()*1000),"history":json.dumps(message_to_dict(message),ensure_ascii=bool(self.ensure_ascii),),},refresh=True,)exceptApiErroraserr:logger.error(f"Could not add message to Elasticsearch: {err}")raiseerr
[docs]asyncdefaclear(self)->None:"""Clear session memory in Elasticsearch"""try:fromelasticsearchimportApiErrorawaitself.create_if_missing()awaitself.client.delete_by_query(index=self.index,query={"term":{"session_id":self.session_id}},refresh=True,)exceptApiErroraserr:logger.error(f"Could not clear session memory in Elasticsearch: {err}")raiseerr