Source code for langchain_community.chat_models.kinetica

##
# Copyright (c) 2024, Chad Juliano, Kinetica DB Inc.
##
"""Kinetica SQL生成LLM API。"""

import json
import logging
import os
import re
from importlib.metadata import version
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast

if TYPE_CHECKING:
    import gpudb

from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import (
    AIMessage,
    BaseMessage,
    HumanMessage,
    SystemMessage,
)
from langchain_core.output_parsers.transform import BaseOutputParser
from langchain_core.outputs import ChatGeneration, ChatResult, Generation
from langchain_core.pydantic_v1 import BaseModel, Field, root_validator

LOG = logging.getLogger(__name__)

# Kinetica pydantic API datatypes


class _KdtSuggestContext(BaseModel):
    """pydantic API 请求类型"""

    table: Optional[str] = Field(default=None, title="Name of table")
    description: Optional[str] = Field(default=None, title="Table description")
    columns: List[str] = Field(default=None, title="Table columns list")
    rules: Optional[List[str]] = Field(
        default=None, title="Rules that apply to the table."
    )
    samples: Optional[Dict] = Field(
        default=None, title="Samples that apply to the entire context."
    )

    def to_system_str(self) -> str:
        lines = []
        lines.append(f"CREATE TABLE {self.table} AS")
        lines.append("(")

        if not self.columns or len(self.columns) == 0:
            ValueError("columns list can't be null.")

        columns = []
        for column in self.columns:
            column = column.replace('"', "").strip()
            columns.append(f"   {column}")
        lines.append(",\n".join(columns))
        lines.append(");")

        if self.description:
            lines.append(f"COMMENT ON TABLE {self.table} IS '{self.description}';")

        if self.rules and len(self.rules) > 0:
            lines.append(
                f"-- When querying table {self.table} the following rules apply:"
            )
            for rule in self.rules:
                lines.append(f"-- * {rule}")

        result = "\n".join(lines)
        return result


class _KdtSuggestPayload(BaseModel):
    """pydantic API 请求类型"""

    question: Optional[str]
    context: List[_KdtSuggestContext]

    def get_system_str(self) -> str:
        lines = []
        for table_context in self.context:
            if table_context.table is None:
                continue
            context_str = table_context.to_system_str()
            lines.append(context_str)
        return "\n\n".join(lines)

    def get_messages(self) -> List[Dict]:
        messages = []
        for context in self.context:
            if context.samples is None:
                continue
            for question, answer in context.samples.items():
                # unescape double quotes
                answer = answer.replace("''", "'")

                messages.append(dict(role="user", content=question or ""))
                messages.append(dict(role="assistant", content=answer))
        return messages

    def to_completion(self) -> Dict:
        messages = []
        messages.append(dict(role="system", content=self.get_system_str()))
        messages.extend(self.get_messages())
        messages.append(dict(role="user", content=self.question or ""))
        response = dict(messages=messages)
        return response


class _KdtoSuggestRequest(BaseModel):
    """pydantic API 请求类型"""

    payload: _KdtSuggestPayload


class _KdtMessage(BaseModel):
    """pydantic API响应类型"""

    role: str = Field(default=None, title="One of [user|assistant|system]")
    content: str


class _KdtChoice(BaseModel):
    """pydantic API响应类型"""

    index: int
    message: _KdtMessage = Field(default=None, title="The generated SQL")
    finish_reason: str


class _KdtUsage(BaseModel):
    """pydantic API响应类型"""

    prompt_tokens: int
    completion_tokens: int
    total_tokens: int


class _KdtSqlResponse(BaseModel):
    """pydantic API响应类型"""

    id: str
    object: str
    created: int
    model: str
    choices: List[_KdtChoice]
    usage: _KdtUsage
    prompt: str = Field(default=None, title="The input question")


class _KdtCompletionResponse(BaseModel):
    """pydantic API响应类型"""

    status: str
    data: _KdtSqlResponse


class _KineticaLlmFileContextParser:
    """用于解析Kinetica LLM上下文数据文件的解析器。"""

    # parse line into a dict containing role and content
    PARSER = re.compile(r"^<\|(?P<role>\w+)\|>\W*(?P<content>.*)$", re.DOTALL)

    @classmethod
    def _removesuffix(cls, text: str, suffix: str) -> str:
        if suffix and text.endswith(suffix):
            return text[: -len(suffix)]
        return text

    @classmethod
    def parse_dialogue_file(cls, input_file: os.PathLike) -> Dict:
        path = Path(input_file)
        # schema = path.name.removesuffix(".txt") python 3.9
        schema = cls._removesuffix(path.name, ".txt")

        lines = open(input_file).read()
        return cls.parse_dialogue(lines, schema)

    @classmethod
    def parse_dialogue(cls, text: str, schema: str) -> Dict:
        messages = []
        system = None

        lines = text.split("<|end|>")
        user_message = None

        for idx, line in enumerate(lines):
            line = line.strip()

            if len(line) == 0:
                continue

            match = cls.PARSER.match(line)
            if match is None:
                raise ValueError(f"Could not find starting token in: {line}")

            groupdict = match.groupdict()
            role = groupdict["role"]

            if role == "system":
                if system is not None:
                    raise ValueError(f"Only one system token allowed in: {line}")
                system = groupdict["content"]
            elif role == "user":
                if user_message is not None:
                    raise ValueError(
                        f"Found user token without assistant token: {line}"
                    )
                user_message = groupdict
            elif role == "assistant":
                if user_message is None:
                    raise Exception(f"Found assistant token without user token: {line}")
                messages.append(user_message)
                messages.append(groupdict)
                user_message = None
            else:
                raise ValueError(f"Unknown token: {role}")

        return {"schema": schema, "system": system, "messages": messages}


[docs]class KineticaUtil: """Kinetica实用函数。"""
[docs] @classmethod def create_kdbc( cls, url: Optional[str] = None, user: Optional[str] = None, passwd: Optional[str] = None, ) -> "gpudb.GPUdb": """创建一个Connectica连接对象并验证连接。 如果对于一个或多个参数传递了None,则将尝试从相关环境变量中检索该值。 参数: url: Kinetica的URL或如果为None,则为``KINETICA_URL``。 user: Kinetica的用户或如果为None,则为``KINETICA_USER``。 passwd: Kinetica的密码或如果为None,则为``KINETICA_PASSWD``。 返回: Kinetica连接对象。 """ try: import gpudb except ModuleNotFoundError: raise ImportError( "Could not import Kinetica python package. " "Please install it with `pip install gpudb`." ) url = cls._get_env("KINETICA_URL", url) user = cls._get_env("KINETICA_USER", user) passwd = cls._get_env("KINETICA_PASSWD", passwd) options = gpudb.GPUdb.Options() options.username = user options.password = passwd options.skip_ssl_cert_verification = True options.disable_failover = True options.logging_level = "INFO" kdbc = gpudb.GPUdb(host=url, options=options) LOG.info( "Connected to Kinetica: {}. (api={}, server={})".format( kdbc.get_url(), version("gpudb"), kdbc.server_version ) ) return kdbc
@classmethod def _get_env(cls, name: str, default: Optional[str]) -> str: """获取环境变量或使用默认值。""" if default is not None: return default result = os.getenv(name) if result is not None: return result raise ValueError( f"Parameter was not passed and not found in the environment: {name}" )
[docs]class ChatKinetica(BaseChatModel): """Kinetica LLM Chat Model API. 使用此API的先决条件: * 已安装``gpudb``和``typeguard``包。 * 一个Kinetica DB实例。 * 在``KINETICA_URL``中指定Kinetica主机。 * 在``KINETICA_USER``和``KINETICA_PASSWD``中指定Kinetica登录信息。 * 一个LLM上下文,指定用于推理的表和样本。 此API旨在与支持从自然语言生成SQL的Kinetica SqlAssist LLM进行交互。 在Kinetica LLM工作流程中,您在数据库中创建一个LLM上下文,提供用于推理的信息,包括表、注释、规则和样本。调用``load_messages_from_context()``将从数据库中检索上下文信息,以便用于创建聊天提示。 聊天提示包括一个``SystemMessage``和一对``HumanMessage``/``AIMessage``,其中包含问题/SQL对的样本。您可以将这些样本对附加到此列表,但不打算促进典型的自然语言对话。 当您从聊天提示创建链并执行时,Kinetica LLM将从输入生成SQL。可选择使用``KineticaSqlOutputParser``执行SQL,并将结果作为数据帧返回。 以下示例使用环境变量创建LLM以用于Kinetica连接。如果API无法连接到数据库,此操作将失败。 示例: .. code-block:: python from langchain_community.chat_models.kinetica import KineticaChatLLM kinetica_llm = KineticaChatLLM() 如果您更喜欢直接传递连接信息,则可以使用``KineticaUtil.create_kdbc()``创建连接。 示例: .. code-block:: python from langchain_community.chat_models.kinetica import ( KineticaChatLLM, KineticaUtil) kdbc = KineticaUtil._create_kdbc(url=url, user=user, passwd=passwd) kinetica_llm = KineticaChatLLM(kdbc=kdbc) """ kdbc: Any = Field(exclude=True) """Kinetica数据库连接。""" @root_validator() def validate_environment(cls, values: Dict) -> Dict: """Pydantic对象验证器。""" kdbc = values.get("kdbc", None) if kdbc is None: kdbc = KineticaUtil.create_kdbc() values["kdbc"] = kdbc return values @property def _llm_type(self) -> str: return "kinetica-sqlassist" @property def _identifying_params(self) -> Dict[str, Any]: return dict( kinetica_version=str(self.kdbc.server_version), api_version=version("gpudb") ) def _generate( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> ChatResult: if stop is not None: raise ValueError("stop kwargs are not permitted.") dict_messages = [self._convert_message_to_dict(m) for m in messages] sql_response = self._submit_completion(dict_messages) response_message = sql_response.choices[0].message # generated_dict = response_message.model_dump() # pydantic v2 generated_dict = response_message.dict() generated_message = self._convert_message_from_dict(generated_dict) llm_output = dict( input_tokens=sql_response.usage.prompt_tokens, output_tokens=sql_response.usage.completion_tokens, model_name=sql_response.model, ) return ChatResult( generations=[ChatGeneration(message=generated_message)], llm_output=llm_output, )
[docs] def load_messages_from_context(self, context_name: str) -> List: """从Kinetica上下文中加载一个lanchain提示。 Kinetica上下文是使用Kinetica Workbench UI或SQL语法创建的对象。此函数将上下文中的数据转换为可用作提示的消息列表。消息将包含一个“SystemMessage”,后跟包含样本的“HumanMessage”/“AIMessage”对。 参数: context_name:数据库中LLM上下文的名称。 返回: 包含上下文信息的消息列表。 """ # query kinetica for the prompt sql = f"GENERATE PROMPT WITH OPTIONS (CONTEXT_NAMES = '{context_name}')" result = self._execute_sql(sql) prompt = result["Prompt"] prompt_json = json.loads(prompt) # convert the prompt to messages # request = SuggestRequest.model_validate(prompt_json) # pydantic v2 request = _KdtoSuggestRequest.parse_obj(prompt_json) payload = request.payload dict_messages = [] dict_messages.append(dict(role="system", content=payload.get_system_str())) dict_messages.extend(payload.get_messages()) messages = [self._convert_message_from_dict(m) for m in dict_messages] return messages
def _submit_completion(self, messages: List[Dict]) -> _KdtSqlResponse: """向Kinetica 提交一个 /chat/completions 请求。""" request = dict(messages=messages) request_json = json.dumps(request) response_raw = self.kdbc._GPUdb__submit_request_json( "/chat/completions", request_json ) response_json = json.loads(response_raw) status = response_json["status"] if status != "OK": message = response_json["message"] match_resp = re.compile(r"response:({.*})") result = match_resp.search(message) if result is not None: response = result.group(1) response_json = json.loads(response) message = response_json["message"] raise ValueError(message) data = response_json["data"] # response = CompletionResponse.model_validate(data) # pydantic v2 response = _KdtCompletionResponse.parse_obj(data) if response.status != "OK": raise ValueError("SQL Generation failed") return response.data def _execute_sql(self, sql: str) -> Dict: """执行一个SQL查询并返回结果。""" response = self.kdbc.execute_sql_and_decode( sql, limit=1, get_column_major=False ) status_info = response["status_info"] if status_info["status"] != "OK": message = status_info["message"] raise ValueError(message) records = response["records"] if len(records) != 1: raise ValueError("No records returned.") record = records[0] response_dict = {} for col, val in record.items(): response_dict[col] = val return response_dict
[docs] @classmethod def load_messages_from_datafile(cls, sa_datafile: Path) -> List[BaseMessage]: """从Kinetica上下文数据文件中加载一个lanchain提示。""" datafile_dict = _KineticaLlmFileContextParser.parse_dialogue_file(sa_datafile) messages = cls._convert_dict_to_messages(datafile_dict) return messages
@classmethod def _convert_message_to_dict(cls, message: BaseMessage) -> Dict: """将单个消息转换为BaseMessage。""" content = cast(str, message.content) if isinstance(message, HumanMessage): role = "user" elif isinstance(message, AIMessage): role = "assistant" elif isinstance(message, SystemMessage): role = "system" else: raise ValueError(f"Got unsupported message type: {message}") result_message = dict(role=role, content=content) return result_message @classmethod def _convert_message_from_dict(cls, message: Dict) -> BaseMessage: """从BaseMessage中转换单个消息。""" role = message["role"] content = message["content"] if role == "user": return HumanMessage(content=content) elif role == "assistant": return AIMessage(content=content) elif role == "system": return SystemMessage(content=content) else: raise ValueError(f"Got unsupported role: {role}") @classmethod def _convert_dict_to_messages(cls, sa_data: Dict) -> List[BaseMessage]: """将字典转换为BaseMessages列表。""" schema = sa_data["schema"] system = sa_data["system"] messages = sa_data["messages"] LOG.info(f"Importing prompt for schema: {schema}") result_list: List[BaseMessage] = [] result_list.append(SystemMessage(content=system)) result_list.extend([cls._convert_message_from_dict(m) for m in messages]) return result_list
[docs]class KineticaSqlResponse(BaseModel): """包含SQL和提取的数据的响应。 此对象由与“KineticaSqlOutputParser”链接返回,并包含从数据库中提取的生成的SQL和相关的Pandas Dataframe。""" sql: str = Field(default=None) """生成的SQL。""" # dataframe: "pd.DataFrame" = Field(default=None) dataframe: Any = Field(default=None) """包含获取数据的Pandas数据框。""" class Config: """此pydantic对象的配置。""" arbitrary_types_allowed = True
[docs]class KineticaSqlOutputParser(BaseOutputParser[KineticaSqlResponse]): """从Kinetica LLM获取并返回数据。 此对象用作链的最后一个元素,用于执行生成的SQL,并输出包含SQL和使用获取数据的pandas dataframe的``KineticaSqlResponse``。 示例: .. code-block:: python from langchain_community.chat_models.kinetica import ( KineticaChatLLM, KineticaSqlOutputParser) kinetica_llm = KineticaChatLLM() # 创建链 ctx_messages = kinetica_llm.load_messages_from_context(self.context_name) ctx_messages.append(("human", "{input}")) prompt_template = ChatPromptTemplate.from_messages(ctx_messages) chain = ( prompt_template | kinetica_llm | KineticaSqlOutputParser(kdbc=kinetica_llm.kdbc) ) sql_response: KineticaSqlResponse = chain.invoke( {"input": "What are the female users ordered by username?"} ) assert isinstance(sql_response, KineticaSqlResponse) LOG.info(f"SQL Response: {sql_response.sql}") assert isinstance(sql_response.dataframe, pd.DataFrame)""" kdbc: Any = Field(exclude=True) """Kinetica数据库连接。""" class Config: """此pydantic对象的配置。""" arbitrary_types_allowed = True
[docs] def parse(self, text: str) -> KineticaSqlResponse: df = self.kdbc.to_df(text) return KineticaSqlResponse(sql=text, dataframe=df)
[docs] def parse_result( self, result: List[Generation], *, partial: bool = False ) -> KineticaSqlResponse: return self.parse(result[0].text)
@property def _type(self) -> str: return "kinetica_sql_output_parser"