Source code for langchain.agents.openai_assistant.base_v2

from __future__ import annotations

from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Optional,
    Sequence,
    Type,
    Union,
)

from langchain_core._api import beta
from langchain_core.callbacks import CallbackManager
from langchain_core.load import dumpd
from langchain_core.pydantic_v1 import BaseModel, Field, root_validator
from langchain_core.runnables import RunnableConfig, ensure_config
from langchain_core.tools import BaseTool
from langchain_core.utils.function_calling import convert_to_openai_tool

from langchain.agents.openai_assistant.base import OpenAIAssistantRunnable, OutputType

if TYPE_CHECKING:
    import openai
    from openai._types import NotGiven
    from openai.types.beta.assistant import ToolResources as AssistantToolResources


def _get_openai_client() -> openai.OpenAI:
    try:
        import openai

        return openai.OpenAI(default_headers={"OpenAI-Beta": "assistants=v2"})
    except ImportError as e:
        raise ImportError(
            "Unable to import openai, please install with `pip install openai`."
        ) from e
    except AttributeError as e:
        raise AttributeError(
            "Please make sure you are using a v1.23-compatible version of openai. You "
            'can install with `pip install "openai>=1.23"`.'
        ) from e


def _get_openai_async_client() -> openai.AsyncOpenAI:
    try:
        import openai

        return openai.AsyncOpenAI(default_headers={"OpenAI-Beta": "assistants=v2"})
    except ImportError as e:
        raise ImportError(
            "Unable to import openai, please install with `pip install openai`."
        ) from e
    except AttributeError as e:
        raise AttributeError(
            "Please make sure you are using a v1.23-compatible version of openai. You "
            'can install with `pip install "openai>=1.23"`.'
        ) from e


def _convert_file_ids_into_attachments(file_ids: list) -> list:
    """将文件ID转换为附件
文件搜索和代码解释器将默认打开
"""
    attachments = []
    for id in file_ids:
        attachments.append(
            {
                "file_id": id,
                "tools": [{"type": "file_search"}, {"type": "code_interpreter"}],
            }
        )
    return attachments


def _is_assistants_builtin_tool(
    tool: Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool],
) -> bool:
    """确定工具是否对应于OpenAI助手内置工具。"""
    assistants_builtin_tools = ("code_interpreter", "retrieval")
    return (
        isinstance(tool, dict)
        and ("type" in tool)
        and (tool["type"] in assistants_builtin_tools)
    )


def _get_assistants_tool(
    tool: Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool],
) -> Dict[str, Any]:
    """将原始函数/类转换为OpenAI工具。

请注意,OpenAI助手支持几种内置工具,例如"code_interpreter"和"retrieval"。
"""
    if _is_assistants_builtin_tool(tool):
        return tool  # type: ignore
    else:
        return convert_to_openai_tool(tool)


[docs]@beta() class OpenAIAssistantV2Runnable(OpenAIAssistantRunnable): """运行一个OpenAI助手。 使用OpenAI工具的示例: .. code-block:: python from langchain.agents.openai_assistant import OpenAIAssistantV2Runnable interpreter_assistant = OpenAIAssistantV2Runnable.create_assistant( name="langchain助手", instructions="您是一个个人数学导师。编写并运行代码来回答数学问题。", tools=[{"type": "code_interpreter"}], model="gpt-4-1106-preview" ) output = interpreter_assistant.invoke({"content": "10 - 4的2.7次方是多少"}) 使用自定义工具和AgentExecutor的示例: .. code-block:: python from langchain.agents.openai_assistant import OpenAIAssistantV2Runnable from langchain.agents import AgentExecutor from langchain.tools import E2BDataAnalysisTool tools = [E2BDataAnalysisTool(api_key="...")] agent = OpenAIAssistantV2Runnable.create_assistant( name="langchain助手 e2b工具", instructions="您是一个个人数学导师。编写并运行代码来回答数学问题。", tools=tools, model="gpt-4-1106-preview", as_agent=True ) agent_executor = AgentExecutor(agent=agent, tools=tools) agent_executor.invoke({"content": "10 - 4的2.7次方是多少"}) 使用自定义工具和自定义执行的示例: .. code-block:: python from langchain.agents.openai_assistant import OpenAIAssistantV2Runnable from langchain.agents import AgentExecutor from langchain_core.agents import AgentFinish from langchain.tools import E2BDataAnalysisTool tools = [E2BDataAnalysisTool(api_key="...")] agent = OpenAIAssistantV2Runnable.create_assistant( name="langchain助手 e2b工具", instructions="您是一个个人数学导师。编写并运行代码来回答数学问题。", tools=tools, model="gpt-4-1106-preview", as_agent=True ) def execute_agent(agent, tools, input): tool_map = {tool.name: tool for tool in tools} response = agent.invoke(input) while not isinstance(response, AgentFinish): tool_outputs = [] for action in response: tool_output = tool_map[action.tool].invoke(action.tool_input) tool_outputs.append({"output": tool_output, "tool_call_id": action.tool_call_id}) response = agent.invoke( { "tool_outputs": tool_outputs, "run_id": action.run_id, "thread_id": action.thread_id } ) return response response = execute_agent(agent, tools, {"content": "10 - 4的2.7次方是多少"}) next_response = execute_agent(agent, tools, {"content": "现在加上17.241", "thread_id": response.thread_id})""" # noqa: E501 client: Any = Field(default_factory=_get_openai_client) """OpenAI或AzureOpenAI客户端。""" async_client: Any = None """OpenAI或AzureOpenAI异步客户端。""" assistant_id: str """OpenAI助手ID。""" check_every_ms: float = 1_000.0 """检查运行进度的频率,单位为毫秒。""" as_agent: bool = False """作为LangChain代理使用,与AgentExecutor兼容。""" @root_validator() def validate_async_client(cls, values: dict) -> dict: if values["async_client"] is None: import openai api_key = values["client"].api_key values["async_client"] = openai.AsyncOpenAI(api_key=api_key) return values
[docs] @classmethod def create_assistant( cls, name: str, instructions: str, tools: Sequence[Union[BaseTool, dict]], model: str, *, client: Optional[Union[openai.OpenAI, openai.AzureOpenAI]] = None, tool_resources: Optional[Union[AssistantToolResources, dict, NotGiven]] = None, **kwargs: Any, ) -> OpenAIAssistantRunnable: """创建一个OpenAI助手并实例化Runnable。 参数: name: 助手名称。 instructions: 助手指令。 tools: 助手工具。可以以OpenAI格式或BaseTools形式传递。 tool_resources: 助手工具资源。可以以OpenAI格式传递。 model: 要使用的助手模型。 client: OpenAI或AzureOpenAI客户端。 如果未指定,将创建默认的OpenAI客户端(助手v2)。 返回: 配置为使用创建的助手运行的OpenAIAssistantRunnable。 """ client = client or _get_openai_client() if tool_resources is None: from openai._types import NOT_GIVEN tool_resources = NOT_GIVEN assistant = client.beta.assistants.create( name=name, instructions=instructions, tools=[_get_assistants_tool(tool) for tool in tools], # type: ignore tool_resources=tool_resources, model=model, ) return cls(assistant_id=assistant.id, client=client, **kwargs)
[docs] def invoke( self, input: dict, config: Optional[RunnableConfig] = None, **kwargs: Any ) -> OutputType: """调用助手。 参数: input: 可运行的输入字典,可以包括: content: 启动新运行时的用户消息。 thread_id: 要使用的现有线程。 run_id: 要使用的现有运行。仅在在初始调用后提供所需操作的工具输出时才应提供。 file_ids: (已弃用)要包含在新运行中的文件ID。请改用'attachments'。 attachments: 要包含在新运行中的助手文件(v2 API)。 message_metadata: 与新消息关联的元数据。 thread_metadata: 与新线程关联的元数据。仅在创建新线程时相关。 instructions: 额外的运行指令。 model: 为此运行覆盖助手模型。 tools: 为此运行覆盖助手工具。 tool_resources: 为此运行覆盖助手工具资源(v2 API)。 run_metadata: 与新运行关联的元数据。 config: 可运行的配置: 返回: 如果self.as_agent,则返回 Union[List[OpenAIAssistantAction], OpenAIAssistantFinish]。否则, 将返回OpenAI类型 Union[List[ThreadMessage], List[RequiredActionFunctionToolCall]]。 """ config = ensure_config(config) callback_manager = CallbackManager.configure( inheritable_callbacks=config.get("callbacks"), inheritable_tags=config.get("tags"), inheritable_metadata=config.get("metadata"), ) run_manager = callback_manager.on_chain_start( dumpd(self), input, name=config.get("run_name") ) files = _convert_file_ids_into_attachments(kwargs.get("file_ids", [])) attachments = kwargs.get("attachments", []) + files try: # Being run within AgentExecutor and there are tool outputs to submit. if self.as_agent and input.get("intermediate_steps"): tool_outputs = self._parse_intermediate_steps( input["intermediate_steps"] ) run = self.client.beta.threads.runs.submit_tool_outputs(**tool_outputs) # Starting a new thread and a new run. elif "thread_id" not in input: thread = { "messages": [ { "role": "user", "content": input["content"], "attachments": attachments, "metadata": input.get("message_metadata"), } ], "metadata": input.get("thread_metadata"), } run = self._create_thread_and_run(input, thread) # Starting a new run in an existing thread. elif "run_id" not in input: _ = self.client.beta.threads.messages.create( input["thread_id"], content=input["content"], role="user", attachments=attachments, metadata=input.get("message_metadata"), ) run = self._create_run(input) # Submitting tool outputs to an existing run, outside the AgentExecutor # framework. else: run = self.client.beta.threads.runs.submit_tool_outputs(**input) run = self._wait_for_run(run.id, run.thread_id) except BaseException as e: run_manager.on_chain_error(e) raise e try: response = self._get_response(run) except BaseException as e: run_manager.on_chain_error(e, metadata=run.dict()) raise e else: run_manager.on_chain_end(response) return response
[docs] @classmethod async def acreate_assistant( cls, name: str, instructions: str, tools: Sequence[Union[BaseTool, dict]], model: str, *, async_client: Optional[ Union[openai.AsyncOpenAI, openai.AsyncAzureOpenAI] ] = None, tool_resources: Optional[Union[AssistantToolResources, dict, NotGiven]] = None, **kwargs: Any, ) -> OpenAIAssistantRunnable: """创建一个AsyncOpenAI助手并实例化Runnable。 参数: name: 助手名称。 instructions: 助手指令。 tools: 助手工具。可以以OpenAI格式或BaseTools形式传递。 tool_resources: 助手工具资源。可以以OpenAI格式传递。 model: 要使用的助手模型。 async_client: AsyncOpenAI客户端。 如果未指定,将创建默认的async_client。 返回: 配置为使用创建的助手运行的AsyncOpenAIAssistantRunnable。 """ async_client = async_client or _get_openai_async_client() if tool_resources is None: from openai._types import NOT_GIVEN tool_resources = NOT_GIVEN openai_tools = [_get_assistants_tool(tool) for tool in tools] assistant = await async_client.beta.assistants.create( name=name, instructions=instructions, tools=openai_tools, # type: ignore tool_resources=tool_resources, model=model, ) return cls(assistant_id=assistant.id, async_client=async_client, **kwargs)
[docs] async def ainvoke( self, input: dict, config: Optional[RunnableConfig] = None, **kwargs: Any ) -> OutputType: """异步调用助手。 参数: input: 可运行输入字典,可以包括: content: 启动新运行时的用户消息。 thread_id: 要使用的现有线程。 run_id: 要使用的现有运行。仅在在初始调用后为必需操作提供工具输出时才应提供。 file_ids: (已弃用)要包含在新运行中的文件ID。请改用'attachments'。 attachments: 要包含在新运行中的助手文件(v2 API)。 message_metadata: 与新消息关联的元数据。 thread_metadata: 与新线程关联的元数据。仅在创建新线程时相关。 instructions: 附加运行说明。 model: 为此运行覆盖助手模型。 tools: 为此运行覆盖助手工具。 tool_resources: 为此运行覆盖助手工具资源(v2 API)。 run_metadata: 与新运行关联的元数据。 config: 可运行配置: 返回: 如果self.as_agent,则返回 Union[List[OpenAIAssistantAction], OpenAIAssistantFinish]。否则, 将返回OpenAI类型 Union[List[ThreadMessage], List[RequiredActionFunctionToolCall]]。 """ config = config or {} callback_manager = CallbackManager.configure( inheritable_callbacks=config.get("callbacks"), inheritable_tags=config.get("tags"), inheritable_metadata=config.get("metadata"), ) run_manager = callback_manager.on_chain_start( dumpd(self), input, name=config.get("run_name") ) files = _convert_file_ids_into_attachments(kwargs.get("file_ids", [])) attachments = kwargs.get("attachments", []) + files try: # Being run within AgentExecutor and there are tool outputs to submit. if self.as_agent and input.get("intermediate_steps"): tool_outputs = self._parse_intermediate_steps( input["intermediate_steps"] ) run = await self.async_client.beta.threads.runs.submit_tool_outputs( **tool_outputs ) # Starting a new thread and a new run. elif "thread_id" not in input: thread = { "messages": [ { "role": "user", "content": input["content"], "attachments": attachments, "metadata": input.get("message_metadata"), } ], "metadata": input.get("thread_metadata"), } run = await self._acreate_thread_and_run(input, thread) # Starting a new run in an existing thread. elif "run_id" not in input: _ = await self.async_client.beta.threads.messages.create( input["thread_id"], content=input["content"], role="user", attachments=attachments, metadata=input.get("message_metadata"), ) run = await self._acreate_run(input) # Submitting tool outputs to an existing run, outside the AgentExecutor # framework. else: run = await self.async_client.beta.threads.runs.submit_tool_outputs( **input ) run = await self._await_for_run(run.id, run.thread_id) except BaseException as e: run_manager.on_chain_error(e) raise e try: response = self._get_response(run) except BaseException as e: run_manager.on_chain_error(e, metadata=run.dict()) raise e else: run_manager.on_chain_end(response) return response
def _create_run(self, input: dict) -> Any: params = { k: v for k, v in input.items() if k in ("instructions", "model", "tools", "tool_resources", "run_metadata") } return self.client.beta.threads.runs.create( input["thread_id"], assistant_id=self.assistant_id, **params, ) def _create_thread_and_run(self, input: dict, thread: dict) -> Any: params = { k: v for k, v in input.items() if k in ("instructions", "model", "tools", "tool_resources", "run_metadata") } run = self.client.beta.threads.create_and_run( assistant_id=self.assistant_id, thread=thread, **params, ) return run async def _acreate_run(self, input: dict) -> Any: params = { k: v for k, v in input.items() if k in ("instructions", "model", "tools", "tool_resources" "run_metadata") } return await self.async_client.beta.threads.runs.create( input["thread_id"], assistant_id=self.assistant_id, **params, ) async def _acreate_thread_and_run(self, input: dict, thread: dict) -> Any: params = { k: v for k, v in input.items() if k in ("instructions", "model", "tools", "tool_resources", "run_metadata") } run = await self.async_client.beta.threads.create_and_run( assistant_id=self.assistant_id, thread=thread, **params, ) return run