Documentation

使用Python和Quix Streams对数据进行降采样

使用 Quix Streams 查询存储在 InfluxDB 中的时间序列数据,并定期写入 Kafka,持续对其进行降采样,然后将降采样的数据写回 InfluxDB。Quix Streams 是一个开源的 Python 库,用于构建基于 Apache Kafka 的容器化流处理应用程序。它设计为一个服务,持续处理数据流,同时将结果流式传输到 Kafka 主题。您可以在本地尝试,使用本地 Kafka 安装,或在 Quix Cloud 上免费试用。处理高容量数据时,一个常见的做法是在将其提交到 InfluxDB 之前进行降采样,以减少随着时间推移而积累的总体磁盘使用量。

本指南介绍了创建一系列Python服务的过程,这些服务从InfluxDB v2存储桶中获取数据,然后对数据进行降采样并将其发布到另一个InfluxDB v2存储桶。通过在时间窗口内聚合数据,然后将聚合值存储回InfluxDB,您可以减少磁盘使用和长期成本。

本指南使用 InfluxDB v2 和 Quix Streams Python 客户端库,可以在本地运行或在 Quix Cloud 中部署,并提供免费试用。假设您已设置好 Python 项目和虚拟环境。

管道架构

下图展示了数据在被降采样时如何在进程之间传递:

InfluxDB v2 源生产者

下采样过程

InfluxDB v2 接收消费端

通常直接将原始数据写入Kafka比先将原始数据写入InfluxDB更高效(本质上是用“influxv2-data”主题启动Quix Streams管道)。然而,本指南假设您已经在InfluxDB中拥有了您想要降采样的原始数据。


  1. 设置前提条件
  2. 安装依赖
  3. 准备InfluxDB桶
  4. 创建下采样逻辑
  5. 创建生产者和消费者客户端
    1. 创建生产者
    2. 创建消费者
  6. 运行机器数据生成器
  7. 获取完整的下采样代码文件

设置前提条件

本指南中描述的过程需要以下内容:

安装依赖

使用 pip 安装以下依赖项:

  • influxdb-client(InfluxDB v2 客户端库)
  • quixstreams<2.5 (Quixstreams 客户端库)
  • pandas(数据分析和处理工具)
pip install influxdb-client pandas quixstreams<2.5 

准备InfluxDB桶

降采样过程涉及两个 InfluxDB 存储桶。 每个存储桶都有一个 保留期,指定数据在过期并被删除之前持续多久。 通过使用两个存储桶,您可以在保留期较短的存储桶中存储未修改的高分辨率数据,然后在保留期较长的存储桶中存储降采样的低分辨率数据。

确保您为以下每一项准备一个桶:

  • 从您的 InfluxDB v2 集群查询未修改的数据
  • 另一个用于写入降采样数据的地方

创建下采样逻辑

该过程读取来自输入Kafka主题的原始数据,该主题存储从InfluxDB v2桶中流式传输的数据,进行降采样,然后将其发送到输出主题,该主题稍后将写回到另一个桶中。

  1. 使用 Quix Streams 库的 Application 类来初始化与 Kafka 主题的连接。

    from quixstreams import Application
    
    app = Application(consumer_group="downsampling-process", auto_offset_reset="earliest")
    input_topic = app.topic("input")
    output_topic = app.topic("output")
    # ...
    
  2. 配置Quix Streams内置窗口函数,以创建一个持续将数据下采样到1分钟桶的翻滚窗口。

    # ...
    target_field = "temperature" # The field that you want to downsample.
    
    def custom_ts_extractor(value):
        # ...
        # truncated for brevity - custom code that defines the "time_recorded"
        # field as the timestamp to use for windowing...
    
    topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor)
    
    sdf = (
        sdf.apply(lambda value: value[target_field])  # Extract temperature values
        .tumbling_window(timedelta(minutes=1))        # 1-minute tumbling windows
        .mean()                                       # Calculate average temperature
        .final()                                      # Emit results at window completion
    )
    
    sdf = sdf.apply(
        lambda value: {
            "time": value["end"],                  # End of the window
            "temperature_avg": value["value"],     # Average temperature
        }
    )
    
    sdf.to_topic(output_topic) # Output results to the "downsampled" topic
    # ...
    

结果被流式传输到 Kafka 主题, downsampled

注意:“sdf”代表“流式数据框”。

您可以在Quix GitHub repository中找到此过程的完整代码。

创建生产者和消费者客户端

使用 influxdb_clientquixstreams 模块来实例化两个与 InfluxDB 和 Kafka 交互的客户端:

  • A producer client configured to read from your InfluxDB bucket with 未修改的 data and 生成 that data to Kafka.
  • 一个消费者客户端,被配置为消费来自Kafka的数据并将降采样的数据写入相应的InfluxDB桶。

创建生产者

提供以下制片人的凭证:

  • INFLUXDB_HOST: InfluxDB云(TSM)区域URL (不带协议)
  • INFLUXDB_ORG: InfluxDB 组织名称
  • INFLUXDB_TOKEN: InfluxDB API 令牌,具有对您想要查询和写入的存储桶的读取和写入权限。
  • INFLUXDB_BUCKET: InfluxDB 存储桶名称

生产者按特定间隔从InfluxDB查询新数据。它将原始数据写入名为 influxv2-data 的Kafka主题。

from quixstreams import Application
import influxdb_client
# Create a Quix Application
app = Application(consumer_group="influxdbv2_migrate", auto_create_topics=True)
# Define the topic using the "output" environment variable
topic = app.topic(os.getenv("output", "influxv2-data"))
# Create an InfluxDB v2 client
influxdb2_client = influxdb_client.InfluxDBClient(token=os.environ["INFLUXDB_TOKEN"],
                        org=os.environ["INFLUXDB_ORG"],
                        url=os.environ["INFLUXDB_HOST"])

## ... remaining code trunctated for brevity ...

# Function to fetch data from InfluxDB
# It runs in a continuous loop, periodically fetching data based on the interval.
def get_data():
    # Run in a loop until the main thread is terminated
    while run:
        try:            
            # Query InfluxDB 2.0 using flux
            flux_query = f'''
            from(bucket: "{bucket}")
                |> range(start: -{interval})
                |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
            '''
            logger.info(f"Sending query: {flux_query}")

## ... remaining code trunctated for brevity ...

# Create a pre-configured Producer object.
with app.get_producer() as producer:
    for res in get_data():
        # Get the data from InfluxDB
        records = json.loads(res)
        for index, obj in enumerate(records):
            logger.info(f"Produced message with key:{message_key}, value:{obj}")
            # Publish the data to the Kafka topic
            producer.produce(
                topic=topic.name,
                key=message_key,
                value=obj,
            ) 

您可以在Quix GitHub repository中找到此过程的完整代码。

创建消费者

与之前一样,为消费者提供以下凭据:

  • INFLUXDB_HOST: InfluxDB Cloud (TSM) 区域 URL (不带协议)
  • INFLUXDB_ORG: InfluxDB 组织名称
  • INFLUXDB_TOKEN: InfluxDB API 令牌,具有对您想要查询和写入的存储桶的读取和写入权限。
  • INFLUXDB_BUCKET: InfluxDB 存储桶名称

注意:这些将是您的 InfluxDB v2 凭据。

该过程从Kafka主题 downsampled-data 读取消息,并将每条消息作为点字典写回InfluxDB。

from quixstreams import Application, State
from influxdb_client import InfluxDBClient, Point

# Create a Quix platform-specific application instead
app = Application(consumer_group=consumer_group_name, auto_offset_reset="earliest", use_changelog_topics=False)

input_topic = app.topic(os.getenv("input", "input-data"))

# Initialize InfluxDB v2 client
influx2_client = InfluxDBClient(url=influx_host,
                                token=influx_token,
                                org=influx_org)

## ... remaining code trunctated for brevity ...

def send_data_to_influx(message: dict, state: State):
    global last_write_time_ns, points_buffer, service_start_state

    try:
        ## ... code trunctated for brevity ...

        # Check if it's time to write the batch
        # 10k records have accumulated or 15 seconds have passed
        if len(points_buffer) >= 10000 or int(time() * 1e9) - last_write_time_ns >= 15e9:
            with influx2_client.write_api() as write_api:
                logger.info(f"Writing batch of {len(points_buffer)} points written to InfluxDB.")
                write_api.write(influx_bucket, influx_org, points_buffer)

            # Clear the buffer and update the last write time
            points_buffer = []
            last_write_time_ns = int(time() * 1e9)
        
            ## ... code trunctated for brevity ...

    except Exception as e:
        logger.info(f"{str(datetime.utcnow())}: Write failed")
        logger.info(e)

## ... code trunctated for brevity ...

# We use Quix Streams StreamingDataframe (SDF) to handle every message
# in the Kafka topic by writing it to InfluxDB
sdf = app.dataframe(input_topic)
sdf = sdf.update(send_data_to_influx, stateful=True)

if __name__ == "__main__":
    logger.info("Starting application")
    app.run(sdf)

您可以在Quix GitHub repository中找到此过程的完整代码。

运行机器数据生成器

现在是时候运行机器数据生成器代码了,该代码将用数据填充您的源 存储桶,这些数据将被生产者读取。

从GitHub仓库中的 Machine data to InfluxDB 文件夹运行 main.py

获取完整的下采样代码文件

要获取本教程中引用的完整文件集,请克隆Quix的“downsampling”存储库。

克隆降采样模板库

要克隆下采样模板,请在命令行中输入以下命令:

git clone https://github.com/quixio/template-invluxdbv2-tsm-downsampling.git

该仓库包含以下文件夹,存储整个流程的不同部分:

  • 机器数据到 InfluxDB: 一个生成合成机器数据并将其写入 InfluxDB 的脚本。这在您还没有自己的数据,或者只想先使用测试数据时非常有用。

    • It produces a reading every 250 milliseconds.
    • This script originally comes from the InfluxCommunity repository but has been adapted to write directly to InfluxDB rather than using an MQTT broker.
  • InfluxDB v2 数据源: 一项服务,定期从InfluxDB中查询新数据。它配置为查找由之前提到的合成机器数据生成器生成的测量值。它将原始数据写入名为“influxv2-data”的Kafka主题。

  • 降采样器:一个对来自InfluxDB的数据执行1分钟滚动窗口操作的服务,并每分钟发出“温度”读数的平均值。它将输出写入一个“降采样”Kafka主题。

  • InfluxDB v2 数据接收器: 一种从“下采样”的主题中读取并将下采样记录作为点写回到 InfluxDB 的服务。



Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看:

由TSM驱动的InfluxDB Cloud