Documentation

Python 客户端库入门

按照这个逐步教程,使用InfluxData客户端库和您喜欢的框架或语言构建一个物联网(IoT)应用程序。

在本教程中,您将使用InfluxDB API和客户端库来构建现代应用程序,同时学习以下内容:

  • InfluxDB核心概念。

  • 应用程序如何与设备和InfluxDB交互。

  • 如何对应用程序和设备进行API认证。

  • 如何安装客户端库。

  • 如何在InfluxDB中写入和查询数据。

  • 如何使用InfluxData UI库格式化数据和创建可视化。

目录

设置 InfluxDB

如果你还没有,创建一个 InfluxDB Cloud 帐户安装 InfluxDB OSS

物联网入门示例应用假设以下前提条件:

  • 一个 InfluxDB 组织 ID
  • 一个 API token(例如,一个 All Access token)具有对存储桶的读取和写入权限
  • 一个 bucket 名为 iot_center 用于存储来自设备的时间序列数据
  • 一个 bucket 名为 iot_center_devices 用于存储设备元数据和API令牌ID

介绍IoT入门

应用程序架构分为四层:

  • InfluxDB API: InfluxDB v2 API.
  • IoT 设备: 虚拟或物理设备将 IoT 数据写入 InfluxDB API。
  • 用户界面: 向服务器发送请求并在浏览器中渲染视图。
  • API: 接收来自用户界面的请求,向InfluxDB发送请求,并处理来自InfluxDB的响应。

有关本教程中引用的完整代码,请参见influxdata/iot-api-python repository

创建应用程序

创建一个将包含您的 iot-api 项目的目录。 以下示例代码在您的主目录中创建一个 iot-api 目录并切换到新目录:

mkdir ~/iot-api-apps
cd ~/iot-api-apps

使用 Flask,一个轻量级的 Python 网站框架,来创建你的应用程序。

  1. 在你的 ~/iot-api-apps 目录中,打开一个终端并输入以下命令以创建并导航到一个新的项目目录:

    mkdir iot-api-python && cd $_
    
  2. 在您的终端中输入以下命令以创建并激活项目的Python虚拟环境:

    # Create a new virtual environment named "virtualenv"
    # Python 3.8+
    python -m venv virtualenv
    
    # Activate the virtualenv (OS X & Linux)
    source virtualenv/bin/activate
    
  3. 激活完成后,在终端中输入以下命令以使用pip包安装程序(随Python一起提供)安装Flask:

    pip install Flask
    
  4. 在您的项目中,创建一个 app.py 文件,该文件:

    1. Imports the Flask package.
    2. Instantiates a Flask application.
    3. Provides a route to execute the application.
    from flask import Flask
    app = Flask(__name__)
    
    @app.route("/")
    def hello():
      return "Hello World!"
    

    启动您的应用程序。 以下示例代码在 http://localhost:3001 上启动应用程序,启用调试和热重载:

    export FLASK_ENV=development
    flask run -h localhost -p 3001
    

    在您的浏览器中,访问 http://localhost:3001 以查看“Hello World!”响应。

安装 InfluxDB 客户端库

InfluxDB客户端库提供以下InfluxDB API交互:

  • 使用Flux语言查询数据。
  • 将数据写入InfluxDB。
  • 在后台批处理数据。
  • 在失败时自动重试请求。

在终端中输入以下命令以安装客户端库:

pip install influxdb-client

有关客户端库的更多信息,请参阅 influxdata/influxdb-client-python repo

配置客户端库

InfluxDB 客户端库需要您 InfluxDB 环境中的配置属性。通常,您需要将以下属性作为环境变量提供给您的应用程序:

  • INFLUX_URL
  • INFLUX_TOKEN
  • INFLUX_ORG
  • INFLUX_BUCKET
  • INFLUX_BUCKET_AUTH

要设置客户端配置,请在项目的顶级目录中创建一个 config.ini 文件,并粘贴以下内容以提供必要的 InfluxDB 认证信息:

[APP]
INFLUX_URL = <INFLUX_URL>
INFLUX_TOKEN = <INFLUX_TOKEN>
INFLUX_ORG = <INFLUX_ORG_ID>
INFLUX_BUCKET = iot_center
INFLUX_BUCKET_AUTH = iot_center_devices

替换以下内容:

  • : 你的 InfluxDB 实例 URL。
  • : 您的 InfluxDB API token,具备查询(读取)桶和为设备创建(写入)授权的权限。
  • : 您的 InfluxDB 组织 ID。

构建API

您的应用程序 API 提供处理来自 UI 的请求的服务器端 HTTP 端点。每个 API 端点负责以下工作:

  1. 监听来自用户界面的HTTP请求。
  2. 将请求翻译为InfluxDB API请求。
  3. 处理InfluxDB API响应并处理错误。
  4. 响应状态和数据(用于用户界面)。

创建注册设备的API

在这个应用程序中,注册设备是一个包含您的设备ID、授权ID和API令牌的点。 API令牌和授权权限允许设备查询并写入INFLUX_BUCKET。 在此部分,您添加处理来自UI的请求的API端点,创建InfluxDB中的授权,并将注册设备写入INFLUX_BUCKET_AUTH桶。 要了解有关API令牌和授权的更多信息,请参见管理API令牌

应用程序API使用以下 /api/v2 InfluxDB API 端点:

  • POST /api/v2/query:查询INFLUX_BUCKET_AUTH以获取注册设备。
  • GET /api/v2/buckets: 获取 INFLUX_BUCKET 的桶 ID。
  • POST /api/v2/authorizations: 为设备创建授权。
  • POST /api/v2/write: 将设备授权写入 INFLUX_BUCKET_AUTH

为设备创建授权

在本节中,您将创建对 INFLUX_BUCKET读取-写入 权限的授权,并为设备接收一个 API 令牌。 下面的示例使用以下步骤来创建授权:

  1. 实例化 AuthorizationsAPI 客户端和 BucketsAPI 客户端,并进行配置。
  2. 检索桶ID。
  3. 使用客户端库向/api/v2/authorizations InfluxDB API端点发送POST请求。

创建一个 ./api/devices.py 文件,内容包括:

# Import the dependencies.
import configparser
from datetime import datetime
from uuid import uuid4

# Import client library classes.
from influxdb_client import Authorization, InfluxDBClient, Permission, PermissionResource, Point, WriteOptions
from influxdb_client.client.authorizations_api import AuthorizationsApi
from influxdb_client.client.bucket_api import BucketsApi
from influxdb_client.client.query_api import QueryApi
from influxdb_client.client.write_api import SYNCHRONOUS

from api.sensor import Sensor

# Get the configuration key-value pairs.

config = configparser.ConfigParser()
config.read('config.ini')

def create_authorization(device_id) -> Authorization:
    influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
                                     token=os.environ.get('INFLUX_TOKEN'),
                                     org=os.environ.get('INFLUX_ORG'))

    authorization_api = AuthorizationsApi(influxdb_client)
    # get bucket_id from bucket
    buckets_api = BucketsApi(influxdb_client)
    buckets = buckets_api.find_bucket_by_name(config.get('APP', 'INFLUX_BUCKET'))  # function returns only 1 bucket
    bucket_id = buckets.id
    org_id = buckets.org_id
    desc_prefix = f'IoTCenterDevice: {device_id}'
    org_resource = PermissionResource(org_id=org_id, id=bucket_id, type="buckets")
    read = Permission(action="read", resource=org_resource)
    write = Permission(action="write", resource=org_resource)
    permissions = [read, write]
    authorization = Authorization(org_id=org_id, permissions=permissions, description=desc_prefix)
    request = authorization_api.create_authorization(authorization=authorization)
    return request

要创建一个具有 - 权限的授权,需要 INFLUX_BUCKET 的桶 ID。 要检索桶 ID,create_authorization(deviceId) 调用 BucketsAPI find_bucket_by_name 函数,该函数向 /api/v2/buckets InfluxDB API 端点发送 GET 请求。 create_authorization(deviceId) 然后在请求体中传递一个新的授权,内容如下:

  • 桶 ID。
  • 组织 ID。
  • 描述: IoTCenterDevice: DEVICE_ID.
  • 存储桶的权限列表。

要了解有关API令牌和授权的更多信息,请参阅 管理API令牌

接下来, 将设备授权写入存储桶

将设备授权写入存储桶

在InfluxDB中使用设备授权,将设备和授权详情写入INFLUX_BUCKET_AUTH。将设备授权存储在桶中允许你执行以下操作:

  • 报告设备授权历史。
  • 管理有令牌和没有令牌的设备。
  • 将相同的令牌分配给多个设备。
  • 刷新令牌。

要将一个点写入InfluxDB,请使用InfluxDB客户端库向/api/v2/write InfluxDB API端点发送POST请求。 在./api/devices.py中,添加以下create_device(device_id)函数:

def create_device(device_id=None):
    influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
                                     token=config.get('APP', 'INFLUX_TOKEN'),
                                     org=config.get('APP', 'INFLUX_ORG'))
    if device_id is None:
        device_id = str(uuid4())
    write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)
    point = Point('deviceauth') \
        .tag("deviceId", device_id) \
        .field('key', f'fake_auth_id_{device_id}') \
        .field('token', f'fake_auth_token_{device_id}')
    client_response = write_api.write(bucket=config.get('APP', 'INFLUX_BUCKET_AUTH'), record=point)
    # write() returns None on success
    if client_response is None:
        return device_id
    # Return None on failure
    return None

create_device(device_id) 接收一个 device_id 并按照以下步骤将数据写入 INFLUX_BUCKET_AUTH

  1. 使用来自配置的 urltokenorg 值初始化 InfluxDBClient()
  2. 初始化一个 WriteAPI 客户端,用于将数据写入 InfluxDB 存储桶。
  3. 创建一个 Point
  4. 使用 write_api.write()Point 写入存储桶。
  5. 检查是否失败–如果写入成功, write_api 返回 None
  6. 如果成功,返回 device_id;否则返回 None

该函数写入一个具有以下元素的点:

元素名称
测量deviceauth
标签deviceId设备 ID
字段key授权ID
字段token授权(API)令牌

接下来,创建列出设备的API

创建API以列出设备

添加 /api/devices API 端点,该端点用于检索、处理和列出注册的设备。

  1. 创建一个Flux查询,获取每个包含series的最后一行deviceauth测量结果。 以下示例查询返回包含key字段(授权ID)的行,并排除包含token字段的行(以避免将令牌暴露给UI)。

    // Flux query finds devices
     from(bucket:`${INFLUX_BUCKET_AUTH}`)
          |> range(start: 0)
          |> filter(fn: (r) => r._measurement == "deviceauth" and r._field != "token")
          |> last()
    
  2. 使用QueryApi客户端将Flux查询发送到POST /api/v2/query InfluxDB API端点。

    ./api/devices.py 中,添加以下内容:

    def get_device(device_id=None) -> {}:
        influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
                                         token=os.environ.get('INFLUX_TOKEN'),
                                         org=os.environ.get('INFLUX_ORG'))
        # Queries must be formatted with single and double quotes correctly
        query_api = QueryApi(influxdb_client)
        device_filter = ''
        if device_id:
            device_id = str(device_id)
            device_filter = f'r.deviceId == "{device_id}" and r._field != "token"'
        else:
            device_filter = f'r._field != "token"'
    
        flux_query = f'from(bucket: "{config.get("APP", "INFLUX_BUCKET_AUTH")}") ' \
                     f'|> range(start: 0) ' \
                     f'|> filter(fn: (r) => r._measurement == "deviceauth" and {device_filter}) ' \
                     f'|> last()'
    
        response = query_api.query(flux_query)
        result = []
        for table in response:
            for record in table.records:
                try:
                    'updatedAt' in record
                except KeyError:
                    record['updatedAt'] = record.get_time()
                    record[record.get_field()] = record.get_value()
                result.append(record.values)
        return result
    

    函数 get_device(device_id) 执行以下操作:

    1. Instantiates a QueryApi client and sends the Flux query to InfluxDB.
    2. Iterates over the FluxTable in the response and returns a list of tuples.

创建物联网虚拟设备

创建一个 ./api/sensor.py 文件,该文件生成模拟的天气遥测数据。 按照 示例代码 创建物联网虚拟设备。

接下来,为虚拟设备生成数据并 将数据写入 InfluxDB

写入遥测数据

在本节中,您将遥测数据写入一个 InfluxDB 存储桶。为了写入数据,请使用 InfluxDB 客户端库向 /api/v2/write InfluxDB API 端点发送 POST 请求。

下面的示例使用以下步骤生成数据,然后将其写入InfluxDB:

  1. 初始化一个 WriteAPI 实例。
  2. 创建一个 Point,包含 environment 测量和数据字段,分别为温度、湿度、气压、纬度和经度。
  3. 使用 WriteAPI write 方法将数据点发送到 InfluxDB。

./api/devices.py 中,添加以下 write_measurements(device_id) 函数:

def write_measurements(device_id):
    influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
                                     token=config.get('APP', 'INFLUX_TOKEN'),
                                     org=config.get('APP', 'INFLUX_ORG'))
    write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)
    virtual_device = Sensor()
    coord = virtual_device.geo()
    point = Point("environment") \
        .tag("device", device_id) \
        .tag("TemperatureSensor", "virtual_bme280") \
        .tag("HumiditySensor", "virtual_bme280") \
        .tag("PressureSensor", "virtual_bme280") \
        .field("Temperature", virtual_device.generate_measurement()) \
        .field("Humidity", virtual_device.generate_measurement()) \
        .field("Pressure", virtual_device.generate_measurement()) \
        .field("Lat", coord['latitude']) \
        .field("Lon", coord['latitude']) \
        .time(datetime.utcnow())
    print(f"Writing: {point.to_line_protocol()}")
    client_response = write_api.write(bucket=config.get('APP', 'INFLUX_BUCKET'), record=point)
    # write() returns None on success
    if client_response is None:
        # TODO Maybe also return the data that was written
        return device_id
    # Return None on failure
    return None

查询遥测数据

在本节中,您将从 InfluxDB 存储桶中获取遥测数据。
要检索数据,请使用 InfluxDB 客户端库向 /api/v2/query InfluxDB API 端点发送一个 POST 请求。
下面的示例使用以下步骤来获取和处理遥测数据:

  1. 查询 environment 测量值在 INFLUX_BUCKET 中。
  2. 通过 device_id 过滤结果。
  3. 返回 influxdata/giraffe UI 库 可以处理的 CSV 数据。

./api/devices.py 中,添加以下 get_measurements(device_id) 函数:

def get_measurements(query):
    influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
                                     token=os.environ.get('INFLUX_TOKEN'), org=os.environ.get('INFLUX_ORG'))
    query_api = QueryApi(influxdb_client)
    result = query_api.query_csv(query,
                                 dialect=Dialect(
                                       header=True,
                                       delimiter=",",
                                       comment_prefix="#",
                                       annotations=['group', 'datatype', 'default'],
                                       date_time_format="RFC3339"))
    response = ''
    for row in result:
        response += (',').join(row) + ('\n')
    return response

定义API响应

app.py 中,添加匹配传入请求的 API 端点,并响应您模块的结果。 在以下 /api/devices/ 路由示例中,app.pyGETPOST 请求中检索 device_id,将其传递给 get_device(device_id) 方法,并将结果作为 JSON 数据返回,附带 CORS allow- 头。

@app.route('/api/devices/<string:device_id>', methods=['GET', 'POST'])
def api_get_device(device_id):
    if request.method == "OPTIONS": # CORS preflight
        return _build_cors_preflight_response()
    return _corsify_actual_response(jsonify(devices.get_device(device_id)))

在终端中输入以下命令以重启应用程序:

  1. CONTROL+C停止应用程序。
  2. flask run -h localhost -p 3001 以启动应用程序。

要从您的API获取设备数据,请在浏览器中访问 http://localhost:3001/api/devices

安装并运行用户界面

influxdata/iot-api-ui 是一个独立的 Next.js React 用户界面,使用您的应用程序 API 向 InfluxDB 写入和查询数据。iot-api-ui 使用 Next.js rewrites 将所有请求路由到 /api/ 路径下的您的 API。

要安装和运行用户界面,请执行以下操作:

  1. 在你的 ~/iot-api-apps 目录中,克隆 influxdata/iot-api-ui 仓库 并进入 iot-api-ui 目录,例如:

    cd ~/iot-api-apps
    git clone git@github.com:influxdata/iot-api-ui.git
    cd ./iot-app-ui
    
  2. 文件 ./.env.development 包含默认的配置设置,您可以编辑或覆盖(使用 ./.env.local 文件)。

  3. 要启动用户界面,请在终端中输入以下命令:

    yarn dev
    

    要查看设备列表并注册设备,请在浏览器中访问 http://localhost:3000/devices

要了解有关UI组件的更多信息,请参见 influxdata/iot-api-ui



Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看: