将数据从 InfluxDB OSS 写入 InfluxDB Cloud
要将数据从 InfluxDB OSS 写入 InfluxDB Cloud,请使用 Flux
to() 或
experimental.to() 函数。
使用单个查询执行一次性写入数据或使用 InfluxDB 任务
定期将数据写入 InfluxDB Cloud。
将写入 InfluxDB OSS 的数据复制到 InfluxDB Cloud
要将所有写入复制到InfluxDB OSS实例到InfluxDB Cloud实例,使用 InfluxDB replication streams。
InfluxDB Cloud 速率限制
写入 InfluxDB Cloud 的请求受限于与您的 InfluxDB Cloud 定价计划相关的速率限制。
从 InfluxDB OSS 查询数据。
(可选) 过滤 或处理数据以写入 InfluxDB Cloud。
使用
to或experimental.to将数据写入 InfluxDB Cloud。对于大多数用例,to()是正确的函数,但根据您正在写入的数据的结构,可能需要使用experimental.to。使用以下指南:
to(): 使用 to 将字段键写入
_field列,将字段值写入_value列。experimental.to(): 用于将列名中的数据写入对应的字段键,并将列值写入字段值。
提供以下参数给任一函数:
- bucket: 要写入的InfluxDB Cloud bucket
- host: InfluxDB Cloud 区域 URL
- org: InfluxDB Cloud 组织
- token: InfluxDB Cloud API令牌
(推荐) 为了将您的原始 API 令牌保留在查询之外,请将您的 InfluxDB Cloud API 令牌作为 InfluxDB 秘密 存储在您的 InfluxDB OSS 实例中,并使用
secrets.get()按如下示例检索秘密值(选择您使用的函数以查看正确的格式):
import "influxdata/influxdb/secrets"
cloudToken = secrets.get(key: "INFLUX_CLOUD_API_TOKEN")
from(bucket: "example-oss-bucket")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "example-measurement")
|> to(
bucket: "example-cloud-bucket",
host: "https://cloud2.influxdata.com",
org: "example-org",
token: cloudToken,
)
import "experimental"
import "influxdata/influxdb/secrets"
cloudToken = secrets.get(key: "INFLUX_CLOUD_API_TOKEN")
from(bucket: "example-oss-bucket")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "example-measurement")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> experimental.to(
bucket: "example-cloud-bucket",
host: "https://cloud2.influxdata.com",
org: "example-org",
token: cloudToken,
)
to() 函数的输入和输出数据
to()需要_time、_measurement、_field和_value列。to()将所有其他列写为标签,其中列名是标签键,而列值是标签值。
输入数据
| _时间 | _测量 | 示例标签 | _字段 | _值 |
|---|---|---|---|---|
| 2021-01-01T00:00:00Z | 例子-m | A | 温度 | 80.0 |
| 2021-01-01T00:01:00Z | example-m | A | 温度 | 80.3 |
| 2021-01-01T00:02:00Z | example-m | A | 温度 | 81.1 |
| _时间 | _测量 | 示例标签 | _字段 | _值 |
|---|---|---|---|---|
| 2021-01-01T00:00:00Z | example-m | A | rpm | 4023 |
| 2021-01-01T00:01:00Z | 示例-m | A | rpm | 4542 |
| 2021-01-01T00:02:00Z | example-m | A | 转速 | 4901 |
输出行协议
example-m,exampleTag=A temp=80.0,rpm=4023i 1609459200000000000
example-m,exampleTag=A temp=80.3,rpm=4542i 1609459260000000000
example-m,exampleTag=A temp=81.1,rpm=4901i 1609459320000000000
experimental.to()需要_time和_measurement列。- 在组键中的列(除了
_measurement)被解析为标签,其中列名是标签键,列值是标签值。 - 不在分组键中的列(除了
_time_)被解析为字段,其中列名称是字段键,列值是字段值。
输入数据
组键 = [_measurement, exampleTag]
| 时间 | 测量 | 示例标签 | 温度 | 转速 |
|---|---|---|---|---|
| 2021-01-01T00:00:00Z | example-m | A | 80.0 | 4023 |
| 2021-01-01T00:01:00Z | example-m | A | 80.3 | 4542 |
| 2021-01-01T00:02:00Z | example-m | A | 81.1 | 4901 |
输出行协议
example-m,exampleTag=A temp=80.0,rpm=4023i 1609459200000000000
example-m,exampleTag=A temp=80.3,rpm=4542i 1609459260000000000
example-m,exampleTag=A temp=81.1,rpm=4901i 1609459320000000000
示例
下采样并将数据写入InfluxDB Cloud
import "influxdata/influxdb/secrets"
cloudToken = secrets.get(key: "INFLUX_CLOUD_API_TOKEN")
from(bucket: "example-oss-bucket")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "example-measurement")
|> aggregateWindow(every: 1m, fn: last)
|> to(
bucket: "example-cloud-bucket",
host: "https://cloud2.influxdata.com",
org: "example-org",
token: cloudToken,
)
将最小值、最大值和平均值写入InfluxDB Cloud
import "influxdata/influxdb/secrets"
cloudToken = secrets.get(key: "INFLUX_CLOUD_API_TOKEN")
data = from(bucket: "example-oss-bucket")
|> range(start: -30m)
|> filter(fn: (r) => r._measurement == "example-measurement")
min = data |> aggregateWindow(every: 10m, fn: min) |> map(fn: (r) => ({ r with _field: "{$r._field}_min" }))
max = data |> aggregateWindow(every: 10m, fn: max) |> map(fn: (r) => ({ r with _field: "{$r._field}_max" }))
mean = data |> aggregateWindow(every: 10m, fn: mean) |> map(fn: (r) => ({ r with _field: "{$r._field}_mean" }))
union(tables: [min, max, mean])
|> to(
bucket: "example-cloud-bucket",
host: "https://cloud2.influxdata.com",
org: "example-org",
token: cloudToken,
)
自动化将数据从 InfluxDB OSS 写入 InfluxDB Cloud
为了自动和定期地将数据从 InfluxDB OSS 写入 InfluxDB Cloud,创建一个任务 在您的 InfluxDB OSS 实例中,定期查询、处理和写入数据到 InfluxDB Cloud。
import "influxdata/influxdb/tasks"
option task = {name: "Downsample to InfluxDB Cloud", every: 1h}
from(bucket: "example-oss-bucket")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "example-measurement")
|> aggregateWindow(every: 1m, fn: last)
|> to(
bucket: "example-cloud-bucket",
host: "https://cloud2.influxdata.com",
org: "example-org",
token: cloudToken,
)