Source code for langchain_experimental.plan_and_execute.executors.base
from abc import abstractmethod
from typing import Any
from langchain.chains.base import Chain
from langchain_core.callbacks.manager import Callbacks
from langchain_experimental.plan_and_execute.schema import StepResponse
from langchain_experimental.pydantic_v1 import BaseModel
[docs]class BaseExecutor(BaseModel):
"""基础执行器。"""
[docs] @abstractmethod
def step(
self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any
) -> StepResponse:
"""采取步骤。"""
[docs] @abstractmethod
async def astep(
self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any
) -> StepResponse:
"""进行异步步骤。"""
[docs]class ChainExecutor(BaseExecutor):
"""链式执行器。"""
chain: Chain
"""要使用的链。"""
[docs] def step(
self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any
) -> StepResponse:
"""采取步骤。"""
response = self.chain.run(**inputs, callbacks=callbacks)
return StepResponse(response=response)
[docs] async def astep(
self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any
) -> StepResponse:
"""采取步骤。"""
response = await self.chain.arun(**inputs, callbacks=callbacks)
return StepResponse(response=response)