from __future__ import annotations
import json
from json import JSONDecodeError
from time import sleep
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Sequence,
Tuple,
Type,
Union,
)
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.callbacks import CallbackManager
from langchain_core.load import dumpd
from langchain_core.pydantic_v1 import BaseModel, Field, root_validator
from langchain_core.runnables import RunnableConfig, RunnableSerializable, ensure_config
from langchain_core.tools import BaseTool
from langchain_core.utils.function_calling import convert_to_openai_tool
if TYPE_CHECKING:
import openai
from openai.types.beta.threads import ThreadMessage
from openai.types.beta.threads.required_action_function_tool_call import (
RequiredActionFunctionToolCall,
)
[docs]class OpenAIAssistantFinish(AgentFinish):
"""代理完成运行和线程元数据。"""
run_id: str
thread_id: str
[docs] @classmethod
def is_lc_serializable(cls) -> bool:
return False
[docs]class OpenAIAssistantAction(AgentAction):
"""AgentAction 包含提交自定义工具输出到现有运行所需的信息。"""
tool_call_id: str
run_id: str
thread_id: str
[docs] @classmethod
def is_lc_serializable(cls) -> bool:
return False
def _get_openai_client() -> openai.OpenAI:
try:
import openai
return openai.OpenAI()
except ImportError as e:
raise ImportError(
"Unable to import openai, please install with `pip install openai`."
) from e
except AttributeError as e:
raise AttributeError(
"Please make sure you are using a v1.1-compatible version of openai. You "
'can install with `pip install "openai>=1.1"`.'
) from e
def _get_openai_async_client() -> openai.AsyncOpenAI:
try:
import openai
return openai.AsyncOpenAI()
except ImportError as e:
raise ImportError(
"Unable to import openai, please install with `pip install openai`."
) from e
except AttributeError as e:
raise AttributeError(
"Please make sure you are using a v1.1-compatible version of openai. You "
'can install with `pip install "openai>=1.1"`.'
) from e
def _is_assistants_builtin_tool(
tool: Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool],
) -> bool:
"""确定工具是否对应于OpenAI助手内置的工具。"""
assistants_builtin_tools = ("code_interpreter", "retrieval")
return (
isinstance(tool, dict)
and ("type" in tool)
and (tool["type"] in assistants_builtin_tools)
)
def _get_assistants_tool(
tool: Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool],
) -> Dict[str, Any]:
"""将原始函数/类转换为OpenAI工具。
请注意,OpenAI助手支持几种内置工具,例如“code_interpreter”和“retrieval”。
"""
if _is_assistants_builtin_tool(tool):
return tool # type: ignore
else:
return convert_to_openai_tool(tool)
OutputType = Union[
List[OpenAIAssistantAction],
OpenAIAssistantFinish,
List["ThreadMessage"],
List["RequiredActionFunctionToolCall"],
]
[docs]class OpenAIAssistantRunnable(RunnableSerializable[Dict, OutputType]):
"""运行一个OpenAI助手。
使用OpenAI工具的示例:
.. code-block:: python
from langchain_experimental.openai_assistant import OpenAIAssistantRunnable
interpreter_assistant = OpenAIAssistantRunnable.create_assistant(
name="langchain助手",
instructions="您是一位个人数学导师。编写并运行代码来回答数学问题。",
tools=[{"type": "code_interpreter"}],
model="gpt-4-1106-preview"
)
output = interpreter_assistant.invoke({"content": "10 - 4的2.7次方是多少"})
使用自定义工具和AgentExecutor的示例:
.. code-block:: python
from langchain_experimental.openai_assistant import OpenAIAssistantRunnable
from langchain.agents import AgentExecutor
from langchain.tools import E2BDataAnalysisTool
tools = [E2BDataAnalysisTool(api_key="...")]
agent = OpenAIAssistantRunnable.create_assistant(
name="langchain助手 e2b工具",
instructions="您是一位个人数学导师。编写并运行代码来回答数学问题。",
tools=tools,
model="gpt-4-1106-preview",
as_agent=True
)
agent_executor = AgentExecutor(agent=agent, tools=tools)
agent_executor.invoke({"content": "10 - 4的2.7次方是多少"})
使用自定义工具和自定义执行的示例:
.. code-block:: python
from langchain_experimental.openai_assistant import OpenAIAssistantRunnable
from langchain.agents import AgentExecutor
from langchain_core.agents import AgentFinish
from langchain.tools import E2BDataAnalysisTool
tools = [E2BDataAnalysisTool(api_key="...")]
agent = OpenAIAssistantRunnable.create_assistant(
name="langchain助手 e2b工具",
instructions="您是一位个人数学导师。编写并运行代码来回答数学问题。",
tools=tools,
model="gpt-4-1106-preview",
as_agent=True
)
def execute_agent(agent, tools, input):
tool_map = {tool.name: tool for tool in tools}
response = agent.invoke(input)
while not isinstance(response, AgentFinish):
tool_outputs = []
for action in response:
tool_output = tool_map[action.tool].invoke(action.tool_input)
tool_outputs.append({"output": tool_output, "tool_call_id": action.tool_call_id})
response = agent.invoke(
{
"tool_outputs": tool_outputs,
"run_id": action.run_id,
"thread_id": action.thread_id
}
)
return response
response = execute_agent(agent, tools, {"content": "10 - 4的2.7次方是多少"})
next_response = execute_agent(agent, tools, {"content": "现在加上17.241", "thread_id": response.thread_id})""" # noqa: E501
client: Any = Field(default_factory=_get_openai_client)
"""OpenAI或AzureOpenAI客户端。"""
async_client: Any = None
"""OpenAI或AzureOpenAI异步客户端。"""
assistant_id: str
"""OpenAI助手ID。"""
check_every_ms: float = 1_000.0
"""每隔多少毫秒检查运行进度的频率。"""
as_agent: bool = False
"""作为LangChain代理使用,与AgentExecutor兼容。"""
@root_validator()
def validate_async_client(cls, values: dict) -> dict:
if values["async_client"] is None:
import openai
api_key = values["client"].api_key
values["async_client"] = openai.AsyncOpenAI(api_key=api_key)
return values
[docs] @classmethod
def create_assistant(
cls,
name: str,
instructions: str,
tools: Sequence[Union[BaseTool, dict]],
model: str,
*,
client: Optional[Union[openai.OpenAI, openai.AzureOpenAI]] = None,
**kwargs: Any,
) -> OpenAIAssistantRunnable:
"""创建一个OpenAI助手并实例化Runnable。
参数:
name: 助手名称。
instructions: 助手指令。
tools: 助手工具。可以以OpenAI格式或BaseTools形式传递。
model: 要使用的助手模型。
client: OpenAI或AzureOpenAI客户端。
如果未指定,将创建默认的OpenAI客户端。
返回:
配置为使用创建的助手运行的OpenAIAssistantRunnable。
"""
client = client or _get_openai_client()
assistant = client.beta.assistants.create(
name=name,
instructions=instructions,
tools=[_get_assistants_tool(tool) for tool in tools], # type: ignore
model=model,
file_ids=kwargs.get("file_ids"),
)
return cls(assistant_id=assistant.id, client=client, **kwargs)
[docs] def invoke(
self, input: dict, config: Optional[RunnableConfig] = None
) -> OutputType:
"""调用助手。
参数:
input: 可运行的输入字典,可以包括:
content: 启动新运行时的用户消息。
thread_id: 要使用的现有线程。
run_id: 要使用的现有运行。仅在在初始调用后提供所需操作的工具输出时才应提供。
file_ids: 要包含在新运行中的文件id。用于检索。
message_metadata: 与新消息关联的元数据。
thread_metadata: 与新线程关联的元数据。仅在创建新线程时相关。
instructions: 额外的运行指令。
model: 为此运行覆盖助手模型。
tools: 为此运行覆盖助手工具。
run_metadata: 与新运行关联的元数据。
config: 可运行的配置:
返回:
如果self.as_agent,则返回
Union[List[OpenAIAssistantAction], OpenAIAssistantFinish]。否则,
将返回OpenAI类型
Union[List[ThreadMessage], List[RequiredActionFunctionToolCall]]。
"""
config = ensure_config(config)
callback_manager = CallbackManager.configure(
inheritable_callbacks=config.get("callbacks"),
inheritable_tags=config.get("tags"),
inheritable_metadata=config.get("metadata"),
)
run_manager = callback_manager.on_chain_start(
dumpd(self), input, name=config.get("run_name")
)
try:
# Being run within AgentExecutor and there are tool outputs to submit.
if self.as_agent and input.get("intermediate_steps"):
tool_outputs = self._parse_intermediate_steps(
input["intermediate_steps"]
)
run = self.client.beta.threads.runs.submit_tool_outputs(**tool_outputs)
# Starting a new thread and a new run.
elif "thread_id" not in input:
thread = {
"messages": [
{
"role": "user",
"content": input["content"],
"file_ids": input.get("file_ids", []),
"metadata": input.get("message_metadata"),
}
],
"metadata": input.get("thread_metadata"),
}
run = self._create_thread_and_run(input, thread)
# Starting a new run in an existing thread.
elif "run_id" not in input:
_ = self.client.beta.threads.messages.create(
input["thread_id"],
content=input["content"],
role="user",
file_ids=input.get("file_ids", []),
metadata=input.get("message_metadata"),
)
run = self._create_run(input)
# Submitting tool outputs to an existing run, outside the AgentExecutor
# framework.
else:
run = self.client.beta.threads.runs.submit_tool_outputs(**input)
run = self._wait_for_run(run.id, run.thread_id)
except BaseException as e:
run_manager.on_chain_error(e)
raise e
try:
response = self._get_response(run)
except BaseException as e:
run_manager.on_chain_error(e, metadata=run.dict())
raise e
else:
run_manager.on_chain_end(response)
return response
[docs] @classmethod
async def acreate_assistant(
cls,
name: str,
instructions: str,
tools: Sequence[Union[BaseTool, dict]],
model: str,
*,
async_client: Optional[
Union[openai.AsyncOpenAI, openai.AsyncAzureOpenAI]
] = None,
**kwargs: Any,
) -> OpenAIAssistantRunnable:
"""创建一个AsyncOpenAI助手并实例化Runnable。
参数:
name: 助手名称。
instructions: 助手指令。
tools: 助手工具。可以以OpenAI格式或BaseTools形式传递。
model: 要使用的助手模型。
async_client: AsyncOpenAI客户端。
如果未指定,将创建默认的async_client。
返回:
配置为使用创建的助手运行的AsyncOpenAIAssistantRunnable。
"""
async_client = async_client or _get_openai_async_client()
openai_tools = [_get_assistants_tool(tool) for tool in tools]
assistant = await async_client.beta.assistants.create(
name=name,
instructions=instructions,
tools=openai_tools, # type: ignore
model=model,
file_ids=kwargs.get("file_ids"),
)
return cls(assistant_id=assistant.id, async_client=async_client, **kwargs)
[docs] async def ainvoke(
self, input: dict, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> OutputType:
"""异步调用助手。
参数:
input: 可以包含以下内容的可运行输入字典:
content: 启动新运行时的用户消息。
thread_id: 要使用的现有线程。
run_id: 要使用的现有运行。仅在初始调用后提供所需操作的工具输出时才应提供。
file_ids: 要包含在新运行中的文件ID。用于检索。
message_metadata: 与新消息关联的元数据。
thread_metadata: 与新线程关联的元数据。仅在创建新线程时相关。
instructions: 额外的运行指令。
model: 为此运行覆盖助手模型。
tools: 为此运行覆盖助手工具。
run_metadata: 与新运行关联的元数据。
config: 可运行配置:
返回:
如果self.as_agent,则返回
Union[List[OpenAIAssistantAction], OpenAIAssistantFinish]。否则,
将返回OpenAI类型
Union[List[ThreadMessage], List[RequiredActionFunctionToolCall]]。
"""
config = config or {}
callback_manager = CallbackManager.configure(
inheritable_callbacks=config.get("callbacks"),
inheritable_tags=config.get("tags"),
inheritable_metadata=config.get("metadata"),
)
run_manager = callback_manager.on_chain_start(
dumpd(self), input, name=config.get("run_name")
)
try:
# Being run within AgentExecutor and there are tool outputs to submit.
if self.as_agent and input.get("intermediate_steps"):
tool_outputs = self._parse_intermediate_steps(
input["intermediate_steps"]
)
run = await self.async_client.beta.threads.runs.submit_tool_outputs(
**tool_outputs
)
# Starting a new thread and a new run.
elif "thread_id" not in input:
thread = {
"messages": [
{
"role": "user",
"content": input["content"],
"file_ids": input.get("file_ids", []),
"metadata": input.get("message_metadata"),
}
],
"metadata": input.get("thread_metadata"),
}
run = await self._acreate_thread_and_run(input, thread)
# Starting a new run in an existing thread.
elif "run_id" not in input:
_ = await self.async_client.beta.threads.messages.create(
input["thread_id"],
content=input["content"],
role="user",
file_ids=input.get("file_ids", []),
metadata=input.get("message_metadata"),
)
run = await self._acreate_run(input)
# Submitting tool outputs to an existing run, outside the AgentExecutor
# framework.
else:
run = await self.async_client.beta.threads.runs.submit_tool_outputs(
**input
)
run = await self._await_for_run(run.id, run.thread_id)
except BaseException as e:
run_manager.on_chain_error(e)
raise e
try:
response = self._get_response(run)
except BaseException as e:
run_manager.on_chain_error(e, metadata=run.dict())
raise e
else:
run_manager.on_chain_end(response)
return response
def _parse_intermediate_steps(
self, intermediate_steps: List[Tuple[OpenAIAssistantAction, str]]
) -> dict:
last_action, last_output = intermediate_steps[-1]
run = self._wait_for_run(last_action.run_id, last_action.thread_id)
required_tool_call_ids = {
tc.id for tc in run.required_action.submit_tool_outputs.tool_calls
}
tool_outputs = [
{"output": str(output), "tool_call_id": action.tool_call_id}
for action, output in intermediate_steps
if action.tool_call_id in required_tool_call_ids
]
submit_tool_outputs = {
"tool_outputs": tool_outputs,
"run_id": last_action.run_id,
"thread_id": last_action.thread_id,
}
return submit_tool_outputs
def _create_run(self, input: dict) -> Any:
params = {
k: v
for k, v in input.items()
if k in ("instructions", "model", "tools", "run_metadata")
}
return self.client.beta.threads.runs.create(
input["thread_id"],
assistant_id=self.assistant_id,
**params,
)
def _create_thread_and_run(self, input: dict, thread: dict) -> Any:
params = {
k: v
for k, v in input.items()
if k in ("instructions", "model", "tools", "run_metadata")
}
run = self.client.beta.threads.create_and_run(
assistant_id=self.assistant_id,
thread=thread,
**params,
)
return run
def _get_response(self, run: Any) -> Any:
# TODO: Pagination
if run.status == "completed":
import openai
major_version = int(openai.version.VERSION.split(".")[0])
minor_version = int(openai.version.VERSION.split(".")[1])
version_gte_1_14 = (major_version > 1) or (
major_version == 1 and minor_version >= 14
)
messages = self.client.beta.threads.messages.list(
run.thread_id, order="asc"
)
new_messages = [msg for msg in messages if msg.run_id == run.id]
if not self.as_agent:
return new_messages
answer: Any = [
msg_content for msg in new_messages for msg_content in msg.content
]
if all(
(
isinstance(content, openai.types.beta.threads.TextContentBlock)
if version_gte_1_14
else isinstance(
content, openai.types.beta.threads.MessageContentText
)
)
for content in answer
):
answer = "\n".join(content.text.value for content in answer)
return OpenAIAssistantFinish(
return_values={
"output": answer,
"thread_id": run.thread_id,
"run_id": run.id,
},
log="",
run_id=run.id,
thread_id=run.thread_id,
)
elif run.status == "requires_action":
if not self.as_agent:
return run.required_action.submit_tool_outputs.tool_calls
actions = []
for tool_call in run.required_action.submit_tool_outputs.tool_calls:
function = tool_call.function
try:
args = json.loads(function.arguments, strict=False)
except JSONDecodeError as e:
raise ValueError(
f"Received invalid JSON function arguments: "
f"{function.arguments} for function {function.name}"
) from e
if len(args) == 1 and "__arg1" in args:
args = args["__arg1"]
actions.append(
OpenAIAssistantAction(
tool=function.name,
tool_input=args,
tool_call_id=tool_call.id,
log="",
run_id=run.id,
thread_id=run.thread_id,
)
)
return actions
else:
run_info = json.dumps(run.dict(), indent=2)
raise ValueError(
f"Unexpected run status: {run.status}. Full run info:\n\n{run_info})"
)
def _wait_for_run(self, run_id: str, thread_id: str) -> Any:
in_progress = True
while in_progress:
run = self.client.beta.threads.runs.retrieve(run_id, thread_id=thread_id)
in_progress = run.status in ("in_progress", "queued")
if in_progress:
sleep(self.check_every_ms / 1000)
return run
async def _aparse_intermediate_steps(
self, intermediate_steps: List[Tuple[OpenAIAssistantAction, str]]
) -> dict:
last_action, last_output = intermediate_steps[-1]
run = await self._wait_for_run(last_action.run_id, last_action.thread_id)
required_tool_call_ids = {
tc.id for tc in run.required_action.submit_tool_outputs.tool_calls
}
tool_outputs = [
{"output": str(output), "tool_call_id": action.tool_call_id}
for action, output in intermediate_steps
if action.tool_call_id in required_tool_call_ids
]
submit_tool_outputs = {
"tool_outputs": tool_outputs,
"run_id": last_action.run_id,
"thread_id": last_action.thread_id,
}
return submit_tool_outputs
async def _acreate_run(self, input: dict) -> Any:
params = {
k: v
for k, v in input.items()
if k in ("instructions", "model", "tools", "run_metadata")
}
return await self.async_client.beta.threads.runs.create(
input["thread_id"],
assistant_id=self.assistant_id,
**params,
)
async def _acreate_thread_and_run(self, input: dict, thread: dict) -> Any:
params = {
k: v
for k, v in input.items()
if k in ("instructions", "model", "tools", "run_metadata")
}
run = await self.async_client.beta.threads.create_and_run(
assistant_id=self.assistant_id,
thread=thread,
**params,
)
return run
async def _aget_response(self, run: Any) -> Any:
# TODO: Pagination
if run.status == "completed":
import openai
major_version = int(openai.version.VERSION.split(".")[0])
minor_version = int(openai.version.VERSION.split(".")[1])
version_gte_1_14 = (major_version > 1) or (
major_version == 1 and minor_version >= 14
)
messages = await self.async_client.beta.threads.messages.list(
run.thread_id, order="asc"
)
new_messages = [msg for msg in messages if msg.run_id == run.id]
if not self.as_agent:
return new_messages
answer: Any = [
msg_content for msg in new_messages for msg_content in msg.content
]
if all(
(
isinstance(content, openai.types.beta.threads.TextContentBlock)
if version_gte_1_14
else isinstance(
content, openai.types.beta.threads.MessageContentText
)
)
for content in answer
):
answer = "\n".join(content.text.value for content in answer)
return OpenAIAssistantFinish(
return_values={
"output": answer,
"thread_id": run.thread_id,
"run_id": run.id,
},
log="",
run_id=run.id,
thread_id=run.thread_id,
)
elif run.status == "requires_action":
if not self.as_agent:
return run.required_action.submit_tool_outputs.tool_calls
actions = []
for tool_call in run.required_action.submit_tool_outputs.tool_calls:
function = tool_call.function
try:
args = json.loads(function.arguments, strict=False)
except JSONDecodeError as e:
raise ValueError(
f"Received invalid JSON function arguments: "
f"{function.arguments} for function {function.name}"
) from e
if len(args) == 1 and "__arg1" in args:
args = args["__arg1"]
actions.append(
OpenAIAssistantAction(
tool=function.name,
tool_input=args,
tool_call_id=tool_call.id,
log="",
run_id=run.id,
thread_id=run.thread_id,
)
)
return actions
else:
run_info = json.dumps(run.dict(), indent=2)
raise ValueError(
f"Unexpected run status: {run.status}. Full run info:\n\n{run_info})"
)
async def _await_for_run(self, run_id: str, thread_id: str) -> Any:
in_progress = True
while in_progress:
run = await self.async_client.beta.threads.runs.retrieve(
run_id, thread_id=thread_id
)
in_progress = run.status in ("in_progress", "queued")
if in_progress:
sleep(self.check_every_ms / 1000)
return run