Skip to content

Introspective

IntrospectiveAgentWorker #

Bases: BaseAgentWorker

代理工作者。

该代理工作者实现了反思人工智能代理模式。它通过纯粹的确定性方式将工作委托给其他两个代理来实现。

该代理执行的任务(再次通过委托)是对查询生成响应并对响应进行反思和修正。该代理将任务委托给(可选地)首先是一个“main_agent_worker”,它生成查询的初始响应。然后将这个初始响应传递给“reflective_agent_worker”来执行初始响应的反思和修正。如果没有提供“main_agent_worker”,则可以选择跳过。在这种情况下,用户输入的查询将被假定为包含需要经过反思和修正的原始响应。

属性: reflective_agent_worker(BaseAgentWorker):负责执行初始响应的反思和修正的反思代理。 main_agent_worker(Optional[BaseAgentWorker],可选):负责为用户查询生成初始响应的主代理。默认为None。如果为None,则用户输入被假定为初始响应。 verbose(bool,可选):是否应该详细执行。默认为False。 callback_manager(Optional[CallbackManager],可选):回调管理器。默认为None。

Source code in llama_index/agent/introspective/step.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
class IntrospectiveAgentWorker(BaseAgentWorker):
    """代理工作者。

    该代理工作者实现了反思人工智能代理模式。它通过纯粹的确定性方式将工作委托给其他两个代理来实现。

    该代理执行的任务(再次通过委托)是对查询生成响应并对响应进行反思和修正。该代理将任务委托给(可选地)首先是一个“main_agent_worker”,它生成查询的初始响应。然后将这个初始响应传递给“reflective_agent_worker”来执行初始响应的反思和修正。如果没有提供“main_agent_worker”,则可以选择跳过。在这种情况下,用户输入的查询将被假定为包含需要经过反思和修正的原始响应。

    属性:
        reflective_agent_worker(BaseAgentWorker):负责执行初始响应的反思和修正的反思代理。
        main_agent_worker(Optional[BaseAgentWorker],可选):负责为用户查询生成初始响应的主代理。默认为None。如果为None,则用户输入被假定为初始响应。
        verbose(bool,可选):是否应该详细执行。默认为False。
        callback_manager(Optional[CallbackManager],可选):回调管理器。默认为None。"""

    def __init__(
        self,
        reflective_agent_worker: BaseAgentWorker,
        main_agent_worker: Optional[BaseAgentWorker] = None,
        verbose: bool = False,
        callback_manager: Optional[CallbackManager] = None,
    ) -> None:
        """初始化参数。"""
        self._verbose = verbose
        self._main_agent_worker = main_agent_worker
        self._reflective_agent_worker = reflective_agent_worker
        self.callback_manager = callback_manager or CallbackManager([])

    @classmethod
    def from_defaults(
        cls,
        reflective_agent_worker: BaseAgentWorker,
        main_agent_worker: Optional[BaseAgentWorker] = None,
        verbose: bool = False,
        callback_manager: Optional[CallbackManager] = None,
        **kwargs: Any,
    ) -> "IntrospectiveAgentWorker":
        """从参数中创建一个IntrospectiveAgentWorker。

类似于其他类中的`from_defaults`,该方法将推断各种参数的默认值,包括LLM(如果未指定)。
"""
        return cls(
            main_agent_worker=main_agent_worker,
            reflective_agent_worker=reflective_agent_worker,
            verbose=verbose,
            callback_manager=callback_manager,
            **kwargs,
        )

    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """从任务中初始化步骤。"""
        # temporary memory for new messages
        main_memory = ChatMemoryBuffer.from_defaults()
        reflective_memory = ChatMemoryBuffer.from_defaults()

        # put current history in new memory
        messages = task.memory.get()
        for message in messages:
            main_memory.put(message)

        # initialize task state
        task_state = {
            "main": {
                "memory": main_memory,
                "sources": [],
            },
            "reflection": {"memory": reflective_memory, "sources": []},
        }
        task.extra_state.update(task_state)

        return TaskStep(
            task_id=task.task_id,
            step_id=str(uuid.uuid4()),
            input=task.input,
        )

    def get_all_messages(self, task: Task) -> List[ChatMessage]:
        return (
            +task.memory.get()
            + task.extra_state["main"]["memory"].get_all()
            + task.extra_state["reflection"]["memory"].get_all()
        )

    @trace_method("run_step")
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """运行步骤。"""
        # run main agent
        if self._main_agent_worker is not None:
            main_agent_messages = task.extra_state["main"]["memory"].get()
            main_agent = self._main_agent_worker.as_agent(
                chat_history=main_agent_messages
            )
            main_agent_response = main_agent.chat(task.input)
            original_response = main_agent_response.response
            task.extra_state["main"]["sources"] = main_agent_response.sources
            task.extra_state["main"]["memory"] = main_agent.memory
        else:
            add_user_step_to_memory(
                step, task.extra_state["main"]["memory"], verbose=self._verbose
            )
            original_response = step.input
            task.extra_state["main"]["memory"].put(
                ChatMessage(content=original_response, role="assistant")
            )

        # run reflective agent
        reflective_agent_messages = task.extra_state["main"]["memory"].get()
        reflective_agent = self._reflective_agent_worker.as_agent(
            chat_history=reflective_agent_messages
        )
        reflective_agent_response = reflective_agent.chat(original_response)
        task.extra_state["reflection"]["sources"] = reflective_agent_response.sources
        task.extra_state["reflection"]["memory"] = reflective_agent.memory

        agent_response = AgentChatResponse(
            response=str(reflective_agent_response.response),
            sources=task.extra_state["main"]["sources"]
            + task.extra_state["reflection"]["sources"],
        )

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=True,
            next_steps=[],
        )

    @trace_method("run_step")
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """运行步骤(异步)。"""
        # run main agent if one is supplied otherwise assume user input
        # is the original response to be reflected on and subsequently corrected
        if self._main_agent_worker is not None:
            main_agent_messages = task.extra_state["main"]["memory"].get()
            main_agent = self._main_agent_worker.as_agent(
                chat_history=main_agent_messages, verbose=self._verbose
            )
            main_agent_response = await main_agent.achat(task.input)
            original_response = main_agent_response.response
            task.extra_state["main"]["sources"] = main_agent_response.sources
            task.extra_state["main"]["memory"] = main_agent.memory
        else:
            add_user_step_to_memory(
                step, task.extra_state["main"]["memory"], verbose=self._verbose
            )
            original_response = step.input
            # fictitious agent's initial response (to get reflection/correction cycle started)
            task.extra_state["main"]["memory"].put(
                ChatMessage(content=original_response, role="assistant")
            )

        # run reflective agent
        reflective_agent_messages = task.extra_state["main"]["memory"].get()
        reflective_agent = self._reflective_agent_worker.as_agent(
            chat_history=reflective_agent_messages, verbose=self._verbose
        )
        reflective_agent_response = await reflective_agent.achat(original_response)
        task.extra_state["reflection"]["sources"] = reflective_agent_response.sources
        task.extra_state["reflection"]["memory"] = reflective_agent.memory

        agent_response = AgentChatResponse(
            response=str(reflective_agent_response.response),
            sources=task.extra_state["main"]["sources"]
            + task.extra_state["reflection"]["sources"],
        )

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=True,
            next_steps=[],
        )

    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """运行步骤(流式)。"""
        raise NotImplementedError("Stream not supported for introspective agent")

    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """运行步骤(异步流)。"""
        raise NotImplementedError("Stream not supported for introspective agent")

    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """完成任务,在所有步骤都完成之后。"""
        # add new messages to memory
        main_memory = task.extra_state["main"][
            "memory"
        ].get_all()  # contains initial response as final message
        final_corrected_message = task.extra_state["reflection"]["memory"].get_all()[-1]
        # swap main workers response with the reflected/corrected one
        finalized_task_memory = main_memory[:-1] + [final_corrected_message]
        task.memory.set(finalized_task_memory)

from_defaults classmethod #

from_defaults(
    reflective_agent_worker: BaseAgentWorker,
    main_agent_worker: Optional[BaseAgentWorker] = None,
    verbose: bool = False,
    callback_manager: Optional[CallbackManager] = None,
    **kwargs: Any
) -> IntrospectiveAgentWorker

从参数中创建一个IntrospectiveAgentWorker。

类似于其他类中的from_defaults,该方法将推断各种参数的默认值,包括LLM(如果未指定)。

Source code in llama_index/agent/introspective/step.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
    @classmethod
    def from_defaults(
        cls,
        reflective_agent_worker: BaseAgentWorker,
        main_agent_worker: Optional[BaseAgentWorker] = None,
        verbose: bool = False,
        callback_manager: Optional[CallbackManager] = None,
        **kwargs: Any,
    ) -> "IntrospectiveAgentWorker":
        """从参数中创建一个IntrospectiveAgentWorker。

类似于其他类中的`from_defaults`,该方法将推断各种参数的默认值,包括LLM(如果未指定)。
"""
        return cls(
            main_agent_worker=main_agent_worker,
            reflective_agent_worker=reflective_agent_worker,
            verbose=verbose,
            callback_manager=callback_manager,
            **kwargs,
        )

initialize_step #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

从任务中初始化步骤。

Source code in llama_index/agent/introspective/step.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """从任务中初始化步骤。"""
    # temporary memory for new messages
    main_memory = ChatMemoryBuffer.from_defaults()
    reflective_memory = ChatMemoryBuffer.from_defaults()

    # put current history in new memory
    messages = task.memory.get()
    for message in messages:
        main_memory.put(message)

    # initialize task state
    task_state = {
        "main": {
            "memory": main_memory,
            "sources": [],
        },
        "reflection": {"memory": reflective_memory, "sources": []},
    }
    task.extra_state.update(task_state)

    return TaskStep(
        task_id=task.task_id,
        step_id=str(uuid.uuid4()),
        input=task.input,
    )

run_step #

run_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤。

Source code in llama_index/agent/introspective/step.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@trace_method("run_step")
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """运行步骤。"""
    # run main agent
    if self._main_agent_worker is not None:
        main_agent_messages = task.extra_state["main"]["memory"].get()
        main_agent = self._main_agent_worker.as_agent(
            chat_history=main_agent_messages
        )
        main_agent_response = main_agent.chat(task.input)
        original_response = main_agent_response.response
        task.extra_state["main"]["sources"] = main_agent_response.sources
        task.extra_state["main"]["memory"] = main_agent.memory
    else:
        add_user_step_to_memory(
            step, task.extra_state["main"]["memory"], verbose=self._verbose
        )
        original_response = step.input
        task.extra_state["main"]["memory"].put(
            ChatMessage(content=original_response, role="assistant")
        )

    # run reflective agent
    reflective_agent_messages = task.extra_state["main"]["memory"].get()
    reflective_agent = self._reflective_agent_worker.as_agent(
        chat_history=reflective_agent_messages
    )
    reflective_agent_response = reflective_agent.chat(original_response)
    task.extra_state["reflection"]["sources"] = reflective_agent_response.sources
    task.extra_state["reflection"]["memory"] = reflective_agent.memory

    agent_response = AgentChatResponse(
        response=str(reflective_agent_response.response),
        sources=task.extra_state["main"]["sources"]
        + task.extra_state["reflection"]["sources"],
    )

    return TaskStepOutput(
        output=agent_response,
        task_step=step,
        is_last=True,
        next_steps=[],
    )

arun_step async #

arun_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(异步)。

Source code in llama_index/agent/introspective/step.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
@trace_method("run_step")
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """运行步骤(异步)。"""
    # run main agent if one is supplied otherwise assume user input
    # is the original response to be reflected on and subsequently corrected
    if self._main_agent_worker is not None:
        main_agent_messages = task.extra_state["main"]["memory"].get()
        main_agent = self._main_agent_worker.as_agent(
            chat_history=main_agent_messages, verbose=self._verbose
        )
        main_agent_response = await main_agent.achat(task.input)
        original_response = main_agent_response.response
        task.extra_state["main"]["sources"] = main_agent_response.sources
        task.extra_state["main"]["memory"] = main_agent.memory
    else:
        add_user_step_to_memory(
            step, task.extra_state["main"]["memory"], verbose=self._verbose
        )
        original_response = step.input
        # fictitious agent's initial response (to get reflection/correction cycle started)
        task.extra_state["main"]["memory"].put(
            ChatMessage(content=original_response, role="assistant")
        )

    # run reflective agent
    reflective_agent_messages = task.extra_state["main"]["memory"].get()
    reflective_agent = self._reflective_agent_worker.as_agent(
        chat_history=reflective_agent_messages, verbose=self._verbose
    )
    reflective_agent_response = await reflective_agent.achat(original_response)
    task.extra_state["reflection"]["sources"] = reflective_agent_response.sources
    task.extra_state["reflection"]["memory"] = reflective_agent.memory

    agent_response = AgentChatResponse(
        response=str(reflective_agent_response.response),
        sources=task.extra_state["main"]["sources"]
        + task.extra_state["reflection"]["sources"],
    )

    return TaskStepOutput(
        output=agent_response,
        task_step=step,
        is_last=True,
        next_steps=[],
    )

stream_step #

stream_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(流式)。

Source code in llama_index/agent/introspective/step.py
211
212
213
214
@trace_method("run_step")
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """运行步骤(流式)。"""
    raise NotImplementedError("Stream not supported for introspective agent")

astream_step async #

astream_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(异步流)。

Source code in llama_index/agent/introspective/step.py
216
217
218
219
220
221
@trace_method("run_step")
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """运行步骤(异步流)。"""
    raise NotImplementedError("Stream not supported for introspective agent")

finalize_task #

finalize_task(task: Task, **kwargs: Any) -> None

完成任务,在所有步骤都完成之后。

Source code in llama_index/agent/introspective/step.py
223
224
225
226
227
228
229
230
231
232
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """完成任务,在所有步骤都完成之后。"""
    # add new messages to memory
    main_memory = task.extra_state["main"][
        "memory"
    ].get_all()  # contains initial response as final message
    final_corrected_message = task.extra_state["reflection"]["memory"].get_all()[-1]
    # swap main workers response with the reflected/corrected one
    finalized_task_memory = main_memory[:-1] + [final_corrected_message]
    task.memory.set(finalized_task_memory)

SelfReflectionAgentWorker #

Bases: BaseModel, BaseAgentWorker

自我反思代理工作者。

该代理执行对给定响应的反思,随后进行校正,需要注意的是,该反思实现受到两项工作的启发:

  1. Reflexion: Language Agents with Verbal Reinforcement Learning, by Shinn et al. (2023) (https://arxiv.org/pdf/2303.11366.pdf)
  2. CRITIC: Large Language Models Can Self-Correct with Tool-Interactive Critiquing, by Gou et al. (2024) (https://arxiv.org/pdf/2305.11738.pdf)

该代理对初始响应执行反思和校正的循环,直到生成满意的校正或达到最大循环次数为止。为了进行反思,该代理利用用户指定的LLM以及PydanticProgram(通过structured_predict)生成一个结构化输出,其中包含LLM生成的对当前响应的反思。在反思之后,再次使用相同的用户指定LLM,但这次使用另一个PydanticProgram生成一个结构化输出,其中包含LLM生成的对先前生成的反思的当前响应的校正版本。

属性: max_iterations(int,可选):反思和校正的最大次数。默认为DEFAULT_MAX_ITERATIONS。 callback_manager(Optional[CallbackManager],可选):回调管理器。默认为None。 llm(Optional[LLM],可选):用于执行反思和校正的LLM。目前必须是OpenAI LLM。默认为None。 verbose(bool,可选):是否应该详细执行。默认为False。

Source code in llama_index/agent/introspective/reflective/self_reflection.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
class SelfReflectionAgentWorker(BaseModel, BaseAgentWorker):
    """自我反思代理工作者。

    该代理执行对给定响应的反思,随后进行校正,需要注意的是,该反思实现受到两项工作的启发:

    1. Reflexion: Language Agents with Verbal Reinforcement Learning, by Shinn et al. (2023)
        (https://arxiv.org/pdf/2303.11366.pdf)
    2. CRITIC: Large Language Models Can Self-Correct with Tool-Interactive Critiquing, by Gou et al. (2024)
       (https://arxiv.org/pdf/2305.11738.pdf)

    该代理对初始响应执行反思和校正的循环,直到生成满意的校正或达到最大循环次数为止。为了进行反思,该代理利用用户指定的LLM以及PydanticProgram(通过structured_predict)生成一个结构化输出,其中包含LLM生成的对当前响应的反思。在反思之后,再次使用相同的用户指定LLM,但这次使用另一个PydanticProgram生成一个结构化输出,其中包含LLM生成的对先前生成的反思的当前响应的校正版本。

    属性:
        max_iterations(int,可选):反思和校正的最大次数。默认为DEFAULT_MAX_ITERATIONS。
        callback_manager(Optional[CallbackManager],可选):回调管理器。默认为None。
        llm(Optional[LLM],可选):用于执行反思和校正的LLM。目前必须是OpenAI LLM。默认为None。
        verbose(bool,可选):是否应该详细执行。默认为False。"""

    callback_manager: CallbackManager = Field(default=CallbackManager([]))
    max_iterations: int = Field(default=DEFAULT_MAX_ITERATIONS)
    _llm: LLM = PrivateAttr()
    _verbose: bool = PrivateAttr()

    class Config:
        arbitrary_types_allowed = True

    def __init__(
        self,
        max_iterations: int = DEFAULT_MAX_ITERATIONS,
        callback_manager: Optional[CallbackManager] = None,
        llm: Optional[LLM] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> None:
        """__init__。"""
        self._llm = llm
        self._verbose = verbose

        super().__init__(
            callback_manager=callback_manager or CallbackManager([]),
            max_iterations=max_iterations,
            **kwargs,
        )

    @classmethod
    def from_defaults(
        cls,
        llm: Optional[LLM] = None,
        max_iterations: int = DEFAULT_MAX_ITERATIONS,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> "SelfReflectionAgentWorker":
        """方便的构造函数。"""
        if llm is None:
            try:
                from llama_index.llms.openai import OpenAI
            except ImportError:
                raise ImportError(
                    "Missing OpenAI LLMs. Please run `pip install llama-index-llms-openai`."
                )
            llm = OpenAI(model="gpt-4-turbo-preview", temperature=0)

        return cls(
            llm=llm,
            max_iterations=max_iterations,
            callback_manager=callback_manager,
            verbose=verbose,
            **kwargs,
        )

    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """从任务中初始化步骤。"""
        # temporary memory for new messages
        new_memory = ChatMemoryBuffer.from_defaults()

        # put current history in new memory
        messages = task.memory.get()
        for message in messages:
            new_memory.put(message)

        # initialize task state
        task_state = {
            "new_memory": new_memory,
            "sources": [],
        }
        task.extra_state.update(task_state)

        return TaskStep(
            task_id=task.task_id,
            step_id=str(uuid.uuid4()),
            input=task.input,
            step_state={"count": 0},
        )

    def _remove_correction_str_prefix(self, correct_msg: str) -> str:
        """辅助函数,用于为最终响应格式化校正消息。"""
        return correct_msg.replace(CORRECT_RESPONSE_PREFIX, "")

    @dispatcher.span
    def _reflect(
        self, chat_history: List[ChatMessage]
    ) -> Tuple[Reflection, ChatMessage]:
        """反思这条轨迹。"""
        reflection = self._llm.structured_predict(
            Reflection,
            PromptTemplate(REFLECTION_PROMPT_TEMPLATE),
            chat_history=messages_to_prompt(chat_history),
        )

        if self._verbose:
            print(f"> Reflection: {reflection.dict()}")

        # end state: return user message
        reflection_output_str = (
            f"Is Done: {reflection.is_done}\nCritique: {reflection.feedback}"
        )
        critique = REFLECTION_RESPONSE_TEMPLATE.format(
            reflection_output=reflection_output_str
        )

        return reflection, ChatMessage.from_str(critique, role="user")

    @dispatcher.span
    def _correct(self, input_str: str, critique: str) -> ChatMessage:
        correction = self._llm.structured_predict(
            Correction,
            PromptTemplate(CORRECT_PROMPT_TEMPLATE),
            input_str=input_str,
            feedback=critique,
        )

        correct_response_str = CORRECT_RESPONSE_FSTRING.format(
            correction=correction.correction
        )
        if self._verbose:
            print(f"Correction: {correction.correction}", flush=True)
        return ChatMessage.from_str(correct_response_str, role="assistant")

    @dispatcher.span
    @trace_method("run_step")
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """运行步骤。"""
        state = step.step_state
        state["count"] += 1

        messages = task.extra_state["new_memory"].get()
        prev_correct_str = messages[-1].content
        prev_correct_str_without_prefix = self._remove_correction_str_prefix(
            prev_correct_str
        )

        # reflect phase
        reflection, reflection_msg = self._reflect(chat_history=messages)
        is_done = reflection.is_done

        critique_msg = ChatMessage(role=MessageRole.USER, content=reflection_msg)
        task.extra_state["new_memory"].put(critique_msg)

        # correction phase
        if is_done:
            # no correction to be made prev correction is sufficient
            agent_response = AgentChatResponse(
                response=prev_correct_str_without_prefix,
                sources=task.extra_state["sources"],
            )
            task.extra_state["new_memory"].put(
                ChatMessage(
                    role=MessageRole.ASSISTANT,
                    content=prev_correct_str_without_prefix,
                )
            )
            new_steps = []
        else:
            # generate a new correction
            correct_msg = self._correct(
                input_str=prev_correct_str_without_prefix,
                critique=reflection_msg.content,
            )
            correct_str_without_prefix = self._remove_correction_str_prefix(
                correct_msg.content
            )

            if self.max_iterations == state["count"]:
                # this will be the last iteration
                task.extra_state["new_memory"].put(
                    ChatMessage(
                        role=MessageRole.ASSISTANT,
                        content=correct_str_without_prefix,
                    )
                )
                agent_response = AgentChatResponse(response=correct_str_without_prefix)
                new_steps = []
            else:
                # another round of reflection/correction will take place
                task.extra_state["new_memory"].put(correct_msg)
                agent_response = AgentChatResponse(response=str(correct_msg))
                new_steps = [
                    step.get_next_step(
                        step_id=str(uuid.uuid4()),
                        # NOTE: input is unused
                        input=None,
                        step_state=state,
                    )
                ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done | (self.max_iterations == state["count"]),
            next_steps=new_steps,
        )

    # Async methods
    @dispatcher.span
    async def _areflect(
        self, chat_history: List[ChatMessage]
    ) -> Tuple[Reflection, ChatMessage]:
        """反思这条轨迹。"""
        reflection = await self._llm.astructured_predict(
            Reflection,
            PromptTemplate(REFLECTION_PROMPT_TEMPLATE),
            chat_history=messages_to_prompt(chat_history),
        )

        if self._verbose:
            print(f"> Reflection: {reflection.dict()}")

        # end state: return user message
        reflection_output_str = (
            f"Is Done: {reflection.is_done}\nCritique: {reflection.feedback}"
        )
        critique = REFLECTION_RESPONSE_TEMPLATE.format(
            reflection_output=reflection_output_str
        )

        return reflection, ChatMessage.from_str(critique, role="user")

    @dispatcher.span
    async def _acorrect(self, input_str: str, critique: str) -> ChatMessage:
        correction = await self._llm.astructured_predict(
            Correction,
            PromptTemplate(CORRECT_PROMPT_TEMPLATE),
            input_str=input_str,
            feedback=critique,
        )

        correct_response_str = CORRECT_RESPONSE_FSTRING.format(
            correction=correction.correction
        )
        if self._verbose:
            print(f"Correction: {correction.correction}", flush=True)
        return ChatMessage.from_str(correct_response_str, role="assistant")

    @dispatcher.span
    @trace_method("run_step")
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """运行步骤(异步)。"""
        state = step.step_state
        state["count"] += 1

        messages = task.extra_state["new_memory"].get()
        prev_correct_str = messages[-1].content
        prev_correct_str_without_prefix = self._remove_correction_str_prefix(
            prev_correct_str
        )

        # reflect
        reflection, reflection_msg = await self._areflect(chat_history=messages)
        is_done = reflection.is_done

        critique_msg = ChatMessage(role=MessageRole.USER, content=reflection_msg)
        task.extra_state["new_memory"].put(critique_msg)

        # correction phase
        if is_done:
            # no correction to be made prev correction is sufficient
            agent_response = AgentChatResponse(
                response=prev_correct_str_without_prefix,
                sources=task.extra_state["sources"],
            )
            task.extra_state["new_memory"].put(
                ChatMessage(
                    role=MessageRole.ASSISTANT,
                    content=prev_correct_str_without_prefix,
                )
            )
            new_steps = []
        else:
            # generate a new correction
            correct_msg = await self._acorrect(
                input_str=prev_correct_str_without_prefix,
                critique=reflection_msg.content,
            )
            correct_str_without_prefix = self._remove_correction_str_prefix(
                correct_msg.content
            )

            if self.max_iterations == state["count"]:
                # this will be the last iteration
                task.extra_state["new_memory"].put(
                    ChatMessage(
                        role=MessageRole.ASSISTANT,
                        content=correct_str_without_prefix,
                    )
                )
                agent_response = AgentChatResponse(response=correct_str_without_prefix)
                new_steps = []
            else:
                # another round of reflection/correction will take place
                task.extra_state["new_memory"].put(correct_msg)
                agent_response = AgentChatResponse(response=str(correct_msg))
                new_steps = [
                    step.get_next_step(
                        step_id=str(uuid.uuid4()),
                        # NOTE: input is unused
                        input=None,
                        step_state=state,
                    )
                ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done | (self.max_iterations == state["count"]),
            next_steps=new_steps,
        )

    # Stream methods
    @dispatcher.span
    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """运行步骤(流式)。"""
        raise NotImplementedError("Stream not supported for self reflection agent")

    @dispatcher.span
    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """运行步骤(异步流)。"""
        raise NotImplementedError("Stream not supported for self reflection agent")

    def get_all_messages(self, task: Task) -> List[ChatMessage]:
        return (
            self.prefix_messages
            + task.memory.get()
            + task.extra_state["new_memory"].get_all()
        )

    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """完成任务,在所有步骤都完成之后。"""
        # add new messages to memory
        task.memory.set(task.extra_state["new_memory"].get_all())
        # reset new memory
        task.extra_state["new_memory"].reset()

from_defaults classmethod #

from_defaults(
    llm: Optional[LLM] = None,
    max_iterations: int = DEFAULT_MAX_ITERATIONS,
    callback_manager: Optional[CallbackManager] = None,
    verbose: bool = False,
    **kwargs: Any
) -> SelfReflectionAgentWorker

方便的构造函数。

Source code in llama_index/agent/introspective/reflective/self_reflection.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
@classmethod
def from_defaults(
    cls,
    llm: Optional[LLM] = None,
    max_iterations: int = DEFAULT_MAX_ITERATIONS,
    callback_manager: Optional[CallbackManager] = None,
    verbose: bool = False,
    **kwargs: Any,
) -> "SelfReflectionAgentWorker":
    """方便的构造函数。"""
    if llm is None:
        try:
            from llama_index.llms.openai import OpenAI
        except ImportError:
            raise ImportError(
                "Missing OpenAI LLMs. Please run `pip install llama-index-llms-openai`."
            )
        llm = OpenAI(model="gpt-4-turbo-preview", temperature=0)

    return cls(
        llm=llm,
        max_iterations=max_iterations,
        callback_manager=callback_manager,
        verbose=verbose,
        **kwargs,
    )

initialize_step #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

从任务中初始化步骤。

Source code in llama_index/agent/introspective/reflective/self_reflection.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """从任务中初始化步骤。"""
    # temporary memory for new messages
    new_memory = ChatMemoryBuffer.from_defaults()

    # put current history in new memory
    messages = task.memory.get()
    for message in messages:
        new_memory.put(message)

    # initialize task state
    task_state = {
        "new_memory": new_memory,
        "sources": [],
    }
    task.extra_state.update(task_state)

    return TaskStep(
        task_id=task.task_id,
        step_id=str(uuid.uuid4()),
        input=task.input,
        step_state={"count": 0},
    )

run_step #

run_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤。

Source code in llama_index/agent/introspective/reflective/self_reflection.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
@dispatcher.span
@trace_method("run_step")
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """运行步骤。"""
    state = step.step_state
    state["count"] += 1

    messages = task.extra_state["new_memory"].get()
    prev_correct_str = messages[-1].content
    prev_correct_str_without_prefix = self._remove_correction_str_prefix(
        prev_correct_str
    )

    # reflect phase
    reflection, reflection_msg = self._reflect(chat_history=messages)
    is_done = reflection.is_done

    critique_msg = ChatMessage(role=MessageRole.USER, content=reflection_msg)
    task.extra_state["new_memory"].put(critique_msg)

    # correction phase
    if is_done:
        # no correction to be made prev correction is sufficient
        agent_response = AgentChatResponse(
            response=prev_correct_str_without_prefix,
            sources=task.extra_state["sources"],
        )
        task.extra_state["new_memory"].put(
            ChatMessage(
                role=MessageRole.ASSISTANT,
                content=prev_correct_str_without_prefix,
            )
        )
        new_steps = []
    else:
        # generate a new correction
        correct_msg = self._correct(
            input_str=prev_correct_str_without_prefix,
            critique=reflection_msg.content,
        )
        correct_str_without_prefix = self._remove_correction_str_prefix(
            correct_msg.content
        )

        if self.max_iterations == state["count"]:
            # this will be the last iteration
            task.extra_state["new_memory"].put(
                ChatMessage(
                    role=MessageRole.ASSISTANT,
                    content=correct_str_without_prefix,
                )
            )
            agent_response = AgentChatResponse(response=correct_str_without_prefix)
            new_steps = []
        else:
            # another round of reflection/correction will take place
            task.extra_state["new_memory"].put(correct_msg)
            agent_response = AgentChatResponse(response=str(correct_msg))
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                    step_state=state,
                )
            ]

    return TaskStepOutput(
        output=agent_response,
        task_step=step,
        is_last=is_done | (self.max_iterations == state["count"]),
        next_steps=new_steps,
    )

arun_step async #

arun_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(异步)。

Source code in llama_index/agent/introspective/reflective/self_reflection.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
@dispatcher.span
@trace_method("run_step")
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """运行步骤(异步)。"""
    state = step.step_state
    state["count"] += 1

    messages = task.extra_state["new_memory"].get()
    prev_correct_str = messages[-1].content
    prev_correct_str_without_prefix = self._remove_correction_str_prefix(
        prev_correct_str
    )

    # reflect
    reflection, reflection_msg = await self._areflect(chat_history=messages)
    is_done = reflection.is_done

    critique_msg = ChatMessage(role=MessageRole.USER, content=reflection_msg)
    task.extra_state["new_memory"].put(critique_msg)

    # correction phase
    if is_done:
        # no correction to be made prev correction is sufficient
        agent_response = AgentChatResponse(
            response=prev_correct_str_without_prefix,
            sources=task.extra_state["sources"],
        )
        task.extra_state["new_memory"].put(
            ChatMessage(
                role=MessageRole.ASSISTANT,
                content=prev_correct_str_without_prefix,
            )
        )
        new_steps = []
    else:
        # generate a new correction
        correct_msg = await self._acorrect(
            input_str=prev_correct_str_without_prefix,
            critique=reflection_msg.content,
        )
        correct_str_without_prefix = self._remove_correction_str_prefix(
            correct_msg.content
        )

        if self.max_iterations == state["count"]:
            # this will be the last iteration
            task.extra_state["new_memory"].put(
                ChatMessage(
                    role=MessageRole.ASSISTANT,
                    content=correct_str_without_prefix,
                )
            )
            agent_response = AgentChatResponse(response=correct_str_without_prefix)
            new_steps = []
        else:
            # another round of reflection/correction will take place
            task.extra_state["new_memory"].put(correct_msg)
            agent_response = AgentChatResponse(response=str(correct_msg))
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                    step_state=state,
                )
            ]

    return TaskStepOutput(
        output=agent_response,
        task_step=step,
        is_last=is_done | (self.max_iterations == state["count"]),
        next_steps=new_steps,
    )

stream_step #

stream_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(流式)。

Source code in llama_index/agent/introspective/reflective/self_reflection.py
432
433
434
435
436
@dispatcher.span
@trace_method("run_step")
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """运行步骤(流式)。"""
    raise NotImplementedError("Stream not supported for self reflection agent")

astream_step async #

astream_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(异步流)。

Source code in llama_index/agent/introspective/reflective/self_reflection.py
438
439
440
441
442
443
444
@dispatcher.span
@trace_method("run_step")
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """运行步骤(异步流)。"""
    raise NotImplementedError("Stream not supported for self reflection agent")

finalize_task #

finalize_task(task: Task, **kwargs: Any) -> None

完成任务,在所有步骤都完成之后。

Source code in llama_index/agent/introspective/reflective/self_reflection.py
453
454
455
456
457
458
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """完成任务,在所有步骤都完成之后。"""
    # add new messages to memory
    task.memory.set(task.extra_state["new_memory"].get_all())
    # reset new memory
    task.extra_state["new_memory"].reset()

ToolInteractiveReflectionAgentWorker #

Bases: BaseModel, BaseAgentWorker

工具交互式反射代理工作者。

该代理工作者实现了Gou, Zhibin等人在ICLR(2024年)中介绍的CRITIC反射框架。 (来源:https://arxiv.org/pdf/2305.11738)

CRITIC代表“使用工具交互式批评进行校正”。它通过使用外部工具(例如,使用Google搜索工具进行事实检查)对任务/查询的响应进行反思,随后使用批评生成校正的响应。它通过工具交互式反思和校正循环,直到满足特定的停止条件或达到最大迭代次数。

该代理将批评子任务委托给用户提供的critique_agent_worker,其类型为FunctionCallingAgentWorker,即它使用工具执行任务。对于校正,它使用用户指定的correction_llm,其中包含一个PydanticProgram(使用llm.structured_predict动态确定),以产生结构化输出,即包含由correction_llm生成的校正的Correction

属性: critique_agent_worker(FunctionCallingAgentWorker):负责执行批评反思的批评代理。 critique_template(str):包含有关批评代理应如何执行反思的说明的模板。 max_iterations(int,可选):允许的最大反思和校正循环次数。默认为DEFAULT_MAX_ITERATIONS = 5。 stopping_callable(Optional[StoppingCallable],可选):一个可选的停止条件,作用于批评反思字符串,并返回一个布尔值,以确定最新的校正是否足够。默认为None。 correction_llm(Optional[LLM],可选):用于根据批评或反思产生校正响应的LLM。默认为None。 callback_manager(Optional[CallbackManager],可选):回调管理器。默认为None。 verbose(bool,可选):是否应该详细执行。默认为False。

Source code in llama_index/agent/introspective/reflective/tool_interactive_reflection.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
class ToolInteractiveReflectionAgentWorker(BaseModel, BaseAgentWorker):
    """工具交互式反射代理工作者。

    该代理工作者实现了Gou, Zhibin等人在ICLR(2024年)中介绍的CRITIC反射框架。 (来源:https://arxiv.org/pdf/2305.11738)

    CRITIC代表“使用工具交互式批评进行校正”。它通过使用外部工具(例如,使用Google搜索工具进行事实检查)对任务/查询的响应进行反思,随后使用批评生成校正的响应。它通过工具交互式反思和校正循环,直到满足特定的停止条件或达到最大迭代次数。

    该代理将批评子任务委托给用户提供的`critique_agent_worker`,其类型为`FunctionCallingAgentWorker`,即它使用工具执行任务。对于校正,它使用用户指定的`correction_llm`,其中包含一个PydanticProgram(使用llm.structured_predict动态确定),以产生结构化输出,即包含由`correction_llm`生成的校正的`Correction`。

    属性:
        critique_agent_worker(FunctionCallingAgentWorker):负责执行批评反思的批评代理。
        critique_template(str):包含有关批评代理应如何执行反思的说明的模板。
        max_iterations(int,可选):允许的最大反思和校正循环次数。默认为DEFAULT_MAX_ITERATIONS = 5。
        stopping_callable(Optional[StoppingCallable],可选):一个可选的停止条件,作用于批评反思字符串,并返回一个布尔值,以确定最新的校正是否足够。默认为None。
        correction_llm(Optional[LLM],可选):用于根据批评或反思产生校正响应的LLM。默认为None。
        callback_manager(Optional[CallbackManager],可选):回调管理器。默认为None。
        verbose(bool,可选):是否应该详细执行。默认为False。"""

    callback_manager: CallbackManager = Field(default=CallbackManager([]))
    max_iterations: int = Field(default=DEFAULT_MAX_ITERATIONS)
    stopping_callable: Optional[StoppingCallable] = Field(
        default=None,
        description="Optional function that operates on critique string to see if no more corrections are needed.",
    )
    _critique_agent_worker: FunctionCallingAgentWorker = PrivateAttr()
    _critique_template: str = PrivateAttr()
    _correction_llm: LLM = PrivateAttr()
    _verbose: bool = PrivateAttr()

    class Config:
        arbitrary_types_allowed = True

    def __init__(
        self,
        critique_agent_worker: FunctionCallingAgentWorker,
        critique_template: str,
        max_iterations: int = DEFAULT_MAX_ITERATIONS,
        stopping_callable: Optional[StoppingCallable] = None,
        correction_llm: Optional[LLM] = None,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> None:
        """__init__。"""
        self._critique_agent_worker = critique_agent_worker
        self._critique_template = critique_template
        self._verbose = verbose
        self._correction_llm = correction_llm

        super().__init__(
            callback_manager=callback_manager,
            max_iterations=max_iterations,
            stopping_callable=stopping_callable,
            **kwargs,
        )

    @classmethod
    def from_defaults(
        cls,
        critique_agent_worker: FunctionCallingAgentWorker,
        critique_template: str,
        correction_llm: Optional[LLM] = None,
        max_iterations: int = DEFAULT_MAX_ITERATIONS,
        stopping_callable: Optional[StoppingCallable] = None,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> "ToolInteractiveReflectionAgentWorker":
        """方便的构造方法,用于一组BaseTools(可选)。"""
        if correction_llm is None:
            try:
                from llama_index.llms.openai import OpenAI
            except ImportError:
                raise ImportError(
                    "Missing OpenAI LLMs. Please run `pip install llama-index-llms-openai`."
                )
            correction_llm = OpenAI(model="gpt-4-turbo-preview", temperature=0)

        return cls(
            critique_agent_worker=critique_agent_worker,
            critique_template=critique_template,
            correction_llm=correction_llm,
            max_iterations=max_iterations,
            stopping_callable=stopping_callable,
            callback_manager=callback_manager or CallbackManager([]),
            verbose=verbose,
            **kwargs,
        )

    def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
        """从任务中初始化步骤。"""
        # temporary memory for new messages
        new_memory = ChatMemoryBuffer.from_defaults()

        # put current history in new memory
        messages = task.memory.get()
        for message in messages:
            new_memory.put(message)

        # initialize task state
        task_state = {
            "new_memory": new_memory,
            "sources": [],
        }
        task.extra_state.update(task_state)

        return TaskStep(
            task_id=task.task_id,
            step_id=str(uuid.uuid4()),
            input=task.input,
            step_state={"count": 0},
        )

    def _remove_correction_str_prefix(self, correct_msg: str) -> str:
        """辅助函数,用于为最终响应格式化纠正消息。"""
        return correct_msg.replace(CORRECT_RESPONSE_PREFIX, "")

    @dispatcher.span
    def _critique(self, input_str: str) -> AgentChatResponse:
        agent = self._critique_agent_worker.as_agent(verbose=self._verbose)
        critique = agent.chat(self._critique_template.format(input_str=input_str))
        if self._verbose:
            print(f"Critique: {critique.response}", flush=True)
        return critique

    @dispatcher.span
    def _correct(self, input_str: str, critique: str) -> ChatMessage:
        correction = self._correction_llm.structured_predict(
            Correction,
            PromptTemplate(CORRECT_PROMPT_TEMPLATE),
            input_str=input_str,
            critique=critique,
        )

        correct_response_str = CORRECT_RESPONSE_FSTRING.format(
            correction=correction.correction
        )
        if self._verbose:
            print(f"Correction: {correction.correction}", flush=True)
        return ChatMessage.from_str(correct_response_str, role="assistant")

    @dispatcher.span
    @trace_method("run_step")
    def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """运行步骤。"""
        state = step.step_state
        state["count"] += 1

        messages = task.extra_state["new_memory"].get()
        prev_correct_str = messages[-1].content
        prev_correct_str_without_prefix = self._remove_correction_str_prefix(
            prev_correct_str
        )

        # critique phase
        critique_response = self._critique(input_str=prev_correct_str_without_prefix)
        task.extra_state["sources"].extend(critique_response.sources)

        is_done = False
        if self.stopping_callable:
            is_done = self.stopping_callable(critique_str=critique_response.response)

        critique_msg = ChatMessage(
            role=MessageRole.USER, content=critique_response.response
        )
        task.extra_state["new_memory"].put(critique_msg)

        # correction phase
        if is_done:
            # no correction to be made prev correction is sufficient
            agent_response = AgentChatResponse(
                response=prev_correct_str_without_prefix,
                sources=task.extra_state["sources"],
            )
            task.extra_state["new_memory"].put(
                ChatMessage(
                    role=MessageRole.ASSISTANT, content=prev_correct_str_without_prefix
                )
            )
            new_steps = []
        else:
            # generate a new correction
            correct_msg = self._correct(
                input_str=prev_correct_str_without_prefix,
                critique=critique_response.response,
            )
            correct_str_without_prefix = self._remove_correction_str_prefix(
                correct_msg.content
            )

            # reached max iterations, no further reflection/correction cycles
            if self.max_iterations == state["count"]:
                task.extra_state["new_memory"].put(
                    ChatMessage(
                        role=MessageRole.ASSISTANT, content=correct_str_without_prefix
                    )
                )
                agent_response = AgentChatResponse(response=correct_str_without_prefix)
                new_steps = []
            else:
                # another round of reflection/correction will take place
                task.extra_state["new_memory"].put(correct_msg)
                agent_response = AgentChatResponse(
                    response=str(correct_msg), sources=critique_response.sources
                )
                new_steps = [
                    step.get_next_step(
                        step_id=str(uuid.uuid4()),
                        # NOTE: input is unused
                        input=None,
                        step_state=state,
                    )
                ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done | (self.max_iterations == state["count"]),
            next_steps=new_steps,
        )

    # Async Methods
    @dispatcher.span
    async def _acritique(self, input_str: str) -> AgentChatResponse:
        agent = self._critique_agent_worker.as_agent(verbose=self._verbose)
        critique = await agent.achat(
            self._critique_template.format(input_str=input_str)
        )
        if self._verbose:
            print(f"Critique: {critique.response}", flush=True)
        return critique

    @dispatcher.span
    async def _acorrect(self, input_str: str, critique: str) -> ChatMessage:
        correction = await self._correction_llm.astructured_predict(
            Correction,
            PromptTemplate(CORRECT_PROMPT_TEMPLATE),
            input_str=input_str,
            critique=critique,
        )

        correct_response_str = CORRECT_RESPONSE_FSTRING.format(
            correction=correction.correction
        )
        if self._verbose:
            print(f"Correction: {correction.correction}", flush=True)
        return ChatMessage.from_str(correct_response_str, role="assistant")

    @dispatcher.span
    @trace_method("run_step")
    async def arun_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """运行步骤(异步)。"""
        state = step.step_state
        state["count"] += 1

        messages = task.extra_state["new_memory"].get()
        prev_correct_str = messages[-1].content
        prev_correct_str_without_prefix = self._remove_correction_str_prefix(
            prev_correct_str
        )

        # critique phase
        critique_response = await self._acritique(
            input_str=prev_correct_str_without_prefix
        )
        task.extra_state["sources"].extend(critique_response.sources)

        is_done = False
        if self.stopping_callable:
            is_done = self.stopping_callable(critique_str=critique_response.response)

        critique_msg = ChatMessage(
            role=MessageRole.USER, content=critique_response.response
        )
        task.extra_state["new_memory"].put(critique_msg)

        # correction phase
        if is_done:
            # no correction to be made prev correction is sufficient
            agent_response = AgentChatResponse(
                response=prev_correct_str_without_prefix,
                sources=task.extra_state["sources"],
            )
            task.extra_state["new_memory"].put(
                ChatMessage(
                    role=MessageRole.ASSISTANT, content=prev_correct_str_without_prefix
                )
            )
            new_steps = []
        else:
            # generate a new correction
            correct_msg = await self._acorrect(
                input_str=prev_correct_str_without_prefix,
                critique=critique_response.response,
            )
            correct_str_without_prefix = self._remove_correction_str_prefix(
                correct_msg.content
            )

            # reached max iterations, no further reflection/correction cycles
            if self.max_iterations == state["count"]:
                task.extra_state["new_memory"].put(
                    ChatMessage(
                        role=MessageRole.ASSISTANT, content=correct_str_without_prefix
                    )
                )
                agent_response = AgentChatResponse(response=correct_str_without_prefix)
                new_steps = []
            else:
                # another round of reflection/correction will take place
                task.extra_state["new_memory"].put(correct_msg)
                agent_response = AgentChatResponse(
                    response=str(correct_msg), sources=critique_response.sources
                )
                new_steps = [
                    step.get_next_step(
                        step_id=str(uuid.uuid4()),
                        # NOTE: input is unused
                        input=None,
                        step_state=state,
                    )
                ]

        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done | (self.max_iterations == state["count"]),
            next_steps=new_steps,
        )

    # Steam methods
    @dispatcher.span
    @trace_method("run_step")
    def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
        """运行步骤(流式)。"""
        raise NotImplementedError(
            "Stream not supported for tool-interactive reflection agent"
        )

    @dispatcher.span
    @trace_method("run_step")
    async def astream_step(
        self, step: TaskStep, task: Task, **kwargs: Any
    ) -> TaskStepOutput:
        """运行步骤(异步流)。"""
        raise NotImplementedError(
            "Stream not supported for tool-interactive reflection agent"
        )

    def get_all_messages(self, task: Task) -> List[ChatMessage]:
        return (
            self.prefix_messages
            + task.memory.get()
            + task.extra_state["new_memory"].get_all()
        )

    def finalize_task(self, task: Task, **kwargs: Any) -> None:
        """完成任务,在所有步骤都完成之后。"""
        # add new messages to memory
        task.memory.set(task.extra_state["new_memory"].get_all())
        # reset new memory
        task.extra_state["new_memory"].reset()

from_defaults classmethod #

from_defaults(
    critique_agent_worker: FunctionCallingAgentWorker,
    critique_template: str,
    correction_llm: Optional[LLM] = None,
    max_iterations: int = DEFAULT_MAX_ITERATIONS,
    stopping_callable: Optional[StoppingCallable] = None,
    callback_manager: Optional[CallbackManager] = None,
    verbose: bool = False,
    **kwargs: Any
) -> ToolInteractiveReflectionAgentWorker

方便的构造方法,用于一组BaseTools(可选)。

Source code in llama_index/agent/introspective/reflective/tool_interactive_reflection.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@classmethod
def from_defaults(
    cls,
    critique_agent_worker: FunctionCallingAgentWorker,
    critique_template: str,
    correction_llm: Optional[LLM] = None,
    max_iterations: int = DEFAULT_MAX_ITERATIONS,
    stopping_callable: Optional[StoppingCallable] = None,
    callback_manager: Optional[CallbackManager] = None,
    verbose: bool = False,
    **kwargs: Any,
) -> "ToolInteractiveReflectionAgentWorker":
    """方便的构造方法,用于一组BaseTools(可选)。"""
    if correction_llm is None:
        try:
            from llama_index.llms.openai import OpenAI
        except ImportError:
            raise ImportError(
                "Missing OpenAI LLMs. Please run `pip install llama-index-llms-openai`."
            )
        correction_llm = OpenAI(model="gpt-4-turbo-preview", temperature=0)

    return cls(
        critique_agent_worker=critique_agent_worker,
        critique_template=critique_template,
        correction_llm=correction_llm,
        max_iterations=max_iterations,
        stopping_callable=stopping_callable,
        callback_manager=callback_manager or CallbackManager([]),
        verbose=verbose,
        **kwargs,
    )

initialize_step #

initialize_step(task: Task, **kwargs: Any) -> TaskStep

从任务中初始化步骤。

Source code in llama_index/agent/introspective/reflective/tool_interactive_reflection.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def initialize_step(self, task: Task, **kwargs: Any) -> TaskStep:
    """从任务中初始化步骤。"""
    # temporary memory for new messages
    new_memory = ChatMemoryBuffer.from_defaults()

    # put current history in new memory
    messages = task.memory.get()
    for message in messages:
        new_memory.put(message)

    # initialize task state
    task_state = {
        "new_memory": new_memory,
        "sources": [],
    }
    task.extra_state.update(task_state)

    return TaskStep(
        task_id=task.task_id,
        step_id=str(uuid.uuid4()),
        input=task.input,
        step_state={"count": 0},
    )

run_step #

run_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤。

Source code in llama_index/agent/introspective/reflective/tool_interactive_reflection.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
@dispatcher.span
@trace_method("run_step")
def run_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """运行步骤。"""
    state = step.step_state
    state["count"] += 1

    messages = task.extra_state["new_memory"].get()
    prev_correct_str = messages[-1].content
    prev_correct_str_without_prefix = self._remove_correction_str_prefix(
        prev_correct_str
    )

    # critique phase
    critique_response = self._critique(input_str=prev_correct_str_without_prefix)
    task.extra_state["sources"].extend(critique_response.sources)

    is_done = False
    if self.stopping_callable:
        is_done = self.stopping_callable(critique_str=critique_response.response)

    critique_msg = ChatMessage(
        role=MessageRole.USER, content=critique_response.response
    )
    task.extra_state["new_memory"].put(critique_msg)

    # correction phase
    if is_done:
        # no correction to be made prev correction is sufficient
        agent_response = AgentChatResponse(
            response=prev_correct_str_without_prefix,
            sources=task.extra_state["sources"],
        )
        task.extra_state["new_memory"].put(
            ChatMessage(
                role=MessageRole.ASSISTANT, content=prev_correct_str_without_prefix
            )
        )
        new_steps = []
    else:
        # generate a new correction
        correct_msg = self._correct(
            input_str=prev_correct_str_without_prefix,
            critique=critique_response.response,
        )
        correct_str_without_prefix = self._remove_correction_str_prefix(
            correct_msg.content
        )

        # reached max iterations, no further reflection/correction cycles
        if self.max_iterations == state["count"]:
            task.extra_state["new_memory"].put(
                ChatMessage(
                    role=MessageRole.ASSISTANT, content=correct_str_without_prefix
                )
            )
            agent_response = AgentChatResponse(response=correct_str_without_prefix)
            new_steps = []
        else:
            # another round of reflection/correction will take place
            task.extra_state["new_memory"].put(correct_msg)
            agent_response = AgentChatResponse(
                response=str(correct_msg), sources=critique_response.sources
            )
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                    step_state=state,
                )
            ]

    return TaskStepOutput(
        output=agent_response,
        task_step=step,
        is_last=is_done | (self.max_iterations == state["count"]),
        next_steps=new_steps,
    )

arun_step async #

arun_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(异步)。

Source code in llama_index/agent/introspective/reflective/tool_interactive_reflection.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
@dispatcher.span
@trace_method("run_step")
async def arun_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """运行步骤(异步)。"""
    state = step.step_state
    state["count"] += 1

    messages = task.extra_state["new_memory"].get()
    prev_correct_str = messages[-1].content
    prev_correct_str_without_prefix = self._remove_correction_str_prefix(
        prev_correct_str
    )

    # critique phase
    critique_response = await self._acritique(
        input_str=prev_correct_str_without_prefix
    )
    task.extra_state["sources"].extend(critique_response.sources)

    is_done = False
    if self.stopping_callable:
        is_done = self.stopping_callable(critique_str=critique_response.response)

    critique_msg = ChatMessage(
        role=MessageRole.USER, content=critique_response.response
    )
    task.extra_state["new_memory"].put(critique_msg)

    # correction phase
    if is_done:
        # no correction to be made prev correction is sufficient
        agent_response = AgentChatResponse(
            response=prev_correct_str_without_prefix,
            sources=task.extra_state["sources"],
        )
        task.extra_state["new_memory"].put(
            ChatMessage(
                role=MessageRole.ASSISTANT, content=prev_correct_str_without_prefix
            )
        )
        new_steps = []
    else:
        # generate a new correction
        correct_msg = await self._acorrect(
            input_str=prev_correct_str_without_prefix,
            critique=critique_response.response,
        )
        correct_str_without_prefix = self._remove_correction_str_prefix(
            correct_msg.content
        )

        # reached max iterations, no further reflection/correction cycles
        if self.max_iterations == state["count"]:
            task.extra_state["new_memory"].put(
                ChatMessage(
                    role=MessageRole.ASSISTANT, content=correct_str_without_prefix
                )
            )
            agent_response = AgentChatResponse(response=correct_str_without_prefix)
            new_steps = []
        else:
            # another round of reflection/correction will take place
            task.extra_state["new_memory"].put(correct_msg)
            agent_response = AgentChatResponse(
                response=str(correct_msg), sources=critique_response.sources
            )
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                    step_state=state,
                )
            ]

    return TaskStepOutput(
        output=agent_response,
        task_step=step,
        is_last=is_done | (self.max_iterations == state["count"]),
        next_steps=new_steps,
    )

stream_step #

stream_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(流式)。

Source code in llama_index/agent/introspective/reflective/tool_interactive_reflection.py
404
405
406
407
408
409
410
@dispatcher.span
@trace_method("run_step")
def stream_step(self, step: TaskStep, task: Task, **kwargs: Any) -> TaskStepOutput:
    """运行步骤(流式)。"""
    raise NotImplementedError(
        "Stream not supported for tool-interactive reflection agent"
    )

astream_step async #

astream_step(
    step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput

运行步骤(异步流)。

Source code in llama_index/agent/introspective/reflective/tool_interactive_reflection.py
412
413
414
415
416
417
418
419
420
@dispatcher.span
@trace_method("run_step")
async def astream_step(
    self, step: TaskStep, task: Task, **kwargs: Any
) -> TaskStepOutput:
    """运行步骤(异步流)。"""
    raise NotImplementedError(
        "Stream not supported for tool-interactive reflection agent"
    )

finalize_task #

finalize_task(task: Task, **kwargs: Any) -> None

完成任务,在所有步骤都完成之后。

Source code in llama_index/agent/introspective/reflective/tool_interactive_reflection.py
429
430
431
432
433
434
def finalize_task(self, task: Task, **kwargs: Any) -> None:
    """完成任务,在所有步骤都完成之后。"""
    # add new messages to memory
    task.memory.set(task.extra_state["new_memory"].get_all())
    # reset new memory
    task.extra_state["new_memory"].reset()