Skip to content

Snowflake

SnowflakeReader #

Bases: BaseReader

初始化SnowflakeReader的新实例。

该类使用SQLAlchemy建立与Snowflake的连接,执行查询并将每一行连接成LlamaIndex使用的Document。

属性: engine(可选[Engine]):数据库连接的SQLAlchemy Engine对象。

account(可选[str]):Snowflake账户标识符。
user(可选[str]):Snowflake账户用户名。
password(可选[str]):Snowflake账户的密码。
database(可选[str]):Snowflake数据库名称。
schema(可选[str]):Snowflake模式名称。
warehouse(可选[str]):Snowflake仓库名称。
proxy(可选[str]):连接的代理设置。
Source code in llama_index/readers/snowflake/base.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class SnowflakeReader(BaseReader):
    """初始化SnowflakeReader的新实例。

该类使用SQLAlchemy建立与Snowflake的连接,执行查询并将每一行连接成LlamaIndex使用的Document。

属性:
    engine(可选[Engine]):数据库连接的SQLAlchemy Engine对象。



    account(可选[str]):Snowflake账户标识符。
    user(可选[str]):Snowflake账户用户名。
    password(可选[str]):Snowflake账户的密码。
    database(可选[str]):Snowflake数据库名称。
    schema(可选[str]):Snowflake模式名称。
    warehouse(可选[str]):Snowflake仓库名称。
    proxy(可选[str]):连接的代理设置。"""

    def __init__(
        self,
        account: Optional[str] = None,
        user: Optional[str] = None,
        password: Optional[str] = None,
        database: Optional[str] = None,
        schema: Optional[str] = None,
        warehouse: Optional[str] = None,
        role: Optional[str] = None,
        proxy: Optional[str] = None,
        engine: Optional[Engine] = None,
    ) -> None:
        """使用可选的连接详细信息、代理配置或直接使用引擎初始化SnowflakeReader。

Args:
    account (Optional[str]): Snowflake账户标识符。
    user (Optional[str]): Snowflake账户用户名。
    password (Optional[str]): Snowflake账户密码。
    database (Optional[str]): Snowflake数据库名称。
    schema (Optional[str]): Snowflake模式名称。
    warehouse (Optional[str]): Snowflake仓库名称。
    role (Optional[str]): Snowflake角色名称。
    proxy (Optional[str]): 连接的代理设置。
    engine (Optional[Engine]): 已存在的SQLAlchemy引擎。
"""
        from snowflake.sqlalchemy import URL

        if engine is None:
            connect_args = {}
            if proxy:
                connect_args["proxy"] = proxy

            # Create an SQLAlchemy engine for Snowflake
            self.engine = create_engine(
                URL(
                    account=account or "",
                    user=user or "",
                    password=password or "",
                    database=database or "",
                    schema=schema or "",
                    warehouse=warehouse or "",
                    role=role or "",
                ),
                connect_args=connect_args,
            )
        else:
            self.engine = engine

        # Create a sessionmaker bound to the engine
        self.Session = sessionmaker(bind=self.engine)

    def execute_query(self, query_string: str) -> List[Any]:
        """执行一个SQL查询并返回获取的结果。

Args:
    query_string(str):要执行的SQL查询。

Returns:
    List[Any]:来自查询的获取结果。
"""
        # Create a session and execute the query
        session = self.Session()
        try:
            result = session.execute(text(query_string))
            return result.fetchall()
        finally:
            # Ensure the session is closed after query execution
            session.close()

    def load_data(self, query: str) -> List[Document]:
        """查询并从数据库中加载数据,返回一个文档列表。

Args:
    query (str): 用于过滤表和行的查询参数。

Returns:
    List[Document]: 一个Document对象的列表。
"""
        documents = []

        if query is None:
            raise ValueError("A query parameter is necessary to filter the data")

        try:
            result = self.execute_query(query)

            for item in result:
                # fetch each item
                doc_str = ", ".join([str(entry) for entry in item])
                documents.append(Document(text=doc_str))
            return documents
        except Exception as e:
            logger.error(
                f"An error occurred while loading the data: {e}", exc_info=True
            )

execute_query #

execute_query(query_string: str) -> List[Any]

执行一个SQL查询并返回获取的结果。

Returns:

Type Description
List[Any]

List[Any]:来自查询的获取结果。

Source code in llama_index/readers/snowflake/base.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
    def execute_query(self, query_string: str) -> List[Any]:
        """执行一个SQL查询并返回获取的结果。

Args:
    query_string(str):要执行的SQL查询。

Returns:
    List[Any]:来自查询的获取结果。
"""
        # Create a session and execute the query
        session = self.Session()
        try:
            result = session.execute(text(query_string))
            return result.fetchall()
        finally:
            # Ensure the session is closed after query execution
            session.close()

load_data #

load_data(query: str) -> List[Document]

查询并从数据库中加载数据,返回一个文档列表。

Parameters:

Name Type Description Default
query str

用于过滤表和行的查询参数。

required

Returns:

Type Description
List[Document]

List[Document]: 一个Document对象的列表。

Source code in llama_index/readers/snowflake/base.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
    def load_data(self, query: str) -> List[Document]:
        """查询并从数据库中加载数据,返回一个文档列表。

Args:
    query (str): 用于过滤表和行的查询参数。

Returns:
    List[Document]: 一个Document对象的列表。
"""
        documents = []

        if query is None:
            raise ValueError("A query parameter is necessary to filter the data")

        try:
            result = self.execute_query(query)

            for item in result:
                # fetch each item
                doc_str = ", ".join([str(entry) for entry in item])
                documents.append(Document(text=doc_str))
            return documents
        except Exception as e:
            logger.error(
                f"An error occurred while loading the data: {e}", exc_info=True
            )