Python 客户端库
使用 InfluxDB Python client library 将 InfluxDB 集成到 Python 脚本和应用程序中。
本指南假设您对Python和InfluxDB有一定的熟悉度。
如果您刚开始,请参见 Get started with InfluxDB。
开始之前
安装 InfluxDB Python 库:
pip install influxdb-client确保 InfluxDB 正在运行。 如果在本地运行 InfluxDB,请访问 http://localhost:8086。 (如果使用 InfluxDB Cloud,请访问您的 InfluxDB Cloud UI 的 URL。 例如: https://us-west-2-1.aws.cloud2.influxdata.com。)
使用Python将数据写入InfluxDB
我们将使用Python库以行协议写入一些数据。
在你的Python程序中,导入InfluxDB客户端库,并使用它将数据写入InfluxDB。
import influxdb_client from influxdb_client.client.write_api import SYNCHRONOUS定义一些变量,命名为你的 bucket、organization 和 token。
bucket = "<my-bucket>" org = "<my-org>" token = "<my-token>" # Store the URL of your InfluxDB instance url="http://localhost:8086"实例化客户端。
InfluxDBClient对象需要三个命名参数:url、org和token。传入命名参数。client = influxdb_client.InfluxDBClient( url=url, token=token, org=org )这个
InfluxDBClient对象有一个write_api方法,用于配置。使用 写客户端 来实例化一个
client对象和write_api方法。使用write_api方法来配置写入器对象。write_api = client.write_api(write_options=SYNCHRONOUS)创建一个 point 对象,并使用 API 写入对象的
write方法将其写入 InfluxDB。写入方法需要三个参数:bucket,org, 和record。p = influxdb_client.Point("my_measurement").tag("location", "Prague").field("temperature", 25.3) write_api.write(bucket=bucket, org=org, record=p)
完整示例编写脚本
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
bucket = "<my-bucket>"
org = "<my-org>"
token = "<my-token>"
# Store the URL of your InfluxDB instance
url="http://localhost:8086"
client = influxdb_client.InfluxDBClient(
url=url,
token=token,
org=org
)
# Write script
write_api = client.write_api(write_options=SYNCHRONOUS)
p = influxdb_client.Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
write_api.write(bucket=bucket, org=org, record=p)
使用Python从InfluxDB查询数据
实例化查询客户端。
query_api = client.query_api()创建一个 Flux 查询,然后将其格式化为 Python 字符串。
query = 'from(bucket:"my-bucket")\ |> range(start: -10m)\ |> filter(fn:(r) => r._measurement == "my_measurement")\ |> filter(fn:(r) => r.location == "Prague")\ |> filter(fn:(r) => r._field == "temperature")'查询客户端将Flux查询发送到InfluxDB,并返回一个具有表结构的Flux对象。
将
query()方法传递两个命名参数:org和query。result = query_api.query(org=org, query=query)遍历Flux对象中的表和记录。
- 使用
get_value()方法来返回值。 - 使用
get_field()方法返回字段。
results = [] for table in result: for record in table.records: results.append((record.get_field(), record.get_value())) print(results) [(temperature, 25.3)]- 使用
Flux对象提供以下方法来访问您的数据:
get_measurement(): 返回记录的测量名称。get_field(): 返回字段名称。get_value(): 返回实际字段值。values: 返回列值的映射。values.get(": 返回给定列的记录值。") get_time(): 返回记录的时间。get_start(): 返回当前表中所有记录的包含下限时间。get_stop(): 返回当前表中所有记录的独占上时间界限。
完整示例查询脚本
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
bucket = "<my-bucket>"
org = "<my-org>"
token = "<my-token>"
# Store the URL of your InfluxDB instance
url="http://localhost:8086"
client = influxdb_client.InfluxDBClient(
url=url,
token=token,
org=org
)
# Query script
query_api = client.query_api()
query = 'from(bucket:"my-bucket")\
|> range(start: -10m)\
|> filter(fn:(r) => r._measurement == "my_measurement")\
|> filter(fn:(r) => r.location == "Prague")\
|> filter(fn:(r) => r._field == "temperature")'
result = query_api.query(org=org, query=query)
results = []
for table in result:
for record in table.records:
results.append((record.get_field(), record.get_value()))
print(results)
[(temperature, 25.3)]
有关更多信息,请参见GitHub上的Python客户端自述文件。