使用Python和Quix Streams对数据进行降采样
使用 Quix Streams 查询存储在 InfluxDB 中的时间序列数据,并定期写入 Kafka,持续对其进行降采样,然后将降采样的数据写回 InfluxDB。Quix Streams 是一个开源的 Python 库,用于构建基于 Apache Kafka 的容器化流处理应用程序。它设计为一个服务,持续处理数据流,同时将结果流式传输到 Kafka 主题。您可以在本地尝试,使用本地 Kafka 安装,或在 Quix Cloud 上免费试用。处理高容量数据时,一个常见的做法是在将其提交到 InfluxDB 之前进行降采样,以减少随着时间推移而积累的总体磁盘使用量。
本指南介绍了创建一系列Python服务的过程,这些服务从InfluxDB v2存储桶中获取数据,然后对数据进行降采样并将其发布到另一个InfluxDB v2存储桶。通过在时间窗口内聚合数据,然后将聚合值存储回InfluxDB,您可以减少磁盘使用和长期成本。
本指南使用 InfluxDB v2 和 Quix Streams Python 客户端库,可以在本地运行或在 Quix Cloud 中部署,并提供免费试用。假设您已设置好 Python 项目和虚拟环境。
管道架构
下图展示了数据在被降采样时如何在进程之间传递:
InfluxDB v2 源生产者
下采样过程
InfluxDB v2 接收消费端
通常直接将原始数据写入Kafka比先将原始数据写入InfluxDB更高效(本质上是用“influxv2-data”主题启动Quix Streams管道)。然而,本指南假设您已经在InfluxDB中拥有了您想要降采样的原始数据。
设置前提条件
本指南中描述的过程需要以下内容:
- InfluxDB v2,数据准备好进行下采样。使用下面的机器数据生成器代码.
- 一个 Quix Cloud 账户或本地的 Apache Kafka 或 Red Panda 安装。
- 熟悉基本的Python和Docker概念。
安装依赖
使用 pip 安装以下依赖项:
influxdb-client(InfluxDB v2 客户端库)quixstreams<2.5(Quixstreams 客户端库)pandas(数据分析和处理工具)
pip install influxdb-client pandas quixstreams<2.5
准备InfluxDB桶
降采样过程涉及两个 InfluxDB 存储桶。 每个存储桶都有一个 保留期,指定数据在过期并被删除之前持续多久。 通过使用两个存储桶,您可以在保留期较短的存储桶中存储未修改的高分辨率数据,然后在保留期较长的存储桶中存储降采样的低分辨率数据。
确保您为以下每一项准备一个桶:
- 从您的 InfluxDB v2 集群查询未修改的数据
- 另一个用于写入降采样数据的地方
创建下采样逻辑
该过程读取来自输入Kafka主题的原始数据,该主题存储从InfluxDB v2桶中流式传输的数据,进行降采样,然后将其发送到输出主题,该主题稍后将写回到另一个桶中。
使用 Quix Streams 库的
Application类来初始化与 Kafka 主题的连接。from quixstreams import Application app = Application(consumer_group="downsampling-process", auto_offset_reset="earliest") input_topic = app.topic("input") output_topic = app.topic("output") # ...配置Quix Streams内置窗口函数,以创建一个持续将数据下采样到1分钟桶的翻滚窗口。
# ... target_field = "temperature" # The field that you want to downsample. def custom_ts_extractor(value): # ... # truncated for brevity - custom code that defines the "time_recorded" # field as the timestamp to use for windowing... topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor) sdf = ( sdf.apply(lambda value: value[target_field]) # Extract temperature values .tumbling_window(timedelta(minutes=1)) # 1-minute tumbling windows .mean() # Calculate average temperature .final() # Emit results at window completion ) sdf = sdf.apply( lambda value: { "time": value["end"], # End of the window "temperature_avg": value["value"], # Average temperature } ) sdf.to_topic(output_topic) # Output results to the "downsampled" topic # ...
结果被流式传输到 Kafka 主题, downsampled。
注意:“sdf”代表“流式数据框”。
您可以在Quix GitHub repository中找到此过程的完整代码。
创建生产者和消费者客户端
使用 influxdb_client 和 quixstreams 模块来实例化两个与 InfluxDB 和 Kafka 交互的客户端:
- A producer client configured to read from your InfluxDB bucket with 未修改的 data and 生成 that data to Kafka.
- 一个消费者客户端,被配置为消费来自Kafka的数据并将降采样的数据写入相应的InfluxDB桶。
创建生产者
提供以下制片人的凭证:
- INFLUXDB_HOST: InfluxDB云(TSM)区域URL (不带协议)
- INFLUXDB_ORG: InfluxDB 组织名称
- INFLUXDB_TOKEN: InfluxDB API 令牌,具有对您想要查询和写入的存储桶的读取和写入权限。
- INFLUXDB_BUCKET: InfluxDB 存储桶名称
生产者按特定间隔从InfluxDB查询新数据。它将原始数据写入名为 influxv2-data 的Kafka主题。
from quixstreams import Application
import influxdb_client
# Create a Quix Application
app = Application(consumer_group="influxdbv2_migrate", auto_create_topics=True)
# Define the topic using the "output" environment variable
topic = app.topic(os.getenv("output", "influxv2-data"))
# Create an InfluxDB v2 client
influxdb2_client = influxdb_client.InfluxDBClient(token=os.environ["INFLUXDB_TOKEN"],
org=os.environ["INFLUXDB_ORG"],
url=os.environ["INFLUXDB_HOST"])
## ... remaining code trunctated for brevity ...
# Function to fetch data from InfluxDB
# It runs in a continuous loop, periodically fetching data based on the interval.
def get_data():
# Run in a loop until the main thread is terminated
while run:
try:
# Query InfluxDB 2.0 using flux
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: -{interval})
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
logger.info(f"Sending query: {flux_query}")
## ... remaining code trunctated for brevity ...
# Create a pre-configured Producer object.
with app.get_producer() as producer:
for res in get_data():
# Get the data from InfluxDB
records = json.loads(res)
for index, obj in enumerate(records):
logger.info(f"Produced message with key:{message_key}, value:{obj}")
# Publish the data to the Kafka topic
producer.produce(
topic=topic.name,
key=message_key,
value=obj,
)
您可以在Quix GitHub repository中找到此过程的完整代码。
创建消费者
与之前一样,为消费者提供以下凭据:
- INFLUXDB_HOST: InfluxDB Cloud (TSM) 区域 URL (不带协议)
- INFLUXDB_ORG: InfluxDB 组织名称
- INFLUXDB_TOKEN: InfluxDB API 令牌,具有对您想要查询和写入的存储桶的读取和写入权限。
- INFLUXDB_BUCKET: InfluxDB 存储桶名称
注意:这些将是您的 InfluxDB v2 凭据。
该过程从Kafka主题 downsampled-data 读取消息,并将每条消息作为点字典写回InfluxDB。
from quixstreams import Application, State
from influxdb_client import InfluxDBClient, Point
# Create a Quix platform-specific application instead
app = Application(consumer_group=consumer_group_name, auto_offset_reset="earliest", use_changelog_topics=False)
input_topic = app.topic(os.getenv("input", "input-data"))
# Initialize InfluxDB v2 client
influx2_client = InfluxDBClient(url=influx_host,
token=influx_token,
org=influx_org)
## ... remaining code trunctated for brevity ...
def send_data_to_influx(message: dict, state: State):
global last_write_time_ns, points_buffer, service_start_state
try:
## ... code trunctated for brevity ...
# Check if it's time to write the batch
# 10k records have accumulated or 15 seconds have passed
if len(points_buffer) >= 10000 or int(time() * 1e9) - last_write_time_ns >= 15e9:
with influx2_client.write_api() as write_api:
logger.info(f"Writing batch of {len(points_buffer)} points written to InfluxDB.")
write_api.write(influx_bucket, influx_org, points_buffer)
# Clear the buffer and update the last write time
points_buffer = []
last_write_time_ns = int(time() * 1e9)
## ... code trunctated for brevity ...
except Exception as e:
logger.info(f"{str(datetime.utcnow())}: Write failed")
logger.info(e)
## ... code trunctated for brevity ...
# We use Quix Streams StreamingDataframe (SDF) to handle every message
# in the Kafka topic by writing it to InfluxDB
sdf = app.dataframe(input_topic)
sdf = sdf.update(send_data_to_influx, stateful=True)
if __name__ == "__main__":
logger.info("Starting application")
app.run(sdf)
您可以在Quix GitHub repository中找到此过程的完整代码。
运行机器数据生成器
现在是时候运行机器数据生成器代码了,该代码将用数据填充您的源 存储桶,这些数据将被生产者读取。
从GitHub仓库中的 Machine data to InfluxDB 文件夹运行 main.py。
获取完整的下采样代码文件
要获取本教程中引用的完整文件集,请克隆Quix的“downsampling”存储库。
克隆降采样模板库
要克隆下采样模板,请在命令行中输入以下命令:
git clone https://github.com/quixio/template-invluxdbv2-tsm-downsampling.git
该仓库包含以下文件夹,存储整个流程的不同部分:
机器数据到 InfluxDB: 一个生成合成机器数据并将其写入 InfluxDB 的脚本。这在您还没有自己的数据,或者只想先使用测试数据时非常有用。
- It produces a reading every 250 milliseconds.
- This script originally comes from the InfluxCommunity repository but has been adapted to write directly to InfluxDB rather than using an MQTT broker.
InfluxDB v2 数据源: 一项服务,定期从InfluxDB中查询新数据。它配置为查找由之前提到的合成机器数据生成器生成的测量值。它将原始数据写入名为“influxv2-data”的Kafka主题。
降采样器:一个对来自InfluxDB的数据执行1分钟滚动窗口操作的服务,并每分钟发出“温度”读数的平均值。它将输出写入一个“降采样”Kafka主题。
InfluxDB v2 数据接收器: 一种从“下采样”的主题中读取并将下采样记录作为点写回到 InfluxDB 的服务。