Spark-Milvus 连接器用户指南
Spark-Milvus 连接器 (https://github.com/zilliztech/spark-milvus) 实现了 Apache Spark 和 Milvus 之间的无缝集成,将 Apache Spark 的数据处理和机器学习功能与 Milvus 的向量数据存储和搜索能力结合起来。这种集成实现了各种有趣的应用,包括:
- 高效地将向量数据批量加载到 Milvus 中,
- 在 Milvus 和其他存储系统或数据库之间移动数据,
- 利用 Spark MLlib 和其他人工智能工具分析 Milvus 中的数据。
快速开始
准备工作
Spark-Milvus 连接器支持 Scala 和 Python 编程语言。用户可以在 Pyspark 或 Spark-shell 中使用它。要运行此演示,请按照以下步骤设置包含 Spark-Milvus 连接器依赖项的 Spark 环境:
-
安装 Apache Spark (版本 >= 3.3.0)
您可以参考 官方文档 安装 Apache Spark。
-
下载 spark-milvus jar 文件。
wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar
-
使用 spark-milvus jar 作为依赖项启动 Spark 运行时。
要启动带有 Spark-Milvus 连接器的 Spark 运行时,请将下载的 spark-milvus 添加为依赖项。
-
pyspark
./bin/pyspark --jars spark-milvus-1.0.0-SNAPSHOT.jar
-
spark-shell
./bin/spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar
-
演示
在此演示中,我们创建一个带有向量数据的示例 Spark DataFrame,并通过 Spark-Milvus 连接器将其写入 Milvus。将根据模式和指定的选项自动在 Milvus 中创建一个集合。
from pyspark.sql import SparkSession
columns = ["id", "text", "vec"]
data = [(1, "a", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(2, "b", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(3, "c", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(4, "d", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0])]
sample_df = spark.sparkContext.parallelize(data).toDF(columns)
sample_df.write \
.mode("append") \
.option("milvus.host", "localhost") \
.option("milvus.port", "19530") \
.option("milvus.collection.name", "hello_spark_milvus") \
.option("milvus.collection.vectorField", "vec") \
.option("milvus.collection.vectorDim", "8") \
.option("milvus.collection.primaryKeyField", "id") \
.format("milvus") \
.save()
import org.apache.spark.sql.{SaveMode, SparkSession}
object Hello extends App {
val spark = SparkSession.builder().master("local[*]")
.appName("HelloSparkMilvus")
.getOrCreate()
import spark.implicits._
// 创建 DataFrame
val sampleDF = Seq(
(1, "a", Seq(1.0,2.0,3.0,4.0,5.0)),
(2, "b", Seq(1.0,2.0,3.0,4.0,5.0)),
(3, "c", Seq(1.0,2.0,3.0,4.0,5.0)),
(4, "d", Seq(1.0,2.0,3.0,4.0,5.0))
).toDF("id", "text", "vec")
// 设置 Milvus 选项
val milvusOptions = Map(
"milvus.host" -> "localhost" -> uri,
"milvus.port" -> "19530",
"milvus.collection.name" -> "hello_spark_milvus",
"milvus.collection.vectorField" -> "vec",
"milvus.collection.vectorDim" -> "5",
"milvus.collection.primaryKeyField", "id"
)
sampleDF.write.format("milvus")
.options(milvusOptions)
.mode(SaveMode.Append)
.save()
}
在执行上述代码后,您可以使用 SDK 或 Attu(一个 Milvus 仪表板)查看 Milvus 中插入的数据。您可以找到一个名为 hello_spark_milvus
的集合,其中已经插入了 4 个实体。