使用InfluxDB中的数据发送警报
查询、分析并使用存储在InfluxDB中的时间序列数据发送警报。
本指南使用 Python、InfluxDB 3 Python client library 和 Python Slack SDK 来演示如何从 InfluxDB 查询数据并发送警报到 Slack,但您可以使用您选择的任何运行时和警报平台,以及任何可用的 InfluxDB 3 client libraries。无论您选择使用哪个客户端和平台,过程都是一样的:
警报过程
- 使用外部运行时和InfluxDB客户端从InfluxDB查询数据。
- 使用查询的数据和您运行时可用的工具来发送警报。
创建一个Slack应用
要向Slack发送警报,首先创建一个Slack应用程序并收集与您的应用程序互动所需的连接凭据。更多信息请参见Slack基本应用程序设置文档。
安装依赖
本指南假设您已经 设置了您的Python项目和虚拟环境。
使用 pip 安装以下依赖项:
influxdb_client_3pandasslack_sdk
pip install influxdb3-python pandas slack_sdk
创建一个 InfluxDB 客户端
在 influxdb_client_3 模块中使用 InfluxDBClient3 函数来
实例化一个 InfluxDB 客户端。
提供以下凭据:
- host: InfluxDB Cloud Serverless 区域 URL (不带协议)
- org: InfluxDB 组织名称
- token: InfluxDB API 令牌,具有对您想要查询的存储桶的读取权限
- database: InfluxDB 桶名称
from influxdb_client_3 import InfluxDBClient3
import pandas
# Instantiate an InfluxDBClient3 client configured for your bucket
influxdb = InfluxDBClient3(
host='cloud2.influxdata.com',
org='ORG_NAME',
token='API_TOKEN',
database='BUCKET_NAME'
)
创建一个 Slack 客户端
从
slack.sdk模块导入WebClient函数,以及从slack_sdk.errors模块导入SlackApiError函数。使用
WebClient函数来实例化一个 Slack 客户端。
提供以下凭据:- token: Slack bot token
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
slack = WebClient(token='SLACK_BOT_TOKEN')
查询 InfluxDB
定义一个SQL或InfluxQL查询以检索要警报的数据。根据您想要警报的数据,您可以:
- 在查询中包含逻辑,以便仅返回应警报的结果。
- 查询进一步处理所需的数据,然后根据在运行时执行的处理发送警报。
下面的示例查询仅返回超过应触发警报的阈值的值。
SELECT
selector_last(co, time)['time'] AS time,
selector_last(co, time)['value'] AS co,
room
FROM home
WHERE co > 10
GROUP BY room
SELECT
LAST(co) AS co,
room
FROM home
WHERE co > 10
GROUP BY room
执行查询
将查询字符串分配给一个变量。
使用您实例化的客户端的
query方法从InfluxDB查询原始数据。提供以下参数。- query: 要执行的查询字符串
- 语言:
sql或influxql
使用
to_pandas方法将返回的 Arrow 表转化为 Pandas 数据框。
# ...
query = '''
SELECT
selector_last(co, time)['time'] AS time,
selector_last(co, time)['value'] AS co,
room
FROM home
WHERE co > 10
GROUP BY room
'''
table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
# ...
query = '''
SELECT
LAST(co) AS co,
room
FROM home
WHERE co > 10
GROUP BY room
'''
table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
发送警报
遍历数据框,并为每一行发送警报到Slack。
在数据框上使用
reset_index函数以确保索引与数据框中的行数对齐。遍历每一行,并使用您的
chat_postMessage方法通过 Slack client 向Slack发送消息(每行一条)。提供以下参数:- channel: 发送警报的Slack频道。
- text: 要发送的消息文本。使用字符串插值将每行的列值插入消息文本中。
# ...
data_frame = data_frame.reset_index()
for index, row in data_frame.iterrows():
slack.chat_postMessage(
channel="#SLACK_CHANNEL",
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}'
)
完整警报脚本
from influxdb_client_3 import InfluxDBClient3
import pandas
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
influxdb = InfluxDBClient3(
host='cloud2.influxdata.com',
org='ORG_NAME',
token='API_TOKEN',
database='BUCKET_NAME'
)
slack = WebClient(token='SLACK_BOT_TOKEN')
query = '''
SELECT
selector_last(co, time)['time'] AS time,
selector_last(co, time)['value'] AS co,
room
FROM home
WHERE co > 10
GROUP BY room
'''
table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
data_frame = data_frame.reset_index()
for index, row in data_frame.iterrows():
slack.chat_postMessage(
channel="#SLACK_CHANNEL",
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}'
)
from influxdb_client_3 import InfluxDBClient3
import pandas
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
influxdb = InfluxDBClient3(
host='cloud2.influxdata.com',
org='ORG_NAME',
token='API_TOKEN',
database='BUCKET_NAME'
)
slack = WebClient(token='SLACK_BOT_TOKEN')
query = '''
SELECT
LAST(co) AS co,
room
FROM home
WHERE co > 10
GROUP BY room
'''
table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
data_frame = data_frame.reset_index()
for index, row in data_frame.iterrows():
slack.chat_postMessage(
channel="#SLACK_CHANNEL",
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}'
)