Documentation

使用InfluxDB中的数据发送警报

查询、分析并使用存储在InfluxDB中的时间序列数据发送警报。

本指南使用 PythonInfluxDB 3 Python client libraryPython Slack SDK 来演示如何从 InfluxDB 查询数据并发送警报到 Slack,但您可以使用您选择的任何运行时和警报平台,以及任何可用的 InfluxDB 3 client libraries。无论您选择使用哪个客户端和平台,过程都是一样的:

警报过程

  1. 使用外部运行时和InfluxDB客户端从InfluxDB查询数据。
  2. 使用查询的数据和您运行时可用的工具来发送警报。

创建一个Slack应用

要向Slack发送警报,首先创建一个Slack应用程序并收集与您的应用程序互动所需的连接凭据。更多信息请参见Slack基本应用程序设置文档

安装依赖

本指南假设您已经 设置了您的Python项目和虚拟环境

使用 pip 安装以下依赖项:

  • influxdb_client_3
  • pandas
  • slack_sdk
pip install influxdb3-python pandas slack_sdk

创建一个 InfluxDB 客户端

influxdb_client_3 模块中使用 InfluxDBClient3 函数来 实例化一个 InfluxDB 客户端。 提供以下凭据:

from influxdb_client_3 import InfluxDBClient3
import pandas

# Instantiate an InfluxDBClient3 client configured for your bucket
influxdb = InfluxDBClient3(
    host='cloud2.influxdata.com',
    org='
ORG_NAME
'
,
token='
API_TOKEN
'
,
database='
BUCKET_NAME
'
)

创建一个 Slack 客户端

  1. slack.sdk 模块导入 WebClient 函数,以及从 slack_sdk.errors 模块导入 SlackApiError 函数。

  2. 使用 WebClient 函数来实例化一个 Slack 客户端。
    提供以下凭据:

from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

slack = WebClient(token='
SLACK_BOT_TOKEN
'
)

查询 InfluxDB

定义一个SQL或InfluxQL查询以检索要警报的数据。根据您想要警报的数据,您可以:

  • 在查询中包含逻辑,以便仅返回应警报的结果。
  • 查询进一步处理所需的数据,然后根据在运行时执行的处理发送警报。

下面的示例查询仅返回超过应触发警报的阈值的值。

SELECT
  selector_last(co, time)['time'] AS time,
  selector_last(co, time)['value'] AS co,
  room
FROM home
WHERE co > 10
GROUP BY room
SELECT
  LAST(co) AS co,
  room
FROM home
WHERE co > 10
GROUP BY room

执行查询

  1. 将查询字符串分配给一个变量。

  2. 使用您实例化的客户端query方法从InfluxDB查询原始数据。提供以下参数。

    • query: 要执行的查询字符串
    • 语言: sqlinfluxql
  3. 使用 to_pandas 方法将返回的 Arrow 表转化为 Pandas 数据框。

# ...

query = '''
SELECT
  selector_last(co, time)['time'] AS time,
  selector_last(co, time)['value'] AS co,
  room
FROM home
WHERE co > 10
GROUP BY room
'''

table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
# ...

query = '''
SELECT
  LAST(co) AS co,
  room
FROM home
WHERE co > 10
GROUP BY room
'''

table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()

发送警报

遍历数据框,并为每一行发送警报到Slack。

  1. 在数据框上使用 reset_index 函数以确保索引与数据框中的行数对齐。

  2. 遍历每一行,并使用您的 chat_postMessage 方法通过 Slack client 向Slack发送消息(每行一条)。提供以下参数:

    • channel: 发送警报的Slack频道。
    • text: 要发送的消息文本。使用字符串插值将每行的列值插入消息文本中。
# ...

data_frame = data_frame.reset_index()

for index, row in data_frame.iterrows():
    slack.chat_postMessage(
        channel="#
SLACK_CHANNEL
"
,
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}' )

完整警报脚本

from influxdb_client_3 import InfluxDBClient3
import pandas
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

influxdb = InfluxDBClient3(
    host='cloud2.influxdata.com',
    org='
ORG_NAME
'
,
token='
API_TOKEN
'
,
database='
BUCKET_NAME
'
) slack = WebClient(token='
SLACK_BOT_TOKEN
'
)
query = ''' SELECT selector_last(co, time)['time'] AS time, selector_last(co, time)['value'] AS co, room FROM home WHERE co > 10 GROUP BY room ''' table = influxdb_raw.query(query=query, language="sql") data_frame = table.to_pandas() data_frame = data_frame.reset_index() for index, row in data_frame.iterrows(): slack.chat_postMessage( channel="#
SLACK_CHANNEL
"
,
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}' )
from influxdb_client_3 import InfluxDBClient3
import pandas
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

influxdb = InfluxDBClient3(
    host='cloud2.influxdata.com',
    org='
ORG_NAME
'
,
token='
API_TOKEN
'
,
database='
BUCKET_NAME
'
) slack = WebClient(token='
SLACK_BOT_TOKEN
'
)
query = ''' SELECT LAST(co) AS co, room FROM home WHERE co > 10 GROUP BY room ''' table = influxdb_raw.query(query=query, language="influxql") data_frame = table.to_pandas() data_frame = data_frame.reset_index() for index, row in data_frame.iterrows(): slack.chat_postMessage( channel="#
SLACK_CHANNEL
"
,
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}' )


Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看:

InfluxDB 云端无服务器