Documentation

使用Go查询数据

使用 InfluxDB influxdb3-go Go 客户端库包和 SQL 或 InfluxQL 来查询存储在 InfluxDB 中的数据。通过 Flight+gRPC 协议执行查询并检索数据,然后使用常见的 Go 工具处理数据。

开始使用Go查询InfluxDB

以下示例展示了如何使用 Go 和 influxdb3-go 模块来创建客户端并查询 InfluxDB 集群数据库。

安装 Go

请按照Go下载和安装说明安装适合您系统的最新版本Go编程语言。

创建 Go 模块目录

  1. 在您的项目目录中,创建一个新的模块目录并进入其中。

    mkdir influxdb_go_client && cd $_
    
  2. 输入以下命令以初始化一个新的 Go 模块:

    go mod init influxdb_go_client
    

安装依赖

在您的终端中,输入以下命令以下载并安装客户端库:

go get github.com/InfluxCommunity/influxdb3-go/v2

安装了依赖项后,您准备好查询和分析存储在 InfluxDB 数据库中的数据。

执行查询

以下示例展示了如何创建一个 InfluxDB client,使用客户端查询方法选择测量中的所有字段,然后访问查询结果数据和元数据。

在你的 influxdb_go_client 模块目录中,创建一个名为 query.go 的文件,并输入以下样本之一以使用 SQL 或 InfluxQL 进行查询。

在示例代码中替换以下配置值:

使用SQL查询

// query.go
package main

import (
    "context"
    "fmt"
    "io"
    "os"
    "text/tabwriter"
    "time"

    "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
)

func Query() error {

    // Instantiate the client.
    client, err := influxdb3.New(influxdb3.ClientConfig{
        Host:       "https://cluster-host.com",
        Token:      "
DATABASE_TOKEN
"
,
Database: "
DATABASE_NAME
"
,
}) defer func(client *influxdb3.Client) { err := client.Close() if err != nil { panic(err) } }(client) query := `SELECT * FROM home WHERE time >= '2022-01-02T08:00:00Z' AND time <= '2022-01-02T20:00:00Z'` // Example 1: Query data and then read the schema and all data in the result stream. iterator, err := client.Query(context.Background(), query) fmt.Fprintln(os.Stdout, "Read all data in the stream:") data, err := iterator.Raw().Reader.Read() fmt.Fprintln(os.Stdout, data) if err != nil { panic(err) } // Example 2: Query data, view the result schema, and then process result data by row. iterator2, err = client.Query(context.Background(), query) fmt.Fprintln(os.Stdout, "View the query result schema:") schema := iterator2.Raw().Reader.Schema() fmt.Fprintln(os.Stdout, schema) w := tabwriter.NewWriter(io.Discard, 4, 4, 1, ' ', 0) w.Init(os.Stdout, 0, 8, 0, '\t', 0) fmt.Fprintln(w, "Process each row as key-value pairs:") for iterator2.Next() { row := iterator2.Value() // Use Go time package to format unix timestamp // as a time with timezone layout (RFC3339) time := (row["time"].(time.Time)). Format(time.RFC3339) fmt.Fprintf(w, "%s\t%s\t%d\t%.1f\t%.1f\n", time, row["room"], row["co"], row["hum"], row["temp"]) } w.Flush() }

示例代码完成以下操作:

  1. 定义一个 main 包来用于你的模块,并导入你将在代码中使用的包。
  2. 定义一个 Query() 函数。
  3. 使用 InfluxDB 凭证实例化 influxdb3 客户端,并将其分配给 client 变量。
  4. 定义一个延迟函数,当 Query() 执行完毕时关闭客户端。
  5. 定义要执行的SQL查询。
  6. 调用客户端的 Query(ctx context.Context, query string) 方法,并将 SQL 字符串作为 query 参数传递。Query() 返回以下内容:
    • *influxdb3.QueryIterator: 用于从查询结果流中读取数据的自定义迭代器。
    • error: Flight 请求错误。

使用 InfluxQL 查询

// query.go

package main

import (
    "context"
    "fmt"
    "io"
    "os"
    "text/tabwriter"
    "time"

    "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
)

func InfluxQL() error {

    // Instantiate the client.
    client, err := influxdb3.New(influxdb3.ClientConfig{
        Host:       "https://cluster-host.com",
        Token:      "
DATABASE_TOKEN
"
,
Database: "
DATABASE_NAME
"
,
}) defer func(client *influxdb3.Client) { err := client.Close() if err != nil { panic(err) } }(client) query := `SELECT * FROM home WHERE time >= 1641124000s AND time <= 1641124000s + 8h` queryOptions := influxdb3.QueryOptions{ QueryType: influxdb3.InfluxQL, } // Example 1: Query data and then read the schema and all data in the result stream. iterator, err := client.QueryWithOptions(context.Background(), &queryOptions, query) fmt.Fprintln(os.Stdout, "Read all data in the stream:") data, err := iterator.Raw().Reader.Read() fmt.Fprintln(os.Stdout, data) if err != nil { panic(err) } // Example 2: Query data, view the result schema, and then process result data row by row. iterator2, err = client.QueryWithOptions(context.Background(), &queryOptions, query) fmt.Fprintln(os.Stdout, "View the query result schema:") schema := iterator2.Raw().Reader.Schema() fmt.Fprintln(os.Stdout, schema) w := tabwriter.NewWriter(io.Discard, 4, 4, 1, ' ', 0) w.Init(os.Stdout, 0, 8, 0, '\t', 0) fmt.Fprintln(w, "Process each row as key-value pairs:") for iterator2.Next() { row := iterator2.Value() // Use Go time package to format unix timestamp // as a time with timezone layout (RFC3339) time := (row["time"].(time.Time)). Format(time.RFC3339) fmt.Fprintf(w, "%s\t%s\t%d\t%.1f\t%.1f\n", time, row["room"], row["co"], row["hum"], row["temp"]) } w.Flush() }

示例代码完成以下操作:

  1. 定义一个 main 包用于你的模块,并导入你将在代码中使用的包。

  2. 定义一个 Query() 函数。

  3. 使用InfluxDB凭据实例化influxdb3客户端,并将其分配给client变量。

  4. 定义了一个延迟函数,当 Query() 执行完毕后关闭客户端。

  5. 定义一个要执行的InfluxQL查询。

  6. 调用以下客户端方法:

    QueryWithOptions(ctx context.Context, options *QueryOptions, query string)

    并传递以下参数:

    • 选项: 一个 QueryOptions 结构体,其中 QueryType 属性设置为 influxdb3.InfluxQL
    • query: 一个字符串。要执行的 SQL 或 InfluxQL 查询。 QueryWithOptions 返回以下内容:
      • *influxdb3.QueryIterator: 一个自定义迭代器,提供对查询结果数据和元数据的访问。
      • error: 一个 Flight 请求错误。

运行示例

  1. 在你的 influxdb_go_client 模块目录中,创建一个名为 main.go 的文件。

  2. main.go 中,输入以下示例代码以定义一个 main() 可执行函数,该函数调用 Query() 函数:

    package main
    
    func main() {
      Query()
    }
    
  3. 在你的终端中,输入以下命令以安装必要的包、构建模块和运行程序:

    go build && go run influxdb_go_client
    

    程序执行 main() 函数,该函数写入数据并将查询结果打印到控制台。



Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看: