Skip to content

流数据分析

关于 Bytewax

Bytewax 是一个专为 Python 开发者设计的开源流处理框架。它允许用户构建类似于 Flink、Spark 和 Kafka Streams 的流数据管道和实时应用程序,同时提供友好且熟悉的接口,并完全兼容 Python 生态系统。

使用 Bytewax 和 ydata-profiling 进行流处理

数据分析是任何机器学习任务成功开始的关键步骤,指的是深入理解我们的数据:其结构、行为和质量。简而言之,数据分析涉及分析与数据格式和基本描述符(例如,样本数量、特征数量/类型、重复值)、其内在特征(如缺失数据或不平衡特征的存在)以及在数据收集或处理过程中可能出现的其他复杂因素(例如,错误值或不一致特征)相关的方面。

包版本

与 bytewax 的集成适用于 ydata-profiling 的任何版本 >=3.0.0

模拟流数据

以下代码用于模拟数据流。当有可用的流数据源时,这并不需要。

导入
1
2
3
4
5
6
from datetime import datetime, timedelta, timezone

from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput
from bytewax.connectors.files import CSVInput
from bytewax.testing import run_main

然后,我们定义数据流对象。之后,我们将使用一个无状态的 map 方法,在其中传入一个函数将字符串转换为 datetime 对象,并将数据重新结构化为格式 (device_id, data)。map 方法将以无状态的方式对每个数据点进行更改。我们修改数据形状的原因是为了在下一步中可以轻松地按设备分组数据,以便分别为每个设备而不是所有设备同时进行数据分析。

设置数据流
flow = Dataflow()
flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000"))

# 解析时间戳
def parse_time(reading_data):
    reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc)
    return reading_data

flow.map(parse_time)

# 重新映射格式为元组 (device_id, reading_data)
flow.map(lambda reading_data: (reading_data["device"], reading_data))

现在,我们将利用 bytewax 的有状态能力,在定义的时间段内为每个设备收集数据。ydata-profiling 期望在一段时间内的数据快照,这使得 window 操作符成为实现这一目标的完美方法。

在 ydata-profiling 中,我们能够为特定上下文指定的数据框生成汇总统计信息。例如,在这个示例中,我们可以生成引用每个 IoT 设备或特定时间框架的数据快照:

分析流数据快照

分析不同的数据快照
from bytewax.window import EventClockConfig, TumblingWindow

# 这是累加器函数,输出一个读数列表
def acc_values(acc, reading):
    acc.append(reading)
    return acc

# 此函数指示事件时钟如何从输入中检索事件的日期时间。
def get_time(reading):
    return reading["ts"]


# 配置 `fold_window` 操作符以使用事件时间。
cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30))

# 以及一个翻滚窗口
align_to = datetime(2020, 1, 1, tzinfo=timezone.utc)
wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1))

flow.fold_window("running_average", cc, wc, list, acc_values)

flow.inspect(print)

在定义了快照之后,利用 ydata-profiling 就像为每个我们希望分析的数据框调用 ProfileReport 一样简单:

import pandas as pd
from ydata_profiling import ProfileReport


def profile(device_id__readings):
    print(device_id__readings)
    device_id, readings = device_id__readings
    start_time = (
        readings[0]["ts"]
        .replace(minute=0, second=0, microsecond=0)
        .strftime("%Y-%m-%d %H:%M:%S")
    )
    df = pd.DataFrame(readings)
    profile = ProfileReport(
        df, tsmode=True, sortby="ts", title=f"传感器读数 - 设备: {device_id}"
    )

    profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html")
    return f"设备 {device_id}{start_time} 时刻被分析"


flow.map(profile)
在这个示例中,我们将图像作为映射方法中函数的一部分写入本地文件。这些图像可以通过消息工具报告出来,或者我们可以在未来将它们保存到某些远程存储中。一旦分析完成,数据流期望有一些输出,因此我们可以使用内置的[StdOutput]{.title-ref}来打印被分析的设备以及在映射步骤中从分析函数传递出来的分析时间:

flow.output("out", StdOutput())

有多种方式可以执行Bytewax数据流。在这个示例中,我们使用同一台本地机器,但Bytewax也可以在多个Python进程中运行,跨越多台主机,在Docker容器中运行,使用Kubernetes集群,以及更多。在这个示例中,我们将继续使用本地设置,但我们鼓励您查看waxctl,它可以在您的管道准备好过渡到生产环境时管理Kubernetes数据流部署。

假设我们在包含数据流定义文件的同一目录中,我们可以使用以下命令运行它:

python -m bytewax.run ydata-profiling-streaming:flow

然后,我们可以使用分析报告来验证数据质量,检查模式或数据格式的变化,并比较不同设备或时间窗口之间的数据特征。

我们可以进一步利用比较报告功能,该功能以直接的方式突出显示两个数据分析之间的差异,使我们更容易检测需要调查的重要模式或需要解决的问题:

比较不同的流
1
2
3
4
5
6
7
#为每个流生成分析
snapshot_a_report = ProfileReport(df_a, title="Snapshot A")
snapshot_b_report = ProfileReport(df_b, title="Snapshot B")

#比较生成的分析
comparison_report = snapshot_a_report.compare(snapshot_b_report)
comparison_report.to_file("comparison_report.html")

现在您已经准备好开始探索您的数据流了!Bytewax负责处理和将数据流结构化为快照所需的所有过程,这些快照可以通过ydata-profiling进行总结和比较,生成一份全面的数据特征报告。