classSubQuestionQueryEngine(BaseQueryEngine):""" Sub question query engine. A query engine that breaks down a complex query (e.g. compare and contrast) into many sub questions and their target query engine for execution. After executing all sub questions, all responses are gathered and sent to response synthesizer to produce the final response. Args: question_gen (BaseQuestionGenerator): A module for generating sub questions given a complex question and tools. response_synthesizer (BaseSynthesizer): A response synthesizer for generating the final response query_engine_tools (Sequence[QueryEngineTool]): Tools to answer the sub questions. verbose (bool): whether to print intermediate questions and answers. Defaults to True use_async (bool): whether to execute the sub questions with asyncio. Defaults to True """def__init__(self,question_gen:BaseQuestionGenerator,response_synthesizer:BaseSynthesizer,query_engine_tools:Sequence[QueryEngineTool],callback_manager:Optional[CallbackManager]=None,verbose:bool=True,use_async:bool=False,)->None:self._question_gen=question_genself._response_synthesizer=response_synthesizerself._metadatas=[x.metadataforxinquery_engine_tools]self._query_engines={tool.metadata.name:tool.query_enginefortoolinquery_engine_tools}self._verbose=verboseself._use_async=use_asyncsuper().__init__(callback_manager)def_get_prompt_modules(self)->PromptMixinType:"""Get prompt sub-modules."""return{"question_gen":self._question_gen,"response_synthesizer":self._response_synthesizer,}@classmethoddeffrom_defaults(cls,query_engine_tools:Sequence[QueryEngineTool],llm:Optional[LLM]=None,question_gen:Optional[BaseQuestionGenerator]=None,response_synthesizer:Optional[BaseSynthesizer]=None,verbose:bool=True,use_async:bool=True,)->"SubQuestionQueryEngine":callback_manager=Settings.callback_manageriflen(query_engine_tools)>0:callback_manager=query_engine_tools[0].query_engine.callback_managerllm=llmorSettings.llmifquestion_genisNone:try:fromllama_index.question_gen.openaiimport(OpenAIQuestionGenerator,)# pants: no-infer-dep# try to use OpenAI function calling based question generator.# if incompatible, use general LLM question generatorquestion_gen=OpenAIQuestionGenerator.from_defaults(llm=llm)exceptImportErrorase:raiseImportError("`llama-index-question-gen-openai` package cannot be found. ""Please install it by using `pip install `llama-index-question-gen-openai`")exceptValueError:question_gen=LLMQuestionGenerator.from_defaults(llm=llm)synth=response_synthesizerorget_response_synthesizer(llm=llm,callback_manager=callback_manager,use_async=use_async,)returncls(question_gen,synth,query_engine_tools,callback_manager=callback_manager,verbose=verbose,use_async=use_async,)def_query(self,query_bundle:QueryBundle)->RESPONSE_TYPE:withself.callback_manager.event(CBEventType.QUERY,payload={EventPayload.QUERY_STR:query_bundle.query_str})asquery_event:sub_questions=self._question_gen.generate(self._metadatas,query_bundle)colors=get_color_mapping([str(i)foriinrange(len(sub_questions))])ifself._verbose:print_text(f"Generated {len(sub_questions)} sub questions.\n")ifself._use_async:tasks=[self._aquery_subq(sub_q,color=colors[str(ind)])forind,sub_qinenumerate(sub_questions)]qa_pairs_all=run_async_tasks(tasks)qa_pairs_all=cast(List[Optional[SubQuestionAnswerPair]],qa_pairs_all)else:qa_pairs_all=[self._query_subq(sub_q,color=colors[str(ind)])forind,sub_qinenumerate(sub_questions)]# filter out sub questions that failedqa_pairs:List[SubQuestionAnswerPair]=list(filter(None,qa_pairs_all))nodes=[self._construct_node(pair)forpairinqa_pairs]source_nodes=[nodeforqa_pairinqa_pairsfornodeinqa_pair.sources]response=self._response_synthesizer.synthesize(query=query_bundle,nodes=nodes,additional_source_nodes=source_nodes,)query_event.on_end(payload={EventPayload.RESPONSE:response})returnresponseasyncdef_aquery(self,query_bundle:QueryBundle)->RESPONSE_TYPE:withself.callback_manager.event(CBEventType.QUERY,payload={EventPayload.QUERY_STR:query_bundle.query_str})asquery_event:sub_questions=awaitself._question_gen.agenerate(self._metadatas,query_bundle)colors=get_color_mapping([str(i)foriinrange(len(sub_questions))])ifself._verbose:print_text(f"Generated {len(sub_questions)} sub questions.\n")tasks=[self._aquery_subq(sub_q,color=colors[str(ind)])forind,sub_qinenumerate(sub_questions)]qa_pairs_all=awaitasyncio.gather(*tasks)qa_pairs_all=cast(List[Optional[SubQuestionAnswerPair]],qa_pairs_all)# filter out sub questions that failedqa_pairs:List[SubQuestionAnswerPair]=list(filter(None,qa_pairs_all))nodes=[self._construct_node(pair)forpairinqa_pairs]source_nodes=[nodeforqa_pairinqa_pairsfornodeinqa_pair.sources]response=awaitself._response_synthesizer.asynthesize(query=query_bundle,nodes=nodes,additional_source_nodes=source_nodes,)query_event.on_end(payload={EventPayload.RESPONSE:response})returnresponsedef_construct_node(self,qa_pair:SubQuestionAnswerPair)->NodeWithScore:node_text=(f"Sub question: {qa_pair.sub_q.sub_question}\nResponse: {qa_pair.answer}")returnNodeWithScore(node=TextNode(text=node_text))asyncdef_aquery_subq(self,sub_q:SubQuestion,color:Optional[str]=None)->Optional[SubQuestionAnswerPair]:try:withself.callback_manager.event(CBEventType.SUB_QUESTION,payload={EventPayload.SUB_QUESTION:SubQuestionAnswerPair(sub_q=sub_q)},)asevent:question=sub_q.sub_questionquery_engine=self._query_engines[sub_q.tool_name]ifself._verbose:print_text(f"[{sub_q.tool_name}] Q: {question}\n",color=color)response=awaitquery_engine.aquery(question)response_text=str(response)ifself._verbose:print_text(f"[{sub_q.tool_name}] A: {response_text}\n",color=color)qa_pair=SubQuestionAnswerPair(sub_q=sub_q,answer=response_text,sources=response.source_nodes)event.on_end(payload={EventPayload.SUB_QUESTION:qa_pair})returnqa_pairexceptValueError:logger.warning(f"[{sub_q.tool_name}] Failed to run {question}")returnNonedef_query_subq(self,sub_q:SubQuestion,color:Optional[str]=None)->Optional[SubQuestionAnswerPair]:try:withself.callback_manager.event(CBEventType.SUB_QUESTION,payload={EventPayload.SUB_QUESTION:SubQuestionAnswerPair(sub_q=sub_q)},)asevent:question=sub_q.sub_questionquery_engine=self._query_engines[sub_q.tool_name]ifself._verbose:print_text(f"[{sub_q.tool_name}] Q: {question}\n",color=color)response=query_engine.query(question)response_text=str(response)ifself._verbose:print_text(f"[{sub_q.tool_name}] A: {response_text}\n",color=color)qa_pair=SubQuestionAnswerPair(sub_q=sub_q,answer=response_text,sources=response.source_nodes)event.on_end(payload={EventPayload.SUB_QUESTION:qa_pair})returnqa_pairexceptValueError:logger.warning(f"[{sub_q.tool_name}] Failed to run {question}")returnNone
Source code in llama-index-core/llama_index/core/query_engine/sub_question_query_engine.py
2728293031323334
classSubQuestionAnswerPair(BaseModel):""" Pair of the sub question and optionally its answer (if its been answered yet). """sub_q:SubQuestionanswer:Optional[str]=Nonesources:List[NodeWithScore]=Field(default_factory=list)