游戏得分的实时排行榜
如果您没有正在运行的 Kapacitor 实例,请查看入门指南以在本地主机上启动和运行 Kapacitor。
今天我们是游戏开发者。 我们托管几个游戏服务器,每个服务器运行一个游戏代码的实例,每个游戏大约有一百名玩家。
我们需要建立一个排行榜,以便观众可以实时查看玩家的得分。我们还希望拥有历史数据,以便进行赛后分析,了解谁在领先以及领先了多久等。
我们将使用Kapacitor的流处理来为我们做繁重的工作。 游戏服务器可以在玩家的分数变化时随时发送一个UDP数据包,或者如果分数没有变化,至少每10秒钟发送一次。
设置
以下所有片段可以在 这里 找到
我们首先要做的就是配置Kapacitor以接收分数流。在这种情况下,分数更新得太频繁,无法将所有分数存储在InfluxDB中,所以我们将直接将它们发送到Kapacitor。像InfluxDB一样,您可以配置一个UDP侦听器。将此配置部分添加到您的Kapacitor配置的末尾。
[[udp]]
enabled = true
bind-address = ":9100"
database = "game"
retention-policy = "autogen"
此配置告诉Kapacitor监听端口 9100 接收以线性协议格式的UDP数据包。
它将限制传入的数据在 game.autogen 数据库和保留策略中。
启动Kapacitor时将其添加到配置中。
这里是一个简单的bash脚本,用于生成随机的分数数据,以便我们可以测试它而不干扰真实的游戏服务器。
#!/bin/bash
# default options: can be overridden with corresponding arguments.
host=${1-localhost}
port=${2-9100}
games=${3-10}
players=${4-100}
games=$(seq $games)
players=$(seq $players)
# Spam score updates over UDP
while true
do
for game in $games
do
game="g$game"
for player in $players
do
player="p$player"
score=$(($RANDOM % 1000))
echo "scores,player=$player,game=$game value=$score" > /dev/udp/$host/$port
done
done
sleep 0.1
done
将上述脚本放入文件 scores.sh 中并运行:
chmod +x ./scores.sh
./scores.sh
现在我们正在用假分数数据攻击Kapacitor。 我们可以让它继续运行,因为Kapacitor会丢弃 输入的数据,直到它有一个需要这些数据的任务。
定义Kapacitor任务
排行榜需要做什么?
- 获取每个玩家每场比赛的最新得分。
- 计算每场比赛的前 X 名玩家得分。
- 发布结果。
- 存储结果。
要完成第一步,我们需要缓存传入的流,并返回每个玩家每场比赛的最新得分更新。 我们的 TICKscript 将如下所示:
var topPlayerScores = stream
|from()
.measurement('scores')
// Get the most recent score for each player per game.
// Not likely that a player is playing two games but just in case.
.groupBy('game', 'player')
|window()
// keep a buffer of the last 11s of scores
// just in case a player score hasn't updated in a while
.period(11s)
// Emit the current score per player every second.
.every(1s)
// Align the window boundaries to be on the second.
.align()
|last('value')
将这个脚本放在一个名为 top_scores.tick 的文件中。
现在我们的 topPlayerScores 变量包含每个玩家最近的得分。
接下来要计算每场比赛的最高得分,我们只需要按游戏分组并运行另一个 map reduce 作业。
让我们保留每场比赛的前 15 分数。
将这些行添加到 top_scores.tick 文件中。
// Calculate the top 15 scores per game
var topScores = topPlayerScores
|groupBy('game')
|top(15, 'last', 'player')
现在,topScores 变量包含每个游戏的前 15 位玩家的得分。 我们所需要的只是能够构建我们的排行榜。 Kapacitor 可以通过 HTTPOutNode 通过 HTTP 发布得分。 我们将我们的任务命名为 top_scores;通过以下补充,最近的得分将可在 http://localhost:9092/kapacitor/v1/tasks/top_scores/top_scores 获得。
// Expose top scores over the HTTP API at the 'top_scores' endpoint.
// Now your app can just request the top scores from Kapacitor
// and always get the most recent result.
//
// http://localhost:9092/kapacitor/v1/tasks/top_scores/top_scores
topScores
|httpOut('top_scores')
最后,我们希望存储随时间变化的最高分数,以便进行深入分析,确保最佳游戏体验。
但我们不想每秒都存储分数,因为这仍然是太多的数据。
首先,我们将对数据进行抽样,每10秒仅存储一次分数。
此外,既然我们已经拥有所有数据流,我们还可以提前做一些基本分析。
目前,我们将仅进行基本的间隔分析,存储第一名玩家和第15名玩家之间的间隔。
将这些行添加到 top_scores.tick 以完成我们的任务。
// Sample the top scores and keep a score once every 10s
var topScoresSampled = topScores
|sample(10s)
// Store top fifteen player scores in InfluxDB.
topScoresSampled
|influxDBOut()
.database('game')
.measurement('top_scores')
// Calculate the max and min of the top scores.
var max = topScoresSampled
|max('top')
var min = topScoresSampled
|min('top')
// Join the max and min streams back together and calculate the gap.
max
|join(min)
.as('max', 'min')
// Calculate the difference between the max and min scores.
// Rename the max and min fields to more friendly names 'topFirst', 'topLast'.
|eval(lambda: "max.max" - "min.min", lambda: "max.max", lambda: "min.min")
.as('gap', 'topFirst', 'topLast')
// Store the fields: gap, topFirst and topLast in InfluxDB.
|influxDBOut()
.database('game')
.measurement('top_scores_gap')
由于我们要将数据写回到 InfluxDB,为我们的结果创建一个数据库 game。
curl -G 'http://localhost:8086/query?' --data-urlencode 'q=CREATE DATABASE game'
如果你不想复制粘贴这么多,这里是完整的任务 TICKscript :)
dbrp "game"."autogen"
// Define a result that contains the most recent score per player.
var topPlayerScores = stream
|from()
.measurement('scores')
// Get the most recent score for each player per game.
// Not likely that a player is playing two games but just in case.
.groupBy('game', 'player')
|window()
// keep a buffer of the last 11s of scores
// just in case a player score hasn't updated in a while
.period(11s)
// Emit the current score per player every second.
.every(1s)
// Align the window boundaries to be on the second.
.align()
|last('value')
// Calculate the top 15 scores per game
var topScores = topPlayerScores
|groupBy('game')
|top(15, 'last', 'player')
// Expose top scores over the HTTP API at the 'top_scores' endpoint.
// Now your app can just request the top scores from Kapacitor
// and always get the most recent result.
//
// http://localhost:9092/kapacitor/v1/tasks/top_scores/top_scores
topScores
|httpOut('top_scores')
// Sample the top scores and keep a score once every 10s
var topScoresSampled = topScores
|sample(10s)
// Store top fifteen player scores in InfluxDB.
topScoresSampled
|influxDBOut()
.database('game')
.measurement('top_scores')
// Calculate the max and min of the top scores.
var max = topScoresSampled
|max('top')
var min = topScoresSampled
|min('top')
// Join the max and min streams back together and calculate the gap.
max
|join(min)
.as('max', 'min')
// calculate the difference between the max and min scores.
|eval(lambda: "max.max" - "min.min", lambda: "max.max", lambda: "min.min")
.as('gap', 'topFirst', 'topLast')
// store the fields: gap, topFirst, and topLast in InfluxDB.
|influxDBOut()
.database('game')
.measurement('top_scores_gap')
定义并启用我们的任务以查看其实际效果:
kapacitor define top_scores -tick top_scores.tick
kapacitor enable top_scores
首先让我们检查HTTP输出是否正常工作。
curl 'http://localhost:9092/kapacitor/v1/tasks/top_scores/top_scores'
您应该有一个包含前15名玩家及其每场得分的JSON结果。多次调用该端点,以查看得分每秒更新一次。
现在,让我们检查InfluxDB以查看我们的历史数据。
curl \
-G 'http://localhost:8086/query?db=game' \
--data-urlencode 'q=SELECT * FROM top_scores WHERE time > now() - 5m GROUP BY game'
curl \
-G 'http://localhost:8086/query?db=game' \
--data-urlencode 'q=SELECT * FROM top_scores_gap WHERE time > now() - 5m GROUP BY game'
太好了! 艰苦的工作已经完成。 剩下的就是配置游戏服务器,以便将分数更新发送到Kapacitor,并更新观众仪表板,以从Kapacitor获取分数。