Documentation

游戏得分的实时排行榜

如果您没有正在运行的 Kapacitor 实例,请查看入门指南以在本地主机上启动和运行 Kapacitor。

今天我们是游戏开发者。 我们托管几个游戏服务器,每个服务器运行一个游戏代码的实例,每个游戏大约有一百名玩家。

我们需要建立一个排行榜,以便观众可以实时查看玩家的得分。我们还希望拥有历史数据,以便进行赛后分析,了解谁在领先以及领先了多久等。

我们将使用Kapacitor的流处理来为我们做繁重的工作。 游戏服务器可以在玩家的分数变化时随时发送一个UDP数据包,或者如果分数没有变化,至少每10秒钟发送一次。

设置

以下所有片段可以在 这里 找到

我们首先要做的就是配置Kapacitor以接收分数流。在这种情况下,分数更新得太频繁,无法将所有分数存储在InfluxDB中,所以我们将直接将它们发送到Kapacitor。像InfluxDB一样,您可以配置一个UDP侦听器。将此配置部分添加到您的Kapacitor配置的末尾。

[[udp]]
    enabled = true
    bind-address = ":9100"
    database = "game"
    retention-policy = "autogen"

此配置告诉Kapacitor监听端口 9100 接收以线性协议格式的UDP数据包。 它将限制传入的数据在 game.autogen 数据库和保留策略中。 启动Kapacitor时将其添加到配置中。

这里是一个简单的bash脚本,用于生成随机的分数数据,以便我们可以测试它而不干扰真实的游戏服务器。

#!/bin/bash

# default options: can be overridden with corresponding arguments.
host=${1-localhost}
port=${2-9100}
games=${3-10}
players=${4-100}

games=$(seq $games)
players=$(seq $players)
# Spam score updates over UDP
while true
do
    for game in $games
    do
        game="g$game"
        for player in $players
        do
            player="p$player"
            score=$(($RANDOM % 1000))
            echo "scores,player=$player,game=$game value=$score" > /dev/udp/$host/$port
        done
    done
    sleep 0.1
done

将上述脚本放入文件 scores.sh 中并运行:

chmod +x ./scores.sh
./scores.sh

现在我们正在用假分数数据攻击Kapacitor。 我们可以让它继续运行,因为Kapacitor会丢弃 输入的数据,直到它有一个需要这些数据的任务。

定义Kapacitor任务

排行榜需要做什么?

  1. 获取每个玩家每场比赛的最新得分。
  2. 计算每场比赛的前 X 名玩家得分。
  3. 发布结果。
  4. 存储结果。

要完成第一步,我们需要缓存传入的流,并返回每个玩家每场比赛的最新得分更新。 我们的 TICKscript 将如下所示:

var topPlayerScores = stream
    |from()
        .measurement('scores')
        // Get the most recent score for each player per game.
        // Not likely that a player is playing two games but just in case.
        .groupBy('game', 'player')
    |window()
        // keep a buffer of the last 11s of scores
        // just in case a player score hasn't updated in a while
        .period(11s)
        // Emit the current score per player every second.
        .every(1s)
        // Align the window boundaries to be on the second.
        .align()
    |last('value')

将这个脚本放在一个名为 top_scores.tick 的文件中。

现在我们的 topPlayerScores 变量包含每个玩家最近的得分。 接下来要计算每场比赛的最高得分,我们只需要按游戏分组并运行另一个 map reduce 作业。 让我们保留每场比赛的前 15 分数。 将这些行添加到 top_scores.tick 文件中。

// Calculate the top 15 scores per game
var topScores = topPlayerScores
    |groupBy('game')
    |top(15, 'last', 'player')

现在,topScores 变量包含每个游戏的前 15 位玩家的得分。 我们所需要的只是能够构建我们的排行榜。 Kapacitor 可以通过 HTTPOutNode 通过 HTTP 发布得分。 我们将我们的任务命名为 top_scores;通过以下补充,最近的得分将可在 http://localhost:9092/kapacitor/v1/tasks/top_scores/top_scores 获得。

// Expose top scores over the HTTP API at the 'top_scores' endpoint.
// Now your app can just request the top scores from Kapacitor
// and always get the most recent result.
//
// http://localhost:9092/kapacitor/v1/tasks/top_scores/top_scores
topScores
   |httpOut('top_scores')

最后,我们希望存储随时间变化的最高分数,以便进行深入分析,确保最佳游戏体验。 但我们不想每秒都存储分数,因为这仍然是太多的数据。 首先,我们将对数据进行抽样,每10秒仅存储一次分数。 此外,既然我们已经拥有所有数据流,我们还可以提前做一些基本分析。 目前,我们将仅进行基本的间隔分析,存储第一名玩家和第15名玩家之间的间隔。 将这些行添加到 top_scores.tick 以完成我们的任务。

// Sample the top scores and keep a score once every 10s
var topScoresSampled = topScores
    |sample(10s)

// Store top fifteen player scores in InfluxDB.
topScoresSampled
    |influxDBOut()
        .database('game')
        .measurement('top_scores')

// Calculate the max and min of the top scores.
var max = topScoresSampled
    |max('top')

var min = topScoresSampled
    |min('top')

// Join the max and min streams back together and calculate the gap.
max
    |join(min)
        .as('max', 'min')
    // Calculate the difference between the max and min scores.
    // Rename the max and min fields to more friendly names 'topFirst', 'topLast'.
    |eval(lambda: "max.max" - "min.min", lambda: "max.max", lambda: "min.min")
        .as('gap', 'topFirst', 'topLast')
    // Store the fields: gap, topFirst and topLast in InfluxDB.
    |influxDBOut()
        .database('game')
        .measurement('top_scores_gap')

由于我们要将数据写回到 InfluxDB,为我们的结果创建一个数据库 game

curl -G 'http://localhost:8086/query?' --data-urlencode 'q=CREATE DATABASE game'

如果你不想复制粘贴这么多,这里是完整的任务 TICKscript :)

dbrp "game"."autogen"

// Define a result that contains the most recent score per player.
var topPlayerScores = stream
    |from()
        .measurement('scores')
        // Get the most recent score for each player per game.
        // Not likely that a player is playing two games but just in case.
        .groupBy('game', 'player')
    |window()
        // keep a buffer of the last 11s of scores
        // just in case a player score hasn't updated in a while
        .period(11s)
        // Emit the current score per player every second.
        .every(1s)
        // Align the window boundaries to be on the second.
        .align()
    |last('value')

// Calculate the top 15 scores per game
var topScores = topPlayerScores
    |groupBy('game')
    |top(15, 'last', 'player')

// Expose top scores over the HTTP API at the 'top_scores' endpoint.
// Now your app can just request the top scores from Kapacitor
// and always get the most recent result.
//
// http://localhost:9092/kapacitor/v1/tasks/top_scores/top_scores
topScores
   |httpOut('top_scores')

// Sample the top scores and keep a score once every 10s
var topScoresSampled = topScores
    |sample(10s)

// Store top fifteen player scores in InfluxDB.
topScoresSampled
    |influxDBOut()
        .database('game')
        .measurement('top_scores')

// Calculate the max and min of the top scores.
var max = topScoresSampled
    |max('top')

var min = topScoresSampled
    |min('top')

// Join the max and min streams back together and calculate the gap.
max
    |join(min)
        .as('max', 'min')
    // calculate the difference between the max and min scores.
    |eval(lambda: "max.max" - "min.min", lambda: "max.max", lambda: "min.min")
        .as('gap', 'topFirst', 'topLast')
    // store the fields: gap, topFirst, and topLast in InfluxDB.
    |influxDBOut()
        .database('game')
        .measurement('top_scores_gap')

定义并启用我们的任务以查看其实际效果:

kapacitor define top_scores -tick top_scores.tick
kapacitor enable top_scores

首先让我们检查HTTP输出是否正常工作。

curl 'http://localhost:9092/kapacitor/v1/tasks/top_scores/top_scores'

您应该有一个包含前15名玩家及其每场得分的JSON结果。多次调用该端点,以查看得分每秒更新一次。

现在,让我们检查InfluxDB以查看我们的历史数据。

curl \
    -G 'http://localhost:8086/query?db=game' \
    --data-urlencode 'q=SELECT * FROM top_scores  WHERE time > now() - 5m GROUP BY game'

curl \
    -G 'http://localhost:8086/query?db=game' \
    --data-urlencode 'q=SELECT * FROM top_scores_gap WHERE time > now() - 5m GROUP BY game'

太好了! 艰苦的工作已经完成。 剩下的就是配置游戏服务器,以便将分数更新发送到Kapacitor,并更新观众仪表板,以从Kapacitor获取分数。



Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看: