##
# Copyright (c) 2024, Chad Juliano, Kinetica DB Inc.
##
"""Kinetica SQL生成LLM API。"""
import json
import logging
import os
import re
from importlib.metadata import version
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast
if TYPE_CHECKING:
import gpudb
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import (
AIMessage,
BaseMessage,
HumanMessage,
SystemMessage,
)
from langchain_core.output_parsers.transform import BaseOutputParser
from langchain_core.outputs import ChatGeneration, ChatResult, Generation
from langchain_core.pydantic_v1 import BaseModel, Field, root_validator
LOG = logging.getLogger(__name__)
# Kinetica pydantic API datatypes
class _KdtSuggestContext(BaseModel):
"""pydantic API 请求类型"""
table: Optional[str] = Field(default=None, title="Name of table")
description: Optional[str] = Field(default=None, title="Table description")
columns: List[str] = Field(default=None, title="Table columns list")
rules: Optional[List[str]] = Field(
default=None, title="Rules that apply to the table."
)
samples: Optional[Dict] = Field(
default=None, title="Samples that apply to the entire context."
)
def to_system_str(self) -> str:
lines = []
lines.append(f"CREATE TABLE {self.table} AS")
lines.append("(")
if not self.columns or len(self.columns) == 0:
ValueError("columns list can't be null.")
columns = []
for column in self.columns:
column = column.replace('"', "").strip()
columns.append(f" {column}")
lines.append(",\n".join(columns))
lines.append(");")
if self.description:
lines.append(f"COMMENT ON TABLE {self.table} IS '{self.description}';")
if self.rules and len(self.rules) > 0:
lines.append(
f"-- When querying table {self.table} the following rules apply:"
)
for rule in self.rules:
lines.append(f"-- * {rule}")
result = "\n".join(lines)
return result
class _KdtSuggestPayload(BaseModel):
"""pydantic API 请求类型"""
question: Optional[str]
context: List[_KdtSuggestContext]
def get_system_str(self) -> str:
lines = []
for table_context in self.context:
if table_context.table is None:
continue
context_str = table_context.to_system_str()
lines.append(context_str)
return "\n\n".join(lines)
def get_messages(self) -> List[Dict]:
messages = []
for context in self.context:
if context.samples is None:
continue
for question, answer in context.samples.items():
# unescape double quotes
answer = answer.replace("''", "'")
messages.append(dict(role="user", content=question or ""))
messages.append(dict(role="assistant", content=answer))
return messages
def to_completion(self) -> Dict:
messages = []
messages.append(dict(role="system", content=self.get_system_str()))
messages.extend(self.get_messages())
messages.append(dict(role="user", content=self.question or ""))
response = dict(messages=messages)
return response
class _KdtoSuggestRequest(BaseModel):
"""pydantic API 请求类型"""
payload: _KdtSuggestPayload
class _KdtMessage(BaseModel):
"""pydantic API响应类型"""
role: str = Field(default=None, title="One of [user|assistant|system]")
content: str
class _KdtChoice(BaseModel):
"""pydantic API响应类型"""
index: int
message: _KdtMessage = Field(default=None, title="The generated SQL")
finish_reason: str
class _KdtUsage(BaseModel):
"""pydantic API响应类型"""
prompt_tokens: int
completion_tokens: int
total_tokens: int
class _KdtSqlResponse(BaseModel):
"""pydantic API响应类型"""
id: str
object: str
created: int
model: str
choices: List[_KdtChoice]
usage: _KdtUsage
prompt: str = Field(default=None, title="The input question")
class _KdtCompletionResponse(BaseModel):
"""pydantic API响应类型"""
status: str
data: _KdtSqlResponse
class _KineticaLlmFileContextParser:
"""用于解析Kinetica LLM上下文数据文件的解析器。"""
# parse line into a dict containing role and content
PARSER = re.compile(r"^<\|(?P<role>\w+)\|>\W*(?P<content>.*)$", re.DOTALL)
@classmethod
def _removesuffix(cls, text: str, suffix: str) -> str:
if suffix and text.endswith(suffix):
return text[: -len(suffix)]
return text
@classmethod
def parse_dialogue_file(cls, input_file: os.PathLike) -> Dict:
path = Path(input_file)
# schema = path.name.removesuffix(".txt") python 3.9
schema = cls._removesuffix(path.name, ".txt")
lines = open(input_file).read()
return cls.parse_dialogue(lines, schema)
@classmethod
def parse_dialogue(cls, text: str, schema: str) -> Dict:
messages = []
system = None
lines = text.split("<|end|>")
user_message = None
for idx, line in enumerate(lines):
line = line.strip()
if len(line) == 0:
continue
match = cls.PARSER.match(line)
if match is None:
raise ValueError(f"Could not find starting token in: {line}")
groupdict = match.groupdict()
role = groupdict["role"]
if role == "system":
if system is not None:
raise ValueError(f"Only one system token allowed in: {line}")
system = groupdict["content"]
elif role == "user":
if user_message is not None:
raise ValueError(
f"Found user token without assistant token: {line}"
)
user_message = groupdict
elif role == "assistant":
if user_message is None:
raise Exception(f"Found assistant token without user token: {line}")
messages.append(user_message)
messages.append(groupdict)
user_message = None
else:
raise ValueError(f"Unknown token: {role}")
return {"schema": schema, "system": system, "messages": messages}
[docs]class KineticaUtil:
"""Kinetica实用函数。"""
[docs] @classmethod
def create_kdbc(
cls,
url: Optional[str] = None,
user: Optional[str] = None,
passwd: Optional[str] = None,
) -> "gpudb.GPUdb":
"""创建一个Connectica连接对象并验证连接。
如果对于一个或多个参数传递了None,则将尝试从相关环境变量中检索该值。
参数:
url: Kinetica的URL或如果为None,则为``KINETICA_URL``。
user: Kinetica的用户或如果为None,则为``KINETICA_USER``。
passwd: Kinetica的密码或如果为None,则为``KINETICA_PASSWD``。
返回:
Kinetica连接对象。
"""
try:
import gpudb
except ModuleNotFoundError:
raise ImportError(
"Could not import Kinetica python package. "
"Please install it with `pip install gpudb`."
)
url = cls._get_env("KINETICA_URL", url)
user = cls._get_env("KINETICA_USER", user)
passwd = cls._get_env("KINETICA_PASSWD", passwd)
options = gpudb.GPUdb.Options()
options.username = user
options.password = passwd
options.skip_ssl_cert_verification = True
options.disable_failover = True
options.logging_level = "INFO"
kdbc = gpudb.GPUdb(host=url, options=options)
LOG.info(
"Connected to Kinetica: {}. (api={}, server={})".format(
kdbc.get_url(), version("gpudb"), kdbc.server_version
)
)
return kdbc
@classmethod
def _get_env(cls, name: str, default: Optional[str]) -> str:
"""获取环境变量或使用默认值。"""
if default is not None:
return default
result = os.getenv(name)
if result is not None:
return result
raise ValueError(
f"Parameter was not passed and not found in the environment: {name}"
)
[docs]class ChatKinetica(BaseChatModel):
"""Kinetica LLM Chat Model API.
使用此API的先决条件:
* 已安装``gpudb``和``typeguard``包。
* 一个Kinetica DB实例。
* 在``KINETICA_URL``中指定Kinetica主机。
* 在``KINETICA_USER``和``KINETICA_PASSWD``中指定Kinetica登录信息。
* 一个LLM上下文,指定用于推理的表和样本。
此API旨在与支持从自然语言生成SQL的Kinetica SqlAssist LLM进行交互。
在Kinetica LLM工作流程中,您在数据库中创建一个LLM上下文,提供用于推理的信息,包括表、注释、规则和样本。调用``load_messages_from_context()``将从数据库中检索上下文信息,以便用于创建聊天提示。
聊天提示包括一个``SystemMessage``和一对``HumanMessage``/``AIMessage``,其中包含问题/SQL对的样本。您可以将这些样本对附加到此列表,但不打算促进典型的自然语言对话。
当您从聊天提示创建链并执行时,Kinetica LLM将从输入生成SQL。可选择使用``KineticaSqlOutputParser``执行SQL,并将结果作为数据帧返回。
以下示例使用环境变量创建LLM以用于Kinetica连接。如果API无法连接到数据库,此操作将失败。
示例:
.. code-block:: python
from langchain_community.chat_models.kinetica import KineticaChatLLM
kinetica_llm = KineticaChatLLM()
如果您更喜欢直接传递连接信息,则可以使用``KineticaUtil.create_kdbc()``创建连接。
示例:
.. code-block:: python
from langchain_community.chat_models.kinetica import (
KineticaChatLLM, KineticaUtil)
kdbc = KineticaUtil._create_kdbc(url=url, user=user, passwd=passwd)
kinetica_llm = KineticaChatLLM(kdbc=kdbc)
"""
kdbc: Any = Field(exclude=True)
"""Kinetica数据库连接。"""
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""Pydantic对象验证器。"""
kdbc = values.get("kdbc", None)
if kdbc is None:
kdbc = KineticaUtil.create_kdbc()
values["kdbc"] = kdbc
return values
@property
def _llm_type(self) -> str:
return "kinetica-sqlassist"
@property
def _identifying_params(self) -> Dict[str, Any]:
return dict(
kinetica_version=str(self.kdbc.server_version), api_version=version("gpudb")
)
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
if stop is not None:
raise ValueError("stop kwargs are not permitted.")
dict_messages = [self._convert_message_to_dict(m) for m in messages]
sql_response = self._submit_completion(dict_messages)
response_message = sql_response.choices[0].message
# generated_dict = response_message.model_dump() # pydantic v2
generated_dict = response_message.dict()
generated_message = self._convert_message_from_dict(generated_dict)
llm_output = dict(
input_tokens=sql_response.usage.prompt_tokens,
output_tokens=sql_response.usage.completion_tokens,
model_name=sql_response.model,
)
return ChatResult(
generations=[ChatGeneration(message=generated_message)],
llm_output=llm_output,
)
[docs] def load_messages_from_context(self, context_name: str) -> List:
"""从Kinetica上下文中加载一个lanchain提示。
Kinetica上下文是使用Kinetica Workbench UI或SQL语法创建的对象。此函数将上下文中的数据转换为可用作提示的消息列表。消息将包含一个“SystemMessage”,后跟包含样本的“HumanMessage”/“AIMessage”对。
参数:
context_name:数据库中LLM上下文的名称。
返回:
包含上下文信息的消息列表。
"""
# query kinetica for the prompt
sql = f"GENERATE PROMPT WITH OPTIONS (CONTEXT_NAMES = '{context_name}')"
result = self._execute_sql(sql)
prompt = result["Prompt"]
prompt_json = json.loads(prompt)
# convert the prompt to messages
# request = SuggestRequest.model_validate(prompt_json) # pydantic v2
request = _KdtoSuggestRequest.parse_obj(prompt_json)
payload = request.payload
dict_messages = []
dict_messages.append(dict(role="system", content=payload.get_system_str()))
dict_messages.extend(payload.get_messages())
messages = [self._convert_message_from_dict(m) for m in dict_messages]
return messages
def _submit_completion(self, messages: List[Dict]) -> _KdtSqlResponse:
"""向Kinetica 提交一个 /chat/completions 请求。"""
request = dict(messages=messages)
request_json = json.dumps(request)
response_raw = self.kdbc._GPUdb__submit_request_json(
"/chat/completions", request_json
)
response_json = json.loads(response_raw)
status = response_json["status"]
if status != "OK":
message = response_json["message"]
match_resp = re.compile(r"response:({.*})")
result = match_resp.search(message)
if result is not None:
response = result.group(1)
response_json = json.loads(response)
message = response_json["message"]
raise ValueError(message)
data = response_json["data"]
# response = CompletionResponse.model_validate(data) # pydantic v2
response = _KdtCompletionResponse.parse_obj(data)
if response.status != "OK":
raise ValueError("SQL Generation failed")
return response.data
def _execute_sql(self, sql: str) -> Dict:
"""执行一个SQL查询并返回结果。"""
response = self.kdbc.execute_sql_and_decode(
sql, limit=1, get_column_major=False
)
status_info = response["status_info"]
if status_info["status"] != "OK":
message = status_info["message"]
raise ValueError(message)
records = response["records"]
if len(records) != 1:
raise ValueError("No records returned.")
record = records[0]
response_dict = {}
for col, val in record.items():
response_dict[col] = val
return response_dict
[docs] @classmethod
def load_messages_from_datafile(cls, sa_datafile: Path) -> List[BaseMessage]:
"""从Kinetica上下文数据文件中加载一个lanchain提示。"""
datafile_dict = _KineticaLlmFileContextParser.parse_dialogue_file(sa_datafile)
messages = cls._convert_dict_to_messages(datafile_dict)
return messages
@classmethod
def _convert_message_to_dict(cls, message: BaseMessage) -> Dict:
"""将单个消息转换为BaseMessage。"""
content = cast(str, message.content)
if isinstance(message, HumanMessage):
role = "user"
elif isinstance(message, AIMessage):
role = "assistant"
elif isinstance(message, SystemMessage):
role = "system"
else:
raise ValueError(f"Got unsupported message type: {message}")
result_message = dict(role=role, content=content)
return result_message
@classmethod
def _convert_message_from_dict(cls, message: Dict) -> BaseMessage:
"""从BaseMessage中转换单个消息。"""
role = message["role"]
content = message["content"]
if role == "user":
return HumanMessage(content=content)
elif role == "assistant":
return AIMessage(content=content)
elif role == "system":
return SystemMessage(content=content)
else:
raise ValueError(f"Got unsupported role: {role}")
@classmethod
def _convert_dict_to_messages(cls, sa_data: Dict) -> List[BaseMessage]:
"""将字典转换为BaseMessages列表。"""
schema = sa_data["schema"]
system = sa_data["system"]
messages = sa_data["messages"]
LOG.info(f"Importing prompt for schema: {schema}")
result_list: List[BaseMessage] = []
result_list.append(SystemMessage(content=system))
result_list.extend([cls._convert_message_from_dict(m) for m in messages])
return result_list
[docs]class KineticaSqlResponse(BaseModel):
"""包含SQL和提取的数据的响应。
此对象由与“KineticaSqlOutputParser”链接返回,并包含从数据库中提取的生成的SQL和相关的Pandas Dataframe。"""
sql: str = Field(default=None)
"""生成的SQL。"""
# dataframe: "pd.DataFrame" = Field(default=None)
dataframe: Any = Field(default=None)
"""包含获取数据的Pandas数据框。"""
class Config:
"""此pydantic对象的配置。"""
arbitrary_types_allowed = True
[docs]class KineticaSqlOutputParser(BaseOutputParser[KineticaSqlResponse]):
"""从Kinetica LLM获取并返回数据。
此对象用作链的最后一个元素,用于执行生成的SQL,并输出包含SQL和使用获取数据的pandas dataframe的``KineticaSqlResponse``。
示例:
.. code-block:: python
from langchain_community.chat_models.kinetica import (
KineticaChatLLM, KineticaSqlOutputParser)
kinetica_llm = KineticaChatLLM()
# 创建链
ctx_messages = kinetica_llm.load_messages_from_context(self.context_name)
ctx_messages.append(("human", "{input}"))
prompt_template = ChatPromptTemplate.from_messages(ctx_messages)
chain = (
prompt_template
| kinetica_llm
| KineticaSqlOutputParser(kdbc=kinetica_llm.kdbc)
)
sql_response: KineticaSqlResponse = chain.invoke(
{"input": "What are the female users ordered by username?"}
)
assert isinstance(sql_response, KineticaSqlResponse)
LOG.info(f"SQL Response: {sql_response.sql}")
assert isinstance(sql_response.dataframe, pd.DataFrame)"""
kdbc: Any = Field(exclude=True)
"""Kinetica数据库连接。"""
class Config:
"""此pydantic对象的配置。"""
arbitrary_types_allowed = True
[docs] def parse(self, text: str) -> KineticaSqlResponse:
df = self.kdbc.to_df(text)
return KineticaSqlResponse(sql=text, dataframe=df)
[docs] def parse_result(
self, result: List[Generation], *, partial: bool = False
) -> KineticaSqlResponse:
return self.parse(result[0].text)
@property
def _type(self) -> str:
return "kinetica_sql_output_parser"