Source code for langchain_experimental.plan_and_execute.executors.base

from abc import abstractmethod
from typing import Any

from langchain.chains.base import Chain
from langchain_core.callbacks.manager import Callbacks

from langchain_experimental.plan_and_execute.schema import StepResponse
from langchain_experimental.pydantic_v1 import BaseModel


[docs]class BaseExecutor(BaseModel): """基础执行器。"""
[docs] @abstractmethod def step( self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any ) -> StepResponse: """采取步骤。"""
[docs] @abstractmethod async def astep( self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any ) -> StepResponse: """进行异步步骤。"""
[docs]class ChainExecutor(BaseExecutor): """链式执行器。""" chain: Chain """要使用的链。"""
[docs] def step( self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any ) -> StepResponse: """采取步骤。""" response = self.chain.run(**inputs, callbacks=callbacks) return StepResponse(response=response)
[docs] async def astep( self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any ) -> StepResponse: """采取步骤。""" response = await self.chain.arun(**inputs, callbacks=callbacks) return StepResponse(response=response)