开始使用Kapacitor
使用Kapacitor导入(流式或批量)时间序列数据,然后对数据进行转换、分析和处理。要开始使用Kapacitor,请使用Telegraf在本地机器上收集系统指标并将其存储在InfluxDB中。然后,使用Kapacitor处理您的系统数据。
- 概述
- 启动InfluxDB并收集Telegraf数据
- 开始 Kapacitor
- Kapacitor 任务
概述
Kapacitor 任务定义了对一组数据使用 TICKscript 语法执行的工作。Kapacitor 任务包括:
stream任务。流任务复制写入到 InfluxDB 的数据到 Kapacitor。减轻查询负担,并要求 Kapacitor 将数据存储在磁盘上。batch任务。批处理任务在指定的时间间隔内查询和处理数据。
要开始,请执行以下操作:
- 如果您还没有,下载并安装 InfluxData 1.x TICK 堆栈 (OSS)。
- 启动 InfluxDB 并启动 Telegraf。默认情况下,Telegraf 开始将系统指标发送到 InfluxDB 并创建一个名为 'telegraf' 的数据库。
- 启动 Kapacitor。
注意:以下过程中的示例命令是为Linux编写的。
启动 InfluxDB 并收集 Telegraf 数据
通过运行以下命令启动 InfluxDB:
$ sudo systemctl start influxdb在Telegraf配置文件(
/etc/telegraf/telegraf.conf)中,配置[[outputs.influxd]]以指定如何连接到InfluxDB和目标数据库。[[outputs.influxdb]] ## InfluxDB url is required and must be in the following form: http/udp "://" host [ ":" port] ## Multiple urls can be specified as part of the same cluster; only ONE url is written to each interval. ## InfluxDB url urls = ["http://localhost:8086"] ## The target database for metrics is required (Telegraf creates if one doesn't exist). database = "telegraf"运行以下命令以启动 Telegraf:
$ sudo systemctl start telegrafInfluxDB 和 Telegraf 现在正在 localhost 上运行。
一分钟后,运行以下命令以使用InfluxDB API查询Telegraf数据:
$ curl -G 'http://localhost:8086/query?db=telegraf' --data-urlencode 'q=SELECT mean(usage_idle) FROM cpu'出现类似以下的结果:
{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",91.82304336748372]]}]}]}
开始 Kapacitor
运行以下命令以生成 Kapacitor 配置文件:
kapacitord config > kapacitor.conf默认情况下,Kapacitor 配置文件保存在
/etc/kapacitor/kapacitor.conf。如果您将文件保存到其他位置, 启动 Kapacitor 时请指定该位置。Kapacitor 配置是一个 TOML 文件。配置为 InfluxDB 的输入也适用于 Kapacitor。
启动Kapacitor服务:
$ sudo systemctl start kapacitor因为 InfluxDB 运行在
http://localhost:8086,Kapacitor 在启动时找到它并在 InfluxDB 上创建了几个 subscriptions。订阅告诉 InfluxDB 将数据发送到 Kapacitor。(可选) 要查看日志数据,请运行以下命令:
$ sudo tail -f -n 128 /var/log/kapacitor/kapacitor.logKapacitor在一个HTTP端口上侦听并将数据发送到InfluxDB。现在,InfluxDB将数据从Telegraf流传输到Kapacitor。
执行任务
在TICKscript的开始,指定包含数据的数据库和保留策略:
dbrp "telegraf"."autogen"
// ...
当Kapacitor从匹配指定的数据库和保留策略接收数据时,Kapacitor将执行TICKscript。
Kapacitor支持基于数据库和保留策略(没有其他条件)执行任务。
从流数据触发警报
触发警报是Kapacitor的一个常见用例。必须定义要警报的数据库和保留策略。
CPU使用率示例警报
将以下 TICKscript 复制到一个名为
cpu_alert.tick的文件中:dbrp "telegraf"."autogen" stream // Select the CPU measurement from the `telegraf` database. |from() .measurement('cpu') // Triggers a critical alert when the CPU idle usage drops below 70% |alert() .crit(lambda: int("usage_idle") < 70) // Write each alert to a file. .log('/tmp/alerts.log')在命令行中,使用
kapacitorCLI 通过cpu_alert.tickTICKscript 定义任务:kapacitor define cpu_alert -tick cpu_alert.tick如果数据库和保留策略没有包含在 TICKscript 中(例如,
dbrp "telegraf"."autogen"),使用kapacitor define命令,带上-dbrp标志,后面跟着"来指定它们在添加任务时。"." " (可选) 使用
list命令来验证警报是否已创建:$ kapacitor list tasks ID Type Status Executing Databases and Retention Policies cpu_alert stream disabled false ["telegraf"."autogen"](可选) 使用
show命令查看有关任务的详细信息:$ kapacitor show cpu_alert ID: cpu_alert Error: Template: Type: stream Status: disabled Executing: false ...为了确保日志文件和通信通道不会被警报淹没,测试任务。
启用任务以开始处理实时数据流:
kapacitor enable cpu_alert警报实时写入日志。
运行
show命令以验证任务是否正在接收数据并按预期行为:$ kapacitor show cpu_alert |from() // Information about the state of the task and any error it may have encountered. ID: cpu_alert Error: Type: stream Status: Enabled Executing: true Created: 04 May 16 21:01 MDT Modified: 04 May 16 21:04 MDT LastEnabled: 04 May 16 21:03 MDT Databases Retention Policies: [""."autogen"] // Displays the version of the TICKscript that Kapacitor has stored in its local database. TICKscript: stream // Select just the cpu me .measurement('cpu') |alert() .crit(lambda: "usage_idle" < 70) // Whenever we get an alert write it to a file. .log('/tmp/alerts.log') DOT: digraph asdf { graph [throughput="0.00 points/s"]; stream0 [avg_exec_time_ns="0" ]; stream0 -> from1 [processed="12"]; from1 [avg_exec_time_ns="0" ]; from1 -> alert2 [processed="12"]; alert2 [alerts_triggered="0" avg_exec_time_ns="0" ]; }
返回一个 graphviz dot 格式的树,显示由 TICKscript 定义的数据处理管道和带有每个节点统计信息的键值关联数组条目,以及沿着边到下一个节点的链接,包括关联数组的统计信息。链接/边成员中的 processed 键表示通过图形指定边的数据信息点的数量。
在上面的例子中,stream0 节点(也称为 stream 变量来自 TICKscript)已向 from1 节点发送了 12 个数据点。from1 节点也向 alert2 节点发送了 12 个数据点。由于 Telegraf 被配置为发送 cpu 数据,因此所有 12 个数据点都符合 from1 节点的数据库/计量标准,并被传递。
如果有必要,在Debian或RedHat上使用操作系统提供的包安装graphviz。graphviz网站上提供的包不是最新的。
现在任务正在使用实时数据运行,这里有一个快速的技巧,使用100%的一个核心来产生一些人工的CPU活动:
while true; do i=0; done
测试任务
完成以下步骤以确保日志文件和通信通道不会被警报垃圾邮件。
记录数据流:
kapacitor record stream -task cpu_alert -duration 60s如果出现连接错误,例如:
getsockopt: connection refused(Linux) 或connectex: No connection could be made...(Windows), 请确认Kapacitor服务正在运行(请参见 安装和启动Kapacitor)。 如果Kapacitor正在运行,请检查主机的防火墙设置,确保端口9092是可访问的。 同时,检查/var/log/kapacitor/kapacitor.log中的消息。如果/etc/kapacitor/kapacitor.conf中的http或其他配置存在问题,该问题会在日志中显示。 如果Kapacitor服务在另一台主机上运行,请在本地shell中设置KAPACITOR_URL环境变量,指向远程主机上的Kapacitor端点。获取返回的 ID 并将其分配给 bash 变量以便后续使用(返回的实际 UUID 是不同的):
rid=cd158f21-02e6-405c-8527-261ae6f26153通过运行确认录制捕获了一些数据:
kapacitor list recordings $rid输出应该如下所示:
ID Type Status Size Date cd158f21-02e6-405c-8527-261ae6f26153 stream finished 2.2 kB 04 May 16 11:44 MDT如果大小超过几个字节,则已捕获数据。 如果 Kapacitor 没有接收数据,请检查每一层:Telegraf → InfluxDB → Kapacitor。 如果 Telegraf 无法与 InfluxDB 通信,它会记录错误。 如果 InfluxDB 无法将数据发送到 Kapacitor,它会记录关于
connection refused的错误。 运行查询SHOW SUBSCRIPTIONS以查找 InfluxDB 正在使用的将数据发送到 Kapacitor 的端点。在下面的示例中,InfluxDB 必须在
localhost:8086上运行:$ curl -G 'http://localhost:8086/query?db=telegraf' --data-urlencode 'q=SHOW SUBSCRIPTIONS' {"results":[{"statement_id":0,"series":[{"name":"_internal","columns":["retention_policy","name","mode","destinations"],"values":[["monitor","kapacitor-ef3b3f9d-0997-4c0b-b1b6-5d0fb37fe509","ANY",["http://localhost:9092"]]]},{"name":"telegraf","columns":["retention_policy","name","mode","destinations"],"values":[["autogen","kapacitor-ef3b3f9d-0997-4c0b-b1b6-5d0fb37fe509","ANY",["http://localhost:9092"]]]}]}]}使用
replay来测试特定任务的录制数据:kapacitor replay -recording $rid -task cpu_alert使用标志
-real-clock根据时间戳之间的增量设置回放时间。时间在每个节点上由它接收到的数据点测量。检查日志以获取警报:
sudo cat /tmp/alerts.log每个 JSON 行表示一个警报,包括警报级别和触发警报的数据。
如果主机繁忙,记录警报可能需要一段时间。
(可选) 修改任务以提高敏感性,以确保警报能正常工作。 在 TICKscript 中,将 lambda 函数
.crit(lambda: "usage_idle" < 70)改为.crit(lambda: "usage_idle" < 100),然后使用仅包含TASK_NAME和-tick参数的define命令运行:kapacitor define cpu_alert -tick cpu_alert.tick在录制过程中接收到的每一个数据点都会触发警报。
重复修改后的任务以验证结果。
kapacitor replay -recording $rid -task cpu_alert一旦
alerts.log的结果验证任务正常工作,请将usage_idle阈值更改回一个更合理的水平,并使用define命令重新定义任务,如第6步所示。
注意 - 单引号与双引号
TICK脚本中的单引号和双引号有很不同的作用:
请注意以下示例:
var data = stream
|from()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('cpu')
// NOTE: Double quotes on server1
.where(lambda: "host" == "server1")
此搜索的结果将始终为空,因为在“server1”周围使用了双引号。这意味着Kapacitor将搜索“host”字段等于字段“server1”的序列。这可能不是预期的结果。更可能的意图是搜索标签“host”具有值‘server1’的序列,因此应该使用单引号。双引号表示数据字段,单引号表示字符串值。为了匹配值,上面的tick脚本应如下所示:
var data = stream
|from()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('cpu')
// NOTE: Single quotes on server1
.where(lambda: "host" == 'server1')
扩展 TICKscripts
下面的 TICKscript 将计算运行平均值,并将当前值与其进行比较。
如果这些值与平均值相差超过 3 个标准差,则会触发警报。
用下面的 TICKscript 替换 cpu_alert.tick 脚本:
stream
|from()
.measurement('cpu')
|alert()
// Compare values to running mean and standard deviation
.crit(lambda: sigma("usage_idle") > 3)
.log('/tmp/alerts.log')
就这样,可以创建一个动态阈值,如果白天 CPU 使用率下降或晚上激增,将会发出警报。
试试看。
使用 define 来更新任务 TICKscript。
kapacitor define cpu_alert -tick cpu_alert.tick
注意: 如果任务已经启用,用 define 命令重新定义该任务将自动重载 (reload) 该任务。要定义一个不重载的任务,请使用 -no-reload
现在跟踪警报日志:
sudo tail -f /tmp/alerts.log
现在不应该触发任何警报。 接下来,开始一个while循环以增加一些负载:
while true; do i=0; done
一旦创建了足够的人为负载,警报触发器应该很快写入日志。
让循环运行几分钟。
在取消循环后,应发出另一个警报,指示CPU使用率又发生了变化。
使用这种技术,可以生成CPU使用率上升和下降边缘的警报,以及任何异常值。
一个现实世界的例子
现在基础内容已经涵盖,这里是一个更真实的例子。一旦来自几个主机的指标流向Kapacitor,就可以进行类似于:聚合并分组每个数据中心中每个服务的cpu使用情况,然后根据95百分位触发警报。除了只是将警报写入日志,Kapacitor还可以与第三方工具集成:目前支持Slack、PagerDuty、HipChat、VictorOps等。警报还可以通过电子邮件发送,可以发布到自定义端点或可以触发自定义脚本的执行。还可以定义自定义消息格式,以便警报具有正确的上下文和含义。这个的TICKscript如下例所示。
示例 - 针对多个服务CPU的流的TICK脚本,并在第95百分位上发出警报
stream
|from()
.measurement('cpu')
// create a new field called 'used' which inverts the idle cpu.
|eval(lambda: 100.0 - "usage_idle")
.as('used')
|groupBy('service', 'datacenter')
|window()
.period(1m)
.every(1m)
// calculate the 95th percentile of the used cpu.
|percentile('used', 95.0)
|eval(lambda: sigma("percentile"))
.as('sigma')
.keep('percentile', 'sigma')
|alert()
.id('{{ .Name }}/{{ index .Tags "service" }}/{{ index .Tags "datacenter"}}')
.message('{{ .ID }} is {{ .Level }} cpu-95th:{{ index .Fields "percentile" }}')
// Compare values to running mean and standard deviation
.warn(lambda: "sigma" > 2.5)
.crit(lambda: "sigma" > 3.0)
.log('/tmp/alerts.log')
// Post data to custom endpoint
.post('https://alerthandler.example.com')
// Execute custom alert handler script
.exec('/bin/custom_alert_handler.sh')
// Send alerts to slack
.slack()
.channel('#alerts')
// Sends alerts to PagerDuty
.pagerDuty()
// Send alerts to VictorOps
.victorOps()
.routingKey('team_rocket')
像定义一个警报这样简单的事情,可以迅速扩展到更大的范围。通过上述脚本,如果任何数据中心中的任何服务偏离正常行为(由历史95百分位数的cpu使用率定义)超过3个标准偏差,就会触发警报,并且会在1分钟内完成!
有关警报工作原理的更多信息,请参见AlertNode文档。
从批量数据触发警报
除了流式处理数据外,Kapacitor 还可以定期查询 InfluxDB 并批量处理数据。
虽然基于 CPU 使用率触发警报更适合流处理情况,但这里通过遵循相同的用例演示了 batch 任务的基本工作原理。
批量数据示例警报
这个TICKscript大致执行与之前的流任务相同的功能,但作为一个批处理任务:
dbrp "telegraf"."autogen"
batch
|query('''
SELECT mean(usage_idle)
FROM "telegraf"."autogen"."cpu"
''')
.period(5m)
.every(5m)
.groupBy(time(1m), 'cpu')
|alert()
.crit(lambda: "mean" < 70)
.log('/tmp/batch_alerts.log')
将上面的脚本复制到文件
batch_cpu_alert.tick中。定义任务:
kapacitor define batch_cpu_alert -tick batch_cpu_alert.tick验证其创建:
$ kapacitor list tasks ID Type Status Executing Databases and Retention Policies batch_cpu_alert batch disabled false ["telegraf"."autogen"] cpu_alert stream enabled true ["telegraf"."autogen"]记录查询结果在任务中(注意,实际的UUID不同):
kapacitor record batch -task batch_cpu_alert -past 20m # Save the id again rid=b82d4034-7d5c-4d59-a252-16604f902832这记录了使用
batch_cpu_alert任务中的查询的最后20分钟的批次。 在这种情况下,由于period为5分钟,因此记录并保存了最后4个批次。以相同的方式重放批量录音:
kapacitor replay -recording $rid -task batch_cpu_alert检查警报日志以确保警报按预期生成。 上述基于
sigma的警报也可以调整以处理批量数据。 尝试一下,让自己熟悉在Kapacitor中更新、测试和运行任务。
使用Kapacitor加载任务
要使用Kapacitor加载任务,请将TICKscript保存在load目录中,该目录在kapacitor.conf中指定。TICKscripts必须包含数据库和保留策略声明dbrp。
加载目录中的TICKscripts在Kapacitor启动时会自动加载,无需通过kapacitor define命令添加。
欲了解更多信息,请参阅 Load Directory。