Documentation

编写基于套接字的用户定义函数(UDFs)

另一个示例 中,我们看到了如何为自定义异常检测工作负载编写基于过程的 UDF。在这个示例中,我们将学习如何编写一个简单的基于套接字的 UDF。

什么是用户定义函数 (UDF)?

UDF是用户定义函数,可以与Kapacitor通信以处理数据。 Kapacitor将向其发送数据,UDF可以以新数据或已修改的数据作出响应。 UDF可以用任何支持protocol buffer的语言编写。

套接字UDF和进程UDF之间有什么区别?

  • 一个过程 UDF,是 Kapacitor 的子进程,通过 STDIN/STDOUT 与 Kapacitor 进行通信,并完全由 Kapacitor 管理。
  • 一个套接字 UDF 是与配置的 unix 域套接字进行通信的独立于 Kapacitor 的进程。该进程本身不由 Kapacitor 管理。

使用进程 UDF 可能比套接字 UDF 更简单,因为 Kapacitor 会为您生成进程并管理所有内容。另一方面,您可能希望对 UDF 进程本身有更多控制,并仅将套接字暴露给 Kapacitor。一个常见的用例是在 Docker 容器中运行 Kapacitor,而在另一个容器中运行 UDF,通过 Docker 卷暴露套接字。

在这两种情况下,协议是相同的,唯一的区别是传输机制。还要注意,由于多个Kapacitor任务可以使用相同的UDF,对于基于进程的UDF,每次使用UDF时都会生成一个新的子进程。相比之下,对于基于套接字的UDF,每次使用UDF时都会建立一个新的套接字连接。如果您多个地方使用相同的UDF,使用套接字UDF可能更好,以保持运行进程的数量较少。

编写用户定义函数字

用户定义函数通过协议缓冲区请求/响应系统与Kapacitor进行通信。 我们提供了Go和Python中该通信层的实现。 由于另一个示例使用了Python,我们将在这里使用Go版本。

我们的示例将实现一个 mirror 用户定义函数,它简单地将收到的所有数据反射回 Kapacitor 服务器。这个示例实际上是测试套件的一部分,Python 和 Go 的实现可以在 这里 找到。

生命周期

在我们编写任何代码之前,让我们看看套接字用户定义函数的生命周期:

  1. UDF 进程已启动,与 Kapacitor 独立。
  2. 该进程在 Unix 域套接字上监听。
  3. Kapacitor 连接到套接字并查询有关 UDFs 选项的基本信息。
  4. 启用了一个Kapacitor任务,该任务使用UDF,并且Kapacitor与套接字建立了新的连接。
  5. 该任务通过套接字连接读取和写入数据。
  6. 如果任务因任何原因停止,套接字连接将关闭。

主方法

我们需要编写一个启动并监听套接字的程序。 以下代码是一个主函数,它在默认路径上的套接字上监听, 或者在作为-socket标志指定的自定义路径上监听。

package main

import (
    "flag"
    "log"
    "net"
)


var socketPath = flag.String("socket", "/tmp/mirror.sock", "Where to create the unix socket")

func main() {
    flag.Parse()

    // Create unix socket
    addr, err := net.ResolveUnixAddr("unix", *socketPath)
    if err != nil {
        log.Fatal(err)
    }
    l, err := net.ListenUnix("unix", addr)
    if err != nil {
        log.Fatal(err)
    }

    // More to come here...
}

将上述代码放在一个名为 main.go 的临时目录中。 上述代码可以通过 go run main.go 运行,但此时它将在监听套接字后立即退出。

代理

如前所述,Kapacitor 为用户定义函数(UDF)提供了一个名为 agent 的通信层实现。我们的代码只需实现一个接口即可利用 agent 逻辑。

我们需要实现的接口如下:

// The Agent calls the appropriate methods on the Handler as it receives requests over a socket.
//
// Returning an error from any method will cause the Agent to stop and an ErrorResponse to be sent.
// Some *Response objects (like SnapshotResponse) allow for returning their own error within the object itself.
// These types of errors will not stop the Agent and Kapacitor will deal with them appropriately.
//
// The Handler is called from a single goroutine, meaning methods will not be called concurrently.
//
// To write Points/Batches back to the Agent/Kapacitor use the Agent.Responses channel.
type Handler interface {
    // Return the InfoResponse. Describing the properties of this Handler
    Info() (*agent.InfoResponse, error)
    // Initialize the Handler with the provided options.
    Init(*agent.InitRequest) (*agent.InitResponse, error)
    // Create a snapshot of the running state of the handler.
    Snapshot() (*agent.SnapshotResponse, error)
    // Restore a previous snapshot.
    Restore(*agent.RestoreRequest) (*agent.RestoreResponse, error)

    // A batch has begun.
    BeginBatch(*agent.BeginBatch) error
    // A point has arrived.
    Point(*agent.Point) error
    // The batch is complete.
    EndBatch(*agent.EndBatch) error

    // Gracefully stop the Handler.
    // No other methods will be called.
    Stop()
}

处理程序

让我们定义我们自己的类型,以便开始实现 Handler 接口。更新 main.go 文件如下:

package main

import (
    "flag"
    "log"
    "net"

    "github.com/influxdata/kapacitor/udf/agent"
)



// Mirrors all points it receives back to Kapacitor
type mirrorHandler struct {
    // We need a reference to the agent so we can write data
    // back to Kapacitor.
    agent *agent.Agent
}

func newMirrorHandler(agent *agent.Agent) *mirrorHandler {
    return &mirrorHandler{agent: agent}
}

var socketPath = flag.String("socket", "/tmp/mirror.sock", "Where to create the unix socket")

func main() {
    flag.Parse()

    // Create unix socket
    addr, err := net.ResolveUnixAddr("unix", *socketPath)
    if err != nil {
        log.Fatal(err)
    }
    l, err := net.ListenUnix("unix", addr)
    if err != nil {
        log.Fatal(err)
    }

    // More to come here...
}

现在让我们添加初始化UDF所需的每个方法。 接下来的这些方法实现了上述UDF生命周期第3步中描述的行为,Kapacitor连接到套接字以查询有关UDF的基本信息。

将这些方法添加到 main.go 文件中:


// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
    info := &agent.InfoResponse{
        // We want a stream edge
        Wants:    agent.EdgeType_STREAM,
        // We provide a stream edge
        Provides: agent.EdgeType_STREAM,
        // We expect no options.
        Options:  map[string]*agent.OptionInfo{},
    }
    return info, nil
}

// Initialize the handler based of the provided options.
func (*mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
    // Since we expected no options this method is trivial
    // and we return success.
    init := &agent.InitResponse{
        Success: true,
        Error:   "",
    }
    return init, nil
}

目前,我们简单的镜像 UDF 不需要任何选项,因此这些方法是微不足道的。 在本示例的末尾,我们将修改代码以接受自定义选项。

现在Kapacitor知道我们的UDF使用了哪些边缘类型和选项,我们需要实现处理数据的方法。

将此方法添加到 main.go 文件中,它将通过代理将接收到的每个点发送回 Kapacitor:

func (h *mirrorHandler) Point(p *agent.Point) error {
    // Send back the point we just received
    h.agent.Responses <- &agent.Response{
        Message: &agent.Response_Point{
            Point: p,
        },
    }
    return nil
}

请注意,agent 有一个用于响应的通道,这是因为您的 UDF 可以随时向 Kapacitor 发送数据,因此它不需要在响应中以接收一个点。

因此,我们需要关闭通道以让 agent 知道我们将不再发送任何数据,这可以通过 Stop 方法完成。一旦 agenthandler 上调用 Stop,将不会再调用其他方法,并且 agent 不会停止,直到通道关闭。这给 UDF 提供了在关机之前清除任何剩余数据的机会:

// Stop the handler gracefully.
func (h *mirrorHandler) Stop() {
    // Close the channel since we won't be sending any more data to Kapacitor
    close(h.agent.Responses)
}

尽管我们已经实现了大部分处理程序的实现,但仍然缺少一些方法。具体来说,关于批处理和快照/恢复的方法缺失,但由于我们不需要它们,我们将仅提供简单的实现:

// Create a snapshot of the running state of the process.
func (*mirrorHandler) Snapshot() (*agent.SnapshotResponse, error) {
    return &agent.SnapshotResponse{}, nil
}
// Restore a previous snapshot.
func (*mirrorHandler) Restore(req *agent.RestoreRequest) (*agent.RestoreResponse, error) {
    return &agent.RestoreResponse{
        Success: true,
    }, nil
}

// Start working with the next batch
func (*mirrorHandler) BeginBatch(begin *agent.BeginBatch) error {
    return errors.New("batching not supported")
}
func (*mirrorHandler) EndBatch(end *agent.EndBatch) error {
    return nil
}

服务器

在这一点上,我们已经完成了Handler接口的完整实现。 在上面的生命周期第4步中,Kapacitor为任务中的每次使用都与UDF建立了新的连接。由于我们的UDF进程可能同时处理多个连接,因此我们需要为每个连接创建一个新的agenthandler的机制。

为此提供了一个 server,它期待 Accepter 接口的实现:

type Accepter interface {
    // Accept new connections from the listener and handle them accordingly.
    // The typical action is to create a new Agent with the connection as both its in and out objects.
    Accept(net.Conn)
}

这里是一个简单的 accepter,它为每个新连接创建一个新的 agentmirrorHandler。将其添加到 main.go 文件中:

type accepter struct {
    count int64
}

// Create a new agent/handler for each new connection.
// Count and log each new connection and termination.
func (acc *accepter) Accept(conn net.Conn) {
    count := acc.count
    acc.count++
    a := agent.New(conn, conn)
    h := newMirrorHandler(a)
    a.Handler = h

    log.Println("Starting agent for connection", count)
    a.Start()
    go func() {
        err := a.Wait()
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("Agent for connection %d finished", count)
    }()
}

现在所有的部分都到位了,我们可以更新我们的 main 函数以启动 server。将之前提供的 main 函数替换为:

func main() {
    flag.Parse()

    // Create unix socket
    addr, err := net.ResolveUnixAddr("unix", *socketPath)
    if err != nil {
        log.Fatal(err)
    }
    l, err := net.ListenUnix("unix", addr)
    if err != nil {
        log.Fatal(err)
    }

    // Create server that listens on the socket
    s := agent.NewServer(l, &accepter{})

    // Setup signal handler to stop Server on various signals
    s.StopOnSignals(os.Interrupt, syscall.SIGTERM)

    log.Println("Server listening on", addr.String())
    err = s.Serve()
    if err != nil {
        log.Fatal(err)
    }
    log.Println("Server stopped")
}

开始 UDF

此时我们准备开始 UDF。
以下是完整的 main.go 文件供参考:

package main

import (
    "errors"
    "flag"
    "log"
    "net"
    "os"
    "syscall"

    "github.com/influxdata/kapacitor/udf/agent"
)

// Mirrors all points it receives back to Kapacitor
type mirrorHandler struct {
    agent *agent.Agent
}

func newMirrorHandler(agent *agent.Agent) *mirrorHandler {
    return &mirrorHandler{agent: agent}
}

// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
    info := &agent.InfoResponse{
        Wants:    agent.EdgeType_STREAM,
        Provides: agent.EdgeType_STREAM,
        Options:  map[string]*agent.OptionInfo{},
    }
    return info, nil
}

// Initialize the handler based of the provided options.
func (*mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
    init := &agent.InitResponse{
        Success: true,
        Error:   "",
    }
    return init, nil
}

// Create a snapshot of the running state of the process.
func (*mirrorHandler) Snapshot() (*agent.SnapshotResponse, error) {
    return &agent.SnapshotResponse{}, nil
}

// Restore a previous snapshot.
func (*mirrorHandler) Restore(req *agent.RestoreRequest) (*agent.RestoreResponse, error) {
    return &agent.RestoreResponse{
        Success: true,
    }, nil
}

// Start working with the next batch
func (*mirrorHandler) BeginBatch(begin *agent.BeginBatch) error {
    return errors.New("batching not supported")
}

func (h *mirrorHandler) Point(p *agent.Point) error {
    // Send back the point we just received
    h.agent.Responses <- &agent.Response{
        Message: &agent.Response_Point{
            Point: p,
        },
    }
    return nil
}

func (*mirrorHandler) EndBatch(end *agent.EndBatch) error {
    return nil
}

// Stop the handler gracefully.
func (h *mirrorHandler) Stop() {
    close(h.agent.Responses)
}

type accepter struct {
    count int64
}

// Create a new agent/handler for each new connection.
// Count and log each new connection and termination.
func (acc *accepter) Accept(conn net.Conn) {
    count := acc.count
    acc.count++
    a := agent.New(conn, conn)
    h := newMirrorHandler(a)
    a.Handler = h

    log.Println("Starting agent for connection", count)
    a.Start()
    go func() {
        err := a.Wait()
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("Agent for connection %d finished", count)
    }()
}

var socketPath = flag.String("socket", "/tmp/mirror.sock", "Where to create the unix socket")

func main() {
    flag.Parse()

    // Create unix socket
    addr, err := net.ResolveUnixAddr("unix", *socketPath)
    if err != nil {
        log.Fatal(err)
    }
    l, err := net.ListenUnix("unix", addr)
    if err != nil {
        log.Fatal(err)
    }

    // Create server that listens on the socket
    s := agent.NewServer(l, &accepter{})

    // Setup signal handler to stop Server on various signals
    s.StopOnSignals(os.Interrupt, syscall.SIGTERM)

    log.Println("Server listening on", addr.String())
    err = s.Serve()
    if err != nil {
        log.Fatal(err)
    }
    log.Println("Server stopped")
}

运行 go run main.go 来启动 UDF。 如果你收到关于套接字正在使用的错误, 只需删除套接字文件,然后再次尝试运行 UDF。

配置Kapacitor与UDF进行通信

现在我们的UDF已经准备好了,我们需要告诉Kapacitor我们的UDF套接字位置,并给它一个名称,以便我们可以使用它。将以下内容添加到您的Kapacitor配置文件中:

[udf]
[udf.functions]
    [udf.functions.mirror]
        socket = "/tmp/mirror.sock"
        timeout = "10s"

开始 Kapacitor

启动 Kapacitor,你应该在 Kapacitor 日志和 UDF 进程日志中看到它与 UDF 的连接。

试试看

获取一个现有任务,并在 TICKscript 管道的任何位置添加 @mirror() 以查看其效果。

这是一个示例TICKscript,需要保存到一个文件中:

dbrp "telegraf"."autogen"

stream
    |from()
        .measurement('cpu')
    @mirror()
    |alert()
        .crit(lambda: "usage_idle" < 30)

像这样从您的终端定义上述警报:

kapacitor define mirror_udf_example -tick path/to/above/script.tick

开始任务:

kapacitor enable mirror_udf_example

检查任务的状态:

kapacitor show mirror_udf_example

添加自定义字段

现在让我们更改 UDF 以向数据添加字段。 我们可以使用 Info/Init 方法来定义和使用 UDF 上的选项,所以让我们指定要添加的字段的名称。

更新 mirrorHandler 类型和方法 InfoInit 如下:

// Mirrors all points it receives back to Kapacitor
type mirrorHandler struct {
    agent *agent.Agent
    name  string
    value float64
}

// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
    info := &agent.InfoResponse{
        Wants:    agent.EdgeType_STREAM,
        Provides: agent.EdgeType_STREAM,
        Options: map[string]*agent.OptionInfo{
            "field": {ValueTypes: []agent.ValueType{
                agent.ValueType_STRING,
                agent.ValueType_DOUBLE,
            }},
        },
    }
    return info, nil
}

// Initialize the handler based of the provided options.
func (h *mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
    init := &agent.InitResponse{
        Success: true,
        Error:   "",
    }
    for _, opt := range r.Options {
        switch opt.Name {
        case "field":
            h.name = opt.Values[0].Value.(*agent.OptionValue_StringValue).StringValue
            h.value = opt.Values[1].Value.(*agent.OptionValue_DoubleValue).DoubleValue
        }
    }

    if h.name == "" {
        init.Success = false
        init.Error = "must supply field"
    }
    return init, nil
}

现在我们可以在点上设置其名称和值。
更新Point方法:

func (h *mirrorHandler) Point(p *agent.Point) error {
    // Send back the point we just received
    if p.FieldsDouble == nil {
        p.FieldsDouble = make(map[string]float64)
    }
    p.FieldsDouble[h.name] = h.value

    h.agent.Responses <- &agent.Response{
        Message: &agent.Response_Point{
            Point: p,
        },
    }
    return nil
}

重新启动UDF进程并再次尝试。 指定要使用的字段名称和值与.field(name, value)方法。 您可以在mirror UDF后添加|log()以查看新字段确实已被创建。

dbrp "telegraf"."autogen"

stream
    |from()
        .measurement('cpu')
    @mirror()
        .field('mycustom_field', 42.0)
    |log()
    |alert()
        .cirt(lambda: "usage_idle" < 30)

总结

在这个时刻,您应该能够使用套接字或基于进程的方法编写自定义 UDF。 UDF 的用途广泛,从连续查询中作为自定义降采样逻辑,自定义异常检测算法,或者只是一个“处理”您的数据的系统。

下一步

如果你想了解更多,这里有一些开始的地方:

  • 修改镜像 UDF,使其功能类似于 DefaultNode。 在不是始终覆盖字段的情况下,仅在字段不存在时设置它。 还增加了对设置标签以及字段的支持。
  • 将镜像 UDF 更改为在批次上运行,而不是在流上。这需要更改 Info 方法中的边缘类型,以及实现 BeginBatchEndBatch 方法。
  • 查看其他示例并修改一个以便完成您现有需求中的类似操作。


Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看: