计算连接系列的比率 + 填充数据
收集一组时间序列数据,其中每个时间序列计数特定事件是一种常见的场景。使用Kapacitor,可以将一组中的多个时间序列连接在一起,并用于计算组合值,然后将其存储为一个新的时间序列。
本指南展示如何在python中使用准备好的数据生成器将两个生成的时间序列组合成一个新的计算测量值,然后使用Kapacitor将该测量值存储回InfluxDB。
它以一个假设的高流量网站为例,进行两项测量:
errors– 页面浏览中出现错误的数量。views– 没有错误的页面浏览次数。
数据生成器
这样的一个网站的数据可以通过Python 3脚本准备并生成到InfluxDB,脚本被打包到pages.zip (md5, sha256),并为此目的创建。它利用了InfluxDB-Python库。请查看那个Github项目,了解如何在Python中安装该库的说明。
解压缩后,可以使用此脚本创建一个名为 pages 的数据库,该数据库使用默认的保留策略 autogen。它可以用来创建数据的积压,然后设置生成器运行,沿着随机生成的 view 和 error 计数。
可以以两天的随机数据积压开始,如下所示:
$ ./pages_db.py --silent true pnr --start 2d
Created database pages
priming and running
data primed
generator now running. CTRL+C to stop
..........................................
准备两天的数据大约需要一分钟。
与批量数据连接
仅仅拥有简单的计数可能不足以满足网站管理员的需求。更重要的是要知道导致错误的页面浏览百分比。这个过程是选择现有的两个测量值,将它们连接起来并计算出错误的百分比。然后,错误百分比可以作为一个新的测量值存储在InfluxDB中。
这两个测量值, errors 和 views,需要被查询。
// Get errors batch data
var errors = batch
|query('SELECT sum(value) FROM "pages"."autogen".errors')
.period(1h)
.every(1h)
.groupBy(time(1m), *)
.fill(0)
// Get views batch data
var views = batch
|query('SELECT sum(value) FROM "pages"."autogen".views')
.period(1h)
.every(1h)
.groupBy(time(1m), *)
.fill(0)
连接过程跳过在另一来源中没有匹配时间点的点。因此,在连接批处理数据时,同时进行 groupBy 和 fill 数据是很重要的。按时间对数据进行分组确保每个来源在一致的时间段内都有数据点。填充数据确保每个点都有一个与合理默认值匹配的点。
在这个例子中,groupBy 方法使用通配符 * 来按所有标签对结果进行分组。通过声明单个标签可以使其更具体,并且由于生成的演示数据仅包含一个标签,page,因此 groupBy 语句可以写成如下: .groupBy(time(1m), 'page')。
对于每个测量,需要将两个批量源连接起来,方法如下。
// Join errors and views
errors
|join(views)
.as('errors', 'views')
这些数据是通过时间连接的,这意味着当每个源的批次对到达时,它们会被合并成一个单独的批次。因此,每个源的字段需要重命名以正确命名空间。这是通过.as('errors', 'views')行来实现的。在这个例子中,每个测量只有一个字段,名为sum。合并后的字段分别被称为errors.sum和views.sum。
现在数据已连接,可以计算百分比。使用字段的新名称,可以使用以下表达式来计算所需的百分比。
//Calculate percentage
|eval(lambda: "errors.sum" / ("views.sum" + "errors.sum"))
// Give the resulting field a name
.as('value')
最后,这些数据被存储回InfluxDB中。
|influxDBOut()
.database('pages')
.measurement('error_percent')
这里是批处理任务的完整 TICKscript:
dbrp "pages"."autogen"
// Get errors batch data
var errors = batch
|query('SELECT sum(value) FROM "pages"."autogen".errors')
.period(1h)
.every(1h)
.groupBy(time(1m), *)
.fill(0)
// Get views batch data
var views = batch
|query('SELECT sum(value) FROM "pages"."autogen".views')
.period(1h)
.every(1h)
.groupBy(time(1m), *)
.fill(0)
// Join errors and views
errors
|join(views)
.as('errors', 'views')
//Calculate percentage
|eval(lambda: ("errors.sum" / ("views.sum" + "errors.sum")) * 100)
// Give the resulting field a name
.as('value')
|influxDBOut()
.database('pages')
.measurement('error_percent')
回填
现在来一个有趣的小把戏。 使用Kapacitor的记录/回放操作,这个TICKscript可以在历史数据上运行。 首先,将上面的脚本保存为 error_percent.tick 并定义它。 然后,为我们想要填充的过去时间段创建一个录音。
$ kapacitor define error_percent -tick error_percent.tick
$ kapacitor record batch -task error_percent -past 1d
获取录音 ID 并对任务进行历史数据回放。
这里指定 -rec-time 标志,以指示 Kapacitor 在处理数据时使用录音中存储的实际时间,而不是调整为当前时间。
$ kapacitor replay -task error_percent -recording RECORDING_ID -rec-time
如果数据集太大,无法在一个录音中保存,请定义一个特定的时间范围进行录制,然后逐个回放每个范围。
rid=$(kapacitor record batch -task error_percent -start 2015-10-01 -stop 2015-10-02)
echo $rid
kapacitor replay -task error_percent -recording $rid -rec-time
kapacitor delete recordings $rid
只需对上述脚本进行循环,针对每个时间窗口重建所需的所有历史数据。这样,error_percent 在每分钟的历史数据将会被回填。
流方法
通过流处理案例可以实现类似的功能。请注意,命令
kapacitor record stream 并不包含相同的历史选项 -past,
因此直接在Kapacitor中使用stream任务进行回填是不可能的。如果
需要回填,可以使用下面呈现的命令 kapacitor record query。
然而,相同的 TICKscript 语义可以与 stream 任务一起使用,以实时计算和存储新的计算值,例如 error_percent。
以下就是这样一个TICKscript。
dbrp "pages"."autogen"
// Get errors stream data
var errors = stream
|from()
.measurement('errors')
.groupBy(*)
|window()
.period(1m)
.every(1m)
|sum('value')
// Get views stream data
var views = stream
|from()
.measurement('views')
.groupBy(*)
|window()
.period(1m)
.every(1m)
|sum('value')
// Join errors and views
errors
|join(views)
.as('errors', 'views')
// Calculate percentage
|eval(lambda: "errors.sum" / ("views.sum" + "errors.sum") * 100.0)
// Give the resulting field a name
.as('value')
|influxDBOut()
.database('pages')
.measurement('error_percent')
记录查询和流式回填
要为处理多个测量值的流任务提供历史数据,请在记录数据时使用 multiple statements。
第一次使用 record query 按照这个通用命令的模式:
kapacitor record query -query $'select field1,field2,field3 from "database_name"."autogen"."one" where time > \'YYYY-mm-ddTHH:MM:SSZ\' and time < \'YYYY-mm-ddTHH:MM:SSZ\' GROUP BY *; select field1,field2,field3 from "database_name"."autogen"."two" where time > \'YYYY-mm-ddTHH:MM:SSZ\' and time < \'YYYY-mm-ddTHH:MM:SSZ\' GROUP BY *' -type stream
例如:
$ kapacitor record query -query $'select value from "pages"."autogen"."errors" where time > \'2018-05-30T12:00:00Z\' and time < \'2018-05-31T12:00:00Z\' GROUP BY *; select value from "pages"."autogen"."views" where time > \'2018-05-30T12:00:00Z\' and time < \'2018-12-21T12:00:00Z\' GROUP BY *' -type stream
578bf299-3566-4813-b07b-744da6ab081a
返回的录音 ID 可以在 Kapacitor replay 命令中使用录制的时间。
$ kapacitor replay -task error_percent_s -recording 578bf299-3566-4813-b07b-744da6ab081a -rec-time
c623f73c-cf2a-4fce-be4c-9ab89f0c6045