Documentation

贡献新的Kapacitor输出节点

如果您还没有,请查看Kapacitor 贡献指南以获取有关如何开始贡献的信息。

目标

向Kapacitor添加一个新节点,该节点可以将数据输出到自定义端点。
在本指南中,假设我们想将数据输出到一个虚构的内部数据库HouseDB。

概述

Kapacitor通过一个管道处理数据。 管道正式上是一个有向无环图(DAG)。 基本思想是图中的每个节点代表数据的某种处理形式,每个边在节点之间传递数据。 为了添加一个新的节点类型,需要编写两个组件:

  1. 用于创建和配置节点的API (TICKscript),以及
  2. 数据处理步骤的实现。

在我们的例子中,数据处理步骤是将数据输出到HouseDB。

代码通过两个Go包来反映这些要求。

  1. pipeline: 这个包定义了可用的节点类型及其配置方式。
  2. kapacitor: 该包提供了pipeline包中定义的每个节点的实现。

为了使API(即TICK脚本)干净易读,节点的定义与节点的实现相分离。

更新 TICKscript

首先,我们需要更新 TICKscript,以便用户可以定义我们的新节点。 TICKscript 应该是什么样子才能将数据发送到 HouseDB? 要连接到 HouseDB 实例,我们需要一个 URL 和一个数据库名称,因此我们需要一种提供该信息的方法。 这个怎么样?

    node
        |houseDBOut()
            .url('house://housedb.example.com')
            .database('metrics')

为了更新 TICKscript 以支持这些新方法,我们需要编写一个实现 pipeline.Node 接口的 Go 类型。 可以在 这里 找到该接口 以及通过 pipeline.node 类型的完整实现。 由于 Node 的实现已经为我们完成,我们只需使用它。 首先我们需要一个名字。HouseDBOutNode 遵循命名约定。 让我们定义一个 Go struct,通过组合实现该接口。 在 pipeline 目录中创建一个名为 housedb_out.go 的文件,内容如下:

package pipeline

// A HouseDBOutNode will take the incoming data stream and store it in a
// HouseDB instance.
type HouseDBOutNode struct {
    // Include the generic node implementation.
    node
}

就这样,我们在Go中有一个实现所需接口的类型。 为了允许使用 .url.database 方法,我们只需在类型上定义同名字段。 第一个字母需要大写,以便被导出。 字段需要被导出非常重要,因为它们将被 kapacitor 包中的节点消费。 其余部分的名称应与方法名称保持相同的大小写。 TICKscript将在运行时处理大小写匹配。 更新 housedb_out.go 文件。

package pipeline

// A HouseDBOutNode will take the incoming data stream and store it in a
// HouseDB instance.
type HouseDBOutNode struct {
    // Include the generic node implementation.
    node

    // URL for connecting to HouseDB
    Url string

    // Database name
    Database string
}

接下来,我们需要一种一致的方法来创建我们节点的新实例。 但要做到这一点,我们需要考虑这个节点如何连接到其他节点。 由于在Kapacitor看来我们是一个输出节点,因此这是管道的终点。 我们将不提供任何出边,图形在这个节点结束。 我们想象中的HouseDB是灵活的,可以批量存储数据或作为单个数据点。 因此,我们不关心HouseDBOutNode节点接收哪种类型的数据。 考虑到这些事实,我们可以定义一个函数来创建一个新的HouseDBOutNode。 将这个函数添加到housedb_out.go文件的末尾:

// Create a new HouseDBOutNode that accepts any edge type.
func newHouseDBOutNode(wants EdgeType) *HouseDBOutNode {
    return &HouseDBOutNode{
        node: node{
            desc: "housedb",
            wants: wants,
            provides: NoEdge,
        }
    }
}

通过明确声明节点的边的类型 wantsprovides,Kapacitor 将进行必要的类型检查,以防止无效的管道。

最后,我们需要添加一个新的 chaining method,以便用户可以将 HouseDBOutNodes 连接到他们现有的管道中。 一个 chaining method 是创建一个新节点并将其作为调用节点的子节点添加的方法。 实际上,该方法将节点连接在一起。 pipeline.chainnode 类型包含所有可以用于链接节点的方法集合。 一旦我们将我们的方法添加到该类型中,任何其他节点现在都可以与 HouseDBOutNode 链接。 将此函数添加到 pipeline/node.go 文件的末尾:

// Create a new HouseDBOutNode as a child of the calling node.
func (c *chainnode) HouseDBOut() *HouseDBOutNode {
    h := newHouseDBOutNode(c.Provides())
    c.linkChild(h)
    return h
}

我们现在已经定义了所有必要的组成部分,以便 TICKscripts 可以定义 HouseDBOutNodes:

    node
        |houseDBOut() // added as a method to the 'chainnode' type
            .url('house://housedb.example.com') // added as a field to the HouseDBOutNode
            .database('metrics') // added as a field to the HouseDBOutNode

实现 HouseDB 输出

现在,既然 TICKscript 可以定义我们的新输出节点,我们需要实际提供一个实现,以便 Kapacitor 知道如何处理该节点。
pipeline 包中的每个节点在 kapacitor 包中都有一个同名的节点。
创建一个名为 housedb_out.go 的文件,并将其放在仓库的根目录中。
将下面的内容放入该文件。

package kapacitor

import (
    "github.com/influxdb/kapacitor/pipeline"
)

type HouseDBOutNode struct {
    // Include the generic node implementation
    node
    // Keep a reference to the pipeline node
    h *pipeline.HouseDBOutNode
}

这个 kapacitor 包还定义了一个名为 Node 的接口,并通过 kapacitor.node 类型提供了默认实现。再次使用组合来实现接口。注意,我们还有一个字段将包含我们刚刚定义的 pipeline.HouseDBOutNode 的实例。这个 pipeline.HouseDBOutNode 像一个配置结构,告诉 kapacitor.HouseDBOutNode 它需要做什么才能完成工作。

现在我们有了一个结构体,让我们定义一个函数来创建我们新结构体的实例。new*Node 方法在 kapacitor 包中遵循以下约定:

func newNodeName(et *ExecutingTask, n *pipeline.NodeName) (*NodeName, error) {}

在我们的例子中,我们想要定义一个名为 newHouseDBOutNode 的函数。将以下方法添加到 housedb_out.go 文件中。

func newHouseDBOutNode(et *ExecutingTask, n *pipeline.HouseDBOutNode, d NodeDiagnostic) (*HouseDBOutNode, error) {
    h := &HouseDBOutNode{
        // pass in necessary fields to the 'node' struct
        node: node{Node: n, et: et, diag: d},
        // Keep a reference to the pipeline.HouseDBOutNode
        h: n,
    }
    // Set the function to be called when running the node
    // more on this in a bit.
    h.node.runF = h.runOut
    return h
}

为了创建我们的节点实例,我们需要将其与pipeline包中的节点关联起来。 这可以通过task.go文件中的createNode方法中的switch语句来完成。 继续我们的示例:

// Create a node from a given pipeline node.
func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node, err error) {
    switch t := p.(type) {
    ...
	case *pipeline.HouseDBOutNode:
		n, err = newHouseDBOutNode(et, t, d)
    ...
}

现在我们已经关联了我们的两种类型,让我们回到实现输出代码上。注意在 newHouseDBOutNode 函数中的这一行 h.node.runF = h.runOut。这一行设置了在节点开始执行时将被调用的 kapacitor.HouseDBOutNode 的方法。现在我们需要定义 runOut 方法。在文件 housedb_out.go 中添加这个方法:

func (h *HouseDBOutNode) runOut(snapshot []byte) error {
    return nil
}

通过这个更改,HouseDBOutNode 在语法上是完整的,但尚未执行任何操作。 让我们给它一些任务!

正如我们之前所学,节点通过边进行通信。 有一个 Go 类型 edge.Edge 用于处理这种通信。 我们想要做的就是从边读取数据并将其发送到 HouseDB。 数据以 edge.Message 类型的形式表示。 节点使用 edge.Consumer 读取消息,节点通过实现 edge.Receiver 接口来处理消息。 ConsumerReceiver 接口都可以在 这里 找到。

我们通过组合在 HouseDBOutNode 中包含的 node 类型提供了一个名为 ins 的边列表。由于 HouseDBOutNode 只能有一个父节点,我们关心的边是第 0 条边。我们可以使用 NewConsumerWithReceiver 函数从边中消费和处理消息。

// NewConsumerWithReceiver creates a new consumer for the edge e and receiver r.
func NewConsumerWithReceiver(e Edge, r Receiver) Consumer {
	return &consumer{
		edge: e,
		r:    r,
	}
}

让我们更新 runOut 来使用这个函数读取和处理消息。

func (h *HouseDBOutNode) runOut(snapshot []byte) error {
	consumer := edge.NewConsumerWithReceiver(
		n.ins[0],
		h,
	)
	return consumer.Consume()
}

剩下的就是让 HouseDBOutNode 实现 Receiver 接口,并编写一个接受一批点并将其写入 HouseDB 的函数。 为了方便起见,我们可以使用 edge.BatchBuffer 来接收批量消息。 我们还可以将单个点消息转换为只包含一个点的批量消息。

func (h *HouseDBOutNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error) {
	return nil, h.batchBuffer.BeginBatch(begin)
}

func (h *HouseDBOutNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) {
	return nil, h.batchBuffer.BatchPoint(bp)
}

func (h *HouseDBOutNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error) {
    msg := h.batchBuffer.BufferedBatchMessage(end)
    return msg, h.write(msg)
}

func (h *HouseDBOutNode) Point(p edge.PointMessage) (edge.Message, error) {
	batch := edge.NewBufferedBatchMessage(
		edge.NewBeginBatchMessage(
			p.Name(),
			p.Tags(),
			p.Dimensions().ByName,
			p.Time(),
			1,
		),
		[]edge.BatchPointMessage{
			edge.NewBatchPointMessage(
				p.Fields(),
				p.Tags(),
				p.Time(),
			),
		},
		edge.NewEndBatchMessage(),
	)
    return p, h.write(batch)
}

func (h *HouseDBOutNode) Barrier(b edge.BarrierMessage) (edge.Message, error) {
	return b, nil
}
func (h *HouseDBOutNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) {
	return d, nil
}
func (h *HouseDBOutNode) Done() {}

// Write a batch of data to HouseDB
func (h *HouseDBOutNode) write(batch edge.BufferedBatchMessage) error {
    // Implement writing to HouseDB here...
    return nil
}

一旦我们实现了 write 方法,我们就完成了。 当数据到达 HouseDBOutNode 时,它将被写入指定的 HouseDB 实例。

总结

我们首先在pipeline包中编写了一个节点(文件路径:pipeline/housedb_out.go),以定义用于向HouseDB实例发送数据的TICKscript API。 然后,我们在kapacitor包中编写了该节点的实现(文件路径:housedb_out.go)。 我们还更新了pipeline/node.go以添加一个新的链式方法,并更新了task.go以关联这两种类型。

以下是完整的文件内容:

管道/住房数据库输出.go:

package pipeline

// A HouseDBOutNode will take the incoming data stream and store it in a
// HouseDB instance.
type HouseDBOutNode struct {
    // Include the generic node implementation.
    node

    // URL for connecting to HouseDB
    Url string

    // Database name
    Database string
}

// Create a new HouseDBOutNode that accepts any edge type.
func newHouseDBOutNode(wants EdgeType) *HouseDBOutNode {
    return &HouseDBOutNode{
        node: node{
            desc: "housedb",
            wants: wants,
            provides: NoEdge,
        }
    }
}

housedb_out.go

package kapacitor

import (
    "github.com/influxdb/kapacitor/pipeline"
)

type HouseDBOutNode struct {
    // Include the generic node implementation
    node
    // Keep a reference to the pipeline node
    h *pipeline.HouseDBOutNode
    // Buffer for a batch of points
    batchBuffer *edge.BatchBuffer
}

func newHouseDBOutNode(et *ExecutingTask, n *pipeline.HouseDBOutNode, d NodeDiagnostic) (*HouseDBOutNode, error) {
    h := &HouseDBOutNode{
        // pass in necessary fields to the 'node' struct
        node: node{Node: n, et: et, diag: d},
        // Keep a reference to the pipeline.HouseDBOutNode
        h: n,
        // Buffer for a batch of points
        batchBuffer: new(edge.BatchBuffer),
    }
    // Set the function to be called when running the node
    h.node.runF = h.runOut
    return h
}

func (h *HouseDBOutNode) runOut(snapshot []byte) error {
	consumer := edge.NewConsumerWithReceiver(
		n.ins[0],
		h,
	)
	return consumer.Consume()
}

func (h *HouseDBOutNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error) {
	return nil, h.batchBuffer.BeginBatch(begin)
}

func (h *HouseDBOutNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) {
	return nil, h.batchBuffer.BatchPoint(bp)
}

func (h *HouseDBOutNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error) {
    msg := h.batchBuffer.BufferedBatchMessage(end)
    return msg, h.write(msg)
}

func (h *HouseDBOutNode) Point(p edge.PointMessage) (edge.Message, error) {
	batch := edge.NewBufferedBatchMessage(
		edge.NewBeginBatchMessage(
			p.Name(),
			p.Tags(),
			p.Dimensions().ByName,
			p.Time(),
			1,
		),
		[]edge.BatchPointMessage{
			edge.NewBatchPointMessage(
				p.Fields(),
				p.Tags(),
				p.Time(),
			),
		},
		edge.NewEndBatchMessage(),
	)
    return p, h.write(batch)
}

func (h *HouseDBOutNode) Barrier(b edge.BarrierMessage) (edge.Message, error) {
	return b, nil
}
func (h *HouseDBOutNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) {
	return d, nil
}
func (h *HouseDBOutNode) Done() {}

// Write a batch of data to HouseDB
func (h *HouseDBOutNode) write(batch edge.BufferedBatchMessage) error {
    // Implement writing to HouseDB here...
    return nil
}

pipeline/node.go(仅显示新的链式方法):

...
// Create a new HouseDBOutNode as a child of the calling node.
func (c *chainnode) HouseDBOut() *HouseDBOutNode {
    h := newHouseDBOutNode(c.Provides())
    c.linkChild(h)
    return h
}
...

任务.go(只显示新的案例):

...
// Create a node from a given pipeline node.
func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node, err error) {
    switch t := p.(type) {
    ...
	case *pipeline.HouseDBOutNode:
		n, err = newHouseDBOutNode(et, t, d)
    ...
}
...

记录您的新节点

由于TICKscript是其自身的语言,我们构建了一个类似于 godoc 的小工具,名为 tickdoctickdoc 从代码中的注释生成文档。 tickdoc 工具理解两个特殊注释,以帮助它生成干净的文档。

  1. tick:ignore: 可以添加到任何字段、方法、函数或结构体。tickdoc 将跳过它并不会为其生成任何文档。这对于忽略通过属性方法设置的字段最为有用。
  2. tick:property:仅添加到方法中。通知 tickdoc 该方法是 property method 而不是 chaining method

将这些注释单独放在一行上,tickdoc 将找到它并相应地执行。否则,请正常记录您的代码,tickdoc 将处理其余部分。

贡献非输出节点。

写入任何节点(不仅仅是输出节点)是一个非常类似的过程,留给读者作为练习。 有些事情可能会有所不同:

第一个区别是,如果您的新节点可以将数据发送到子节点,它将希望使用pipeline.chainnode 实现pipeline.Node 接口,位于pipeline 包中。
例如:

package pipeline

type MyCustomNode struct {
    // Include pipeline.chainnode so we have all the chaining methods available
    // to our new node
    chainnode

}

func newMyCustomNode(e EdgeType, n Node) *MyCustomNode {
    m := &MyCustomNode{
        chainnode: newBasicChainNode("mycustom", e, e),
    }
    n.linkChild(m)
    return m
}

第二个区别是可以定义一个方法,该方法在管道节点上设置字段并返回相同的实例,以创建一个 property method。 示例:

package pipeline

type MyCustomNode struct {
    // Include pipeline.chainnode so we have all the chaining methods available
    // to our new node
    chainnode

    // Mark this field as ignored for docs
    // Since it is set via the Names method below
    // tick:ignore
    NameList []string `tick:"Names"`

}

func newMyCustomNode(e EdgeType, n Node) *MyCustomNode {
    m := &MyCustomNode{
        chainnode: newBasicChainNode("mycustom", e, e),
    }
    n.linkChild(m)
    return m
}

// Set the NameList field on the node via this method.
//
// Example:
//    node.names('name0', 'name1')
//
// Use the tickdoc comment 'tick:property' to mark this method
// as a 'property method'
// tick:property
func (m *MyCustomNode) Names(name ...string) *MyCustomNode {
    m.NameList = name
    return m
}


Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看: