Skip to content

Core Agent Classes#

Base Types#

基本代理类型。

BaseAgent #

Bases: BaseChatEngine, BaseQueryEngine

基础代理。

Source code in llama_index/core/base/agent/types.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class BaseAgent(BaseChatEngine, BaseQueryEngine):
    """基础代理。"""

    def _get_prompts(self) -> PromptDictType:
        """获取提示。"""
        # TODO: the ReAct agent does not explicitly specify prompts, would need a
        # refactor to expose those prompts
        return {}

    def _get_prompt_modules(self) -> PromptMixinType:
        """获取提示模块。"""
        return {}

    def _update_prompts(self, prompts: PromptDictType) -> None:
        """更新提示。"""

    # ===== Query Engine Interface =====
    @trace_method("query")
    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        agent_response = self.chat(
            query_bundle.query_str,
            chat_history=[],
        )
        return Response(
            response=str(agent_response), source_nodes=agent_response.source_nodes
        )

    @trace_method("query")
    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        agent_response = await self.achat(
            query_bundle.query_str,
            chat_history=[],
        )
        return Response(
            response=str(agent_response), source_nodes=agent_response.source_nodes
        )

    def stream_chat(
        self, message: str, chat_history: Optional[List[ChatMessage]] = None
    ) -> StreamingAgentChatResponse:
        raise NotImplementedError("stream_chat not implemented")

    async def astream_chat(
        self, message: str, chat_history: Optional[List[ChatMessage]] = None
    ) -> StreamingAgentChatResponse:
        raise NotImplementedError("astream_chat not implemented")

BaseAgentWorker #

Bases: PromptMixin

基础代理工作者。

Source code in llama_index/core/base/agent/types.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
class BaseAgentWorker(PromptMixin):
    """基础代理工作者。"""

    class Config:
        arbitrary_types_allowed = True

    def _get_prompts(self) -> PromptDictType:
        """获取提示。"""
        # TODO: the ReAct agent does not explicitly specify prompts, would need a
        # refactor to expose those prompts
        return {}

    def _get_prompt_modules(self) -> PromptMixinType:
        """获取提示模块。"""
        return {}

    def _update_prompts(self, prompts: PromptDictType) -> None:
        """更新提示。"""

    @abstractmethod
    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """从任务中初始化步骤。"""

    @abstractmethod
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """运行步骤。"""

    @abstractmethod
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """运行步骤(异步)。"""
        raise NotImplementedError

    @abstractmethod
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """运行步骤(流式)。"""
        # TODO: figure out if we need a different type for TaskStepOutput
        raise NotImplementedError

    @abstractmethod
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """运行步骤(异步流)。"""
        raise NotImplementedError

    @abstractmethod
    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """完成任务,在所有步骤都完成之后。"""

    def set_callback_manager(self, callback_manager: CallbackManager) -> None:
        """设置回调管理器。"""
        # TODO: make this abstractmethod (right now will break some agent impls)

    def as_agent(self, **kwargs: Any) -> "AgentRunner":
        """作为代理运行返回。"""
        from llama_index.core.agent.runner.base import AgentRunner

        return AgentRunner(self, **kwargs)

initialize_step abstractmethod #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

从任务中初始化步骤。

Source code in llama_index/core/base/agent/types.py
201
202
203
@abstractmethod
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """从任务中初始化步骤。"""

run_step abstractmethod #

run_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤。

Source code in llama_index/core/base/agent/types.py
205
206
207
@abstractmethod
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """运行步骤。"""

arun_step abstractmethod async #

arun_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(异步)。

Source code in llama_index/core/base/agent/types.py
209
210
211
212
213
214
@abstractmethod
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """运行步骤(异步)。"""
    raise NotImplementedError

stream_step abstractmethod #

stream_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(流式)。

Source code in llama_index/core/base/agent/types.py
216
217
218
219
220
@abstractmethod
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """运行步骤(流式)。"""
    # TODO: figure out if we need a different type for TaskStepOutput
    raise NotImplementedError

astream_step abstractmethod async #

astream_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(异步流)。

Source code in llama_index/core/base/agent/types.py
222
223
224
225
226
227
@abstractmethod
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """运行步骤(异步流)。"""
    raise NotImplementedError

finalize_task abstractmethod #

finalize_task(task: Task, **kwargs: Any) -> None

完成任务,在所有步骤都完成之后。

Source code in llama_index/core/base/agent/types.py
229
230
231
@abstractmethod
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """完成任务,在所有步骤都完成之后。"""

set_callback_manager #

set_callback_manager(
    callback_manager: CallbackManager,
) -> None

设置回调管理器。

Source code in llama_index/core/base/agent/types.py
233
234
def set_callback_manager(self, callback_manager: CallbackManager) -> None:
    """设置回调管理器。"""

as_agent #

as_agent(**kwargs: Any) -> AgentRunner

作为代理运行返回。

Source code in llama_index/core/base/agent/types.py
237
238
239
240
241
def as_agent(self, **kwargs: Any) -> "AgentRunner":
    """作为代理运行返回。"""
    from llama_index.core.agent.runner.base import AgentRunner

    return AgentRunner(self, **kwargs)

Task #

Bases: BaseModel

代理任务。

表示根据用户输入给定的代理运行。

Source code in llama_index/core/base/agent/types.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class Task(BaseModel):
    """代理任务。

    表示根据用户输入给定的代理运行。"""

    class Config:
        arbitrary_types_allowed = True

    task_id: str = Field(
        default_factory=lambda: str(uuid.uuid4()), type=str, description="Task ID"
    )
    input: str = Field(..., type=str, description="User input")

    # NOTE: this is state that may be modified throughout the course of execution of the task
    memory: BaseMemory = Field(
        ...,
        type=BaseMemory,
        description=(
            "Conversational Memory. Maintains state before execution of this task."
        ),
    )

    callback_manager: CallbackManager = Field(
        default_factory=CallbackManager,
        exclude=True,
        description="Callback manager for the task.",
    )

    extra_state: Dict[str, Any] = Field(
        default_factory=dict,
        description=(
            "Additional user-specified state for a given task. "
            "Can be modified throughout the execution of a task."
        ),
    )

TaskStep #

Bases: BaseModel

代理任务步骤。

代表了代理执行运行中的单个输入步骤("任务")给定用户输入。

输出以 TaskStepOutput 的形式返回。

Source code in llama_index/core/base/agent/types.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class TaskStep(BaseModel):
    """代理任务步骤。

    代表了代理执行运行中的单个输入步骤("任务")给定用户输入。

    输出以 `TaskStepOutput` 的形式返回。"""

    task_id: str = Field(..., diescription="Task ID")
    step_id: str = Field(..., description="Step ID")
    input: Optional[str] = Field(default=None, description="User input")
    # memory: BaseMemory = Field(
    #     ..., type=BaseMemory, description="Conversational Memory"
    # )
    step_state: Dict[str, Any] = Field(
        default_factory=dict, description="Additional state for a given step."
    )

    # NOTE: the state below may change throughout the course of execution
    # this tracks the relationships to other steps
    next_steps: Dict[str, "TaskStep"] = Field(
        default_factory=dict, description="Next steps to be executed."
    )
    prev_steps: Dict[str, "TaskStep"] = Field(
        default_factory=dict,
        description="Previous steps that were dependencies for this step.",
    )
    is_ready: bool = Field(
        default=True, description="Is this step ready to be executed?"
    )

    def get_next_step(
        self,
        step_id: str,
        input: Optional[str] = None,
        step_state: Optional[Dict[str, Any]] = None,
    ) -> "TaskStep":
        """获取下一步的便利函数。

保留任务ID、内存和步骤状态。
"""
        return TaskStep(
            task_id=self.task_id,
            step_id=step_id,
            input=input,
            # memory=self.memory,
            step_state=step_state or self.step_state,
        )

    def link_step(
        self,
        next_step: "TaskStep",
    ) -> None:
        """链接到下一步。

从当前步骤添加到下一步的链接,以及从下一步到当前步骤的链接。
"""
        self.next_steps[next_step.step_id] = next_step
        next_step.prev_steps[self.step_id] = self

get_next_step #

get_next_step(
    step_id: str,
    input: Optional[str] = None,
    step_state: Optional[Dict[str, Any]] = None,
) -> TaskStep

获取下一步的便利函数。

保留任务ID、内存和步骤状态。

Source code in llama_index/core/base/agent/types.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
    def get_next_step(
        self,
        step_id: str,
        input: Optional[str] = None,
        step_state: Optional[Dict[str, Any]] = None,
    ) -> "TaskStep":
        """获取下一步的便利函数。

保留任务ID、内存和步骤状态。
"""
        return TaskStep(
            task_id=self.task_id,
            step_id=step_id,
            input=input,
            # memory=self.memory,
            step_state=step_state or self.step_state,
        )
link_step(next_step: TaskStep) -> None

链接到下一步。

从当前步骤添加到下一步的链接,以及从下一步到当前步骤的链接。

Source code in llama_index/core/base/agent/types.py
120
121
122
123
124
125
126
127
128
129
    def link_step(
        self,
        next_step: "TaskStep",
    ) -> None:
        """链接到下一步。

从当前步骤添加到下一步的链接,以及从下一步到当前步骤的链接。
"""
        self.next_steps[next_step.step_id] = next_step
        next_step.prev_steps[self.step_id] = self

TaskStepOutput #

Bases: BaseModel

代理任务步骤输出。

Source code in llama_index/core/base/agent/types.py
132
133
134
135
136
137
138
139
140
141
142
class TaskStepOutput(BaseModel):
    """代理任务步骤输出。"""

    output: Any = Field(..., description="Task step output")
    task_step: TaskStep = Field(..., description="Task step input")
    next_steps: List[TaskStep] = Field(..., description="Next steps to be executed.")
    is_last: bool = Field(default=False, description="Is this the last step?")

    def __str__(self) -> str:
        """字符串表示。"""
        return str(self.output)

Runners#

AgentRunner #

Bases: BaseAgentRunner

代理运行器。

顶层代理协调器,可以创建任务,运行任务中的每个步骤,或者运行端到端的任务。存储状态并跟踪任务。

Parameters:

Name Type Description Default
agent_worker BaseAgentWorker

步骤执行器

required
chat_history Optional[List[ChatMessage]]

聊天历史。默认为None。

None
state Optional[AgentState]

代理状态。默认为None。

None
memory Optional[BaseMemory]

内存。默认为None。

None
llm Optional[LLM]

LLM。默认为None。

None
callback_manager Optional[CallbackManager]

回调管理器。默认为None。

None
init_task_state_kwargs Optional[dict]

初始化任务状态参数。默认为None。

None
Source code in llama_index/core/agent/runner/base.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
class AgentRunner(BaseAgentRunner):
    """代理运行器。

顶层代理协调器,可以创建任务,运行任务中的每个步骤,或者运行端到端的任务。存储状态并跟踪任务。

Args:
    agent_worker (BaseAgentWorker): 步骤执行器
    chat_history (Optional[List[ChatMessage]], optional): 聊天历史。默认为None。
    state (Optional[AgentState], optional): 代理状态。默认为None。
    memory (Optional[BaseMemory], optional): 内存。默认为None。
    llm (Optional[LLM], optional): LLM。默认为None。
    callback_manager (Optional[CallbackManager], optional): 回调管理器。默认为None。
    init_task_state_kwargs (Optional[dict], optional): 初始化任务状态参数。默认为None。"""

    # # TODO: implement this in Pydantic

    def __init__(
        self,
        agent_worker: BaseAgentWorker,
        chat_history: Optional[List[ChatMessage]] = None,
        state: Optional[AgentState] = None,
        memory: Optional[BaseMemory] = None,
        llm: Optional[LLM] = None,
        callback_manager: Optional[CallbackManager] = None,
        init_task_state_kwargs: Optional[dict] = None,
        delete_task_on_finish: bool = False,
        default_tool_choice: str = "auto",
        verbose: bool = False,
    ) -> None:
        """初始化。"""
        self.agent_worker = agent_worker
        self.state = state or AgentState()
        self.memory = memory or ChatMemoryBuffer.from_defaults(chat_history, llm=llm)

        # get and set callback manager
        if callback_manager is not None:
            self.agent_worker.set_callback_manager(callback_manager)
            self.callback_manager = callback_manager
        else:
            # TODO: This is *temporary*
            # Stopgap before having a callback on the BaseAgentWorker interface.
            # Doing that requires a bit more refactoring to make sure existing code
            # doesn't break.
            if hasattr(self.agent_worker, "callback_manager"):
                self.callback_manager = (
                    self.agent_worker.callback_manager or CallbackManager()
                )
            else:
                self.callback_manager = CallbackManager()
        self.init_task_state_kwargs = init_task_state_kwargs or {}
        self.delete_task_on_finish = delete_task_on_finish
        self.default_tool_choice = default_tool_choice
        self.verbose = verbose

    @staticmethod
    def from_llm(
        tools: Optional[List[BaseTool]] = None,
        llm: Optional[LLM] = None,
        **kwargs: Any,
    ) -> "AgentRunner":
        from llama_index.core.agent import ReActAgent

        if os.getenv("IS_TESTING"):
            return ReActAgent.from_tools(
                tools=tools,
                llm=llm,
                **kwargs,
            )

        try:
            from llama_index.llms.openai import OpenAI  # pants: no-infer-dep
            from llama_index.llms.openai.utils import (
                is_function_calling_model,
            )  # pants: no-infer-dep
        except ImportError:
            raise ImportError(
                "`llama-index-llms-openai` package not found. Please "
                "install by running `pip install llama-index-llms-openai`."
            )

        if isinstance(llm, OpenAI) and is_function_calling_model(llm.model):
            from llama_index.agent.openai import OpenAIAgent  # pants: no-infer-dep

            return OpenAIAgent.from_tools(
                tools=tools,
                llm=llm,
                **kwargs,
            )
        else:
            return ReActAgent.from_tools(
                tools=tools,
                llm=llm,
                **kwargs,
            )

    @property
    def chat_history(self) -> List[ChatMessage]:
        return self.memory.get_all()

    def reset(self) -> None:
        self.memory.reset()
        self.state.reset()

    def create_task(self, input: str, **kwargs: Any) -> Task:
        """创建任务。"""
        if not self.init_task_state_kwargs:
            extra_state = kwargs.pop("extra_state", {})
        else:
            if "extra_state" in kwargs:
                raise ValueError(
                    "Cannot specify both `extra_state` and `init_task_state_kwargs`"
                )
            else:
                extra_state = self.init_task_state_kwargs

        callback_manager = kwargs.pop("callback_manager", self.callback_manager)
        task = Task(
            input=input,
            memory=self.memory,
            extra_state=extra_state,
            callback_manager=callback_manager,
            **kwargs,
        )
        # # put input into memory
        # self.memory.put(ChatMessage(content=input, role=MessageRole.USER))

        # get initial step from task, and put it in the step queue
        initial_step = self.agent_worker.initialize_step(task)
        task_state = TaskState(
            task=task,
            step_queue=deque([initial_step]),
        )
        # add it to state
        self.state.task_dict[task.task_id] = task_state

        return task

    def delete_task(
        self,
        task_id: str,
    ) -> None:
        """删除任务。

注意:这不会从内存中删除任何先前的执行。
"""
        self.state.task_dict.pop(task_id)

    def list_tasks(self, **kwargs: Any) -> List[Task]:
        """列出任务。"""
        return [task_state.task for task_state in self.state.task_dict.values()]

    def get_task(self, task_id: str, **kwargs: Any) -> Task:
        """获取任务。"""
        return self.state.get_task(task_id)

    def get_upcoming_steps(self, task_id: str, **kwargs: Any) -> List[TaskStep]:
        """获取接下来的步骤。"""
        return list(self.state.get_step_queue(task_id))

    def get_completed_steps(self, task_id: str, **kwargs: Any) -> List[TaskStepOutput]:
        """获取已完成的步骤。"""
        return self.state.get_completed_steps(task_id)

    def get_task_output(self, task_id: str, **kwargs: Any) -> TaskStepOutput:
        """获取任务输出。"""
        completed_steps = self.get_completed_steps(task_id)
        if len(completed_steps) == 0:
            raise ValueError(f"No completed steps for task_id: {task_id}")
        return completed_steps[-1]

    def get_completed_tasks(self, **kwargs: Any) -> List[Task]:
        """获取已完成的任务。"""
        task_states = list(self.state.task_dict.values())
        completed_tasks = []
        for task_state in task_states:
            completed_steps = self.get_completed_steps(task_state.task.task_id)
            if len(completed_steps) > 0 and completed_steps[-1].is_last:
                completed_tasks.append(task_state.task)

        return completed_tasks

    @dispatcher.span
    def _run_step(
        self,
        task_id: str,
        step: Optional[TaskStep] = None,
        input: Optional[str] = None,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """执行步骤。"""
        dispatch_event = dispatcher.get_dispatch_event()

        dispatch_event(AgentRunStepStartEvent(task_id=task_id, step=step, input=input))
        task = self.state.get_task(task_id)
        step_queue = self.state.get_step_queue(task_id)
        step = step or step_queue.popleft()
        if input is not None:
            step.input = input

        if self.verbose:
            print(f"> Running step {step.step_id}. Step input: {step.input}")

        # TODO: figure out if you can dynamically swap in different step executors
        # not clear when you would do that by theoretically possible

        if mode == ChatResponseMode.WAIT:
            cur_step_output = self.agent_worker.run_step(step, task, **kwargs)
        elif mode == ChatResponseMode.STREAM:
            cur_step_output = self.agent_worker.stream_step(step, task, **kwargs)
        else:
            raise ValueError(f"Invalid mode: {mode}")
        # append cur_step_output next steps to queue
        next_steps = cur_step_output.next_steps
        step_queue.extend(next_steps)

        # add cur_step_output to completed steps
        completed_steps = self.state.get_completed_steps(task_id)
        completed_steps.append(cur_step_output)

        dispatch_event(AgentRunStepEndEvent(step_output=cur_step_output))
        return cur_step_output

    @dispatcher.span
    async def _arun_step(
        self,
        task_id: str,
        step: Optional[TaskStep] = None,
        input: Optional[str] = None,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """执行步骤。"""
        dispatch_event = dispatcher.get_dispatch_event()

        dispatch_event(AgentRunStepStartEvent(task_id=task_id, step=step, input=input))
        task = self.state.get_task(task_id)
        step_queue = self.state.get_step_queue(task_id)
        step = step or step_queue.popleft()
        if input is not None:
            step.input = input

        if self.verbose:
            print(f"> Running step {step.step_id}. Step input: {step.input}")

        # TODO: figure out if you can dynamically swap in different step executors
        # not clear when you would do that by theoretically possible
        if mode == ChatResponseMode.WAIT:
            cur_step_output = await self.agent_worker.arun_step(step, task, **kwargs)
        elif mode == ChatResponseMode.STREAM:
            cur_step_output = await self.agent_worker.astream_step(step, task, **kwargs)
        else:
            raise ValueError(f"Invalid mode: {mode}")
        # append cur_step_output next steps to queue
        next_steps = cur_step_output.next_steps
        step_queue.extend(next_steps)

        # add cur_step_output to completed steps
        completed_steps = self.state.get_completed_steps(task_id)
        completed_steps.append(cur_step_output)

        dispatch_event(AgentRunStepEndEvent(step_output=cur_step_output))
        return cur_step_output

    @dispatcher.span
    def run_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """运行步骤。"""
        step = validate_step_from_args(task_id, input, step, **kwargs)
        return self._run_step(
            task_id, step, input=input, mode=ChatResponseMode.WAIT, **kwargs
        )

    @dispatcher.span
    async def arun_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """运行步骤(异步)。"""
        step = validate_step_from_args(task_id, input, step, **kwargs)
        return await self._arun_step(
            task_id, step, input=input, mode=ChatResponseMode.WAIT, **kwargs
        )

    @dispatcher.span
    def stream_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """运行步骤(流式)。"""
        step = validate_step_from_args(task_id, input, step, **kwargs)
        return self._run_step(
            task_id, step, input=input, mode=ChatResponseMode.STREAM, **kwargs
        )

    @dispatcher.span
    async def astream_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """运行步骤(异步流)。"""
        step = validate_step_from_args(task_id, input, step, **kwargs)
        return await self._arun_step(
            task_id, step, input=input, mode=ChatResponseMode.STREAM, **kwargs
        )

    @dispatcher.span
    def finalize_response(
        self,
        task_id: str,
        step_output: Optional[TaskStepOutput] = None,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """完成响应。"""
        if step_output is None:
            step_output = self.state.get_completed_steps(task_id)[-1]
        if not step_output.is_last:
            raise ValueError(
                "finalize_response can only be called on the last step output"
            )

        if not isinstance(
            step_output.output,
            (AgentChatResponse, StreamingAgentChatResponse),
        ):
            raise ValueError(
                "When `is_last` is True, cur_step_output.output must be "
                f"AGENT_CHAT_RESPONSE_TYPE: {step_output.output}"
            )

        # finalize task
        self.agent_worker.finalize_task(self.state.get_task(task_id))

        if self.delete_task_on_finish:
            self.delete_task(task_id)

        return cast(AGENT_CHAT_RESPONSE_TYPE, step_output.output)

    @dispatcher.span
    def _chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
        mode: ChatResponseMode = ChatResponseMode.WAIT,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """与步骤执行器的聊天。"""
        dispatch_event = dispatcher.get_dispatch_event()

        if chat_history is not None:
            self.memory.set(chat_history)
        task = self.create_task(message)

        result_output = None
        dispatch_event(AgentChatWithStepStartEvent(user_msg=message))
        while True:
            # pass step queue in as argument, assume step executor is stateless
            cur_step_output = self._run_step(
                task.task_id, mode=mode, tool_choice=tool_choice
            )

            if cur_step_output.is_last:
                result_output = cur_step_output
                break

            # ensure tool_choice does not cause endless loops
            tool_choice = "auto"

        result = self.finalize_response(
            task.task_id,
            result_output,
        )
        dispatch_event(AgentChatWithStepEndEvent(response=result))
        return result

    @dispatcher.span
    async def _achat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
        mode: ChatResponseMode = ChatResponseMode.WAIT,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """与步骤执行器的聊天。"""
        dispatch_event = dispatcher.get_dispatch_event()

        if chat_history is not None:
            self.memory.set(chat_history)
        task = self.create_task(message)

        result_output = None
        dispatch_event(AgentChatWithStepStartEvent(user_msg=message))
        while True:
            # pass step queue in as argument, assume step executor is stateless
            cur_step_output = await self._arun_step(
                task.task_id, mode=mode, tool_choice=tool_choice
            )

            if cur_step_output.is_last:
                result_output = cur_step_output
                break

            # ensure tool_choice does not cause endless loops
            tool_choice = "auto"

        result = self.finalize_response(
            task.task_id,
            result_output,
        )
        dispatch_event(AgentChatWithStepEndEvent(response=result))
        return result

    @dispatcher.span
    @trace_method("chat")
    def chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Optional[Union[str, dict]] = None,
    ) -> AgentChatResponse:
        # override tool choice is provided as input.
        if tool_choice is None:
            tool_choice = self.default_tool_choice
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = self._chat(
                message=message,
                chat_history=chat_history,
                tool_choice=tool_choice,
                mode=ChatResponseMode.WAIT,
            )
            assert isinstance(chat_response, AgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    @dispatcher.span
    @trace_method("chat")
    async def achat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Optional[Union[str, dict]] = None,
    ) -> AgentChatResponse:
        # override tool choice is provided as input.
        if tool_choice is None:
            tool_choice = self.default_tool_choice
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = await self._achat(
                message=message,
                chat_history=chat_history,
                tool_choice=tool_choice,
                mode=ChatResponseMode.WAIT,
            )
            assert isinstance(chat_response, AgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    @dispatcher.span
    @trace_method("chat")
    def stream_chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Optional[Union[str, dict]] = None,
    ) -> StreamingAgentChatResponse:
        # override tool choice is provided as input.
        if tool_choice is None:
            tool_choice = self.default_tool_choice
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = self._chat(
                message, chat_history, tool_choice, mode=ChatResponseMode.STREAM
            )
            assert isinstance(chat_response, StreamingAgentChatResponse) or (
                isinstance(chat_response, AgentChatResponse)
                and chat_response.is_dummy_stream
            )
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    @dispatcher.span
    @trace_method("chat")
    async def astream_chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Optional[Union[str, dict]] = None,
    ) -> StreamingAgentChatResponse:
        # override tool choice is provided as input.
        if tool_choice is None:
            tool_choice = self.default_tool_choice
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = await self._achat(
                message, chat_history, tool_choice, mode=ChatResponseMode.STREAM
            )
            assert isinstance(chat_response, StreamingAgentChatResponse) or (
                isinstance(chat_response, AgentChatResponse)
                and chat_response.is_dummy_stream
            )
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    def undo_step(self, task_id: str) -> None:
        """撤销上一步操作。"""
        raise NotImplementedError("undo_step not implemented")

create_task #

create_task(input: str, **kwargs: Any) -> Task

创建任务。

Source code in llama_index/core/agent/runner/base.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def create_task(self, input: str, **kwargs: Any) -> Task:
    """创建任务。"""
    if not self.init_task_state_kwargs:
        extra_state = kwargs.pop("extra_state", {})
    else:
        if "extra_state" in kwargs:
            raise ValueError(
                "Cannot specify both `extra_state` and `init_task_state_kwargs`"
            )
        else:
            extra_state = self.init_task_state_kwargs

    callback_manager = kwargs.pop("callback_manager", self.callback_manager)
    task = Task(
        input=input,
        memory=self.memory,
        extra_state=extra_state,
        callback_manager=callback_manager,
        **kwargs,
    )
    # # put input into memory
    # self.memory.put(ChatMessage(content=input, role=MessageRole.USER))

    # get initial step from task, and put it in the step queue
    initial_step = self.agent_worker.initialize_step(task)
    task_state = TaskState(
        task=task,
        step_queue=deque([initial_step]),
    )
    # add it to state
    self.state.task_dict[task.task_id] = task_state

    return task

delete_task #

delete_task(task_id: str) -> None

删除任务。

注意:这不会从内存中删除任何先前的执行。

Source code in llama_index/core/agent/runner/base.py
336
337
338
339
340
341
342
343
344
    def delete_task(
        self,
        task_id: str,
    ) -> None:
        """删除任务。

注意:这不会从内存中删除任何先前的执行。
"""
        self.state.task_dict.pop(task_id)

list_tasks #

list_tasks(**kwargs: Any) -> List[Task]

列出任务。

Source code in llama_index/core/agent/runner/base.py
346
347
348
def list_tasks(self, **kwargs: Any) -> List[Task]:
    """列出任务。"""
    return [task_state.task for task_state in self.state.task_dict.values()]

get_task #

get_task(task_id: str, **kwargs: Any) -> Task

获取任务。

Source code in llama_index/core/agent/runner/base.py
350
351
352
def get_task(self, task_id: str, **kwargs: Any) -> Task:
    """获取任务。"""
    return self.state.get_task(task_id)

get_upcoming_steps #

get_upcoming_steps(
    task_id: str, **kwargs: Any
) -> List[TaskStep]

获取接下来的步骤。

Source code in llama_index/core/agent/runner/base.py
354
355
356
def get_upcoming_steps(self, task_id: str, **kwargs: Any) -> List[TaskStep]:
    """获取接下来的步骤。"""
    return list(self.state.get_step_queue(task_id))

get_completed_steps #

get_completed_steps(
    task_id: str, **kwargs: Any
) -> List[TaskStepOutput]

获取已完成的步骤。

Source code in llama_index/core/agent/runner/base.py
358
359
360
def get_completed_steps(self, task_id: str, **kwargs: Any) -> List[TaskStepOutput]:
    """获取已完成的步骤。"""
    return self.state.get_completed_steps(task_id)

get_task_output #

get_task_output(
    task_id: str, **kwargs: Any
) -> TaskStepOutput

获取任务输出。

Source code in llama_index/core/agent/runner/base.py
362
363
364
365
366
367
def get_task_output(self, task_id: str, **kwargs: Any) -> TaskStepOutput:
    """获取任务输出。"""
    completed_steps = self.get_completed_steps(task_id)
    if len(completed_steps) == 0:
        raise ValueError(f"No completed steps for task_id: {task_id}")
    return completed_steps[-1]

get_completed_tasks #

get_completed_tasks(**kwargs: Any) -> List[Task]

获取已完成的任务。

Source code in llama_index/core/agent/runner/base.py
369
370
371
372
373
374
375
376
377
378
def get_completed_tasks(self, **kwargs: Any) -> List[Task]:
    """获取已完成的任务。"""
    task_states = list(self.state.task_dict.values())
    completed_tasks = []
    for task_state in task_states:
        completed_steps = self.get_completed_steps(task_state.task.task_id)
        if len(completed_steps) > 0 and completed_steps[-1].is_last:
            completed_tasks.append(task_state.task)

    return completed_tasks

run_step #

run_step(
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any
) -> TaskStepOutput

运行步骤。

Source code in llama_index/core/agent/runner/base.py
463
464
465
466
467
468
469
470
471
472
473
474
475
@dispatcher.span
def run_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """运行步骤。"""
    step = validate_step_from_args(task_id, input, step, **kwargs)
    return self._run_step(
        task_id, step, input=input, mode=ChatResponseMode.WAIT, **kwargs
    )

arun_step async #

arun_step(
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any
) -> TaskStepOutput

运行步骤(异步)。

Source code in llama_index/core/agent/runner/base.py
477
478
479
480
481
482
483
484
485
486
487
488
489
@dispatcher.span
async def arun_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """运行步骤(异步)。"""
    step = validate_step_from_args(task_id, input, step, **kwargs)
    return await self._arun_step(
        task_id, step, input=input, mode=ChatResponseMode.WAIT, **kwargs
    )

stream_step #

stream_step(
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any
) -> TaskStepOutput

运行步骤(流式)。

Source code in llama_index/core/agent/runner/base.py
491
492
493
494
495
496
497
498
499
500
501
502
503
@dispatcher.span
def stream_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """运行步骤(流式)。"""
    step = validate_step_from_args(task_id, input, step, **kwargs)
    return self._run_step(
        task_id, step, input=input, mode=ChatResponseMode.STREAM, **kwargs
    )

astream_step async #

astream_step(
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any
) -> TaskStepOutput

运行步骤(异步流)。

Source code in llama_index/core/agent/runner/base.py
505
506
507
508
509
510
511
512
513
514
515
516
517
@dispatcher.span
async def astream_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """运行步骤(异步流)。"""
    step = validate_step_from_args(task_id, input, step, **kwargs)
    return await self._arun_step(
        task_id, step, input=input, mode=ChatResponseMode.STREAM, **kwargs
    )

finalize_response #

finalize_response(
    task_id: str,
    step_output: Optional[TaskStepOutput] = None,
) -> AGENT_CHAT_RESPONSE_TYPE

完成响应。

Source code in llama_index/core/agent/runner/base.py
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
@dispatcher.span
def finalize_response(
    self,
    task_id: str,
    step_output: Optional[TaskStepOutput] = None,
) -> AGENT_CHAT_RESPONSE_TYPE:
    """完成响应。"""
    if step_output is None:
        step_output = self.state.get_completed_steps(task_id)[-1]
    if not step_output.is_last:
        raise ValueError(
            "finalize_response can only be called on the last step output"
        )

    if not isinstance(
        step_output.output,
        (AgentChatResponse, StreamingAgentChatResponse),
    ):
        raise ValueError(
            "When `is_last` is True, cur_step_output.output must be "
            f"AGENT_CHAT_RESPONSE_TYPE: {step_output.output}"
        )

    # finalize task
    self.agent_worker.finalize_task(self.state.get_task(task_id))

    if self.delete_task_on_finish:
        self.delete_task(task_id)

    return cast(AGENT_CHAT_RESPONSE_TYPE, step_output.output)

undo_step #

undo_step(task_id: str) -> None

撤销上一步操作。

Source code in llama_index/core/agent/runner/base.py
724
725
726
def undo_step(self, task_id: str) -> None:
    """撤销上一步操作。"""
    raise NotImplementedError("undo_step not implemented")

ParallelAgentRunner #

Bases: BaseAgentRunner

并行代理运行程序。

在队列中并行执行步骤。需要异步支持。

Source code in llama_index/core/agent/runner/parallel.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
class ParallelAgentRunner(BaseAgentRunner):
    """并行代理运行程序。

    在队列中并行执行步骤。需要异步支持。"""

    def __init__(
        self,
        agent_worker: BaseAgentWorker,
        chat_history: Optional[List[ChatMessage]] = None,
        state: Optional[DAGAgentState] = None,
        memory: Optional[BaseMemory] = None,
        llm: Optional[LLM] = None,
        callback_manager: Optional[CallbackManager] = None,
        init_task_state_kwargs: Optional[dict] = None,
        delete_task_on_finish: bool = False,
    ) -> None:
        """初始化。"""
        self.memory = memory or ChatMemoryBuffer.from_defaults(chat_history, llm=llm)
        self.state = state or DAGAgentState()
        self.callback_manager = callback_manager or CallbackManager([])
        self.init_task_state_kwargs = init_task_state_kwargs or {}
        self.agent_worker = agent_worker
        self.delete_task_on_finish = delete_task_on_finish

    @property
    def chat_history(self) -> List[ChatMessage]:
        return self.memory.get_all()

    def reset(self) -> None:
        self.memory.reset()

    def create_task(self, input: str, **kwargs: Any) -> Task:
        """创建任务。"""
        task = Task(
            input=input,
            memory=self.memory,
            extra_state=self.init_task_state_kwargs,
            **kwargs,
        )
        # # put input into memory
        # self.memory.put(ChatMessage(content=input, role=MessageRole.USER))

        # add it to state
        # get initial step from task, and put it in the step queue
        initial_step = self.agent_worker.initialize_step(task)
        task_state = DAGTaskState(
            task=task,
            root_step=initial_step,
            step_queue=deque([initial_step]),
        )

        self.state.task_dict[task.task_id] = task_state

        return task

    def delete_task(
        self,
        task_id: str,
    ) -> None:
        """删除任务。

注意:这不会从内存中删除任何先前的执行。
"""
        self.state.task_dict.pop(task_id)

    def get_completed_tasks(self, **kwargs: Any) -> List[Task]:
        """获取已完成的任务。"""
        task_states = list(self.state.task_dict.values())
        return [
            task_state.task
            for task_state in task_states
            if len(task_state.completed_steps) > 0
            and task_state.completed_steps[-1].is_last
        ]

    def get_task_output(self, task_id: str) -> TaskStepOutput:
        """获取任务输出。"""
        task_state = self.state.task_dict[task_id]
        if len(task_state.completed_steps) == 0:
            raise ValueError(f"No completed steps for task_id: {task_id}")
        return task_state.completed_steps[-1]

    def list_tasks(self, **kwargs: Any) -> List[Task]:
        """列出任务。"""
        task_states = list(self.state.task_dict.values())
        return [task_state.task for task_state in task_states]

    def get_task(self, task_id: str, **kwargs: Any) -> Task:
        """获取任务。"""
        return self.state.get_task(task_id)

    def get_upcoming_steps(self, task_id: str, **kwargs: Any) -> List[TaskStep]:
        """获取接下来的步骤。"""
        return list(self.state.get_step_queue(task_id))

    def get_completed_steps(self, task_id: str, **kwargs: Any) -> List[TaskStepOutput]:
        """获取已完成的步骤。"""
        return self.state.get_completed_steps(task_id)

    def run_steps_in_queue(
        self,
        task_id: str,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> List[TaskStepOutput]:
        """执行队列中的步骤。

运行队列中的所有步骤,并清空队列。

假设所有步骤可以并行运行。
"""
        return asyncio_run(self.arun_steps_in_queue(task_id, mode=mode, **kwargs))

    async def arun_steps_in_queue(
        self,
        task_id: str,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> List[TaskStepOutput]:
        """执行队列中的所有步骤。

假设队列中的所有步骤都已准备就绪。
"""
        # first pop all steps from step_queue
        steps: List[TaskStep] = []
        while len(self.state.get_step_queue(task_id)) > 0:
            steps.append(self.state.get_step_queue(task_id).popleft())

        # take every item in the queue, and run it
        tasks = []
        for step in steps:
            tasks.append(self._arun_step(task_id, step=step, mode=mode, **kwargs))

        return await asyncio.gather(*tasks)

    def _run_step(
        self,
        task_id: str,
        step: Optional[TaskStep] = None,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """执行步骤。"""
        task = self.state.get_task(task_id)
        task_queue = self.state.get_step_queue(task_id)
        step = step or task_queue.popleft()

        if not step.is_ready:
            raise ValueError(f"Step {step.step_id} is not ready")

        if mode == ChatResponseMode.WAIT:
            cur_step_output: TaskStepOutput = self.agent_worker.run_step(
                step, task, **kwargs
            )
        elif mode == ChatResponseMode.STREAM:
            cur_step_output = self.agent_worker.stream_step(step, task, **kwargs)
        else:
            raise ValueError(f"Invalid mode: {mode}")

        for next_step in cur_step_output.next_steps:
            if next_step.is_ready:
                task_queue.append(next_step)

        # add cur_step_output to completed steps
        completed_steps = self.state.get_completed_steps(task_id)
        completed_steps.append(cur_step_output)

        return cur_step_output

    async def _arun_step(
        self,
        task_id: str,
        step: Optional[TaskStep] = None,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """执行步骤。"""
        task = self.state.get_task(task_id)
        task_queue = self.state.get_step_queue(task_id)
        step = step or task_queue.popleft()

        if not step.is_ready:
            raise ValueError(f"Step {step.step_id} is not ready")

        if mode == ChatResponseMode.WAIT:
            cur_step_output = await self.agent_worker.arun_step(step, task, **kwargs)
        elif mode == ChatResponseMode.STREAM:
            cur_step_output = await self.agent_worker.astream_step(step, task, **kwargs)
        else:
            raise ValueError(f"Invalid mode: {mode}")

        for next_step in cur_step_output.next_steps:
            if next_step.is_ready:
                task_queue.append(next_step)

        # add cur_step_output to completed steps
        completed_steps = self.state.get_completed_steps(task_id)
        completed_steps.append(cur_step_output)

        return cur_step_output

    def run_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """运行步骤。"""
        return self._run_step(task_id, step, mode=ChatResponseMode.WAIT, **kwargs)

    async def arun_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """运行步骤(异步)。"""
        return await self._arun_step(
            task_id, step, mode=ChatResponseMode.WAIT, **kwargs
        )

    def stream_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """运行步骤(流式)。"""
        return self._run_step(task_id, step, mode=ChatResponseMode.STREAM, **kwargs)

    async def astream_step(
        self,
        task_id: str,
        input: Optional[str] = None,
        step: Optional[TaskStep] = None,
        **kwargs: Any,
    ) -> TaskStepOutput:
        """运行步骤(异步流)。"""
        return await self._arun_step(
            task_id, step, mode=ChatResponseMode.STREAM, **kwargs
        )

    def finalize_response(
        self,
        task_id: str,
        step_output: Optional[TaskStepOutput] = None,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """完成响应。"""
        if step_output is None:
            step_output = self.state.get_completed_steps(task_id)[-1]
        if not step_output.is_last:
            raise ValueError(
                "finalize_response can only be called on the last step output"
            )

        if not isinstance(
            step_output.output,
            (AgentChatResponse, StreamingAgentChatResponse),
        ):
            raise ValueError(
                "When `is_last` is True, cur_step_output.output must be "
                f"AGENT_CHAT_RESPONSE_TYPE: {step_output.output}"
            )

        # finalize task
        self.agent_worker.finalize_task(self.state.get_task(task_id))

        if self.delete_task_on_finish:
            self.delete_task(task_id)

        return cast(AGENT_CHAT_RESPONSE_TYPE, step_output.output)

    def _chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
        mode: ChatResponseMode = ChatResponseMode.WAIT,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """与步骤执行器的聊天。"""
        if chat_history is not None:
            self.memory.set(chat_history)
        task = self.create_task(message)

        result_output = None
        while True:
            # pass step queue in as argument, assume step executor is stateless
            cur_step_outputs = self.run_steps_in_queue(task.task_id, mode=mode)

            # check if a step output is_last
            is_last = any(
                cur_step_output.is_last for cur_step_output in cur_step_outputs
            )
            if is_last:
                if len(cur_step_outputs) > 1:
                    raise ValueError(
                        "More than one step output returned in final step."
                    )
                cur_step_output = cur_step_outputs[0]
                result_output = cur_step_output
                break

        return self.finalize_response(
            task.task_id,
            result_output,
        )

    async def _achat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
        mode: ChatResponseMode = ChatResponseMode.WAIT,
    ) -> AGENT_CHAT_RESPONSE_TYPE:
        """与步骤执行器的聊天。"""
        if chat_history is not None:
            self.memory.set(chat_history)
        task = self.create_task(message)

        result_output = None
        while True:
            # pass step queue in as argument, assume step executor is stateless
            cur_step_outputs = await self.arun_steps_in_queue(task.task_id, mode=mode)

            # check if a step output is_last
            is_last = any(
                cur_step_output.is_last for cur_step_output in cur_step_outputs
            )
            if is_last:
                if len(cur_step_outputs) > 1:
                    raise ValueError(
                        "More than one step output returned in final step."
                    )
                cur_step_output = cur_step_outputs[0]
                result_output = cur_step_output
                break

        return self.finalize_response(
            task.task_id,
            result_output,
        )

    @trace_method("chat")
    def chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
    ) -> AgentChatResponse:
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = self._chat(
                message, chat_history, tool_choice, mode=ChatResponseMode.WAIT
            )
            assert isinstance(chat_response, AgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    @trace_method("chat")
    async def achat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
    ) -> AgentChatResponse:
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = await self._achat(
                message, chat_history, tool_choice, mode=ChatResponseMode.WAIT
            )
            assert isinstance(chat_response, AgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    @trace_method("chat")
    def stream_chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
    ) -> StreamingAgentChatResponse:
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = self._chat(
                message, chat_history, tool_choice, mode=ChatResponseMode.STREAM
            )
            assert isinstance(chat_response, StreamingAgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    @trace_method("chat")
    async def astream_chat(
        self,
        message: str,
        chat_history: Optional[List[ChatMessage]] = None,
        tool_choice: Union[str, dict] = "auto",
    ) -> StreamingAgentChatResponse:
        with self.callback_manager.event(
            CBEventType.AGENT_STEP,
            payload={EventPayload.MESSAGES: [message]},
        ) as e:
            chat_response = await self._achat(
                message, chat_history, tool_choice, mode=ChatResponseMode.STREAM
            )
            assert isinstance(chat_response, StreamingAgentChatResponse)
            e.on_end(payload={EventPayload.RESPONSE: chat_response})
        return chat_response

    def undo_step(self, task_id: str) -> None:
        """撤销上一步操作。"""
        raise NotImplementedError("undo_step not implemented")

create_task #

create_task(input: str, **kwargs: Any) -> Task

创建任务。

Source code in llama_index/core/agent/runner/parallel.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def create_task(self, input: str, **kwargs: Any) -> Task:
    """创建任务。"""
    task = Task(
        input=input,
        memory=self.memory,
        extra_state=self.init_task_state_kwargs,
        **kwargs,
    )
    # # put input into memory
    # self.memory.put(ChatMessage(content=input, role=MessageRole.USER))

    # add it to state
    # get initial step from task, and put it in the step queue
    initial_step = self.agent_worker.initialize_step(task)
    task_state = DAGTaskState(
        task=task,
        root_step=initial_step,
        step_queue=deque([initial_step]),
    )

    self.state.task_dict[task.task_id] = task_state

    return task

delete_task #

delete_task(task_id: str) -> None

删除任务。

注意:这不会从内存中删除任何先前的执行。

Source code in llama_index/core/agent/runner/parallel.py
127
128
129
130
131
132
133
134
135
    def delete_task(
        self,
        task_id: str,
    ) -> None:
        """删除任务。

注意:这不会从内存中删除任何先前的执行。
"""
        self.state.task_dict.pop(task_id)

get_completed_tasks #

get_completed_tasks(**kwargs: Any) -> List[Task]

获取已完成的任务。

Source code in llama_index/core/agent/runner/parallel.py
137
138
139
140
141
142
143
144
145
def get_completed_tasks(self, **kwargs: Any) -> List[Task]:
    """获取已完成的任务。"""
    task_states = list(self.state.task_dict.values())
    return [
        task_state.task
        for task_state in task_states
        if len(task_state.completed_steps) > 0
        and task_state.completed_steps[-1].is_last
    ]

get_task_output #

get_task_output(task_id: str) -> TaskStepOutput

获取任务输出。

Source code in llama_index/core/agent/runner/parallel.py
147
148
149
150
151
152
def get_task_output(self, task_id: str) -> TaskStepOutput:
    """获取任务输出。"""
    task_state = self.state.task_dict[task_id]
    if len(task_state.completed_steps) == 0:
        raise ValueError(f"No completed steps for task_id: {task_id}")
    return task_state.completed_steps[-1]

list_tasks #

list_tasks(**kwargs: Any) -> List[Task]

列出任务。

Source code in llama_index/core/agent/runner/parallel.py
154
155
156
157
def list_tasks(self, **kwargs: Any) -> List[Task]:
    """列出任务。"""
    task_states = list(self.state.task_dict.values())
    return [task_state.task for task_state in task_states]

get_task #

get_task(task_id: str, **kwargs: Any) -> Task

获取任务。

Source code in llama_index/core/agent/runner/parallel.py
159
160
161
def get_task(self, task_id: str, **kwargs: Any) -> Task:
    """获取任务。"""
    return self.state.get_task(task_id)

get_upcoming_steps #

get_upcoming_steps(
    task_id: str, **kwargs: Any
) -> List[TaskStep]

获取接下来的步骤。

Source code in llama_index/core/agent/runner/parallel.py
163
164
165
def get_upcoming_steps(self, task_id: str, **kwargs: Any) -> List[TaskStep]:
    """获取接下来的步骤。"""
    return list(self.state.get_step_queue(task_id))

get_completed_steps #

get_completed_steps(
    task_id: str, **kwargs: Any
) -> List[TaskStepOutput]

获取已完成的步骤。

Source code in llama_index/core/agent/runner/parallel.py
167
168
169
def get_completed_steps(self, task_id: str, **kwargs: Any) -> List[TaskStepOutput]:
    """获取已完成的步骤。"""
    return self.state.get_completed_steps(task_id)

run_steps_in_queue #

run_steps_in_queue(
    task_id: str,
    mode: ChatResponseMode = ChatResponseMode.WAIT,
    **kwargs: Any
) -> List[TaskStepOutput]

执行队列中的步骤。

运行队列中的所有步骤,并清空队列。

假设所有步骤可以并行运行。

Source code in llama_index/core/agent/runner/parallel.py
171
172
173
174
175
176
177
178
179
180
181
182
183
    def run_steps_in_queue(
        self,
        task_id: str,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> List[TaskStepOutput]:
        """执行队列中的步骤。

运行队列中的所有步骤,并清空队列。

假设所有步骤可以并行运行。
"""
        return asyncio_run(self.arun_steps_in_queue(task_id, mode=mode, **kwargs))

arun_steps_in_queue async #

arun_steps_in_queue(
    task_id: str,
    mode: ChatResponseMode = ChatResponseMode.WAIT,
    **kwargs: Any
) -> List[TaskStepOutput]

执行队列中的所有步骤。

假设队列中的所有步骤都已准备就绪。

Source code in llama_index/core/agent/runner/parallel.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
    async def arun_steps_in_queue(
        self,
        task_id: str,
        mode: ChatResponseMode = ChatResponseMode.WAIT,
        **kwargs: Any,
    ) -> List[TaskStepOutput]:
        """执行队列中的所有步骤。

假设队列中的所有步骤都已准备就绪。
"""
        # first pop all steps from step_queue
        steps: List[TaskStep] = []
        while len(self.state.get_step_queue(task_id)) > 0:
            steps.append(self.state.get_step_queue(task_id).popleft())

        # take every item in the queue, and run it
        tasks = []
        for step in steps:
            tasks.append(self._arun_step(task_id, step=step, mode=mode, **kwargs))

        return await asyncio.gather(*tasks)

run_step #

run_step(
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any
) -> TaskStepOutput

运行步骤。

Source code in llama_index/core/agent/runner/parallel.py
273
274
275
276
277
278
279
280
281
def run_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """运行步骤。"""
    return self._run_step(task_id, step, mode=ChatResponseMode.WAIT, **kwargs)

arun_step async #

arun_step(
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any
) -> TaskStepOutput

运行步骤(异步)。

Source code in llama_index/core/agent/runner/parallel.py
283
284
285
286
287
288
289
290
291
292
293
async def arun_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """运行步骤(异步)。"""
    return await self._arun_step(
        task_id, step, mode=ChatResponseMode.WAIT, **kwargs
    )

stream_step #

stream_step(
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any
) -> TaskStepOutput

运行步骤(流式)。

Source code in llama_index/core/agent/runner/parallel.py
295
296
297
298
299
300
301
302
303
def stream_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """运行步骤(流式)。"""
    return self._run_step(task_id, step, mode=ChatResponseMode.STREAM, **kwargs)

astream_step async #

astream_step(
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any
) -> TaskStepOutput

运行步骤(异步流)。

Source code in llama_index/core/agent/runner/parallel.py
305
306
307
308
309
310
311
312
313
314
315
async def astream_step(
    self,
    task_id: str,
    input: Optional[str] = None,
    step: Optional[TaskStep] = None,
    **kwargs: Any,
) -> TaskStepOutput:
    """运行步骤(异步流)。"""
    return await self._arun_step(
        task_id, step, mode=ChatResponseMode.STREAM, **kwargs
    )

finalize_response #

finalize_response(
    task_id: str,
    step_output: Optional[TaskStepOutput] = None,
) -> AGENT_CHAT_RESPONSE_TYPE

完成响应。

Source code in llama_index/core/agent/runner/parallel.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def finalize_response(
    self,
    task_id: str,
    step_output: Optional[TaskStepOutput] = None,
) -> AGENT_CHAT_RESPONSE_TYPE:
    """完成响应。"""
    if step_output is None:
        step_output = self.state.get_completed_steps(task_id)[-1]
    if not step_output.is_last:
        raise ValueError(
            "finalize_response can only be called on the last step output"
        )

    if not isinstance(
        step_output.output,
        (AgentChatResponse, StreamingAgentChatResponse),
    ):
        raise ValueError(
            "When `is_last` is True, cur_step_output.output must be "
            f"AGENT_CHAT_RESPONSE_TYPE: {step_output.output}"
        )

    # finalize task
    self.agent_worker.finalize_task(self.state.get_task(task_id))

    if self.delete_task_on_finish:
        self.delete_task(task_id)

    return cast(AGENT_CHAT_RESPONSE_TYPE, step_output.output)

undo_step #

undo_step(task_id: str) -> None

撤销上一步操作。

Source code in llama_index/core/agent/runner/parallel.py
489
490
491
def undo_step(self, task_id: str) -> None:
    """撤销上一步操作。"""
    raise NotImplementedError("undo_step not implemented")

Workers#

CustomSimpleAgentWorker #

Bases: BaseModel, BaseAgentWorker

自定义简单的代理工作程序。

这里的“简单”是指一些脚手架已经设置好了。 假设: - 假设代理有工具、llm、回调管理器和工具检索器 - 有一个from_tools便利函数 - 假设代理是顺序的,并且不接受任何额外的中间输入。

Args: 工具(Sequence[BaseTool]):用于推理的工具 llm(LLM):要使用的LLM callback_manager(CallbackManager):回调管理器 tool_retriever(Optional[ObjectRetriever[BaseTool]]):工具检索器 verbose(bool):是否打印推理步骤

Source code in llama_index/core/agent/custom/simple.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
class CustomSimpleAgentWorker(BaseModel, BaseAgentWorker):
    """自定义简单的代理工作程序。

这里的“简单”是指一些脚手架已经设置好了。
假设:
- 假设代理有工具、llm、回调管理器和工具检索器
- 有一个`from_tools`便利函数
- 假设代理是顺序的,并且不接受任何额外的中间输入。

Args:
工具(Sequence[BaseTool]):用于推理的工具
llm(LLM):要使用的LLM
callback_manager(CallbackManager):回调管理器
tool_retriever(Optional[ObjectRetriever[BaseTool]]):工具检索器
verbose(bool):是否打印推理步骤"""

    tools: Sequence[BaseTool] = Field(..., description="Tools to use for reasoning")
    llm: LLM = Field(..., description="LLM to use")
    callback_manager: CallbackManager = Field(
        default_factory=lambda: CallbackManager([]), exclude=True
    )
    tool_retriever: Optional[ObjectRetriever[BaseTool]] = Field(
        default=None, description="Tool retriever"
    )
    verbose: bool = Field(False, description="Whether to print out reasoning steps")

    _get_tools: Callable[[str], Sequence[BaseTool]] = PrivateAttr()

    class Config:
        arbitrary_types_allowed = True

    def __init__(
        self,
        tools: Sequence[BaseTool],
        llm: LLM,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
        **kwargs: Any,
    ) -> None:
        if len(tools) > 0 and tool_retriever is not None:
            raise ValueError("Cannot specify both tools and tool_retriever")
        elif len(tools) > 0:
            self._get_tools = lambda _: tools
        elif tool_retriever is not None:
            tool_retriever_c = cast(ObjectRetriever[BaseTool], tool_retriever)
            self._get_tools = lambda message: tool_retriever_c.retrieve(message)
        else:
            self._get_tools = lambda _: []

        callback_manager = callback_manager or CallbackManager([])

        super().__init__(
            tools=tools,
            llm=llm,
            callback_manager=callback_manager or CallbackManager([]),
            tool_retriever=tool_retriever,
            verbose=verbose,
            **kwargs,
        )

    @classmethod
    def from_tools(
        cls,
        tools: Optional[Sequence[BaseTool]] = None,
        tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
        llm: Optional[LLM] = None,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> "CustomSimpleAgentWorker":
        """方便的构造方法,用于一组BaseTools(可选)。"""
        llm = llm or Settings.llm
        if callback_manager is not None:
            llm.callback_manager = callback_manager
        return cls(
            tools=tools or [],
            tool_retriever=tool_retriever,
            llm=llm,
            callback_manager=callback_manager or CallbackManager([]),
            verbose=verbose,
            **kwargs,
        )

    @abstractmethod
    def _initialize_state(self, task: Task, **kwargs: Any) -> Dict[str, Any]:
        """初始化状态。"""

    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """从任务中初始化步骤。"""
        sources: List[ToolOutput] = []
        # temporary memory for new messages
        new_memory = ChatMemoryBuffer.from_defaults()

        # initialize initial state
        initial_state = {
            "sources": sources,
            "memory": new_memory,
        }

        step_state = self._initialize_state(task, **kwargs)
        # if intersecting keys, error
        if set(step_state.keys()).intersection(set(initial_state.keys())):
            raise ValueError(
                f"Step state keys {step_state.keys()} and initial state keys {initial_state.keys()} intersect."
                f"*NOTE*: initial state keys {initial_state.keys()} are reserved."
            )
        step_state.update(initial_state)

        return TaskStep(
            task_id=task.task_id,
            step_id=str(uuid.uuid4()),
            input=task.input,
            step_state=step_state,
        )

    def get_tools(self, input: str) -> List[AsyncBaseTool]:
        """获取工具。"""
        return [adapt_to_async_tool(t) for t in self._get_tools(input)]

    def _get_task_step_response(
        self, agent_response: AGENT_CHAT_RESPONSE_TYPE, step: TaskStep, is_done: bool
    ) -> TaskStepOutput:
        """获取任务步骤响应。"""
        if is_done:
            new_steps = []
        else:
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                )
            ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done,
            next_steps=new_steps,
        )

    @abstractmethod
    def _run_step(
        self, state: Dict[str, Any], task: Task, input: Optional[str] = None
    ) -> Tuple[AgentChatResponse, bool]:
        """运行步骤。

返回:
    元组(agent_response, is_done)
"""

    async def _arun_step(
        self, state: Dict[str, Any], task: Task, input: Optional[str] = None
    ) -> Tuple[AgentChatResponse, bool]:
        """运行步骤(异步)。

如果要异步运行步骤,可以重写此方法。

返回:
    代理响应和是否完成的元组
"""
        raise NotImplementedError(
            "This agent does not support async." "Please implement _arun_step."
        )

    @trace_method("run_step")
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """运行步骤。"""
        agent_response, is_done = self._run_step(
            step.step_state, task, input=step.input
        )
        response = self._get_task_step_response(agent_response, step, is_done)
        # sync step state with task state
        task.extra_state.update(step.step_state)
        return response

    @trace_method("run_step")
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """运行步骤(异步)。"""
        agent_response, is_done = await self._arun_step(
            step.step_state, task, input=step.input
        )
        response = self._get_task_step_response(agent_response, step, is_done)
        task.extra_state.update(step.step_state)
        return response

    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """运行步骤(流式)。"""
        raise NotImplementedError("This agent does not support streaming.")

    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """运行步骤(异步流)。"""
        raise NotImplementedError("This agent does not support streaming.")

    @abstractmethod
    def _finalize_task(self, state: Dict[str, Any], **kwargs: Any) -> None:
        """完成任务,在所有步骤都完成之后。

State是所有步骤的状态。
"""

    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """完成任务,在所有步骤都完成之后。"""
        # add new messages to memory
        task.memory.set(task.memory.get() + task.extra_state["memory"].get_all())
        # reset new memory
        task.extra_state["memory"].reset()
        self._finalize_task(task.extra_state, **kwargs)

    def set_callback_manager(self, callback_manager: CallbackManager) -> None:
        """设置回调管理器。"""
        # TODO: make this abstractmethod (right now will break some agent impls)
        self.callback_manager = callback_manager

from_tools classmethod #

from_tools(
    tools: Optional[Sequence[BaseTool]] = None,
    tool_retriever: Optional[
        ObjectRetriever[BaseTool]
    ] = None,
    llm: Optional[LLM] = None,
    callback_manager: Optional[CallbackManager] = None,
    verbose: bool = False,
    **kwargs: Any
) -> CustomSimpleAgentWorker

方便的构造方法,用于一组BaseTools(可选)。

Source code in llama_index/core/agent/custom/simple.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
@classmethod
def from_tools(
    cls,
    tools: Optional[Sequence[BaseTool]] = None,
    tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
    llm: Optional[LLM] = None,
    callback_manager: Optional[CallbackManager] = None,
    verbose: bool = False,
    **kwargs: Any,
) -> "CustomSimpleAgentWorker":
    """方便的构造方法,用于一组BaseTools(可选)。"""
    llm = llm or Settings.llm
    if callback_manager is not None:
        llm.callback_manager = callback_manager
    return cls(
        tools=tools or [],
        tool_retriever=tool_retriever,
        llm=llm,
        callback_manager=callback_manager or CallbackManager([]),
        verbose=verbose,
        **kwargs,
    )

initialize_step #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

从任务中初始化步骤。

Source code in llama_index/core/agent/custom/simple.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """从任务中初始化步骤。"""
    sources: List[ToolOutput] = []
    # temporary memory for new messages
    new_memory = ChatMemoryBuffer.from_defaults()

    # initialize initial state
    initial_state = {
        "sources": sources,
        "memory": new_memory,
    }

    step_state = self._initialize_state(task, **kwargs)
    # if intersecting keys, error
    if set(step_state.keys()).intersection(set(initial_state.keys())):
        raise ValueError(
            f"Step state keys {step_state.keys()} and initial state keys {initial_state.keys()} intersect."
            f"*NOTE*: initial state keys {initial_state.keys()} are reserved."
        )
    step_state.update(initial_state)

    return TaskStep(
        task_id=task.task_id,
        step_id=str(uuid.uuid4()),
        input=task.input,
        step_state=step_state,
    )

get_tools #

get_tools(input: str) -> List[AsyncBaseTool]

获取工具。

Source code in llama_index/core/agent/custom/simple.py
155
156
157
def get_tools(self, input: str) -> List[AsyncBaseTool]:
    """获取工具。"""
    return [adapt_to_async_tool(t) for t in self._get_tools(input)]

run_step #

run_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤。

Source code in llama_index/core/agent/custom/simple.py
205
206
207
208
209
210
211
212
213
214
@trace_method("run_step")
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """运行步骤。"""
    agent_response, is_done = self._run_step(
        step.step_state, task, input=step.input
    )
    response = self._get_task_step_response(agent_response, step, is_done)
    # sync step state with task state
    task.extra_state.update(step.step_state)
    return response

arun_step async #

arun_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(异步)。

Source code in llama_index/core/agent/custom/simple.py
216
217
218
219
220
221
222
223
224
225
226
@trace_method("run_step")
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """运行步骤(异步)。"""
    agent_response, is_done = await self._arun_step(
        step.step_state, task, input=step.input
    )
    response = self._get_task_step_response(agent_response, step, is_done)
    task.extra_state.update(step.step_state)
    return response

stream_step #

stream_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(流式)。

Source code in llama_index/core/agent/custom/simple.py
228
229
230
231
@trace_method("run_step")
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """运行步骤(流式)。"""
    raise NotImplementedError("This agent does not support streaming.")

astream_step async #

astream_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(异步流)。

Source code in llama_index/core/agent/custom/simple.py
233
234
235
236
237
238
@trace_method("run_step")
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """运行步骤(异步流)。"""
    raise NotImplementedError("This agent does not support streaming.")

finalize_task #

finalize_task(task: Task, **kwargs: Any) -> None

完成任务,在所有步骤都完成之后。

Source code in llama_index/core/agent/custom/simple.py
247
248
249
250
251
252
253
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """完成任务,在所有步骤都完成之后。"""
    # add new messages to memory
    task.memory.set(task.memory.get() + task.extra_state["memory"].get_all())
    # reset new memory
    task.extra_state["memory"].reset()
    self._finalize_task(task.extra_state, **kwargs)

set_callback_manager #

set_callback_manager(
    callback_manager: CallbackManager,
) -> None

设置回调管理器。

Source code in llama_index/core/agent/custom/simple.py
255
256
257
258
def set_callback_manager(self, callback_manager: CallbackManager) -> None:
    """设置回调管理器。"""
    # TODO: make this abstractmethod (right now will break some agent impls)
    self.callback_manager = callback_manager

MultimodalReActAgentWorker #

Bases: BaseAgentWorker

多模态ReAct代理工作程序。

注意:这是一个测试版功能。

Source code in llama_index/core/agent/react_multimodal/step.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
class MultimodalReActAgentWorker(BaseAgentWorker):
    """多模态ReAct代理工作程序。

**注意**:这是一个测试版功能。"""

    def __init__(
        self,
        tools: Sequence[BaseTool],
        multi_modal_llm: MultiModalLLM,
        max_iterations: int = 10,
        react_chat_formatter: Optional[ReActChatFormatter] = None,
        output_parser: Optional[ReActOutputParser] = None,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
    ) -> None:
        self._multi_modal_llm = multi_modal_llm
        self.callback_manager = callback_manager or CallbackManager([])
        self._max_iterations = max_iterations
        self._react_chat_formatter = react_chat_formatter or ReActChatFormatter(
            system_header=REACT_MM_CHAT_SYSTEM_HEADER
        )
        self._output_parser = output_parser or ReActOutputParser()
        self._verbose = verbose

        try:
            from llama_index.multi_modal_llms.openai.utils import (
                generate_openai_multi_modal_chat_message,
            )  # pants: no-infer-dep

            self._add_user_step_to_reasoning = partial(
                add_user_step_to_reasoning,
                generate_chat_message_fn=generate_openai_multi_modal_chat_message,
            )
        except ImportError:
            raise ImportError(
                "`llama-index-multi-modal-llms-openai` package cannot be found. "
                "Please install it by using `pip install `llama-index-multi-modal-llms-openai`"
            )

        if len(tools) > 0 and tool_retriever is not None:
            raise ValueError("Cannot specify both tools and tool_retriever")
        elif len(tools) > 0:
            self._get_tools = lambda _: tools
        elif tool_retriever is not None:
            tool_retriever_c = cast(ObjectRetriever[BaseTool], tool_retriever)
            self._get_tools = lambda message: tool_retriever_c.retrieve(message)
        else:
            self._get_tools = lambda _: []

    @classmethod
    def from_tools(
        cls,
        tools: Optional[Sequence[BaseTool]] = None,
        tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
        multi_modal_llm: Optional[MultiModalLLM] = None,
        max_iterations: int = 10,
        react_chat_formatter: Optional[ReActChatFormatter] = None,
        output_parser: Optional[ReActOutputParser] = None,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> "MultimodalReActAgentWorker":
        """方便的构造方法,从BaseTools的集合中(可选)。

注意:kwargs在这一点上应该已经用尽。换句话说,各种上游组件,如BaseSynthesizer(响应合成器)或BaseRetriever应该在它们的构造中从各自的kwargs中取出。

返回:
    ReActAgent
"""
        if multi_modal_llm is None:
            try:
                from llama_index.multi_modal_llms.openai import (
                    OpenAIMultiModal,
                )  # pants: no-infer-dep

                multi_modal_llm = multi_modal_llm or OpenAIMultiModal(
                    model="gpt-4-vision-preview", max_new_tokens=1000
                )
            except ImportError:
                raise ImportError(
                    "`llama-index-multi-modal-llms-openai` package cannot be found. "
                    "Please install it by using `pip install `llama-index-multi-modal-llms-openai`"
                )
        return cls(
            tools=tools or [],
            tool_retriever=tool_retriever,
            multi_modal_llm=multi_modal_llm,
            max_iterations=max_iterations,
            react_chat_formatter=react_chat_formatter,
            output_parser=output_parser,
            callback_manager=callback_manager,
            verbose=verbose,
        )

    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """从任务中初始化步骤。"""
        sources: List[ToolOutput] = []
        current_reasoning: List[BaseReasoningStep] = []
        # temporary memory for new messages
        new_memory = ChatMemoryBuffer.from_defaults()

        # validation
        if "image_docs" not in task.extra_state:
            raise ValueError("Image docs not found in task extra state.")

        # initialize task state
        task_state = {
            "sources": sources,
            "current_reasoning": current_reasoning,
            "new_memory": new_memory,
        }
        task.extra_state.update(task_state)

        return TaskStep(
            task_id=task.task_id,
            step_id=str(uuid.uuid4()),
            input=task.input,
            step_state={"is_first": True, "image_docs": task.extra_state["image_docs"]},
        )

    def get_tools(self, input: str) -> List[AsyncBaseTool]:
        """获取工具。"""
        return [adapt_to_async_tool(t) for t in self._get_tools(input)]

    def _extract_reasoning_step(
        self, output: ChatResponse, is_streaming: bool = False
    ) -> Tuple[str, List[BaseReasoningStep], bool]:
        """从给定的输出中提取推理步骤。

该方法解析输出的消息内容,提取推理步骤,并确定处理是否完成。它还对输出进行验证检查并处理可能的错误。
"""
        if output.message.content is None:
            raise ValueError("Got empty message.")
        message_content = output.message.content
        current_reasoning = []
        try:
            reasoning_step = self._output_parser.parse(message_content, is_streaming)
        except BaseException as exc:
            raise ValueError(f"Could not parse output: {message_content}") from exc
        if self._verbose:
            print_text(f"{reasoning_step.get_content()}\n", color="pink")
        current_reasoning.append(reasoning_step)

        if reasoning_step.is_done:
            return message_content, current_reasoning, True

        reasoning_step = cast(ActionReasoningStep, reasoning_step)
        if not isinstance(reasoning_step, ActionReasoningStep):
            raise ValueError(f"Expected ActionReasoningStep, got {reasoning_step}")

        return message_content, current_reasoning, False

    def _process_actions(
        self,
        task: Task,
        tools: Sequence[AsyncBaseTool],
        output: ChatResponse,
        is_streaming: bool = False,
    ) -> Tuple[List[BaseReasoningStep], bool]:
        tools_dict: Dict[str, AsyncBaseTool] = {
            tool.metadata.get_name(): tool for tool in tools
        }
        _, current_reasoning, is_done = self._extract_reasoning_step(
            output, is_streaming
        )

        if is_done:
            return current_reasoning, True

        # call tool with input
        reasoning_step = cast(ActionReasoningStep, current_reasoning[-1])
        tool = tools_dict[reasoning_step.action]
        with self.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={
                EventPayload.FUNCTION_CALL: reasoning_step.action_input,
                EventPayload.TOOL: tool.metadata,
            },
        ) as event:
            tool_output = tool.call(**reasoning_step.action_input)
            event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(tool_output)})

        task.extra_state["sources"].append(tool_output)

        observation_step = ObservationReasoningStep(
            observation=str(tool_output), return_direct=tool.metadata.return_direct
        )
        current_reasoning.append(observation_step)
        if self._verbose:
            print_text(f"{observation_step.get_content()}\n", color="blue")
        return current_reasoning, tool.metadata.return_direct

    async def _aprocess_actions(
        self,
        task: Task,
        tools: Sequence[AsyncBaseTool],
        output: ChatResponse,
        is_streaming: bool = False,
    ) -> Tuple[List[BaseReasoningStep], bool]:
        tools_dict = {tool.metadata.name: tool for tool in tools}
        _, current_reasoning, is_done = self._extract_reasoning_step(
            output, is_streaming
        )

        if is_done:
            return current_reasoning, True

        # call tool with input
        reasoning_step = cast(ActionReasoningStep, current_reasoning[-1])
        tool = tools_dict[reasoning_step.action]
        with self.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={
                EventPayload.FUNCTION_CALL: reasoning_step.action_input,
                EventPayload.TOOL: tool.metadata,
            },
        ) as event:
            tool_output = await tool.acall(**reasoning_step.action_input)
            event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(tool_output)})

        task.extra_state["sources"].append(tool_output)

        observation_step = ObservationReasoningStep(
            observation=str(tool_output), return_direct=tool.metadata.return_direct
        )
        current_reasoning.append(observation_step)
        if self._verbose:
            print_text(f"{observation_step.get_content()}\n", color="blue")
        return current_reasoning, tool.metadata.return_direct

    def _get_response(
        self,
        current_reasoning: List[BaseReasoningStep],
        sources: List[ToolOutput],
    ) -> AgentChatResponse:
        """从推理步骤中获取响应。"""
        if len(current_reasoning) == 0:
            raise ValueError("No reasoning steps were taken.")
        elif len(current_reasoning) == self._max_iterations:
            raise ValueError("Reached max iterations.")

        if isinstance(current_reasoning[-1], ResponseReasoningStep):
            response_step = cast(ResponseReasoningStep, current_reasoning[-1])
            response_str = response_step.response
        elif (
            isinstance(current_reasoning[-1], ObservationReasoningStep)
            and current_reasoning[-1].return_direct
        ):
            response_str = current_reasoning[-1].observation
        else:
            response_str = current_reasoning[-1].get_content()

        # TODO: add sources from reasoning steps
        return AgentChatResponse(response=response_str, sources=sources)

    def _get_task_step_response(
        self, agent_response: AGENT_CHAT_RESPONSE_TYPE, step: TaskStep, is_done: bool
    ) -> TaskStepOutput:
        """获取任务步骤响应。"""
        if is_done:
            new_steps = []
        else:
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                )
            ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done,
            next_steps=new_steps,
        )

    def _run_step(
        self,
        step: TaskStep,
        task: Task,
    ) -> TaskStepOutput:
        """运行步骤。"""
        # This is either not None on the first step or if the user specifies
        # an intermediate step in the middle
        if step.input is not None:
            self._add_user_step_to_reasoning(
                step=step,
                memory=task.extra_state["new_memory"],
                current_reasoning=task.extra_state["current_reasoning"],
                verbose=self._verbose,
            )
        # TODO: see if we want to do step-based inputs
        tools = self.get_tools(task.input)

        input_chat = self._react_chat_formatter.format(
            tools,
            chat_history=task.memory.get_all()
            + task.extra_state["new_memory"].get_all(),
            current_reasoning=task.extra_state["current_reasoning"],
        )

        # send prompt
        chat_response = self._multi_modal_llm.chat(input_chat)
        # given react prompt outputs, call tools or return response
        reasoning_steps, is_done = self._process_actions(
            task, tools, output=chat_response
        )
        task.extra_state["current_reasoning"].extend(reasoning_steps)
        agent_response = self._get_response(
            task.extra_state["current_reasoning"], task.extra_state["sources"]
        )
        if is_done:
            task.extra_state["new_memory"].put(
                ChatMessage(content=agent_response.response, role=MessageRole.ASSISTANT)
            )

        return self._get_task_step_response(agent_response, step, is_done)

    async def _arun_step(
        self,
        step: TaskStep,
        task: Task,
    ) -> TaskStepOutput:
        """运行步骤。"""
        if step.input is not None:
            self._add_user_step_to_reasoning(
                step=step,
                memory=task.extra_state["new_memory"],
                current_reasoning=task.extra_state["current_reasoning"],
                verbose=self._verbose,
            )
        # TODO: see if we want to do step-based inputs
        tools = self.get_tools(task.input)

        input_chat = self._react_chat_formatter.format(
            tools,
            chat_history=task.memory.get_all()
            + task.extra_state["new_memory"].get_all(),
            current_reasoning=task.extra_state["current_reasoning"],
        )
        # send prompt
        chat_response = await self._multi_modal_llm.achat(input_chat)
        # given react prompt outputs, call tools or return response
        reasoning_steps, is_done = await self._aprocess_actions(
            task, tools, output=chat_response
        )
        task.extra_state["current_reasoning"].extend(reasoning_steps)
        agent_response = self._get_response(
            task.extra_state["current_reasoning"], task.extra_state["sources"]
        )
        if is_done:
            task.extra_state["new_memory"].put(
                ChatMessage(content=agent_response.response, role=MessageRole.ASSISTANT)
            )

        return self._get_task_step_response(agent_response, step, is_done)

    def _run_step_stream(
        self,
        step: TaskStep,
        task: Task,
    ) -> TaskStepOutput:
        """运行步骤。"""
        raise NotImplementedError("Stream step not implemented yet.")

    async def _arun_step_stream(
        self,
        step: TaskStep,
        task: Task,
    ) -> TaskStepOutput:
        """运行步骤。"""
        raise NotImplementedError("Stream step not implemented yet.")

    @trace_method("run_step")
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """运行步骤。"""
        return self._run_step(step, task)

    @trace_method("run_step")
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """运行步骤(异步)。"""
        return await self._arun_step(step, task)

    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """运行步骤(流式)。"""
        # TODO: figure out if we need a different type for TaskStepOutput
        return self._run_step_stream(step, task)

    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """运行步骤(异步流)。"""
        return await self._arun_step_stream(step, task)

    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """完成任务,在所有步骤都完成之后。"""
        # add new messages to memory
        task.memory.set(
            task.memory.get_all() + task.extra_state["new_memory"].get_all()
        )
        # reset new memory
        task.extra_state["new_memory"].reset()

    def set_callback_manager(self, callback_manager: CallbackManager) -> None:
        """设置回调管理器。"""
        # TODO: make this abstractmethod (right now will break some agent impls)
        self.callback_manager = callback_manager

from_tools classmethod #

from_tools(
    tools: Optional[Sequence[BaseTool]] = None,
    tool_retriever: Optional[
        ObjectRetriever[BaseTool]
    ] = None,
    multi_modal_llm: Optional[MultiModalLLM] = None,
    max_iterations: int = 10,
    react_chat_formatter: Optional[
        ReActChatFormatter
    ] = None,
    output_parser: Optional[ReActOutputParser] = None,
    callback_manager: Optional[CallbackManager] = None,
    verbose: bool = False,
    **kwargs: Any
) -> MultimodalReActAgentWorker

方便的构造方法,从BaseTools的集合中(可选)。

注意:kwargs在这一点上应该已经用尽。换句话说,各种上游组件,如BaseSynthesizer(响应合成器)或BaseRetriever应该在它们的构造中从各自的kwargs中取出。

返回: ReActAgent

Source code in llama_index/core/agent/react_multimodal/step.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
    @classmethod
    def from_tools(
        cls,
        tools: Optional[Sequence[BaseTool]] = None,
        tool_retriever: Optional[ObjectRetriever[BaseTool]] = None,
        multi_modal_llm: Optional[MultiModalLLM] = None,
        max_iterations: int = 10,
        react_chat_formatter: Optional[ReActChatFormatter] = None,
        output_parser: Optional[ReActOutputParser] = None,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> "MultimodalReActAgentWorker":
        """方便的构造方法,从BaseTools的集合中(可选)。

注意:kwargs在这一点上应该已经用尽。换句话说,各种上游组件,如BaseSynthesizer(响应合成器)或BaseRetriever应该在它们的构造中从各自的kwargs中取出。

返回:
    ReActAgent
"""
        if multi_modal_llm is None:
            try:
                from llama_index.multi_modal_llms.openai import (
                    OpenAIMultiModal,
                )  # pants: no-infer-dep

                multi_modal_llm = multi_modal_llm or OpenAIMultiModal(
                    model="gpt-4-vision-preview", max_new_tokens=1000
                )
            except ImportError:
                raise ImportError(
                    "`llama-index-multi-modal-llms-openai` package cannot be found. "
                    "Please install it by using `pip install `llama-index-multi-modal-llms-openai`"
                )
        return cls(
            tools=tools or [],
            tool_retriever=tool_retriever,
            multi_modal_llm=multi_modal_llm,
            max_iterations=max_iterations,
            react_chat_formatter=react_chat_formatter,
            output_parser=output_parser,
            callback_manager=callback_manager,
            verbose=verbose,
        )

initialize_step #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

从任务中初始化步骤。

Source code in llama_index/core/agent/react_multimodal/step.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """从任务中初始化步骤。"""
    sources: List[ToolOutput] = []
    current_reasoning: List[BaseReasoningStep] = []
    # temporary memory for new messages
    new_memory = ChatMemoryBuffer.from_defaults()

    # validation
    if "image_docs" not in task.extra_state:
        raise ValueError("Image docs not found in task extra state.")

    # initialize task state
    task_state = {
        "sources": sources,
        "current_reasoning": current_reasoning,
        "new_memory": new_memory,
    }
    task.extra_state.update(task_state)

    return TaskStep(
        task_id=task.task_id,
        step_id=str(uuid.uuid4()),
        input=task.input,
        step_state={"is_first": True, "image_docs": task.extra_state["image_docs"]},
    )

get_tools #

get_tools(input: str) -> List[AsyncBaseTool]

获取工具。

Source code in llama_index/core/agent/react_multimodal/step.py
223
224
225
def get_tools(self, input: str) -> List[AsyncBaseTool]:
    """获取工具。"""
    return [adapt_to_async_tool(t) for t in self._get_tools(input)]

run_step #

run_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤。

Source code in llama_index/core/agent/react_multimodal/step.py
477
478
479
480
@trace_method("run_step")
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """运行步骤。"""
    return self._run_step(step, task)

arun_step async #

arun_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(异步)。

Source code in llama_index/core/agent/react_multimodal/step.py
482
483
484
485
486
487
@trace_method("run_step")
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """运行步骤(异步)。"""
    return await self._arun_step(step, task)

stream_step #

stream_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(流式)。

Source code in llama_index/core/agent/react_multimodal/step.py
489
490
491
492
493
@trace_method("run_step")
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """运行步骤(流式)。"""
    # TODO: figure out if we need a different type for TaskStepOutput
    return self._run_step_stream(step, task)

astream_step async #

astream_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(异步流)。

Source code in llama_index/core/agent/react_multimodal/step.py
495
496
497
498
499
500
@trace_method("run_step")
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """运行步骤(异步流)。"""
    return await self._arun_step_stream(step, task)

finalize_task #

finalize_task(task: Task, **kwargs: Any) -> None

完成任务,在所有步骤都完成之后。

Source code in llama_index/core/agent/react_multimodal/step.py
502
503
504
505
506
507
508
509
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """完成任务,在所有步骤都完成之后。"""
    # add new messages to memory
    task.memory.set(
        task.memory.get_all() + task.extra_state["new_memory"].get_all()
    )
    # reset new memory
    task.extra_state["new_memory"].reset()

set_callback_manager #

set_callback_manager(
    callback_manager: CallbackManager,
) -> None

设置回调管理器。

Source code in llama_index/core/agent/react_multimodal/step.py
511
512
513
514
def set_callback_manager(self, callback_manager: CallbackManager) -> None:
    """设置回调管理器。"""
    # TODO: make this abstractmethod (right now will break some agent impls)
    self.callback_manager = callback_manager

QueryPipelineAgentWorker #

Bases: BaseModel, BaseAgentWorker

查询管道代理工作程序。

最基本的代理工作程序,接收查询管道作为输入。

假设查询管道中的第一个组件是 AgentInputComponent,最后一个是 AgentFnComponent

Parameters:

Name Type Description Default
pipeline QueryPipeline

查询管道

required
Source code in llama_index/core/agent/custom/pipeline_worker.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
class QueryPipelineAgentWorker(BaseModel, BaseAgentWorker):
    """查询管道代理工作程序。

    最基本的代理工作程序,接收查询管道作为输入。

    假设查询管道中的第一个组件是 `AgentInputComponent`,最后一个是 `AgentFnComponent`。

    Args:
        pipeline (QueryPipeline): 查询管道"""

    pipeline: QueryPipeline = Field(..., description="Query pipeline")
    callback_manager: CallbackManager = Field(..., exclude=True)

    class Config:
        arbitrary_types_allowed = True

    def __init__(
        self,
        pipeline: QueryPipeline,
        callback_manager: Optional[CallbackManager] = None,
    ) -> None:
        """初始化。"""
        if callback_manager is not None:
            # set query pipeline callback
            pipeline.set_callback_manager(callback_manager)
        else:
            callback_manager = pipeline.callback_manager
        super().__init__(
            pipeline=pipeline,
            callback_manager=callback_manager,
        )
        # validate query pipeline
        self.agent_input_component
        self.agent_components

    @property
    def agent_input_component(self) -> AgentInputComponent:
        """获取代理输入组件。"""
        root_key = self.pipeline.get_root_keys()[0]
        if not isinstance(self.pipeline.module_dict[root_key], AgentInputComponent):
            raise ValueError(
                "Query pipeline first component must be AgentInputComponent, got "
                f"{self.pipeline.module_dict[root_key]}"
            )

        return cast(AgentInputComponent, self.pipeline.module_dict[root_key])

    @property
    def agent_components(self) -> List[AgentFnComponent]:
        """获取代理输出组件。"""
        return _get_agent_components(self.pipeline)

    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """从任务中初始化步骤。"""
        sources: List[ToolOutput] = []
        # temporary memory for new messages
        new_memory = ChatMemoryBuffer.from_defaults()

        # initialize initial state
        initial_state = {
            "sources": sources,
            "memory": new_memory,
        }

        return TaskStep(
            task_id=task.task_id,
            step_id=str(uuid.uuid4()),
            input=task.input,
            step_state=initial_state,
        )

    def _get_task_step_response(
        self, agent_response: AGENT_CHAT_RESPONSE_TYPE, step: TaskStep, is_done: bool
    ) -> TaskStepOutput:
        """获取任务步骤响应。"""
        if is_done:
            new_steps = []
        else:
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                )
            ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done,
            next_steps=new_steps,
        )

    @trace_method("run_step")
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """运行步骤。"""
        # partial agent output component with task and step
        for agent_fn_component in self.agent_components:
            agent_fn_component.partial(task=task, state=step.step_state)

        agent_response, is_done = self.pipeline.run(state=step.step_state, task=task)
        response = self._get_task_step_response(agent_response, step, is_done)
        # sync step state with task state
        task.extra_state.update(step.step_state)
        return response

    @trace_method("run_step")
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """运行步骤(异步)。"""
        # partial agent output component with task and step
        for agent_fn_component in self.agent_components:
            agent_fn_component.partial(task=task, state=step.step_state)

        agent_response, is_done = await self.pipeline.arun(
            state=step.step_state, task=task
        )
        response = self._get_task_step_response(agent_response, step, is_done)
        task.extra_state.update(step.step_state)
        return response

    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """运行步骤(流式)。"""
        raise NotImplementedError("This agent does not support streaming.")

    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """运行步骤(异步流)。"""
        raise NotImplementedError("This agent does not support streaming.")

    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """完成任务,在所有步骤都完成之后。"""
        # add new messages to memory
        task.memory.set(task.memory.get() + task.extra_state["memory"].get_all())
        # reset new memory
        task.extra_state["memory"].reset()

    def set_callback_manager(self, callback_manager: CallbackManager) -> None:
        """设置回调管理器。"""
        # TODO: make this abstractmethod (right now will break some agent impls)
        self.callback_manager = callback_manager
        self.pipeline.set_callback_manager(callback_manager)

agent_input_component property #

agent_input_component: AgentInputComponent

获取代理输入组件。

agent_components property #

agent_components: List[AgentFnComponent]

获取代理输出组件。

initialize_step #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

从任务中初始化步骤。

Source code in llama_index/core/agent/custom/pipeline_worker.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """从任务中初始化步骤。"""
    sources: List[ToolOutput] = []
    # temporary memory for new messages
    new_memory = ChatMemoryBuffer.from_defaults()

    # initialize initial state
    initial_state = {
        "sources": sources,
        "memory": new_memory,
    }

    return TaskStep(
        task_id=task.task_id,
        step_id=str(uuid.uuid4()),
        input=task.input,
        step_state=initial_state,
    )

run_step #

run_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤。

Source code in llama_index/core/agent/custom/pipeline_worker.py
144
145
146
147
148
149
150
151
152
153
154
155
@trace_method("run_step")
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """运行步骤。"""
    # partial agent output component with task and step
    for agent_fn_component in self.agent_components:
        agent_fn_component.partial(task=task, state=step.step_state)

    agent_response, is_done = self.pipeline.run(state=step.step_state, task=task)
    response = self._get_task_step_response(agent_response, step, is_done)
    # sync step state with task state
    task.extra_state.update(step.step_state)
    return response

arun_step async #

arun_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(异步)。

Source code in llama_index/core/agent/custom/pipeline_worker.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
@trace_method("run_step")
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """运行步骤(异步)。"""
    # partial agent output component with task and step
    for agent_fn_component in self.agent_components:
        agent_fn_component.partial(task=task, state=step.step_state)

    agent_response, is_done = await self.pipeline.arun(
        state=step.step_state, task=task
    )
    response = self._get_task_step_response(agent_response, step, is_done)
    task.extra_state.update(step.step_state)
    return response

stream_step #

stream_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(流式)。

Source code in llama_index/core/agent/custom/pipeline_worker.py
173
174
175
176
@trace_method("run_step")
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """运行步骤(流式)。"""
    raise NotImplementedError("This agent does not support streaming.")

astream_step async #

astream_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(异步流)。

Source code in llama_index/core/agent/custom/pipeline_worker.py
178
179
180
181
182
183
@trace_method("run_step")
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """运行步骤(异步流)。"""
    raise NotImplementedError("This agent does not support streaming.")

finalize_task #

finalize_task(task: Task, **kwargs: Any) -> None

完成任务,在所有步骤都完成之后。

Source code in llama_index/core/agent/custom/pipeline_worker.py
185
186
187
188
189
190
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """完成任务,在所有步骤都完成之后。"""
    # add new messages to memory
    task.memory.set(task.memory.get() + task.extra_state["memory"].get_all())
    # reset new memory
    task.extra_state["memory"].reset()

set_callback_manager #

set_callback_manager(
    callback_manager: CallbackManager,
) -> None

设置回调管理器。

Source code in llama_index/core/agent/custom/pipeline_worker.py
192
193
194
195
196
def set_callback_manager(self, callback_manager: CallbackManager) -> None:
    """设置回调管理器。"""
    # TODO: make this abstractmethod (right now will break some agent impls)
    self.callback_manager = callback_manager
    self.pipeline.set_callback_manager(callback_manager)