贡献新的Kapacitor输出节点
如果您还没有,请查看Kapacitor 贡献指南以获取有关如何开始贡献的信息。
目标
向Kapacitor添加一个新节点,该节点可以将数据输出到自定义端点。
在本指南中,假设我们想将数据输出到一个虚构的内部数据库HouseDB。
概述
Kapacitor通过一个管道处理数据。 管道正式上是一个有向无环图(DAG)。 基本思想是图中的每个节点代表数据的某种处理形式,每个边在节点之间传递数据。 为了添加一个新的节点类型,需要编写两个组件:
- 用于创建和配置节点的API (TICKscript),以及
- 数据处理步骤的实现。
在我们的例子中,数据处理步骤是将数据输出到HouseDB。
代码通过两个Go包来反映这些要求。
pipeline: 这个包定义了可用的节点类型及其配置方式。kapacitor: 该包提供了pipeline包中定义的每个节点的实现。
为了使API(即TICK脚本)干净易读,节点的定义与节点的实现相分离。
更新 TICKscript
首先,我们需要更新 TICKscript,以便用户可以定义我们的新节点。 TICKscript 应该是什么样子才能将数据发送到 HouseDB? 要连接到 HouseDB 实例,我们需要一个 URL 和一个数据库名称,因此我们需要一种提供该信息的方法。 这个怎么样?
node
|houseDBOut()
.url('house://housedb.example.com')
.database('metrics')
为了更新 TICKscript 以支持这些新方法,我们需要编写一个实现 pipeline.Node 接口的 Go 类型。
可以在 这里 找到该接口
以及通过 pipeline.node 类型的完整实现。
由于 Node 的实现已经为我们完成,我们只需使用它。
首先我们需要一个名字。HouseDBOutNode 遵循命名约定。
让我们定义一个 Go struct,通过组合实现该接口。
在 pipeline 目录中创建一个名为 housedb_out.go 的文件,内容如下:
package pipeline
// A HouseDBOutNode will take the incoming data stream and store it in a
// HouseDB instance.
type HouseDBOutNode struct {
// Include the generic node implementation.
node
}
就这样,我们在Go中有一个实现所需接口的类型。
为了允许使用 .url 和 .database 方法,我们只需在类型上定义同名字段。
第一个字母需要大写,以便被导出。
字段需要被导出非常重要,因为它们将被 kapacitor 包中的节点消费。
其余部分的名称应与方法名称保持相同的大小写。
TICKscript将在运行时处理大小写匹配。
更新 housedb_out.go 文件。
package pipeline
// A HouseDBOutNode will take the incoming data stream and store it in a
// HouseDB instance.
type HouseDBOutNode struct {
// Include the generic node implementation.
node
// URL for connecting to HouseDB
Url string
// Database name
Database string
}
接下来,我们需要一种一致的方法来创建我们节点的新实例。
但要做到这一点,我们需要考虑这个节点如何连接到其他节点。
由于在Kapacitor看来我们是一个输出节点,因此这是管道的终点。
我们将不提供任何出边,图形在这个节点结束。
我们想象中的HouseDB是灵活的,可以批量存储数据或作为单个数据点。
因此,我们不关心HouseDBOutNode节点接收哪种类型的数据。
考虑到这些事实,我们可以定义一个函数来创建一个新的HouseDBOutNode。
将这个函数添加到housedb_out.go文件的末尾:
// Create a new HouseDBOutNode that accepts any edge type.
func newHouseDBOutNode(wants EdgeType) *HouseDBOutNode {
return &HouseDBOutNode{
node: node{
desc: "housedb",
wants: wants,
provides: NoEdge,
}
}
}
通过明确声明节点的边的类型 wants 和 provides,Kapacitor 将进行必要的类型检查,以防止无效的管道。
最后,我们需要添加一个新的 chaining method,以便用户可以将 HouseDBOutNodes 连接到他们现有的管道中。
一个 chaining method 是创建一个新节点并将其作为调用节点的子节点添加的方法。
实际上,该方法将节点连接在一起。
pipeline.chainnode 类型包含所有可以用于链接节点的方法集合。
一旦我们将我们的方法添加到该类型中,任何其他节点现在都可以与 HouseDBOutNode 链接。
将此函数添加到 pipeline/node.go 文件的末尾:
// Create a new HouseDBOutNode as a child of the calling node.
func (c *chainnode) HouseDBOut() *HouseDBOutNode {
h := newHouseDBOutNode(c.Provides())
c.linkChild(h)
return h
}
我们现在已经定义了所有必要的组成部分,以便 TICKscripts 可以定义 HouseDBOutNodes:
node
|houseDBOut() // added as a method to the 'chainnode' type
.url('house://housedb.example.com') // added as a field to the HouseDBOutNode
.database('metrics') // added as a field to the HouseDBOutNode
实现 HouseDB 输出
现在,既然 TICKscript 可以定义我们的新输出节点,我们需要实际提供一个实现,以便 Kapacitor 知道如何处理该节点。
pipeline 包中的每个节点在 kapacitor 包中都有一个同名的节点。
创建一个名为 housedb_out.go 的文件,并将其放在仓库的根目录中。
将下面的内容放入该文件。
package kapacitor
import (
"github.com/influxdb/kapacitor/pipeline"
)
type HouseDBOutNode struct {
// Include the generic node implementation
node
// Keep a reference to the pipeline node
h *pipeline.HouseDBOutNode
}
这个 kapacitor 包还定义了一个名为 Node 的接口,并通过 kapacitor.node 类型提供了默认实现。再次使用组合来实现接口。注意,我们还有一个字段将包含我们刚刚定义的 pipeline.HouseDBOutNode 的实例。这个 pipeline.HouseDBOutNode 像一个配置结构,告诉 kapacitor.HouseDBOutNode 它需要做什么才能完成工作。
现在我们有了一个结构体,让我们定义一个函数来创建我们新结构体的实例。new*Node 方法在 kapacitor 包中遵循以下约定:
func newNodeName(et *ExecutingTask, n *pipeline.NodeName) (*NodeName, error) {}
在我们的例子中,我们想要定义一个名为 newHouseDBOutNode 的函数。将以下方法添加到 housedb_out.go 文件中。
func newHouseDBOutNode(et *ExecutingTask, n *pipeline.HouseDBOutNode, d NodeDiagnostic) (*HouseDBOutNode, error) {
h := &HouseDBOutNode{
// pass in necessary fields to the 'node' struct
node: node{Node: n, et: et, diag: d},
// Keep a reference to the pipeline.HouseDBOutNode
h: n,
}
// Set the function to be called when running the node
// more on this in a bit.
h.node.runF = h.runOut
return h
}
为了创建我们的节点实例,我们需要将其与pipeline包中的节点关联起来。 这可以通过task.go文件中的createNode方法中的switch语句来完成。 继续我们的示例:
// Create a node from a given pipeline node.
func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node, err error) {
switch t := p.(type) {
...
case *pipeline.HouseDBOutNode:
n, err = newHouseDBOutNode(et, t, d)
...
}
现在我们已经关联了我们的两种类型,让我们回到实现输出代码上。注意在 newHouseDBOutNode 函数中的这一行 h.node.runF = h.runOut。这一行设置了在节点开始执行时将被调用的 kapacitor.HouseDBOutNode 的方法。现在我们需要定义 runOut 方法。在文件 housedb_out.go 中添加这个方法:
func (h *HouseDBOutNode) runOut(snapshot []byte) error {
return nil
}
通过这个更改,HouseDBOutNode 在语法上是完整的,但尚未执行任何操作。
让我们给它一些任务!
正如我们之前所学,节点通过边进行通信。
有一个 Go 类型 edge.Edge 用于处理这种通信。
我们想要做的就是从边读取数据并将其发送到 HouseDB。
数据以 edge.Message 类型的形式表示。
节点使用 edge.Consumer 读取消息,节点通过实现 edge.Receiver 接口来处理消息。
Consumer 和 Receiver 接口都可以在 这里 找到。
我们通过组合在 HouseDBOutNode 中包含的 node 类型提供了一个名为 ins 的边列表。由于 HouseDBOutNode 只能有一个父节点,我们关心的边是第 0 条边。我们可以使用 NewConsumerWithReceiver 函数从边中消费和处理消息。
// NewConsumerWithReceiver creates a new consumer for the edge e and receiver r.
func NewConsumerWithReceiver(e Edge, r Receiver) Consumer {
return &consumer{
edge: e,
r: r,
}
}
让我们更新 runOut 来使用这个函数读取和处理消息。
func (h *HouseDBOutNode) runOut(snapshot []byte) error {
consumer := edge.NewConsumerWithReceiver(
n.ins[0],
h,
)
return consumer.Consume()
}
剩下的就是让 HouseDBOutNode 实现 Receiver 接口,并编写一个接受一批点并将其写入 HouseDB 的函数。
为了方便起见,我们可以使用 edge.BatchBuffer 来接收批量消息。
我们还可以将单个点消息转换为只包含一个点的批量消息。
func (h *HouseDBOutNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error) {
return nil, h.batchBuffer.BeginBatch(begin)
}
func (h *HouseDBOutNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) {
return nil, h.batchBuffer.BatchPoint(bp)
}
func (h *HouseDBOutNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error) {
msg := h.batchBuffer.BufferedBatchMessage(end)
return msg, h.write(msg)
}
func (h *HouseDBOutNode) Point(p edge.PointMessage) (edge.Message, error) {
batch := edge.NewBufferedBatchMessage(
edge.NewBeginBatchMessage(
p.Name(),
p.Tags(),
p.Dimensions().ByName,
p.Time(),
1,
),
[]edge.BatchPointMessage{
edge.NewBatchPointMessage(
p.Fields(),
p.Tags(),
p.Time(),
),
},
edge.NewEndBatchMessage(),
)
return p, h.write(batch)
}
func (h *HouseDBOutNode) Barrier(b edge.BarrierMessage) (edge.Message, error) {
return b, nil
}
func (h *HouseDBOutNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) {
return d, nil
}
func (h *HouseDBOutNode) Done() {}
// Write a batch of data to HouseDB
func (h *HouseDBOutNode) write(batch edge.BufferedBatchMessage) error {
// Implement writing to HouseDB here...
return nil
}
一旦我们实现了 write 方法,我们就完成了。 当数据到达 HouseDBOutNode 时,它将被写入指定的 HouseDB 实例。
总结
我们首先在pipeline包中编写了一个节点(文件路径:pipeline/housedb_out.go),以定义用于向HouseDB实例发送数据的TICKscript API。
然后,我们在kapacitor包中编写了该节点的实现(文件路径:housedb_out.go)。
我们还更新了pipeline/node.go以添加一个新的链式方法,并更新了task.go以关联这两种类型。
以下是完整的文件内容:
管道/住房数据库输出.go:
package pipeline
// A HouseDBOutNode will take the incoming data stream and store it in a
// HouseDB instance.
type HouseDBOutNode struct {
// Include the generic node implementation.
node
// URL for connecting to HouseDB
Url string
// Database name
Database string
}
// Create a new HouseDBOutNode that accepts any edge type.
func newHouseDBOutNode(wants EdgeType) *HouseDBOutNode {
return &HouseDBOutNode{
node: node{
desc: "housedb",
wants: wants,
provides: NoEdge,
}
}
}
housedb_out.go
package kapacitor
import (
"github.com/influxdb/kapacitor/pipeline"
)
type HouseDBOutNode struct {
// Include the generic node implementation
node
// Keep a reference to the pipeline node
h *pipeline.HouseDBOutNode
// Buffer for a batch of points
batchBuffer *edge.BatchBuffer
}
func newHouseDBOutNode(et *ExecutingTask, n *pipeline.HouseDBOutNode, d NodeDiagnostic) (*HouseDBOutNode, error) {
h := &HouseDBOutNode{
// pass in necessary fields to the 'node' struct
node: node{Node: n, et: et, diag: d},
// Keep a reference to the pipeline.HouseDBOutNode
h: n,
// Buffer for a batch of points
batchBuffer: new(edge.BatchBuffer),
}
// Set the function to be called when running the node
h.node.runF = h.runOut
return h
}
func (h *HouseDBOutNode) runOut(snapshot []byte) error {
consumer := edge.NewConsumerWithReceiver(
n.ins[0],
h,
)
return consumer.Consume()
}
func (h *HouseDBOutNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error) {
return nil, h.batchBuffer.BeginBatch(begin)
}
func (h *HouseDBOutNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) {
return nil, h.batchBuffer.BatchPoint(bp)
}
func (h *HouseDBOutNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error) {
msg := h.batchBuffer.BufferedBatchMessage(end)
return msg, h.write(msg)
}
func (h *HouseDBOutNode) Point(p edge.PointMessage) (edge.Message, error) {
batch := edge.NewBufferedBatchMessage(
edge.NewBeginBatchMessage(
p.Name(),
p.Tags(),
p.Dimensions().ByName,
p.Time(),
1,
),
[]edge.BatchPointMessage{
edge.NewBatchPointMessage(
p.Fields(),
p.Tags(),
p.Time(),
),
},
edge.NewEndBatchMessage(),
)
return p, h.write(batch)
}
func (h *HouseDBOutNode) Barrier(b edge.BarrierMessage) (edge.Message, error) {
return b, nil
}
func (h *HouseDBOutNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) {
return d, nil
}
func (h *HouseDBOutNode) Done() {}
// Write a batch of data to HouseDB
func (h *HouseDBOutNode) write(batch edge.BufferedBatchMessage) error {
// Implement writing to HouseDB here...
return nil
}
pipeline/node.go(仅显示新的链式方法):
...
// Create a new HouseDBOutNode as a child of the calling node.
func (c *chainnode) HouseDBOut() *HouseDBOutNode {
h := newHouseDBOutNode(c.Provides())
c.linkChild(h)
return h
}
...
任务.go(只显示新的案例):
...
// Create a node from a given pipeline node.
func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node, err error) {
switch t := p.(type) {
...
case *pipeline.HouseDBOutNode:
n, err = newHouseDBOutNode(et, t, d)
...
}
...
记录您的新节点
由于TICKscript是其自身的语言,我们构建了一个类似于 godoc 的小工具,名为 tickdoc。
tickdoc 从代码中的注释生成文档。
tickdoc 工具理解两个特殊注释,以帮助它生成干净的文档。
tick:ignore: 可以添加到任何字段、方法、函数或结构体。tickdoc将跳过它并不会为其生成任何文档。这对于忽略通过属性方法设置的字段最为有用。tick:property:仅添加到方法中。通知tickdoc该方法是property method而不是chaining method。
将这些注释单独放在一行上,tickdoc 将找到它并相应地执行。否则,请正常记录您的代码,tickdoc 将处理其余部分。
贡献非输出节点。
写入任何节点(不仅仅是输出节点)是一个非常类似的过程,留给读者作为练习。 有些事情可能会有所不同:
第一个区别是,如果您的新节点可以将数据发送到子节点,它将希望使用pipeline.chainnode 实现pipeline.Node 接口,位于pipeline 包中。
例如:
package pipeline
type MyCustomNode struct {
// Include pipeline.chainnode so we have all the chaining methods available
// to our new node
chainnode
}
func newMyCustomNode(e EdgeType, n Node) *MyCustomNode {
m := &MyCustomNode{
chainnode: newBasicChainNode("mycustom", e, e),
}
n.linkChild(m)
return m
}
第二个区别是可以定义一个方法,该方法在管道节点上设置字段并返回相同的实例,以创建一个 property method。 示例:
package pipeline
type MyCustomNode struct {
// Include pipeline.chainnode so we have all the chaining methods available
// to our new node
chainnode
// Mark this field as ignored for docs
// Since it is set via the Names method below
// tick:ignore
NameList []string `tick:"Names"`
}
func newMyCustomNode(e EdgeType, n Node) *MyCustomNode {
m := &MyCustomNode{
chainnode: newBasicChainNode("mycustom", e, e),
}
n.linkChild(m)
return m
}
// Set the NameList field on the node via this method.
//
// Example:
// node.names('name0', 'name1')
//
// Use the tickdoc comment 'tick:property' to mark this method
// as a 'property method'
// tick:property
func (m *MyCustomNode) Names(name ...string) *MyCustomNode {
m.NameList = name
return m
}