Documentation

将数据从 InfluxDB OSS 写入 InfluxDB Cloud

要将数据从 InfluxDB OSS 写入 InfluxDB Cloud,请使用 Flux to()experimental.to() 函数。 使用单个查询执行一次性写入数据或使用 InfluxDB 任务 定期将数据写入 InfluxDB Cloud

将写入 InfluxDB OSS 的数据复制到 InfluxDB Cloud

要将所有写入复制到InfluxDB OSS实例到InfluxDB Cloud实例,使用 InfluxDB replication streams

InfluxDB Cloud 速率限制

写入 InfluxDB Cloud 的请求受限于与您的 InfluxDB Cloud 定价计划相关的速率限制。

  1. 从 InfluxDB OSS 查询数据。

  2. (可选) 过滤 或处理数据以写入 InfluxDB Cloud。

  3. 使用 toexperimental.to 将数据写入 InfluxDB Cloud。对于大多数用例,to() 是正确的函数,但根据您正在写入的数据的结构,可能需要使用 experimental.to

    使用以下指南:

    • to(): 使用 to 将字段键写入 _field 列,将字段值写入 _value 列。

    • experimental.to(): 用于将列名中的数据写入对应的字段键,并将列值写入字段值。

    请参阅输入和输出示例,适用于to()函数

  4. 提供以下参数给任一函数:

    • bucket: 要写入的InfluxDB Cloud bucket
    • host: InfluxDB Cloud 区域 URL
    • org: InfluxDB Cloud 组织
    • token: InfluxDB Cloud API令牌
  5. (推荐) 为了将您的原始 API 令牌保留在查询之外,请将您的 InfluxDB Cloud API 令牌作为 InfluxDB 秘密 存储在您的 InfluxDB OSS 实例中,并使用 secrets.get() 按如下示例检索秘密值(选择您使用的函数以查看正确的格式):

import "influxdata/influxdb/secrets"

cloudToken = secrets.get(key: "INFLUX_CLOUD_API_TOKEN")

from(bucket: "example-oss-bucket")
    |> range(start: -10m)
    |> filter(fn: (r) => r._measurement == "example-measurement")
    |> to(
        bucket: "example-cloud-bucket",
        host: "https://cloud2.influxdata.com",
        org: "example-org",
        token: cloudToken,
    )
import "experimental"
import "influxdata/influxdb/secrets"

cloudToken = secrets.get(key: "INFLUX_CLOUD_API_TOKEN")

from(bucket: "example-oss-bucket")
    |> range(start: -10m)
    |> filter(fn: (r) => r._measurement == "example-measurement")
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> experimental.to(
        bucket: "example-cloud-bucket",
        host: "https://cloud2.influxdata.com",
        org: "example-org",
        token: cloudToken,
    )

to() 函数的输入和输出数据

  • to() 需要 _time_measurement_field_value 列。
  • to() 将所有其他列写为标签,其中列名是标签键,而列值是标签值。

输入数据

时间测量示例标签字段
2021-01-01T00:00:00Z例子-mA温度80.0
2021-01-01T00:01:00Zexample-mA温度80.3
2021-01-01T00:02:00Zexample-mA温度81.1
_时间_测量示例标签_字段_值
2021-01-01T00:00:00Zexample-mArpm4023
2021-01-01T00:01:00Z示例-mArpm4542
2021-01-01T00:02:00Zexample-mA转速4901

输出行协议

example-m,exampleTag=A temp=80.0,rpm=4023i 1609459200000000000
example-m,exampleTag=A temp=80.3,rpm=4542i 1609459260000000000
example-m,exampleTag=A temp=81.1,rpm=4901i 1609459320000000000
  • experimental.to() 需要 _time_measurement 列。
  • 组键中的(除了_measurement)被解析为标签,其中列名是标签键,列值是标签值。
  • 不在分组键中的(除了 _time_)被解析为字段,其中列名称是字段键,列值是字段值。

输入数据

组键 = [_measurement, exampleTag]

时间测量示例标签温度转速
2021-01-01T00:00:00Zexample-mA80.04023
2021-01-01T00:01:00Zexample-mA80.34542
2021-01-01T00:02:00Zexample-mA81.14901

输出行协议

example-m,exampleTag=A temp=80.0,rpm=4023i 1609459200000000000
example-m,exampleTag=A temp=80.3,rpm=4542i 1609459260000000000
example-m,exampleTag=A temp=81.1,rpm=4901i 1609459320000000000

示例

下采样并将数据写入InfluxDB Cloud

import "influxdata/influxdb/secrets"

cloudToken = secrets.get(key: "INFLUX_CLOUD_API_TOKEN")

from(bucket: "example-oss-bucket")
    |> range(start: -10m)
    |> filter(fn: (r) => r._measurement == "example-measurement")
    |> aggregateWindow(every: 1m, fn: last)
    |> to(
        bucket: "example-cloud-bucket",
        host: "https://cloud2.influxdata.com",
        org: "example-org",
        token: cloudToken,
    )

将最小值、最大值和平均值写入InfluxDB Cloud

import "influxdata/influxdb/secrets"

cloudToken = secrets.get(key: "INFLUX_CLOUD_API_TOKEN")

data = from(bucket: "example-oss-bucket")
    |> range(start: -30m)
    |> filter(fn: (r) => r._measurement == "example-measurement")

min = data |> aggregateWindow(every: 10m, fn: min) |> map(fn: (r) => ({ r with _field: "{$r._field}_min" }))
max = data |> aggregateWindow(every: 10m, fn: max) |> map(fn: (r) => ({ r with _field: "{$r._field}_max" }))
mean = data |> aggregateWindow(every: 10m, fn: mean) |> map(fn: (r) => ({ r with _field: "{$r._field}_mean" }))

union(tables: [min, max, mean])
    |> to(
        bucket: "example-cloud-bucket",
        host: "https://cloud2.influxdata.com",
        org: "example-org",
        token: cloudToken,
    )

自动化将数据从 InfluxDB OSS 写入 InfluxDB Cloud

为了自动和定期地将数据从 InfluxDB OSS 写入 InfluxDB Cloud,创建一个任务 在您的 InfluxDB OSS 实例中,定期查询、处理和写入数据到 InfluxDB Cloud。

import "influxdata/influxdb/tasks"

option task = {name: "Downsample to InfluxDB Cloud", every: 1h}

from(bucket: "example-oss-bucket")
    |> range(start: -10m)
    |> filter(fn: (r) => r._measurement == "example-measurement")
    |> aggregateWindow(every: 1m, fn: last)
    |> to(
        bucket: "example-cloud-bucket",
        host: "https://cloud2.influxdata.com",
        org: "example-org",
        token: cloudToken,
    )


Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看: