InfluxDB 3 的 Python 客户端库
InfluxDB 3 influxdb3-python Python 客户端库 将 InfluxDB 集群的写入和查询操作与 Python 脚本和应用程序集成。
InfluxDB 客户端库提供可配置的批量数据写入 InfluxDB 集群。客户端库可以用于构造行协议数据,将其他格式的数据转换为行协议,并批量写入行协议数据到 InfluxDB HTTP APIs。
InfluxDB 3 客户端库可以使用 SQL 或 InfluxQL 查询 InfluxDB 集群。influxdb3-python Python 客户端库将 Apache Arrow pyarrow.flight 客户端包装成一个方便的 InfluxDB 3 接口,用于执行 SQL 和 InfluxQL 查询,请求服务器元数据,并使用 gRPC 通过 Flight 协议从 InfluxDB 集群中检索数据。
本页面中的代码示例使用获取入门家居传感器示例数据。
安装
使用 pip 安装客户端库和依赖项:
pip install influxdb3-python
导入模块
该 influxdb3-python 客户端库包提供了 influxdb_client_3 模块。
导入模块:
import influxdb_client_3
从模块中导入特定的类方法:
from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions
influxdb_client_3.InfluxDBClient3: 一个与InfluxDB交互的类influxdb_client_3.Point: 用于构建时间序列数据点的类influxdb_client_3.WriteOptions:用于配置客户端写入选项的类。
API 参考
该 influxdb_client_3 模块包括以下类和函数。
类
类 InfluxDBClient3
提供与InfluxDB API交互的接口,以用于数据的写入和查询。
该 InfluxDBClient3 构造函数初始化并返回一个客户端实例,包含以下内容:
- 一个配置用于写入数据库的单例 写客户端。
- 一个配置为查询数据库的航班客户端单例。
参数
host(string): InfluxDB实例的主机URL。database(字符串): 使用于写入和查询的数据库。token(字符串): 一个具有读/写权限的数据库令牌。- 可选
write_client_options(字典): 写入InfluxDB时要使用的选项。 如果None,写入是 同步。 - 可选
flight_client_options(dict): 查询 InfluxDB 时使用的选项。
书写模式
在写入数据时,客户端使用以下模式之一:
同步写入
默认。当在InfluxDBClient3初始化期间未提供write_client_options时,写入操作是同步的。 在同步模式下写入数据时,客户端会立即尝试将提供的数据写入InfluxDB,不会重试失败的请求,也不会调用响应回调。
示例:使用同步(非批处理)默认值初始化客户端
以下示例初始化一个客户端,用于在InfluxDB集群数据库中写入和查询数据。 由于未指定 write_client_options,客户端使用默认的 同步写入 模式。
–>
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
替换以下内容:
要明确指定同步模式,使用 write_options=SYNCHRONOUS 创建一个客户端,例如:
from influxdb_client_3 import InfluxDBClient3, write_client_options, SYNCHRONOUS
wco = write_client_options(write_options=SYNCHRONOUS)
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN",
write_client_options=wco,
flight_client_options=None)
替换以下内容:
批量写入
批量写入在高效的批量数据操作中尤其有用。选项包括设置批量大小、刷新间隔、重试间隔等。
批量写入将多个写入组合到对 InfluxDB 的单个请求中。
在批处理模式下,客户端将记录添加到批处理中,然后安排批处理写入 InfluxDB。
客户端在达到 write_client_options.batch_size 或 write_client_options.flush_interval 后将批处理写入 InfluxDB。
如果写入失败,客户端将根据 write_client_options 的重试选项重新安排写入。
配置写入客户端选项
使用 WriteOptions 和 write_client_options 来配置客户端的批量写入和响应处理:
- 实例化
WriteOptions。要使用批处理默认值,请调用构造函数而不指定参数。 - 调用
write_client_options并使用write_options参数来指定前一步骤的WriteOptions实例。 指定回调参数(success、error 和 retry)以在成功或出错时调用函数。 - 实例化
InfluxDBClient3并使用write_client_options参数指定前一步的dict输出。
示例:使用批量默认值和回调初始化客户端
下面的示例展示了如何使用默认的批处理模式并指定响应状态的回调函数(成功、错误或可重试错误)。
from influxdb_client_3 import(InfluxDBClient3,
write_client_options,
WriteOptions,
InfluxDBError)
status = None
# Define callbacks for write responses
def success(self, data: str):
status = "Success writing batch: data: {data}"
assert status.startswith('Success'), f"Expected {status} to be success"
def error(self, data: str, err: InfluxDBError):
status = f"Error writing batch: config: {self}, data: {data}, error: {err}"
assert status.startswith('Success'), f"Expected {status} to be success"
def retry(self, data: str, err: InfluxDBError):
status = f"Retry error writing batch: config: {self}, data: {data}, error: {err}"
assert status.startswith('Success'), f"Expected {status} to be success"
# Instantiate WriteOptions for batching
write_options = WriteOptions()
wco = write_client_options(success_callback=success,
error_callback=error,
retry_callback=retry,
write_options=write_options)
# Use the with...as statement to ensure the file is properly closed and resources
# are released.
with InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN",
write_client_options=wco) as client:
client.write_file(file='./data/home-sensor-data.csv',
timestamp_column='time', tag_columns=["room"], write_precision='s')
替换以下内容:
InfluxDBClient3 实例方法
InfluxDBClient3.write
将记录或记录列表写入InfluxDB。
参数
record(record 或列表): 要写入的记录或记录列表。记录可以是一个Point对象,一个表示点的字典,一个线协议字符串,或一个DataFrame。database(字符串):要写入的数据库。默认是写入为客户端指定的数据库。- **
**kwargs**:额外的写入选项——例如:write_precision(字符串):可选。默认值是"ns"。 指定时间戳在record中的精度("ms","s","us","ns")。write_client_options(字典):可选。 指定回调函数和批量写入模式的选项。 要生成dict,请使用write_client_options函数。
示例:写一行协议字符串
替换以下内容:
示例:使用点写入数据
该 influxdb_client_3.Point 类提供了一个接口,用于构造一个测量的数据点,并设置该数据点的字段、标签和时间戳。以下示例展示了如何创建一个 Point 对象,然后将数据写入 InfluxDB。
from influxdb_client_3 import Point, InfluxDBClient3
point = Point("home").tag("room", "Kitchen").field("temp", 21.5).field("hum", .25)
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
client.write(point)
以下示例代码执行一个 InfluxQL 查询以检索已写入的数据:
# Execute an InfluxQL query
table = client.query(query='''SELECT DISTINCT(temp) as val
FROM home
WHERE temp > 21.0
AND time >= now() - 10m''', language="influxql")
# table is a pyarrow.Table
df = table.to_pandas()
assert 21.5 in df['val'].values, f"Expected value in {df['val']}"
替换以下内容:
示例:使用字典写入数据
InfluxDBClient3 可以将字典对象序列化为线协议。 如果您将一个 dict 传递给 InfluxDBClient3.write,客户端期望 dict 具有以下 点 属性:
- measurement (string): 测量名称
- tags (dict): 标签键值对的字典
- fields (dict): 字段键值对的字典
- 时间: 记录的时间戳
以下示例显示了如何定义一个 dict,它表示一个点,然后将数据写入 InfluxDB。
替换以下内容:
InfluxDBClient3.write_file
将数据从文件写入InfluxDB。执行是同步的。
参数
file(string): 一个包含要写入InfluxDB记录的文件路径。 文件名必须以以下支持的扩展名之一结尾。 有关编码和格式化数据的更多信息,请参阅每种支持的格式的文档:.feather: Feather.parquet: Parquet.csv: Comma-separated values.json: JSON.orc: ORC
measurement_name(字符串): 为文件中的记录定义测量名称。 指定的值优先于文件中的measurement和iox::measurement列。 如果未指定参数的值,并且文件中存在measurement列,则使用measurement列的值作为测量名称。 如果未指定参数的值,并且不存在measurement列,则使用iox::measurement列的值作为测量名称。tag_columns(list): 标签列名称。 不在列表中且未通过其他参数指定的列被视为字段。timestamp_column(字符串): 包含时间戳的列名。默认值为'time'。database(str): 要写入的数据库。默认情况下写入为客户端指定的数据库。file_parser_options(可调用): 一个提供额外参数给文件解析器的函数。**kwargs: 传递给WriteAPI的附加选项–例如:write_precision(string): Optional. Default is"ns". Specifies the precision ("ms","s","us","ns") for timestamps inrecord.write_client_options(dict): Optional. Specifies callback functions and options for batch writing mode. To generate thedict, use thewrite_client_optionsfunction.
示例:在写入文件数据时使用批处理选项
下面的示例展示了如何为批处理、重试和响应回调指定自定义写入选项,以及如何将 CSV 和 JSON 文件中的数据写入 InfluxDB:
from influxdb_client_3 import(InfluxDBClient3, write_client_options,
WritePrecision, WriteOptions, InfluxDBError)
# Define the result object
result = {
'config': None,
'status': None,
'data': None,
'error': None
}
# Define callbacks for write responses
def success_callback(self, data: str):
result['config'] = self
result['status'] = 'success'
result['data'] = data
assert result['data'] != None, f"Expected {result['data']}"
print("Successfully wrote data: {result['data']}")
def error_callback(self, data: str, exception: InfluxDBError):
result['config'] = self
result['status'] = 'error'
result['data'] = data
result['error'] = exception
assert result['status'] == "success", f"Expected {result['error']} to be success for {result['config']}"
def retry_callback(self, data: str, exception: InfluxDBError):
result['config'] = self
result['status'] = 'retry_error'
result['data'] = data
result['error'] = exception
assert result['status'] == "success", f"Expected {result['status']} to be success for {result['config']}"
write_options = WriteOptions(batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)
wco = write_client_options(success_callback=success_callback,
error_callback=error_callback,
retry_callback=retry_callback,
write_options=write_options)
with InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN",
write_client_options=wco) as client:
client.write_file(file='./data/home-sensor-data.csv', timestamp_column='time',
tag_columns=["room"], write_precision='s')
client.write_file(file='./data/home-sensor-data.json', timestamp_column='time',
tag_columns=["room"], write_precision='s')
替换以下内容:
InfluxDBClient3.query
发送一个航班请求以执行指定的 SQL 或 InfluxQL 查询。 返回查询结果中的所有数据作为一个 Arrow 表 (pyarrow.Table 实例)。
参数
query(字符串): 要执行的SQL或InfluxQL。language(字符串): 在query参数中使用的查询语言——"sql"或"influxql"。默认值是"sql"。mode(字符串):指定从pyarrow.flight.FlightStreamReader返回的输出。 默认值为"all"。all:读取流的全部内容并返回为pyarrow.Table。chunk:读取下一个消息(一个FlightStreamChunk)并返回data和app_metadata。 如果没有更多消息,则返回null。pandas:读取流的内容并返回为pandas.DataFrame。reader:将FlightStreamReader转换为pyarrow.RecordBatchReader。schema:返回流中所有记录批次的模式。
**kwargs:FlightCallOptions
示例:使用SQL查询
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
table = client.query("SELECT * from home WHERE time >= now() - INTERVAL '90 days'")
# Filter columns.
print(table.select(['room', 'temp']))
# Use PyArrow to aggregate data.
print(table.group_by('hum').aggregate([]))
在示例中,替换以下内容:
示例:使用 InfluxQL 查询
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
query = "SELECT * from home WHERE time >= -90d"
table = client.query(query=query, language="influxql")
# Filter columns.
print(table.select(['room', 'temp']))
示例:从数据流中读取所有数据并返回一个 pandas DataFrame
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'"
pd = client.query(query=query, mode="pandas")
# Print the pandas DataFrame formatted as a Markdown table.
print(pd.to_markdown())
示例:查看流中所有批次的模式
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
table = client.query("""SELECT *
from home
WHERE time >= now() - INTERVAL '90 days'""")
# View the table schema.
print(table.schema)
示例:获取结果架构且没有数据
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'"
schema = client.query(query=query, mode="schema")
print(schema)
指定超时时间
传递 timeout= 给 FlightCallOptions 以使用自定义超时。
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'"
client.query(query=query, timeout=5)
InfluxDBClient3.close
将批次中所有剩余记录发送到 InfluxDB,然后关闭底层写入客户端和 Flight 客户端以释放资源。
示例:关闭一个客户
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
client.close()
类 点
提供一个接口,用于构建测量的时间序列数据点,并设置字段、标签和时间戳。
from influxdb_client_3 import Point
point = Point("home").tag("room", "Living Room").field("temp", 72)
查看如何 使用点写入数据。
类 WriteOptions
提供一个接口用于构建自定义批量写入行为的选项,例如批量大小和重试。
from influxdb_client_3 import WriteOptions
write_options = WriteOptions(batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)
查看如何使用批处理选项来写入数据。
参数
batch_size: 默认值为1000。flush_interval: 默认值是1000.jitter_interval: 默认是0。retry_interval: 默认值为5000。max_retries: 默认值是5.max_retry_delay: 默认为125000。max_retry_time: 默认值为180000。exponential_base: 默认是2。max_close_wait: 默认值是300000。write_scheduler:默认值为ThreadPoolScheduler(max_workers=1)。
函数
函数 write_client_options(**kwargs)
返回一个 dict,包含指定的写客户端选项。
参数
该函数接受以下关键字参数:
write_options(WriteOptions): 指定客户端是使用同步模式还是批处理模式写入数据。如果使用批处理模式,客户端将使用指定的批处理选项。point_settings(dict): 客户端在将数据写入InfluxDB时将添加到每个点的默认标签。success_callback(可调用): 如果使用批处理模式,在数据成功写入InfluxDB后调用的函数 (HTTP 状态204)error_callback(可调用): 如果使用批处理模式,当数据未成功写入时要调用的函数(响应的HTTP状态码不是204)retry_callback(可调用): 如果使用批处理模式,当请求是重试时(使用批处理模式)且数据未成功写入,调用的函数
示例:为批处理写入实例化选项
from influxdb_client_3 import write_client_options, WriteOptions
from influxdb_client_3.write_client.client.write_api import WriteType
def success():
print("Success")
def error():
print("Error")
def retry():
print("Retry error")
write_options = WriteOptions()
wco = write_client_options(success_callback=success,
error_callback=error,
retry_callback=retry,
write_options=write_options)
assert wco['success_callback']
assert wco['error_callback']
assert wco['retry_callback']
assert wco['write_options'].write_type == WriteType.batching
示例:为同步写入实例化选项
from influxdb_client_3 import write_client_options, SYNCHRONOUS
from influxdb_client_3.write_client.client.write_api import WriteType
wco = write_client_options(write_options=SYNCHRONOUS)
assert wco['write_options'].write_type == WriteType.synchronous
函数 flight_client_options(**kwargs)
返回一个 dict,包含指定的 FlightClient 参数。
参数
kwargs:pyarrow.flight.FlightClient参数的关键字参数
示例:指定根证书路径
from influxdb_client_3 import InfluxDBClient3, flight_client_options
import certifi
fh = open(certifi.where(), "r")
cert = fh.read()
fh.close()
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN",
flight_client_options=flight_client_options(tls_root_certs=cert))
替换以下内容:
常量
influxdb_client_3.SYNCHRONOUS: 表示同步写入模式influxdb_client_3.WritePrecision: 表示写入精度的枚举类
异常
influxdb_client_3.InfluxDBError: 与 InfluxDB 相关错误引发的异常类