Python航班客户端
Apache Arrow Python bindings 与 Python 脚本和应用程序集成,以查询存储在 InfluxDB 中的数据。
使用 InfluxDB 3 客户端库
我们推荐使用influxdb3-python Python 客户端库来将 InfluxDB 3 与您的 Python 应用程序代码集成。
InfluxDB 3 客户端库 封装了 Apache Arrow Flight 客户端,并提供了方便的方法用于 写入、查询 和处理存储在 InfluxDB 集群中的数据。客户端库可以使用 SQL 或 InfluxQL 进行查询。
以下示例展示了如何使用 pyarrow.flight 和 pandas Python模块来查询和格式化存储在InfluxDB集群数据库中的数据:
# Using pyarrow>=12.0.0 FlightClient
from pyarrow.flight import FlightClient, Ticket, FlightCallOptions
import json
import pandas
import tabulate
# Downsampling query groups data into 2-hour bins
sql="""
SELECT DATE_BIN(INTERVAL '2 hours', time) AS time,
room,
selector_max(temp, time)['value'] AS 'max temp',
selector_min(temp, time)['value'] AS 'min temp',
avg(temp) AS 'average temp'
FROM home
GROUP BY
1,
room
ORDER BY room, 1"""
flight_ticket = Ticket(json.dumps({
"namespace_name": "DATABASE_NAME",
"sql_query": sql,
"query_type": "sql"
}))
token = (b"authorization", bytes(f"Bearer DATABASE_TOKEN".encode('utf-8')))
options = FlightCallOptions(headers=[token])
client = FlightClient(f"grpc+tls://cluster-host.com:443")
reader = client.do_get(flight_ticket, options)
arrow_table = reader.read_all()
# Use pyarrow and pandas to view and analyze data
data_frame = arrow_table.to_pandas()
print(data_frame.to_markdown())
# Using pyarrow>=12.0.0 FlightClient
from pyarrow.flight import FlightClient, Ticket, FlightCallOptions
import json
import pandas
import tabulate
# Downsampling query groups data into 2-hour bins
influxql="""
SELECT FIRST(temp)
FROM home
WHERE room = 'kitchen'
AND time >= now() - 100d
AND time <= now() - 10d
GROUP BY time(2h)"""
flight_ticket = Ticket(json.dumps({
"namespace_name": "DATABASE_NAME",
"sql_query": influxql,
"query_type": "influxql"
}))
token = (b"authorization", bytes(f"Bearer DATABASE_TOKEN".encode('utf-8')))
options = FlightCallOptions(headers=[token])
client = FlightClient(f"grpc+tls://cluster-host.com:443")
reader = client.do_get(flight_ticket, options)
arrow_table = reader.read_all()
# Use pyarrow and pandas to view and analyze data
data_frame = arrow_table.to_pandas()
print(data_frame.to_markdown())
替换以下内容:
DATABASE_NAME: 你的 InfluxDB 集群数据库DATABASE_TOKEN: 一个数据库令牌,具有足够权限访问指定的数据库