高级文本到SQL查询管道¶
在本指南中,我们将向您展示如何使用我们的查询管道语法在您的数据上设置文本到SQL查询管道。
这使您可以灵活地使用其他技术来增强文本到SQL的功能。我们将在下面的部分中展示这些内容:
- 查询时表格检索:动态检索文本到SQL提示中的相关表格。
- 查询时样本行检索:嵌入/索引每一行,并动态检索文本到SQL提示中每个表格的示例行。
我们的开箱即用的管道包括我们的NLSQLTableQueryEngine
和SQLTableRetrieverQueryEngine
。(如果您想查看使用这些模块的文本到SQL指南,请查看这里)。本指南实现了这些模块的高级版本,使您能够最大限度地灵活应用到您自己的设置中。
注意: 任何文本到SQL应用都应该意识到执行任意SQL查询可能存在安全风险。建议采取必要的预防措施,例如使用受限角色、只读数据库、沙盒等。
加载和摄入数据¶
加载数据¶
我们使用WikiTableQuestions数据集(Pasupat和Liang 2015)作为我们的测试数据集。
我们遍历一个文件夹中的所有csv文件,将每个文件存储在一个sqlite数据库中(然后我们将在每个表模式上构建一个对象索引)。
%pip install llama-index-llms-openai
!wget "https://github.com/ppasupat/WikiTableQuestions/releases/download/v1.0.2/WikiTableQuestions-1.0.2-compact.zip" -O data.zip
!unzip data.zip
import pandas as pd
from pathlib import Path
data_dir = Path("./WikiTableQuestions/csv/200-csv")
csv_files = sorted([f for f in data_dir.glob("*.csv")])
dfs = []
for csv_file in csv_files:
print(f"processing file: {csv_file}")
try:
df = pd.read_csv(csv_file)
dfs.append(df)
except Exception as e:
print(f"Error parsing {csv_file}: {str(e)}")
从每个表中提取表名和摘要¶
在这里,我们使用gpt-3.5来从每个表中提取表名(带下划线)和摘要,使用我们的Pydantic程序。
tableinfo_dir = "WikiTableQuestions_TableInfo"
!mkdir {tableinfo_dir}
from llama_index.core.program import LLMTextCompletionProgram
from llama_index.core.bridge.pydantic import BaseModel, Field
from llama_index.llms.openai import OpenAI
class TableInfo(BaseModel):
"""关于结构化表的信息。"""
table_name: str = Field(
..., description="表名(必须使用下划线,不能有空格)"
)
table_summary: str = Field(
..., description="表的简短、简洁的摘要/标题"
)
prompt_str = """\
给我一个符合以下JSON格式的表摘要。
- 表名必须对该表唯一,并在简洁的同时描述该表。
- 不要输出通用的表名(例如:table,my_table)。
不要将表名设置为以下内容之一:{exclude_table_name_list}
表格:
{table_str}
摘要:"""
program = LLMTextCompletionProgram.from_defaults(
output_cls=TableInfo,
llm=OpenAI(model="gpt-3.5-turbo"),
prompt_template_str=prompt_str,
)
import json
def _get_tableinfo_with_index(idx: int) -> str:
results_gen = Path(tableinfo_dir).glob(f"{idx}_*")
results_list = list(results_gen)
if len(results_list) == 0:
return None
elif len(results_list) == 1:
path = results_list[0]
return TableInfo.parse_file(path)
else:
raise ValueError(
f"匹配索引的文件超过一个:{list(results_gen)}"
)
table_names = set()
table_infos = []
for idx, df in enumerate(dfs):
table_info = _get_tableinfo_with_index(idx)
if table_info:
table_infos.append(table_info)
else:
while True:
df_str = df.head(10).to_csv()
table_info = program(
table_str=df_str,
exclude_table_name_list=str(list(table_names)),
)
table_name = table_info.table_name
print(f"处理表格:{table_name}")
if table_name not in table_names:
table_names.add(table_name)
break
else:
# 再试一次
print(f"表格名 {table_name} 已存在,正在尝试再次处理。")
pass
out_file = f"{tableinfo_dir}/{idx}_{table_name}.json"
json.dump(table_info.dict(), open(out_file, "w"))
table_infos.append(table_info)
将数据放入SQL数据库¶
我们使用sqlalchemy
,这是一个流行的SQL数据库工具包,用于加载所有的表格。
# 将数据放入sqlite数据库
from sqlalchemy import (
create_engine, # 创建引擎
MetaData, # 元数据
Table, # 表
Column, # 列
String, # 字符串
Integer, # 整数
)
import re # 导入re模块
# 创建一个经过清理的列名的函数
def sanitize_column_name(col_name):
# 移除特殊字符并用下划线替换空格
return re.sub(r"\W+", "_", col_name)
# 使用SQLAlchemy从DataFrame创建表的函数
def create_table_from_dataframe(
df: pd.DataFrame, table_name: str, engine, metadata_obj
):
# 清理列名
sanitized_columns = {col: sanitize_column_name(col) for col in df.columns}
df = df.rename(columns=sanitized_columns)
# 根据DataFrame的列和数据类型动态创建列
columns = [
Column(col, String if dtype == "object" else Integer)
for col, dtype in zip(df.columns, df.dtypes)
]
# 使用定义的列创建表
table = Table(table_name, metadata_obj, *columns)
# 在数据库中创建表
metadata_obj.create_all(engine)
# 将DataFrame中的数据插入表中
with engine.connect() as conn:
for _, row in df.iterrows():
insert_stmt = table.insert().values(**row.to_dict())
conn.execute(insert_stmt)
conn.commit()
engine = create_engine("sqlite:///:memory:") # 创建内存中的sqlite引擎
metadata_obj = MetaData() # 创建元数据对象
for idx, df in enumerate(dfs):
tableinfo = _get_tableinfo_with_index(idx)
print(f"Creating table: {tableinfo.table_name}") # 创建表
create_table_from_dataframe(df, tableinfo.table_name, engine, metadata_obj) # 从DataFrame创建表
# 设置Arize Phoenix用于日志记录/可观测性
import phoenix as px
import llama_index.core
px.launch_app()
llama_index.core.set_global_handler("arize_phoenix")
🌍 To view the Phoenix app in your browser, visit http://127.0.0.1:6006/ 📺 To view the Phoenix app in a notebook, run `px.active_session().view()` 📖 For more information on how to use Phoenix, check out https://docs.arize.com/phoenix
对象索引,检索器,SQL数据库
from llama_index.core.objects import (
SQLTableNodeMapping,
ObjectIndex,
SQLTableSchema,
)
from llama_index.core import SQLDatabase, VectorStoreIndex
sql_database = SQLDatabase(engine)
table_node_mapping = SQLTableNodeMapping(sql_database)
table_schema_objs = [
SQLTableSchema(table_name=t.table_name, context_str=t.table_summary)
for t in table_infos
] # 为每个表添加一个SQLTableSchema
obj_index = ObjectIndex.from_objects(
table_schema_objs,
table_node_mapping,
VectorStoreIndex,
)
obj_retriever = obj_index.as_retriever(similarity_top_k=3)
SQLRetriever + 表解析器¶
这个项目旨在使用SQLRetriever和表解析器来处理和分析结构化数据。SQLRetriever是一个用于执行SQL查询的Python库,而表解析器用于解析和处理表格数据。通过结合这两个工具,可以更轻松地从数据库中提取数据,并对其进行进一步的分析和处理。
from llama_index.core.retrievers import SQLRetriever
from typing import List
from llama_index.core.query_pipeline import FnComponent
sql_retriever = SQLRetriever(sql_database)
def get_table_context_str(table_schema_objs: List[SQLTableSchema]):
"""获取表上下文字符串。"""
context_strs = []
for table_schema_obj in table_schema_objs:
table_info = sql_database.get_single_table_info(
table_schema_obj.table_name
)
if table_schema_obj.context_str:
table_opt_context = " 表描述为:"
table_opt_context += table_schema_obj.context_str
table_info += table_opt_context
context_strs.append(table_info)
return "\n\n".join(context_strs)
table_parser_component = FnComponent(fn=get_table_context_str)
Text-to-SQL Prompt + 输出解析器¶
这个项目包含两部分:
- Text-to-SQL Prompt:这是一个用于将自然语言问题转换成SQL查询的模型。
- 输出解析器:这是一个用于解析Text-to-SQL模型输出的工具,以便更容易地理解和使用生成的SQL查询。
在这个项目中,我们将翻译python文件中的markdown内容。
from llama_index.core.prompts.default_prompts import DEFAULT_TEXT_TO_SQL_PROMPT
from llama_index.core import PromptTemplate
from llama_index.core.query_pipeline import FnComponent
from llama_index.core.llms import ChatResponse
def parse_response_to_sql(response: ChatResponse) -> str:
"""将响应解析为SQL。"""
response = response.message.content
sql_query_start = response.find("SQLQuery:")
if sql_query_start != -1:
response = response[sql_query_start:]
# TODO: 在Python 3.9+后使用removeprefix移动
if response.startswith("SQLQuery:"):
response = response[len("SQLQuery:") :]
sql_result_start = response.find("SQLResult:")
if sql_result_start != -1:
response = response[:sql_result_start]
return response.strip().strip("```").strip()
sql_parser_component = FnComponent(fn=parse_response_to_sql)
text2sql_prompt = DEFAULT_TEXT_TO_SQL_PROMPT.partial_format(
dialect=engine.dialect.name
)
print(text2sql_prompt.template)
Given an input question, first create a syntactically correct {dialect} query to run, then look at the results of the query and return the answer. You can order the results by a relevant column to return the most interesting examples in the database. Never query for all the columns from a specific table, only ask for a few relevant columns given the question. Pay attention to use only the column names that you can see in the schema description. Be careful to not query for columns that do not exist. Pay attention to which column is in which table. Also, qualify column names with the table name when needed. You are required to use the following format, each taking one line: Question: Question here SQLQuery: SQL Query to run SQLResult: Result of the SQLQuery Answer: Final answer here Only use tables listed below. {schema} Question: {query_str} SQLQuery:
响应合成提示
response_synthesis_prompt_str = (
"Given an input question, synthesize a response from the query results.\n"
"Query: {query_str}\n"
"SQL: {sql_query}\n"
"SQL Response: {context_str}\n"
"Response: "
)
response_synthesis_prompt = PromptTemplate(
response_synthesis_prompt_str,
)
llm = OpenAI(model="gpt-3.5-turbo")
定义查询管道¶
现在组件已经就位,让我们来定义查询管道!
from llama_index.core.query_pipeline import (
QueryPipeline as QP,
Link,
InputComponent,
CustomQueryComponent,
)
qp = QP(
modules={
"input": InputComponent(),
"table_retriever": obj_retriever,
"table_output_parser": table_parser_component,
"text2sql_prompt": text2sql_prompt,
"text2sql_llm": llm,
"sql_output_parser": sql_parser_component,
"sql_retriever": sql_retriever,
"response_synthesis_prompt": response_synthesis_prompt,
"response_synthesis_llm": llm,
},
verbose=True,
)
qp.add_chain(["input", "table_retriever", "table_output_parser"])
qp.add_link("input", "text2sql_prompt", dest_key="query_str")
qp.add_link("table_output_parser", "text2sql_prompt", dest_key="schema")
qp.add_chain(
["text2sql_prompt", "text2sql_llm", "sql_output_parser", "sql_retriever"]
)
qp.add_link(
"sql_output_parser", "response_synthesis_prompt", dest_key="sql_query"
)
qp.add_link(
"sql_retriever", "response_synthesis_prompt", dest_key="context_str"
)
qp.add_link("input", "response_synthesis_prompt", dest_key="query_str")
qp.add_link("response_synthesis_prompt", "response_synthesis_llm")
可视化查询管道¶
查询管道语法的一个非常好的特性是可以通过networkx轻松地将其可视化为图形。
from pyvis.network import Network
net = Network(notebook=True, cdn_resources="in_line", directed=True)
net.from_nx(qp.dag)
# 将网络保存为"text2sql_dag.html"
net.write_html("text2sql_dag.html")
from IPython.display import display, HTML
# 读取HTML文件的内容
with open("text2sql_dag.html", "r") as file:
html_content = file.read()
# 显示HTML内容
display(HTML(html_content))
运行一些查询!¶
现在我们已经准备好在整个流水线上运行一些查询了。
response = qp.run(
query="What was the year that The Notorious B.I.G was signed to Bad Boy?"
)
print(str(response))
> Running module input with input: query: What was the year that The Notorious B.I.G was signed to Bad Boy? > Running module table_retriever with input: input: What was the year that The Notorious B.I.G was signed to Bad Boy? > Running module table_output_parser with input: table_schema_objs: [SQLTableSchema(table_name='Bad_Boy_Artists', context_str='List of artists signed to Bad Boy Records and their album releases'), SQLTableSchema(table_name='Bad_Boy_Artists', context_str='List of artis... > Running module text2sql_prompt with input: query_str: What was the year that The Notorious B.I.G was signed to Bad Boy? schema: Table 'Bad_Boy_Artists' has columns: Act (VARCHAR), Year_signed (INTEGER), _Albums_released_under_Bad_Boy (VARCHAR), and foreign keys: . The table description is: List of artists signed to Bad Boy Rec... > Running module text2sql_llm with input: messages: Given an input question, first create a syntactically correct sqlite query to run, then look at the results of the query and return the answer. You can order the results by a relevant column to return... > Running module sql_output_parser with input: response: assistant: SELECT Year_signed FROM Bad_Boy_Artists WHERE Act = 'The Notorious B.I.G' SQLResult: 1993 Answer: The Notorious B.I.G was signed to Bad Boy in 1993. RAW RESPONSE SELECT Year_signed FROM Bad_Boy_Artists WHERE Act = 'The Notorious B.I.G' SQLResult: 1993 Answer: The Notorious B.I.G was signed to Bad Boy in 1993. > Running module sql_retriever with input: input: SELECT Year_signed FROM Bad_Boy_Artists WHERE Act = 'The Notorious B.I.G' > Running module response_synthesis_prompt with input: query_str: What was the year that The Notorious B.I.G was signed to Bad Boy? sql_query: SELECT Year_signed FROM Bad_Boy_Artists WHERE Act = 'The Notorious B.I.G' context_str: [NodeWithScore(node=TextNode(id_='4ae2f8fc-b803-4238-8433-7a431c2df391', embedding=None, metadata={}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='c336a1cbf9... > Running module response_synthesis_llm with input: messages: Given an input question, synthesize a response from the query results. Query: What was the year that The Notorious B.I.G was signed to Bad Boy? SQL: SELECT Year_signed FROM Bad_Boy_Artists WHERE Act =... assistant: The Notorious B.I.G was signed to Bad Boy in 1993.
response = qp.run(query="Who won best director in the 1972 academy awards")
print(str(response))
> Running module input with input: query: Who won best directory in the 1972 academy awards > Running module table_retriever with input: input: Who won best directory in the 1972 academy awards > Running module table_output_parser with input: table_schema_objs: [SQLTableSchema(table_name='Academy_Awards_1972', context_str='List of award categories and nominees for the 1972 Academy Awards'), SQLTableSchema(table_name='Academy_Awards_1972', context_str='List o... > Running module text2sql_prompt with input: query_str: Who won best directory in the 1972 academy awards schema: Table 'Academy_Awards_1972' has columns: Award (VARCHAR), Category (VARCHAR), Nominee (VARCHAR), Result (VARCHAR), and foreign keys: . The table description is: List of award categories and nominees f... > Running module text2sql_llm with input: messages: Given an input question, first create a syntactically correct sqlite query to run, then look at the results of the query and return the answer. You can order the results by a relevant column to return... > Running module sql_output_parser with input: response: assistant: SELECT Nominee FROM Academy_Awards_1972 WHERE Category = 'Best Director' AND Result = 'Won' SQLResult: The result of the SQLQuery will be the name of the director who won the Best Director ... RAW RESPONSE SELECT Nominee FROM Academy_Awards_1972 WHERE Category = 'Best Director' AND Result = 'Won' SQLResult: The result of the SQLQuery will be the name of the director who won the Best Director award in the 1972 Academy Awards. Answer: The winner of the Best Director award in the 1972 Academy Awards was [Director's Name]. > Running module sql_retriever with input: input: SELECT Nominee FROM Academy_Awards_1972 WHERE Category = 'Best Director' AND Result = 'Won' > Running module response_synthesis_prompt with input: query_str: Who won best directory in the 1972 academy awards sql_query: SELECT Nominee FROM Academy_Awards_1972 WHERE Category = 'Best Director' AND Result = 'Won' context_str: [NodeWithScore(node=TextNode(id_='2ebd2cb3-7836-4f93-9898-4c0798da4a41', embedding=None, metadata={}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='a74ca5f33c... > Running module response_synthesis_llm with input: messages: Given an input question, synthesize a response from the query results. Query: Who won best directory in the 1972 academy awards SQL: SELECT Nominee FROM Academy_Awards_1972 WHERE Category = 'Best Dire... assistant: The winner for Best Director in the 1972 Academy Awards was William Friedkin.
response = qp.run(query="What was the term of Pasquale Preziosa?")
print(str(response))
> Running module input with input: query: What was the term of Pasquale Preziosa? > Running module table_retriever with input: input: What was the term of Pasquale Preziosa? > Running module table_output_parser with input: table_schema_objs: [SQLTableSchema(table_name='Italian_Presidents', context_str='List of Italian Presidents and their terms in office'), SQLTableSchema(table_name='Italian_Presidents', context_str='List of Italian Presi... > Running module text2sql_prompt with input: query_str: What was the term of Pasquale Preziosa? schema: Table 'Italian_Presidents' has columns: Name (VARCHAR), Term_start (VARCHAR), Term_end (VARCHAR), and foreign keys: . The table description is: List of Italian Presidents and their terms in office Ta... > Running module text2sql_llm with input: messages: Given an input question, first create a syntactically correct sqlite query to run, then look at the results of the query and return the answer. You can order the results by a relevant column to return... > Running module sql_output_parser with input: response: assistant: SELECT Term_start, Term_end FROM Italian_Presidents WHERE Name = 'Pasquale Preziosa' SQLResult: Term_start = '2006-05-18', Term_end = '2006-05-22' Answer: Pasquale Preziosa's term was from ... RAW RESPONSE SELECT Term_start, Term_end FROM Italian_Presidents WHERE Name = 'Pasquale Preziosa' SQLResult: Term_start = '2006-05-18', Term_end = '2006-05-22' Answer: Pasquale Preziosa's term was from May 18, 2006 to May 22, 2006. > Running module sql_retriever with input: input: SELECT Term_start, Term_end FROM Italian_Presidents WHERE Name = 'Pasquale Preziosa' > Running module response_synthesis_prompt with input: query_str: What was the term of Pasquale Preziosa? sql_query: SELECT Term_start, Term_end FROM Italian_Presidents WHERE Name = 'Pasquale Preziosa' context_str: [NodeWithScore(node=TextNode(id_='75dfe777-3186-4a57-8969-9e33fb8ab41a', embedding=None, metadata={}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='99f2d91e91... > Running module response_synthesis_llm with input: messages: Given an input question, synthesize a response from the query results. Query: What was the term of Pasquale Preziosa? SQL: SELECT Term_start, Term_end FROM Italian_Presidents WHERE Name = 'Pasquale Pr... assistant: Pasquale Preziosa's term started on 25 February 2013 and he is currently the incumbent.
2. 高级功能2:具有查询时间行检索(以及表检索)的文本到SQL转换¶
在前面的示例中的一个问题是,如果用户查询要求“The Notorious BIG”,但艺术家的存储方式是“The Notorious B.I.G”,那么生成的SELECT语句可能不会返回任何匹配项。
我们可以通过为每个表检索少量示例行来缓解这个问题。一个简单的选择是只取前k行。相反,我们嵌入、索引并检索与用户查询相关的k行,为文本到SQL LLM提供生成SQL时最具上下文相关性的信息。
现在我们扩展我们的查询流程。
为每个表建立索引¶
我们嵌入/索引每个表的行,从而每个表对应一个索引。
from llama_index.core import VectorStoreIndex, load_index_from_storage
from sqlalchemy import text
from llama_index.core.schema import TextNode
from llama_index.core import StorageContext
import os
from pathlib import Path
from typing import Dict
def index_all_tables(
sql_database: SQLDatabase, table_index_dir: str = "table_index_dir"
) -> Dict[str, VectorStoreIndex]:
"""索引所有表格。"""
if not Path(table_index_dir).exists():
os.makedirs(table_index_dir)
vector_index_dict = {}
engine = sql_database.engine
for table_name in sql_database.get_usable_table_names():
print(f"正在索引表格中的行:{table_name}")
if not os.path.exists(f"{table_index_dir}/{table_name}"):
# 从表中获取所有行
with engine.connect() as conn:
cursor = conn.execute(text(f'SELECT * FROM "{table_name}"'))
result = cursor.fetchall()
row_tups = []
for row in result:
row_tups.append(tuple(row))
# 索引每一行,放入向量存储索引
nodes = [TextNode(text=str(t)) for t in row_tups]
# 放入向量存储索引(默认使用OpenAIEmbeddings)
index = VectorStoreIndex(nodes)
# 保存索引
index.set_index_id("vector_index")
index.storage_context.persist(f"{table_index_dir}/{table_name}")
else:
# 重建存储上下文
storage_context = StorageContext.from_defaults(
persist_dir=f"{table_index_dir}/{table_name}"
)
# 加载索引
index = load_index_from_storage(
storage_context, index_id="vector_index"
)
vector_index_dict[table_name] = index
return vector_index_dict
vector_index_dict = index_all_tables(sql_database)
Indexing rows in table: Academy_Awards_1972 Indexing rows in table: Actress_Awards Indexing rows in table: Actress_Awards_Table Indexing rows in table: Actress_Filmography Indexing rows in table: Afrikaans_Language_Translations Indexing rows in table: Airport_Information Indexing rows in table: Average_Temperature_Precipitation Indexing rows in table: Average_Temperature_and_Precipitation Indexing rows in table: BBC_Radio_Costs Indexing rows in table: Bad_Boy_Artists Indexing rows in table: Boxing_Matches Indexing rows in table: Club_Performance_Norway Indexing rows in table: Disappeared_Persons Indexing rows in table: Drop Events Indexing rows in table: European_Football_Standings Indexing rows in table: Football_Team_Records Indexing rows in table: Gortynia_Municipalities Indexing rows in table: Grammy_Awards Indexing rows in table: Italian_Presidents Indexing rows in table: Kentucky_Derby_Winners Indexing rows in table: Kinase_Cancer_Relationships Indexing rows in table: Kodachrome_Film Indexing rows in table: New_Mexico_Officials Indexing rows in table: Number_Encoding_Probability Indexing rows in table: Peak_Chart_Positions Indexing rows in table: Political Positions of Lord Beaverbrook Indexing rows in table: Radio_Stations Indexing rows in table: Renaissance_Discography Indexing rows in table: Schools_in_Ohio Indexing rows in table: Temperature_and_Precipitation Indexing rows in table: Voter_Party_Statistics Indexing rows in table: Voter_Registration_Statistics Indexing rows in table: Yamato_District_Area_Population Indexing rows in table: Yearly_Deaths_and_Accidents
test_retriever = vector_index_dict["Bad_Boy_Artists"].as_retriever(
similarity_top_k=1
)
nodes = test_retriever.retrieve("P. Diddy")
print(nodes[0].get_content())
('Diddy', 1993, '6')
定义扩展的表解析器组件¶
我们扩展了我们的 table_parser_component
的功能,不仅可以返回相关的表模式,还可以返回每个表模式的相关行。
现在它接受 table_schema_objs
(表检索器的输出),还接受原始的 query_str
,然后用于检索相关行的向量检索。
from llama_index.core.retrievers import SQLRetriever
from typing import List
from llama_index.core.query_pipeline import FnComponent
sql_retriever = SQLRetriever(sql_database)
def get_table_context_and_rows_str(
query_str: str, table_schema_objs: List[SQLTableSchema]
):
"""获取表上下文字符串。"""
context_strs = []
for table_schema_obj in table_schema_objs:
# 首先附加表信息 + 额外上下文
table_info = sql_database.get_single_table_info(
table_schema_obj.table_name
)
if table_schema_obj.context_str:
table_opt_context = " 表描述为:"
table_opt_context += table_schema_obj.context_str
table_info += table_opt_context
# 同时查找向量索引以返回相关表行
vector_retriever = vector_index_dict[
table_schema_obj.table_name
].as_retriever(similarity_top_k=2)
relevant_nodes = vector_retriever.retrieve(query_str)
if len(relevant_nodes) > 0:
table_row_context = "\n以下是一些相关示例行(值与上述列的顺序相同)\n"
for node in relevant_nodes:
table_row_context += str(node.get_content()) + "\n"
table_info += table_row_context
context_strs.append(table_info)
return "\n\n".join(context_strs)
table_parser_component = FnComponent(fn=get_table_context_and_rows_str)
定义扩展查询管道¶
这看起来与第1节中的查询管道相似,但是使用了升级版的table_parser_component。
from llama_index.core.query_pipeline import (
QueryPipeline as QP,
Link,
InputComponent,
CustomQueryComponent,
)
qp = QP(
modules={
"input": InputComponent(),
"table_retriever": obj_retriever,
"table_output_parser": table_parser_component,
"text2sql_prompt": text2sql_prompt,
"text2sql_llm": llm,
"sql_output_parser": sql_parser_component,
"sql_retriever": sql_retriever,
"response_synthesis_prompt": response_synthesis_prompt,
"response_synthesis_llm": llm,
},
verbose=True,
)
qp.add_link("input", "table_retriever")
qp.add_link("input", "table_output_parser", dest_key="query_str")
qp.add_link(
"table_retriever", "table_output_parser", dest_key="table_schema_objs"
)
qp.add_link("input", "text2sql_prompt", dest_key="query_str")
qp.add_link("table_output_parser", "text2sql_prompt", dest_key="schema")
qp.add_chain(
["text2sql_prompt", "text2sql_llm", "sql_output_parser", "sql_retriever"]
)
qp.add_link(
"sql_output_parser", "response_synthesis_prompt", dest_key="sql_query"
)
qp.add_link(
"sql_retriever", "response_synthesis_prompt", dest_key="context_str"
)
qp.add_link("input", "response_synthesis_prompt", dest_key="query_str")
qp.add_link("response_synthesis_prompt", "response_synthesis_llm")
from pyvis.network import Network
net = Network(notebook=True, cdn_resources="in_line", directed=True)
net.from_nx(qp.dag)
net.show("text2sql_dag.html")
运行一些查询¶
现在,即使数据库中的条目与查询不完全匹配,我们也可以查询相关的条目。
response = qp.run(
query="What was the year that The Notorious BIG was signed to Bad Boy?"
)
print(str(response))
> Running module input with input: query: What was the year that The Notorious BIG was signed to Bad Boy? > Running module table_retriever with input: input: What was the year that The Notorious BIG was signed to Bad Boy? > Running module table_output_parser with input: query_str: What was the year that The Notorious BIG was signed to Bad Boy? table_schema_objs: [SQLTableSchema(table_name='Bad_Boy_Artists', context_str='List of artists signed to Bad Boy Records and their album releases'), SQLTableSchema(table_name='Bad_Boy_Artists', context_str='List of artis... > Running module text2sql_prompt with input: query_str: What was the year that The Notorious BIG was signed to Bad Boy? schema: Table 'Bad_Boy_Artists' has columns: Act (VARCHAR), Year_signed (INTEGER), _Albums_released_under_Bad_Boy (VARCHAR), and foreign keys: . The table description is: List of artists signed to Bad Boy Rec... > Running module text2sql_llm with input: messages: Given an input question, first create a syntactically correct sqlite query to run, then look at the results of the query and return the answer. You can order the results by a relevant column to return... > Running module sql_output_parser with input: response: assistant: SELECT Year_signed FROM Bad_Boy_Artists WHERE Act = 'The Notorious B.I.G' SQLResult: 1993 Answer: The Notorious BIG was signed to Bad Boy in 1993. RAW RESPONSE SELECT Year_signed FROM Bad_Boy_Artists WHERE Act = 'The Notorious B.I.G' SQLResult: 1993 Answer: The Notorious BIG was signed to Bad Boy in 1993. > Running module sql_retriever with input: input: SELECT Year_signed FROM Bad_Boy_Artists WHERE Act = 'The Notorious B.I.G' > Running module response_synthesis_prompt with input: query_str: What was the year that The Notorious BIG was signed to Bad Boy? sql_query: SELECT Year_signed FROM Bad_Boy_Artists WHERE Act = 'The Notorious B.I.G' context_str: [NodeWithScore(node=TextNode(id_='23214862-784c-4f2b-b489-39d61ea96580', embedding=None, metadata={}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='c336a1cbf9... > Running module response_synthesis_llm with input: messages: Given an input question, synthesize a response from the query results. Query: What was the year that The Notorious BIG was signed to Bad Boy? SQL: SELECT Year_signed FROM Bad_Boy_Artists WHERE Act = '... assistant: The Notorious BIG was signed to Bad Boy in 1993.