Documentation

Python 航班 SQL DBAPI 客户端

Python flightsql-dbapi Flight SQL DBAPI库 与使用SQL查询存储在InfluxDB集群数据库中的数据的Python应用程序集成。flightsql-dbapi库使用Flight SQL协议来查询和检索数据。

使用 InfluxDB 3 客户端库

我们推荐使用influxdb3-python Python 客户端库来将 InfluxDB 3 与您的 Python 应用程序代码集成。

InfluxDB 3 客户端库 封装了 Apache Arrow Flight 客户端,并提供了方便的方法用于 写入查询 和处理存储在 InfluxDB 集群中的数据。客户端库可以使用 SQL 或 InfluxQL 进行查询。

安装

Python 的 flightsql-dbapi Flight SQL 库提供了一个 DB API 2 接口和 SQLAlchemy 方言用于 Flight SQL。 安装 flightsql-dbapi 也会安装你将用于处理 Arrow 数据的 pyarrow 库。

在你的终端中,使用 pip 安装 flightsql-dbapi

pip install flightsql-dbapi

导入模块

flightsql-dbapi 包提供了 flightsql 模块。 从该模块中,导入 FlightSQLClient 类方法:

from flightsql import FlightSQLClient
  • flightsql.FlightSQLClient 类:一个用于 初始化客户端 和与 Flight SQL 服务器交互的接口。

API 参考

类 FlightSQLClient

提供一个接口用于 初始化客户端 并与 Flight SQL 服务器交互。

语法

__init__(self, host=None, token=None, metadata=None, features=None)

初始化并返回一个 FlightSQLClient 实例以与服务器进行交互。

初始化客户端

以下示例演示如何使用 Python 和 flightsql-dbapi 以及 DB API 2 接口来实例化为 InfluxDB 数据库配置的 Flight SQL 客户端。

from flightsql import FlightSQLClient

# Instantiate a FlightSQLClient configured for a database
client = FlightSQLClient(host='cluster-host.com',
                        token='
DATABASE_TOKEN
'
,
metadata={'database': '
DATABASE_NAME
'
},
features={'metadata-reflection': 'true'})

替换以下内容:

  • DATABASE_TOKEN: 一个数据库令牌,具有对您想查询的数据库的读取权限
  • DATABASE_NAME: 您的 InfluxDB 集群的 数据库 名称

实例方法

FlightSQLClient.execute

发送一个飞行 SQL RPC 请求以执行指定的 SQL 查询。

语法

execute(query: str, call_options: Optional[FlightSQLCallOptions] = None)

示例

# Execute the query
info = client.execute("SELECT * FROM home")

响应包含一个 flight.FlightInfo 对象,该对象包含元数据和一个 endpoints: [...] 列表。每个端点包含以下内容:

  • 一个可以检索查询结果数据的地址列表。
  • 一个 ticket 值,用于标识要 retrieve 的数据。

FlightSQLClient.do_get

传递航班票(从 FlightSQLClient.execute 响应中获得)并检索由票识别的 Arrow 数据。 返回一个 pyarrow.flight.FlightStreamReader 用于流式传输数据。

语法

 do_get(ticket, call_options: Optional[FlightSQLCallOptions] = None)

示例

以下示例展示了如何使用 Python 与 flightsql-dbapipyarrow 查询 InfluxDB 并检索数据。

from flightsql import FlightSQLClient

# Instantiate a FlightSQLClient configured for a database
client = FlightSQLClient(host='cluster-host.com',
    token='DATABASE_TOKEN',
    metadata={'database': 'DATABASE_NAME'},
    features={'metadata-reflection': 'true'})

# Execute the query to retrieve FlightInfo
info = client.execute("SELECT * FROM home")

# Extract the token for retrieving data
ticket = info.endpoints[0].ticket

# Use the ticket to request the Arrow data stream.
# Return a FlightStreamReader for streaming the results.
reader = client.do_get(ticket)

# Read all data to a pyarrow.Table
table = reader.read_all()

print(table)

do_get(ticket) 返回一个 pyarrow.flight.FlightStreamReader 用于流式传输 Arrow 记录批次

要从流中读取数据,请调用以下 FlightStreamReader 方法之一:

  • read_all(): 读取所有记录批次作为一个 pyarrow.Table
  • read_chunk(): 读取下一个 RecordBatch 和元数据。
  • read_pandas(): 读取所有记录批次并将它们转换为一个 pandas.DataFrame


Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看: