编写基于套接字的用户定义函数(UDFs)
在 另一个示例 中,我们看到了如何为自定义异常检测工作负载编写基于过程的 UDF。在这个示例中,我们将学习如何编写一个简单的基于套接字的 UDF。
什么是用户定义函数 (UDF)?
UDF是用户定义函数,可以与Kapacitor通信以处理数据。 Kapacitor将向其发送数据,UDF可以以新数据或已修改的数据作出响应。 UDF可以用任何支持protocol buffer的语言编写。
套接字UDF和进程UDF之间有什么区别?
- 一个过程 UDF,是 Kapacitor 的子进程,通过 STDIN/STDOUT 与 Kapacitor 进行通信,并完全由 Kapacitor 管理。
- 一个套接字 UDF 是与配置的 unix 域套接字进行通信的独立于 Kapacitor 的进程。该进程本身不由 Kapacitor 管理。
使用进程 UDF 可能比套接字 UDF 更简单,因为 Kapacitor 会为您生成进程并管理所有内容。另一方面,您可能希望对 UDF 进程本身有更多控制,并仅将套接字暴露给 Kapacitor。一个常见的用例是在 Docker 容器中运行 Kapacitor,而在另一个容器中运行 UDF,通过 Docker 卷暴露套接字。
在这两种情况下,协议是相同的,唯一的区别是传输机制。还要注意,由于多个Kapacitor任务可以使用相同的UDF,对于基于进程的UDF,每次使用UDF时都会生成一个新的子进程。相比之下,对于基于套接字的UDF,每次使用UDF时都会建立一个新的套接字连接。如果您多个地方使用相同的UDF,使用套接字UDF可能更好,以保持运行进程的数量较少。
编写用户定义函数字
用户定义函数通过协议缓冲区请求/响应系统与Kapacitor进行通信。 我们提供了Go和Python中该通信层的实现。 由于另一个示例使用了Python,我们将在这里使用Go版本。
我们的示例将实现一个 mirror 用户定义函数,它简单地将收到的所有数据反射回 Kapacitor 服务器。这个示例实际上是测试套件的一部分,Python 和 Go 的实现可以在 这里 找到。
生命周期
在我们编写任何代码之前,让我们看看套接字用户定义函数的生命周期:
- UDF 进程已启动,与 Kapacitor 独立。
- 该进程在 Unix 域套接字上监听。
- Kapacitor 连接到套接字并查询有关 UDFs 选项的基本信息。
- 启用了一个Kapacitor任务,该任务使用UDF,并且Kapacitor与套接字建立了新的连接。
- 该任务通过套接字连接读取和写入数据。
- 如果任务因任何原因停止,套接字连接将关闭。
主方法
我们需要编写一个启动并监听套接字的程序。
以下代码是一个主函数,它在默认路径上的套接字上监听,
或者在作为-socket标志指定的自定义路径上监听。
package main
import (
"flag"
"log"
"net"
)
var socketPath = flag.String("socket", "/tmp/mirror.sock", "Where to create the unix socket")
func main() {
flag.Parse()
// Create unix socket
addr, err := net.ResolveUnixAddr("unix", *socketPath)
if err != nil {
log.Fatal(err)
}
l, err := net.ListenUnix("unix", addr)
if err != nil {
log.Fatal(err)
}
// More to come here...
}
将上述代码放在一个名为 main.go 的临时目录中。
上述代码可以通过 go run main.go 运行,但此时它将在监听套接字后立即退出。
代理
如前所述,Kapacitor 为用户定义函数(UDF)提供了一个名为 agent 的通信层实现。我们的代码只需实现一个接口即可利用 agent 逻辑。
我们需要实现的接口如下:
// The Agent calls the appropriate methods on the Handler as it receives requests over a socket.
//
// Returning an error from any method will cause the Agent to stop and an ErrorResponse to be sent.
// Some *Response objects (like SnapshotResponse) allow for returning their own error within the object itself.
// These types of errors will not stop the Agent and Kapacitor will deal with them appropriately.
//
// The Handler is called from a single goroutine, meaning methods will not be called concurrently.
//
// To write Points/Batches back to the Agent/Kapacitor use the Agent.Responses channel.
type Handler interface {
// Return the InfoResponse. Describing the properties of this Handler
Info() (*agent.InfoResponse, error)
// Initialize the Handler with the provided options.
Init(*agent.InitRequest) (*agent.InitResponse, error)
// Create a snapshot of the running state of the handler.
Snapshot() (*agent.SnapshotResponse, error)
// Restore a previous snapshot.
Restore(*agent.RestoreRequest) (*agent.RestoreResponse, error)
// A batch has begun.
BeginBatch(*agent.BeginBatch) error
// A point has arrived.
Point(*agent.Point) error
// The batch is complete.
EndBatch(*agent.EndBatch) error
// Gracefully stop the Handler.
// No other methods will be called.
Stop()
}
处理程序
让我们定义我们自己的类型,以便开始实现 Handler 接口。更新 main.go 文件如下:
package main
import (
"flag"
"log"
"net"
"github.com/influxdata/kapacitor/udf/agent"
)
// Mirrors all points it receives back to Kapacitor
type mirrorHandler struct {
// We need a reference to the agent so we can write data
// back to Kapacitor.
agent *agent.Agent
}
func newMirrorHandler(agent *agent.Agent) *mirrorHandler {
return &mirrorHandler{agent: agent}
}
var socketPath = flag.String("socket", "/tmp/mirror.sock", "Where to create the unix socket")
func main() {
flag.Parse()
// Create unix socket
addr, err := net.ResolveUnixAddr("unix", *socketPath)
if err != nil {
log.Fatal(err)
}
l, err := net.ListenUnix("unix", addr)
if err != nil {
log.Fatal(err)
}
// More to come here...
}
现在让我们添加初始化UDF所需的每个方法。 接下来的这些方法实现了上述UDF生命周期第3步中描述的行为,Kapacitor连接到套接字以查询有关UDF的基本信息。
将这些方法添加到 main.go 文件中:
// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
info := &agent.InfoResponse{
// We want a stream edge
Wants: agent.EdgeType_STREAM,
// We provide a stream edge
Provides: agent.EdgeType_STREAM,
// We expect no options.
Options: map[string]*agent.OptionInfo{},
}
return info, nil
}
// Initialize the handler based of the provided options.
func (*mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
// Since we expected no options this method is trivial
// and we return success.
init := &agent.InitResponse{
Success: true,
Error: "",
}
return init, nil
}
目前,我们简单的镜像 UDF 不需要任何选项,因此这些方法是微不足道的。 在本示例的末尾,我们将修改代码以接受自定义选项。
现在Kapacitor知道我们的UDF使用了哪些边缘类型和选项,我们需要实现处理数据的方法。
将此方法添加到 main.go 文件中,它将通过代理将接收到的每个点发送回 Kapacitor:
func (h *mirrorHandler) Point(p *agent.Point) error {
// Send back the point we just received
h.agent.Responses <- &agent.Response{
Message: &agent.Response_Point{
Point: p,
},
}
return nil
}
请注意,agent 有一个用于响应的通道,这是因为您的 UDF 可以随时向 Kapacitor 发送数据,因此它不需要在响应中以接收一个点。
因此,我们需要关闭通道以让 agent 知道我们将不再发送任何数据,这可以通过 Stop 方法完成。一旦 agent 在 handler 上调用 Stop,将不会再调用其他方法,并且 agent 不会停止,直到通道关闭。这给 UDF 提供了在关机之前清除任何剩余数据的机会:
// Stop the handler gracefully.
func (h *mirrorHandler) Stop() {
// Close the channel since we won't be sending any more data to Kapacitor
close(h.agent.Responses)
}
尽管我们已经实现了大部分处理程序的实现,但仍然缺少一些方法。具体来说,关于批处理和快照/恢复的方法缺失,但由于我们不需要它们,我们将仅提供简单的实现:
// Create a snapshot of the running state of the process.
func (*mirrorHandler) Snapshot() (*agent.SnapshotResponse, error) {
return &agent.SnapshotResponse{}, nil
}
// Restore a previous snapshot.
func (*mirrorHandler) Restore(req *agent.RestoreRequest) (*agent.RestoreResponse, error) {
return &agent.RestoreResponse{
Success: true,
}, nil
}
// Start working with the next batch
func (*mirrorHandler) BeginBatch(begin *agent.BeginBatch) error {
return errors.New("batching not supported")
}
func (*mirrorHandler) EndBatch(end *agent.EndBatch) error {
return nil
}
服务器
在这一点上,我们已经完成了Handler接口的完整实现。
在上面的生命周期第4步中,Kapacitor为任务中的每次使用都与UDF建立了新的连接。由于我们的UDF进程可能同时处理多个连接,因此我们需要为每个连接创建一个新的agent和handler的机制。
为此提供了一个 server,它期待 Accepter 接口的实现:
type Accepter interface {
// Accept new connections from the listener and handle them accordingly.
// The typical action is to create a new Agent with the connection as both its in and out objects.
Accept(net.Conn)
}
这里是一个简单的 accepter,它为每个新连接创建一个新的 agent 和 mirrorHandler。将其添加到 main.go 文件中:
type accepter struct {
count int64
}
// Create a new agent/handler for each new connection.
// Count and log each new connection and termination.
func (acc *accepter) Accept(conn net.Conn) {
count := acc.count
acc.count++
a := agent.New(conn, conn)
h := newMirrorHandler(a)
a.Handler = h
log.Println("Starting agent for connection", count)
a.Start()
go func() {
err := a.Wait()
if err != nil {
log.Fatal(err)
}
log.Printf("Agent for connection %d finished", count)
}()
}
现在所有的部分都到位了,我们可以更新我们的 main 函数以启动 server。将之前提供的 main 函数替换为:
func main() {
flag.Parse()
// Create unix socket
addr, err := net.ResolveUnixAddr("unix", *socketPath)
if err != nil {
log.Fatal(err)
}
l, err := net.ListenUnix("unix", addr)
if err != nil {
log.Fatal(err)
}
// Create server that listens on the socket
s := agent.NewServer(l, &accepter{})
// Setup signal handler to stop Server on various signals
s.StopOnSignals(os.Interrupt, syscall.SIGTERM)
log.Println("Server listening on", addr.String())
err = s.Serve()
if err != nil {
log.Fatal(err)
}
log.Println("Server stopped")
}
开始 UDF
此时我们准备开始 UDF。
以下是完整的 main.go 文件供参考:
package main
import (
"errors"
"flag"
"log"
"net"
"os"
"syscall"
"github.com/influxdata/kapacitor/udf/agent"
)
// Mirrors all points it receives back to Kapacitor
type mirrorHandler struct {
agent *agent.Agent
}
func newMirrorHandler(agent *agent.Agent) *mirrorHandler {
return &mirrorHandler{agent: agent}
}
// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
info := &agent.InfoResponse{
Wants: agent.EdgeType_STREAM,
Provides: agent.EdgeType_STREAM,
Options: map[string]*agent.OptionInfo{},
}
return info, nil
}
// Initialize the handler based of the provided options.
func (*mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
init := &agent.InitResponse{
Success: true,
Error: "",
}
return init, nil
}
// Create a snapshot of the running state of the process.
func (*mirrorHandler) Snapshot() (*agent.SnapshotResponse, error) {
return &agent.SnapshotResponse{}, nil
}
// Restore a previous snapshot.
func (*mirrorHandler) Restore(req *agent.RestoreRequest) (*agent.RestoreResponse, error) {
return &agent.RestoreResponse{
Success: true,
}, nil
}
// Start working with the next batch
func (*mirrorHandler) BeginBatch(begin *agent.BeginBatch) error {
return errors.New("batching not supported")
}
func (h *mirrorHandler) Point(p *agent.Point) error {
// Send back the point we just received
h.agent.Responses <- &agent.Response{
Message: &agent.Response_Point{
Point: p,
},
}
return nil
}
func (*mirrorHandler) EndBatch(end *agent.EndBatch) error {
return nil
}
// Stop the handler gracefully.
func (h *mirrorHandler) Stop() {
close(h.agent.Responses)
}
type accepter struct {
count int64
}
// Create a new agent/handler for each new connection.
// Count and log each new connection and termination.
func (acc *accepter) Accept(conn net.Conn) {
count := acc.count
acc.count++
a := agent.New(conn, conn)
h := newMirrorHandler(a)
a.Handler = h
log.Println("Starting agent for connection", count)
a.Start()
go func() {
err := a.Wait()
if err != nil {
log.Fatal(err)
}
log.Printf("Agent for connection %d finished", count)
}()
}
var socketPath = flag.String("socket", "/tmp/mirror.sock", "Where to create the unix socket")
func main() {
flag.Parse()
// Create unix socket
addr, err := net.ResolveUnixAddr("unix", *socketPath)
if err != nil {
log.Fatal(err)
}
l, err := net.ListenUnix("unix", addr)
if err != nil {
log.Fatal(err)
}
// Create server that listens on the socket
s := agent.NewServer(l, &accepter{})
// Setup signal handler to stop Server on various signals
s.StopOnSignals(os.Interrupt, syscall.SIGTERM)
log.Println("Server listening on", addr.String())
err = s.Serve()
if err != nil {
log.Fatal(err)
}
log.Println("Server stopped")
}
运行 go run main.go 来启动 UDF。
如果你收到关于套接字正在使用的错误,
只需删除套接字文件,然后再次尝试运行 UDF。
配置Kapacitor与UDF进行通信
现在我们的UDF已经准备好了,我们需要告诉Kapacitor我们的UDF套接字位置,并给它一个名称,以便我们可以使用它。将以下内容添加到您的Kapacitor配置文件中:
[udf]
[udf.functions]
[udf.functions.mirror]
socket = "/tmp/mirror.sock"
timeout = "10s"
开始 Kapacitor
启动 Kapacitor,你应该在 Kapacitor 日志和 UDF 进程日志中看到它与 UDF 的连接。
试试看
获取一个现有任务,并在 TICKscript 管道的任何位置添加 @mirror() 以查看其效果。
这是一个示例TICKscript,需要保存到一个文件中:
dbrp "telegraf"."autogen"
stream
|from()
.measurement('cpu')
@mirror()
|alert()
.crit(lambda: "usage_idle" < 30)
像这样从您的终端定义上述警报:
kapacitor define mirror_udf_example -tick path/to/above/script.tick
开始任务:
kapacitor enable mirror_udf_example
检查任务的状态:
kapacitor show mirror_udf_example
添加自定义字段
现在让我们更改 UDF 以向数据添加字段。 我们可以使用 Info/Init 方法来定义和使用 UDF 上的选项,所以让我们指定要添加的字段的名称。
更新 mirrorHandler 类型和方法 Info 和 Init 如下:
// Mirrors all points it receives back to Kapacitor
type mirrorHandler struct {
agent *agent.Agent
name string
value float64
}
// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
info := &agent.InfoResponse{
Wants: agent.EdgeType_STREAM,
Provides: agent.EdgeType_STREAM,
Options: map[string]*agent.OptionInfo{
"field": {ValueTypes: []agent.ValueType{
agent.ValueType_STRING,
agent.ValueType_DOUBLE,
}},
},
}
return info, nil
}
// Initialize the handler based of the provided options.
func (h *mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
init := &agent.InitResponse{
Success: true,
Error: "",
}
for _, opt := range r.Options {
switch opt.Name {
case "field":
h.name = opt.Values[0].Value.(*agent.OptionValue_StringValue).StringValue
h.value = opt.Values[1].Value.(*agent.OptionValue_DoubleValue).DoubleValue
}
}
if h.name == "" {
init.Success = false
init.Error = "must supply field"
}
return init, nil
}
现在我们可以在点上设置其名称和值。
更新Point方法:
func (h *mirrorHandler) Point(p *agent.Point) error {
// Send back the point we just received
if p.FieldsDouble == nil {
p.FieldsDouble = make(map[string]float64)
}
p.FieldsDouble[h.name] = h.value
h.agent.Responses <- &agent.Response{
Message: &agent.Response_Point{
Point: p,
},
}
return nil
}
重新启动UDF进程并再次尝试。
指定要使用的字段名称和值与.field(name, value)方法。
您可以在mirror UDF后添加|log()以查看新字段确实已被创建。
dbrp "telegraf"."autogen"
stream
|from()
.measurement('cpu')
@mirror()
.field('mycustom_field', 42.0)
|log()
|alert()
.cirt(lambda: "usage_idle" < 30)
总结
在这个时刻,您应该能够使用套接字或基于进程的方法编写自定义 UDF。 UDF 的用途广泛,从连续查询中作为自定义降采样逻辑,自定义异常检测算法,或者只是一个“处理”您的数据的系统。
下一步
如果你想了解更多,这里有一些开始的地方:
- 修改镜像 UDF,使其功能类似于 DefaultNode。 在不是始终覆盖字段的情况下,仅在字段不存在时设置它。 还增加了对设置标签以及字段的支持。
- 将镜像 UDF 更改为在批次上运行,而不是在流上。这需要更改
Info方法中的边缘类型,以及实现BeginBatch和EndBatch方法。 - 查看其他示例并修改一个以便完成您现有需求中的类似操作。