Python 客户端库入门
按照这个逐步教程,使用InfluxData客户端库和您喜欢的框架或语言构建一个物联网(IoT)应用程序。
在本教程中,您将使用InfluxDB API和客户端库来构建现代应用程序,同时学习以下内容:
InfluxDB核心概念。
应用程序如何与设备和InfluxDB交互。
如何对应用程序和设备进行API认证。
如何安装客户端库。
如何在InfluxDB中写入和查询数据。
如何使用InfluxData UI库格式化数据和创建可视化。
目录
- 内容
- 设置 InfluxDB
- 介绍物联网入门套件
- 创建应用程序
- 安装 InfluxDB 客户端库
- 配置客户端库
- 构建API
- 创建注册设备的API
- 创建API以列出设备
- 创建物联网虚拟设备
- 写入遥测数据
- 查询遥测数据
- 定义 API 响应
- 安装并运行用户界面
设置 InfluxDB
如果你还没有,创建一个 InfluxDB Cloud 帐户 或 安装 InfluxDB OSS。
物联网入门示例应用假设以下前提条件:
- 一个 InfluxDB 组织 ID
- 一个 API token(例如,一个 All Access token)具有对存储桶的读取和写入权限
- 一个 bucket 名为
iot_center用于存储来自设备的时间序列数据 - 一个 bucket 名为
iot_center_devices用于存储设备元数据和API令牌ID
介绍IoT入门
应用程序架构分为四层:
- InfluxDB API: InfluxDB v2 API.
- IoT 设备: 虚拟或物理设备将 IoT 数据写入 InfluxDB API。
- 用户界面: 向服务器发送请求并在浏览器中渲染视图。
- API: 接收来自用户界面的请求,向InfluxDB发送请求,并处理来自InfluxDB的响应。
有关本教程中引用的完整代码,请参见influxdata/iot-api-python repository。
创建应用程序
创建一个将包含您的 iot-api 项目的目录。 以下示例代码在您的主目录中创建一个 iot-api 目录并切换到新目录:
mkdir ~/iot-api-apps
cd ~/iot-api-apps
使用 Flask,一个轻量级的 Python 网站框架,来创建你的应用程序。
在你的
~/iot-api-apps目录中,打开一个终端并输入以下命令以创建并导航到一个新的项目目录:mkdir iot-api-python && cd $_在您的终端中输入以下命令以创建并激活项目的Python虚拟环境:
# Create a new virtual environment named "virtualenv" # Python 3.8+ python -m venv virtualenv # Activate the virtualenv (OS X & Linux) source virtualenv/bin/activate激活完成后,在终端中输入以下命令以使用
pip包安装程序(随Python一起提供)安装Flask:pip install Flask在您的项目中,创建一个
app.py文件,该文件:- Imports the Flask package.
- Instantiates a Flask application.
- Provides a route to execute the application.
from flask import Flask app = Flask(__name__) @app.route("/") def hello(): return "Hello World!"启动您的应用程序。 以下示例代码在
http://localhost:3001上启动应用程序,启用调试和热重载:export FLASK_ENV=development flask run -h localhost -p 3001在您的浏览器中,访问 http://localhost:3001 以查看“Hello World!”响应。
安装 InfluxDB 客户端库
InfluxDB客户端库提供以下InfluxDB API交互:
- 使用Flux语言查询数据。
- 将数据写入InfluxDB。
- 在后台批处理数据。
- 在失败时自动重试请求。
在终端中输入以下命令以安装客户端库:
pip install influxdb-client
有关客户端库的更多信息,请参阅 influxdata/influxdb-client-python repo。
配置客户端库
InfluxDB 客户端库需要您 InfluxDB 环境中的配置属性。通常,您需要将以下属性作为环境变量提供给您的应用程序:
INFLUX_URLINFLUX_TOKENINFLUX_ORGINFLUX_BUCKETINFLUX_BUCKET_AUTH
要设置客户端配置,请在项目的顶级目录中创建一个 config.ini 文件,并粘贴以下内容以提供必要的 InfluxDB 认证信息:
[APP]
INFLUX_URL = <INFLUX_URL>
INFLUX_TOKEN = <INFLUX_TOKEN>
INFLUX_ORG = <INFLUX_ORG_ID>
INFLUX_BUCKET = iot_center
INFLUX_BUCKET_AUTH = iot_center_devices
替换以下内容:
: 你的 InfluxDB 实例 URL。: 您的 InfluxDB API token,具备查询(读取)桶和为设备创建(写入)授权的权限。: 您的 InfluxDB 组织 ID。
构建API
您的应用程序 API 提供处理来自 UI 的请求的服务器端 HTTP 端点。每个 API 端点负责以下工作:
- 监听来自用户界面的HTTP请求。
- 将请求翻译为InfluxDB API请求。
- 处理InfluxDB API响应并处理错误。
- 响应状态和数据(用于用户界面)。
创建注册设备的API
在这个应用程序中,注册设备是一个包含您的设备ID、授权ID和API令牌的点。
API令牌和授权权限允许设备查询并写入INFLUX_BUCKET。
在此部分,您添加处理来自UI的请求的API端点,创建InfluxDB中的授权,并将注册设备写入INFLUX_BUCKET_AUTH桶。
要了解有关API令牌和授权的更多信息,请参见管理API令牌
应用程序API使用以下 /api/v2 InfluxDB API 端点:
POST /api/v2/query:查询INFLUX_BUCKET_AUTH以获取注册设备。GET /api/v2/buckets: 获取INFLUX_BUCKET的桶 ID。POST /api/v2/authorizations: 为设备创建授权。POST /api/v2/write: 将设备授权写入INFLUX_BUCKET_AUTH。
为设备创建授权
在本节中,您将创建对 INFLUX_BUCKET 的 读取-写入 权限的授权,并为设备接收一个 API 令牌。
下面的示例使用以下步骤来创建授权:
- 实例化
AuthorizationsAPI客户端和BucketsAPI客户端,并进行配置。 - 检索桶ID。
- 使用客户端库向
/api/v2/authorizationsInfluxDB API端点发送POST请求。
创建一个 ./api/devices.py 文件,内容包括:
# Import the dependencies.
import configparser
from datetime import datetime
from uuid import uuid4
# Import client library classes.
from influxdb_client import Authorization, InfluxDBClient, Permission, PermissionResource, Point, WriteOptions
from influxdb_client.client.authorizations_api import AuthorizationsApi
from influxdb_client.client.bucket_api import BucketsApi
from influxdb_client.client.query_api import QueryApi
from influxdb_client.client.write_api import SYNCHRONOUS
from api.sensor import Sensor
# Get the configuration key-value pairs.
config = configparser.ConfigParser()
config.read('config.ini')
def create_authorization(device_id) -> Authorization:
influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
token=os.environ.get('INFLUX_TOKEN'),
org=os.environ.get('INFLUX_ORG'))
authorization_api = AuthorizationsApi(influxdb_client)
# get bucket_id from bucket
buckets_api = BucketsApi(influxdb_client)
buckets = buckets_api.find_bucket_by_name(config.get('APP', 'INFLUX_BUCKET')) # function returns only 1 bucket
bucket_id = buckets.id
org_id = buckets.org_id
desc_prefix = f'IoTCenterDevice: {device_id}'
org_resource = PermissionResource(org_id=org_id, id=bucket_id, type="buckets")
read = Permission(action="read", resource=org_resource)
write = Permission(action="write", resource=org_resource)
permissions = [read, write]
authorization = Authorization(org_id=org_id, permissions=permissions, description=desc_prefix)
request = authorization_api.create_authorization(authorization=authorization)
return request
要创建一个具有 读-写 权限的授权,需要 INFLUX_BUCKET 的桶 ID。
要检索桶 ID,create_authorization(deviceId) 调用
BucketsAPI find_bucket_by_name 函数,该函数向
/api/v2/buckets InfluxDB API 端点发送 GET 请求。
create_authorization(deviceId) 然后在请求体中传递一个新的授权,内容如下:
- 桶 ID。
- 组织 ID。
- 描述:
IoTCenterDevice: DEVICE_ID. - 存储桶的权限列表。
要了解有关API令牌和授权的更多信息,请参阅 管理API令牌。
接下来, 将设备授权写入存储桶。
将设备授权写入存储桶
在InfluxDB中使用设备授权,将设备和授权详情写入INFLUX_BUCKET_AUTH。将设备授权存储在桶中允许你执行以下操作:
- 报告设备授权历史。
- 管理有令牌和没有令牌的设备。
- 将相同的令牌分配给多个设备。
- 刷新令牌。
要将一个点写入InfluxDB,请使用InfluxDB客户端库向/api/v2/write InfluxDB API端点发送POST请求。
在./api/devices.py中,添加以下create_device(device_id)函数:
def create_device(device_id=None):
influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
token=config.get('APP', 'INFLUX_TOKEN'),
org=config.get('APP', 'INFLUX_ORG'))
if device_id is None:
device_id = str(uuid4())
write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)
point = Point('deviceauth') \
.tag("deviceId", device_id) \
.field('key', f'fake_auth_id_{device_id}') \
.field('token', f'fake_auth_token_{device_id}')
client_response = write_api.write(bucket=config.get('APP', 'INFLUX_BUCKET_AUTH'), record=point)
# write() returns None on success
if client_response is None:
return device_id
# Return None on failure
return None
create_device(device_id) 接收一个 device_id 并按照以下步骤将数据写入 INFLUX_BUCKET_AUTH:
- 使用来自配置的
url、token和org值初始化InfluxDBClient()。 - 初始化一个
WriteAPI客户端,用于将数据写入 InfluxDB 存储桶。 - 创建一个
Point。 - 使用
write_api.write()将Point写入存储桶。 - 检查是否失败–如果写入成功,
write_api返回None。 - 如果成功,返回
device_id;否则返回None。
该函数写入一个具有以下元素的点:
| 元素 | 名称 | 值 |
|---|---|---|
| 测量 | deviceauth | |
| 标签 | deviceId | 设备 ID |
| 字段 | key | 授权ID |
| 字段 | token | 授权(API)令牌 |
接下来,创建列出设备的API。
创建API以列出设备
添加 /api/devices API 端点,该端点用于检索、处理和列出注册的设备。
创建一个Flux查询,获取每个包含series的最后一行
deviceauth测量结果。 以下示例查询返回包含key字段(授权ID)的行,并排除包含token字段的行(以避免将令牌暴露给UI)。// Flux query finds devices from(bucket:`${INFLUX_BUCKET_AUTH}`) |> range(start: 0) |> filter(fn: (r) => r._measurement == "deviceauth" and r._field != "token") |> last()使用
QueryApi客户端将Flux查询发送到POST /api/v2/queryInfluxDB API端点。在
./api/devices.py中,添加以下内容:def get_device(device_id=None) -> {}: influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'), token=os.environ.get('INFLUX_TOKEN'), org=os.environ.get('INFLUX_ORG')) # Queries must be formatted with single and double quotes correctly query_api = QueryApi(influxdb_client) device_filter = '' if device_id: device_id = str(device_id) device_filter = f'r.deviceId == "{device_id}" and r._field != "token"' else: device_filter = f'r._field != "token"' flux_query = f'from(bucket: "{config.get("APP", "INFLUX_BUCKET_AUTH")}") ' \ f'|> range(start: 0) ' \ f'|> filter(fn: (r) => r._measurement == "deviceauth" and {device_filter}) ' \ f'|> last()' response = query_api.query(flux_query) result = [] for table in response: for record in table.records: try: 'updatedAt' in record except KeyError: record['updatedAt'] = record.get_time() record[record.get_field()] = record.get_value() result.append(record.values) return result函数
get_device(device_id)执行以下操作:- Instantiates a
QueryApiclient and sends the Flux query to InfluxDB. - Iterates over the
FluxTablein the response and returns a list of tuples.
- Instantiates a
创建物联网虚拟设备
创建一个 ./api/sensor.py 文件,该文件生成模拟的天气遥测数据。
按照 示例代码 创建物联网虚拟设备。
接下来,为虚拟设备生成数据并 将数据写入 InfluxDB。
写入遥测数据
在本节中,您将遥测数据写入一个 InfluxDB 存储桶。为了写入数据,请使用 InfluxDB 客户端库向 /api/v2/write InfluxDB API 端点发送 POST 请求。
下面的示例使用以下步骤生成数据,然后将其写入InfluxDB:
- 初始化一个
WriteAPI实例。 - 创建一个
Point,包含environment测量和数据字段,分别为温度、湿度、气压、纬度和经度。 - 使用
WriteAPI write方法将数据点发送到 InfluxDB。
在 ./api/devices.py 中,添加以下 write_measurements(device_id) 函数:
def write_measurements(device_id):
influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
token=config.get('APP', 'INFLUX_TOKEN'),
org=config.get('APP', 'INFLUX_ORG'))
write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)
virtual_device = Sensor()
coord = virtual_device.geo()
point = Point("environment") \
.tag("device", device_id) \
.tag("TemperatureSensor", "virtual_bme280") \
.tag("HumiditySensor", "virtual_bme280") \
.tag("PressureSensor", "virtual_bme280") \
.field("Temperature", virtual_device.generate_measurement()) \
.field("Humidity", virtual_device.generate_measurement()) \
.field("Pressure", virtual_device.generate_measurement()) \
.field("Lat", coord['latitude']) \
.field("Lon", coord['latitude']) \
.time(datetime.utcnow())
print(f"Writing: {point.to_line_protocol()}")
client_response = write_api.write(bucket=config.get('APP', 'INFLUX_BUCKET'), record=point)
# write() returns None on success
if client_response is None:
# TODO Maybe also return the data that was written
return device_id
# Return None on failure
return None
查询遥测数据
在本节中,您将从 InfluxDB 存储桶中获取遥测数据。
要检索数据,请使用 InfluxDB 客户端库向 /api/v2/query InfluxDB API 端点发送一个 POST 请求。
下面的示例使用以下步骤来获取和处理遥测数据:
- 查询
environment测量值在INFLUX_BUCKET中。 - 通过
device_id过滤结果。 - 返回
influxdata/giraffeUI 库 可以处理的 CSV 数据。
在 ./api/devices.py 中,添加以下 get_measurements(device_id) 函数:
def get_measurements(query):
influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
token=os.environ.get('INFLUX_TOKEN'), org=os.environ.get('INFLUX_ORG'))
query_api = QueryApi(influxdb_client)
result = query_api.query_csv(query,
dialect=Dialect(
header=True,
delimiter=",",
comment_prefix="#",
annotations=['group', 'datatype', 'default'],
date_time_format="RFC3339"))
response = ''
for row in result:
response += (',').join(row) + ('\n')
return response
定义API响应
在 app.py 中,添加匹配传入请求的 API 端点,并响应您模块的结果。
在以下 /api/devices/ 路由示例中,app.py 从 GET 和 POST 请求中检索 device_id,将其传递给 get_device(device_id) 方法,并将结果作为 JSON 数据返回,附带 CORS allow- 头。
@app.route('/api/devices/<string:device_id>', methods=['GET', 'POST'])
def api_get_device(device_id):
if request.method == "OPTIONS": # CORS preflight
return _build_cors_preflight_response()
return _corsify_actual_response(jsonify(devices.get_device(device_id)))
在终端中输入以下命令以重启应用程序:
- 按
CONTROL+C停止应用程序。 flask run -h localhost -p 3001以启动应用程序。
要从您的API获取设备数据,请在浏览器中访问 http://localhost:3001/api/devices。
安装并运行用户界面
influxdata/iot-api-ui 是一个独立的 Next.js React 用户界面,使用您的应用程序 API 向 InfluxDB 写入和查询数据。iot-api-ui 使用 Next.js rewrites 将所有请求路由到 /api/ 路径下的您的 API。
要安装和运行用户界面,请执行以下操作:
在你的
~/iot-api-apps目录中,克隆influxdata/iot-api-ui仓库 并进入iot-api-ui目录,例如:cd ~/iot-api-apps git clone git@github.com:influxdata/iot-api-ui.git cd ./iot-app-ui文件
./.env.development包含默认的配置设置,您可以编辑或覆盖(使用./.env.local文件)。要启动用户界面,请在终端中输入以下命令:
yarn dev要查看设备列表并注册设备,请在浏览器中访问 http://localhost:3000/devices。
要了解有关UI组件的更多信息,请参见 influxdata/iot-api-ui。