Source code for langchain.agents.openai_assistant.base

from __future__ import annotations

import json
from json import JSONDecodeError
from time import sleep
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    List,
    Optional,
    Sequence,
    Tuple,
    Type,
    Union,
)

from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.callbacks import CallbackManager
from langchain_core.load import dumpd
from langchain_core.pydantic_v1 import BaseModel, Field, root_validator
from langchain_core.runnables import RunnableConfig, RunnableSerializable, ensure_config
from langchain_core.tools import BaseTool
from langchain_core.utils.function_calling import convert_to_openai_tool

if TYPE_CHECKING:
    import openai
    from openai.types.beta.threads import ThreadMessage
    from openai.types.beta.threads.required_action_function_tool_call import (
        RequiredActionFunctionToolCall,
    )


[docs]class OpenAIAssistantFinish(AgentFinish): """代理完成运行和线程元数据。""" run_id: str thread_id: str
[docs] @classmethod def is_lc_serializable(cls) -> bool: return False
[docs]class OpenAIAssistantAction(AgentAction): """AgentAction 包含提交自定义工具输出到现有运行所需的信息。""" tool_call_id: str run_id: str thread_id: str
[docs] @classmethod def is_lc_serializable(cls) -> bool: return False
def _get_openai_client() -> openai.OpenAI: try: import openai return openai.OpenAI() except ImportError as e: raise ImportError( "Unable to import openai, please install with `pip install openai`." ) from e except AttributeError as e: raise AttributeError( "Please make sure you are using a v1.1-compatible version of openai. You " 'can install with `pip install "openai>=1.1"`.' ) from e def _get_openai_async_client() -> openai.AsyncOpenAI: try: import openai return openai.AsyncOpenAI() except ImportError as e: raise ImportError( "Unable to import openai, please install with `pip install openai`." ) from e except AttributeError as e: raise AttributeError( "Please make sure you are using a v1.1-compatible version of openai. You " 'can install with `pip install "openai>=1.1"`.' ) from e def _is_assistants_builtin_tool( tool: Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool], ) -> bool: """确定工具是否对应于OpenAI助手内置的工具。""" assistants_builtin_tools = ("code_interpreter", "retrieval") return ( isinstance(tool, dict) and ("type" in tool) and (tool["type"] in assistants_builtin_tools) ) def _get_assistants_tool( tool: Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool], ) -> Dict[str, Any]: """将原始函数/类转换为OpenAI工具。 请注意,OpenAI助手支持几种内置工具,例如“code_interpreter”和“retrieval”。 """ if _is_assistants_builtin_tool(tool): return tool # type: ignore else: return convert_to_openai_tool(tool) OutputType = Union[ List[OpenAIAssistantAction], OpenAIAssistantFinish, List["ThreadMessage"], List["RequiredActionFunctionToolCall"], ]
[docs]class OpenAIAssistantRunnable(RunnableSerializable[Dict, OutputType]): """运行一个OpenAI助手。 使用OpenAI工具的示例: .. code-block:: python from langchain_experimental.openai_assistant import OpenAIAssistantRunnable interpreter_assistant = OpenAIAssistantRunnable.create_assistant( name="langchain助手", instructions="您是一位个人数学导师。编写并运行代码来回答数学问题。", tools=[{"type": "code_interpreter"}], model="gpt-4-1106-preview" ) output = interpreter_assistant.invoke({"content": "10 - 4的2.7次方是多少"}) 使用自定义工具和AgentExecutor的示例: .. code-block:: python from langchain_experimental.openai_assistant import OpenAIAssistantRunnable from langchain.agents import AgentExecutor from langchain.tools import E2BDataAnalysisTool tools = [E2BDataAnalysisTool(api_key="...")] agent = OpenAIAssistantRunnable.create_assistant( name="langchain助手 e2b工具", instructions="您是一位个人数学导师。编写并运行代码来回答数学问题。", tools=tools, model="gpt-4-1106-preview", as_agent=True ) agent_executor = AgentExecutor(agent=agent, tools=tools) agent_executor.invoke({"content": "10 - 4的2.7次方是多少"}) 使用自定义工具和自定义执行的示例: .. code-block:: python from langchain_experimental.openai_assistant import OpenAIAssistantRunnable from langchain.agents import AgentExecutor from langchain_core.agents import AgentFinish from langchain.tools import E2BDataAnalysisTool tools = [E2BDataAnalysisTool(api_key="...")] agent = OpenAIAssistantRunnable.create_assistant( name="langchain助手 e2b工具", instructions="您是一位个人数学导师。编写并运行代码来回答数学问题。", tools=tools, model="gpt-4-1106-preview", as_agent=True ) def execute_agent(agent, tools, input): tool_map = {tool.name: tool for tool in tools} response = agent.invoke(input) while not isinstance(response, AgentFinish): tool_outputs = [] for action in response: tool_output = tool_map[action.tool].invoke(action.tool_input) tool_outputs.append({"output": tool_output, "tool_call_id": action.tool_call_id}) response = agent.invoke( { "tool_outputs": tool_outputs, "run_id": action.run_id, "thread_id": action.thread_id } ) return response response = execute_agent(agent, tools, {"content": "10 - 4的2.7次方是多少"}) next_response = execute_agent(agent, tools, {"content": "现在加上17.241", "thread_id": response.thread_id})""" # noqa: E501 client: Any = Field(default_factory=_get_openai_client) """OpenAI或AzureOpenAI客户端。""" async_client: Any = None """OpenAI或AzureOpenAI异步客户端。""" assistant_id: str """OpenAI助手ID。""" check_every_ms: float = 1_000.0 """每隔多少毫秒检查运行进度的频率。""" as_agent: bool = False """作为LangChain代理使用,与AgentExecutor兼容。""" @root_validator() def validate_async_client(cls, values: dict) -> dict: if values["async_client"] is None: import openai api_key = values["client"].api_key values["async_client"] = openai.AsyncOpenAI(api_key=api_key) return values
[docs] @classmethod def create_assistant( cls, name: str, instructions: str, tools: Sequence[Union[BaseTool, dict]], model: str, *, client: Optional[Union[openai.OpenAI, openai.AzureOpenAI]] = None, **kwargs: Any, ) -> OpenAIAssistantRunnable: """创建一个OpenAI助手并实例化Runnable。 参数: name: 助手名称。 instructions: 助手指令。 tools: 助手工具。可以以OpenAI格式或BaseTools形式传递。 model: 要使用的助手模型。 client: OpenAI或AzureOpenAI客户端。 如果未指定,将创建默认的OpenAI客户端。 返回: 配置为使用创建的助手运行的OpenAIAssistantRunnable。 """ client = client or _get_openai_client() assistant = client.beta.assistants.create( name=name, instructions=instructions, tools=[_get_assistants_tool(tool) for tool in tools], # type: ignore model=model, file_ids=kwargs.get("file_ids"), ) return cls(assistant_id=assistant.id, client=client, **kwargs)
[docs] def invoke( self, input: dict, config: Optional[RunnableConfig] = None ) -> OutputType: """调用助手。 参数: input: 可运行的输入字典,可以包括: content: 启动新运行时的用户消息。 thread_id: 要使用的现有线程。 run_id: 要使用的现有运行。仅在在初始调用后提供所需操作的工具输出时才应提供。 file_ids: 要包含在新运行中的文件id。用于检索。 message_metadata: 与新消息关联的元数据。 thread_metadata: 与新线程关联的元数据。仅在创建新线程时相关。 instructions: 额外的运行指令。 model: 为此运行覆盖助手模型。 tools: 为此运行覆盖助手工具。 run_metadata: 与新运行关联的元数据。 config: 可运行的配置: 返回: 如果self.as_agent,则返回 Union[List[OpenAIAssistantAction], OpenAIAssistantFinish]。否则, 将返回OpenAI类型 Union[List[ThreadMessage], List[RequiredActionFunctionToolCall]]。 """ config = ensure_config(config) callback_manager = CallbackManager.configure( inheritable_callbacks=config.get("callbacks"), inheritable_tags=config.get("tags"), inheritable_metadata=config.get("metadata"), ) run_manager = callback_manager.on_chain_start( dumpd(self), input, name=config.get("run_name") ) try: # Being run within AgentExecutor and there are tool outputs to submit. if self.as_agent and input.get("intermediate_steps"): tool_outputs = self._parse_intermediate_steps( input["intermediate_steps"] ) run = self.client.beta.threads.runs.submit_tool_outputs(**tool_outputs) # Starting a new thread and a new run. elif "thread_id" not in input: thread = { "messages": [ { "role": "user", "content": input["content"], "file_ids": input.get("file_ids", []), "metadata": input.get("message_metadata"), } ], "metadata": input.get("thread_metadata"), } run = self._create_thread_and_run(input, thread) # Starting a new run in an existing thread. elif "run_id" not in input: _ = self.client.beta.threads.messages.create( input["thread_id"], content=input["content"], role="user", file_ids=input.get("file_ids", []), metadata=input.get("message_metadata"), ) run = self._create_run(input) # Submitting tool outputs to an existing run, outside the AgentExecutor # framework. else: run = self.client.beta.threads.runs.submit_tool_outputs(**input) run = self._wait_for_run(run.id, run.thread_id) except BaseException as e: run_manager.on_chain_error(e) raise e try: response = self._get_response(run) except BaseException as e: run_manager.on_chain_error(e, metadata=run.dict()) raise e else: run_manager.on_chain_end(response) return response
[docs] @classmethod async def acreate_assistant( cls, name: str, instructions: str, tools: Sequence[Union[BaseTool, dict]], model: str, *, async_client: Optional[ Union[openai.AsyncOpenAI, openai.AsyncAzureOpenAI] ] = None, **kwargs: Any, ) -> OpenAIAssistantRunnable: """创建一个AsyncOpenAI助手并实例化Runnable。 参数: name: 助手名称。 instructions: 助手指令。 tools: 助手工具。可以以OpenAI格式或BaseTools形式传递。 model: 要使用的助手模型。 async_client: AsyncOpenAI客户端。 如果未指定,将创建默认的async_client。 返回: 配置为使用创建的助手运行的AsyncOpenAIAssistantRunnable。 """ async_client = async_client or _get_openai_async_client() openai_tools = [_get_assistants_tool(tool) for tool in tools] assistant = await async_client.beta.assistants.create( name=name, instructions=instructions, tools=openai_tools, # type: ignore model=model, file_ids=kwargs.get("file_ids"), ) return cls(assistant_id=assistant.id, async_client=async_client, **kwargs)
[docs] async def ainvoke( self, input: dict, config: Optional[RunnableConfig] = None, **kwargs: Any ) -> OutputType: """异步调用助手。 参数: input: 可以包含以下内容的可运行输入字典: content: 启动新运行时的用户消息。 thread_id: 要使用的现有线程。 run_id: 要使用的现有运行。仅在初始调用后提供所需操作的工具输出时才应提供。 file_ids: 要包含在新运行中的文件ID。用于检索。 message_metadata: 与新消息关联的元数据。 thread_metadata: 与新线程关联的元数据。仅在创建新线程时相关。 instructions: 额外的运行指令。 model: 为此运行覆盖助手模型。 tools: 为此运行覆盖助手工具。 run_metadata: 与新运行关联的元数据。 config: 可运行配置: 返回: 如果self.as_agent,则返回 Union[List[OpenAIAssistantAction], OpenAIAssistantFinish]。否则, 将返回OpenAI类型 Union[List[ThreadMessage], List[RequiredActionFunctionToolCall]]。 """ config = config or {} callback_manager = CallbackManager.configure( inheritable_callbacks=config.get("callbacks"), inheritable_tags=config.get("tags"), inheritable_metadata=config.get("metadata"), ) run_manager = callback_manager.on_chain_start( dumpd(self), input, name=config.get("run_name") ) try: # Being run within AgentExecutor and there are tool outputs to submit. if self.as_agent and input.get("intermediate_steps"): tool_outputs = self._parse_intermediate_steps( input["intermediate_steps"] ) run = await self.async_client.beta.threads.runs.submit_tool_outputs( **tool_outputs ) # Starting a new thread and a new run. elif "thread_id" not in input: thread = { "messages": [ { "role": "user", "content": input["content"], "file_ids": input.get("file_ids", []), "metadata": input.get("message_metadata"), } ], "metadata": input.get("thread_metadata"), } run = await self._acreate_thread_and_run(input, thread) # Starting a new run in an existing thread. elif "run_id" not in input: _ = await self.async_client.beta.threads.messages.create( input["thread_id"], content=input["content"], role="user", file_ids=input.get("file_ids", []), metadata=input.get("message_metadata"), ) run = await self._acreate_run(input) # Submitting tool outputs to an existing run, outside the AgentExecutor # framework. else: run = await self.async_client.beta.threads.runs.submit_tool_outputs( **input ) run = await self._await_for_run(run.id, run.thread_id) except BaseException as e: run_manager.on_chain_error(e) raise e try: response = self._get_response(run) except BaseException as e: run_manager.on_chain_error(e, metadata=run.dict()) raise e else: run_manager.on_chain_end(response) return response
def _parse_intermediate_steps( self, intermediate_steps: List[Tuple[OpenAIAssistantAction, str]] ) -> dict: last_action, last_output = intermediate_steps[-1] run = self._wait_for_run(last_action.run_id, last_action.thread_id) required_tool_call_ids = { tc.id for tc in run.required_action.submit_tool_outputs.tool_calls } tool_outputs = [ {"output": str(output), "tool_call_id": action.tool_call_id} for action, output in intermediate_steps if action.tool_call_id in required_tool_call_ids ] submit_tool_outputs = { "tool_outputs": tool_outputs, "run_id": last_action.run_id, "thread_id": last_action.thread_id, } return submit_tool_outputs def _create_run(self, input: dict) -> Any: params = { k: v for k, v in input.items() if k in ("instructions", "model", "tools", "run_metadata") } return self.client.beta.threads.runs.create( input["thread_id"], assistant_id=self.assistant_id, **params, ) def _create_thread_and_run(self, input: dict, thread: dict) -> Any: params = { k: v for k, v in input.items() if k in ("instructions", "model", "tools", "run_metadata") } run = self.client.beta.threads.create_and_run( assistant_id=self.assistant_id, thread=thread, **params, ) return run def _get_response(self, run: Any) -> Any: # TODO: Pagination if run.status == "completed": import openai major_version = int(openai.version.VERSION.split(".")[0]) minor_version = int(openai.version.VERSION.split(".")[1]) version_gte_1_14 = (major_version > 1) or ( major_version == 1 and minor_version >= 14 ) messages = self.client.beta.threads.messages.list( run.thread_id, order="asc" ) new_messages = [msg for msg in messages if msg.run_id == run.id] if not self.as_agent: return new_messages answer: Any = [ msg_content for msg in new_messages for msg_content in msg.content ] if all( ( isinstance(content, openai.types.beta.threads.TextContentBlock) if version_gte_1_14 else isinstance( content, openai.types.beta.threads.MessageContentText ) ) for content in answer ): answer = "\n".join(content.text.value for content in answer) return OpenAIAssistantFinish( return_values={ "output": answer, "thread_id": run.thread_id, "run_id": run.id, }, log="", run_id=run.id, thread_id=run.thread_id, ) elif run.status == "requires_action": if not self.as_agent: return run.required_action.submit_tool_outputs.tool_calls actions = [] for tool_call in run.required_action.submit_tool_outputs.tool_calls: function = tool_call.function try: args = json.loads(function.arguments, strict=False) except JSONDecodeError as e: raise ValueError( f"Received invalid JSON function arguments: " f"{function.arguments} for function {function.name}" ) from e if len(args) == 1 and "__arg1" in args: args = args["__arg1"] actions.append( OpenAIAssistantAction( tool=function.name, tool_input=args, tool_call_id=tool_call.id, log="", run_id=run.id, thread_id=run.thread_id, ) ) return actions else: run_info = json.dumps(run.dict(), indent=2) raise ValueError( f"Unexpected run status: {run.status}. Full run info:\n\n{run_info})" ) def _wait_for_run(self, run_id: str, thread_id: str) -> Any: in_progress = True while in_progress: run = self.client.beta.threads.runs.retrieve(run_id, thread_id=thread_id) in_progress = run.status in ("in_progress", "queued") if in_progress: sleep(self.check_every_ms / 1000) return run async def _aparse_intermediate_steps( self, intermediate_steps: List[Tuple[OpenAIAssistantAction, str]] ) -> dict: last_action, last_output = intermediate_steps[-1] run = await self._wait_for_run(last_action.run_id, last_action.thread_id) required_tool_call_ids = { tc.id for tc in run.required_action.submit_tool_outputs.tool_calls } tool_outputs = [ {"output": str(output), "tool_call_id": action.tool_call_id} for action, output in intermediate_steps if action.tool_call_id in required_tool_call_ids ] submit_tool_outputs = { "tool_outputs": tool_outputs, "run_id": last_action.run_id, "thread_id": last_action.thread_id, } return submit_tool_outputs async def _acreate_run(self, input: dict) -> Any: params = { k: v for k, v in input.items() if k in ("instructions", "model", "tools", "run_metadata") } return await self.async_client.beta.threads.runs.create( input["thread_id"], assistant_id=self.assistant_id, **params, ) async def _acreate_thread_and_run(self, input: dict, thread: dict) -> Any: params = { k: v for k, v in input.items() if k in ("instructions", "model", "tools", "run_metadata") } run = await self.async_client.beta.threads.create_and_run( assistant_id=self.assistant_id, thread=thread, **params, ) return run async def _aget_response(self, run: Any) -> Any: # TODO: Pagination if run.status == "completed": import openai major_version = int(openai.version.VERSION.split(".")[0]) minor_version = int(openai.version.VERSION.split(".")[1]) version_gte_1_14 = (major_version > 1) or ( major_version == 1 and minor_version >= 14 ) messages = await self.async_client.beta.threads.messages.list( run.thread_id, order="asc" ) new_messages = [msg for msg in messages if msg.run_id == run.id] if not self.as_agent: return new_messages answer: Any = [ msg_content for msg in new_messages for msg_content in msg.content ] if all( ( isinstance(content, openai.types.beta.threads.TextContentBlock) if version_gte_1_14 else isinstance( content, openai.types.beta.threads.MessageContentText ) ) for content in answer ): answer = "\n".join(content.text.value for content in answer) return OpenAIAssistantFinish( return_values={ "output": answer, "thread_id": run.thread_id, "run_id": run.id, }, log="", run_id=run.id, thread_id=run.thread_id, ) elif run.status == "requires_action": if not self.as_agent: return run.required_action.submit_tool_outputs.tool_calls actions = [] for tool_call in run.required_action.submit_tool_outputs.tool_calls: function = tool_call.function try: args = json.loads(function.arguments, strict=False) except JSONDecodeError as e: raise ValueError( f"Received invalid JSON function arguments: " f"{function.arguments} for function {function.name}" ) from e if len(args) == 1 and "__arg1" in args: args = args["__arg1"] actions.append( OpenAIAssistantAction( tool=function.name, tool_input=args, tool_call_id=tool_call.id, log="", run_id=run.id, thread_id=run.thread_id, ) ) return actions else: run_info = json.dumps(run.dict(), indent=2) raise ValueError( f"Unexpected run status: {run.status}. Full run info:\n\n{run_info})" ) async def _await_for_run(self, run_id: str, thread_id: str) -> Any: in_progress = True while in_progress: run = await self.async_client.beta.threads.runs.retrieve( run_id, thread_id=thread_id ) in_progress = run.status in ("in_progress", "queued") if in_progress: sleep(self.check_every_ms / 1000) return run