pyspark.sql.DataFrame ¶
-
class
pyspark.sql.DataFrame( jdf : py4j.java_gateway.JavaObject , sql_ctx : Union [ SQLContext , SparkSession ] ) [source] ¶ -
一个分布式的数据集合,按命名列分组。
新增于版本 1.3.0。
在版本 3.4.0 中更改: 支持 Spark Connect。
注释
DataFrame 应仅如上所述创建。不应通过使用构造函数直接创建。
示例
一个
DataFrame等同于 Spark SQL 中的一个关系表, 并且可以通过SparkSession中的各种函数来创建:>>> people = spark.createDataFrame([ ... {"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50}, ... {"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100}, ... {"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150}, ... {"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200} ... ])
一旦创建,可以使用在以下定义的各种领域特定语言(DSL)函数进行操作:
DataFrame、Column。要从
DataFrame中选择一列,请使用 apply 方法:>>> age_col = people.age
一个更具体的例子:
>>> # To create DataFrame using SparkSession ... department = spark.createDataFrame([ ... {"id": 1, "name": "PySpark"}, ... {"id": 2, "name": "ML"}, ... {"id": 3, "name": "Spark SQL"} ... ])
>>> people.filter(people.age > 30).join( ... department, people.deptId == department.id).groupBy( ... department.name, "gender").agg({"salary": "avg", "age": "max"}).show() +-------+------+-----------+--------+ | name|gender|avg(salary)|max(age)| +-------+------+-----------+--------+ | ML| F| 150.0| 60| |PySpark| M| 75.0| 50| +-------+------+-----------+--------+
方法
agg(*exprs)在整个
DataFrame上进行聚合,不进行分组(df.groupBy().agg()的简写)。alias(别名)返回一个设置了别名的新
DataFrame。approxQuantile(列, 概率, 相对误差)计算
DataFrame数值列的近似分位数。cache()使用默认存储级别( MEMORY_AND_DISK_DESER )持久化
DataFrame。checkpoint([eager])返回此
DataFrame的检查点版本。coalesce(numPartitions)返回一个具有确切 numPartitions 分区的新
DataFrame。colRegex(colName)根据作为正则表达式指定的列名选择列,并将其返回为
Column。collect()返回所有记录作为
Row的列表。corr(col1, col2[, method])计算
DataFrame中两列的相关性,返回一个双精度值。count()返回此
DataFrame中的行数。cov(col1, col2)计算给定列的样本协方差,这些列由它们的名称指定,结果为双精度值。
createGlobalTempView(名称)使用此
DataFrame创建一个全局临时视图。使用给定的名称创建或替换一个全局临时视图。
创建或替换一个本地临时视图,使用这个
DataFrame。createTempView(名称)使用此
DataFrame创建一个本地临时视图。crossJoin(其他)返回与另一个
DataFrame的笛卡尔积。crosstab(列1, 列2)计算给定列的对频率表。
cube(*cols)为当前的
DataFrame使用指定的列创建一个多维立方体,以便我们可以对它们运行聚合操作。describe(*cols)计算数值和字符串列的基本统计信息。
distinct()drop(*cols)返回一个新的
DataFrame,不包含指定的列。dropDuplicates([subset])返回一个新的
DataFrame,其中删除了重复的行,可以选择仅考虑某些列。dropDuplicatesWithinWatermark([subset])返回一个新的
DataFrame,其中删除了重复的行,drop_duplicates([subset])dropna([how, thresh, subset])返回一个新的
DataFrame,省略包含空值的行。exceptAll(other)返回一个新的
DataFrame,包含在此DataFrame中但不在另一个DataFrame中的行,同时保留重复项。explain([extended, mode])打印逻辑和物理计划到控制台以进行调试。
fillna(值[, 子集])替换空值,
na.fill()的别名。filter(条件)使用给定的条件过滤行。
first()返回第一行作为
Row。foreach(f)foreachPartition(f)将
f函数应用于这个DataFrame的每个分区。freqItems(cols[, support])查找列中的频繁项,可能带有误报。
groupBy(*cols)使用指定的列对
DataFrame进行分组,以便我们可以对其运行聚合操作。groupby(*cols)groupby()是groupBy()的别名。head([n])返回前
n行。hint(名称, *参数)指定当前
DataFrame的一些提示。inputFiles()返回一个尽力而为的快照,该快照由组成此
DataFrame的文件组成。intersect(其他)intersectAll(其他)返回一个新的
DataFrame,包含此DataFrame和另一个DataFrame中的所有行,同时保留重复项。isEmpty()检查
DataFrame是否为空并返回一个布尔值。isLocal()join(其他[, on, how])与另一个
DataFrame进行连接,使用给定的连接表达式。limit(数量)将结果数量限制为指定的数量。
localCheckpoint([eager])返回此
DataFrame的本地检查点版本。mapInArrow(函数, 模式[, 屏障])使用一个Python原生函数映射当前
DataFrame中的批次迭代器,该函数接受并输出PyArrow的 RecordBatch ,并返回结果作为DataFrame。mapInPandas(func, schema[, barrier])使用一个Python原生函数映射当前
DataFrame中的批次迭代器,该函数接受并输出一个pandas DataFrame,并返回结果作为DataFrame。melt(ids, values, variableColumnName, …)将DataFrame从宽格式透视为长格式,可以选择保留标识符列集。
observe(观察, *表达式)定义(命名)要在 DataFrame 上观察的指标。
offset(num)返回一个新的 :class: DataFrame ,通过跳过前 n 行。
orderBy(*cols, **kwargs)返回一个新的
DataFrame,按指定的列排序。pandas_api([index_col])将现有的 DataFrame 转换为 pandas-on-Spark DataFrame。
persist([storageLevel])设置存储级别以在第一次计算后跨操作持久化
DataFrame的内容。printSchema([level])以树形格式打印出模式。
randomSplit(权重[, 种子])随机拆分此
DataFrame并提供权重。registerTempTable(名称)将此
DataFrame使用给定的名称注册为临时表。repartition(numPartitions, *cols)返回一个新的
DataFrame,由给定的分区表达式进行分区。repartitionByRange(numPartitions, *cols)返回一个新的
DataFrame,由给定的分区表达式进行分区。replace(要替换的值[, 替换值, 子集])返回一个新的
DataFrame,用另一个值替换某个值。rollup(*cols)为当前的
DataFrame创建一个多维度的汇总,使用指定的列,以便我们可以对它们进行聚合。sameSemantics(其他)当两个
DataFrame中的逻辑查询计划相等时,返回 True ,因此它们返回相同的结果。sample([withReplacement, fraction, seed])返回此
DataFrame的采样子集。sampleBy(列, 比例[, 种子])返回基于每个层级给定比例的分层抽样,且不进行替换。
select(*cols)将一组表达式投影并返回一个新的
DataFrame。selectExpr(*expr)将一组SQL表达式投影并返回一个新的
DataFrame。semanticHash()返回对此
DataFrame的逻辑查询计划的哈希码。show([n, truncate, vertical])打印前
n行到控制台。sort(*cols, **kwargs)返回一个新的
DataFrame,按指定的列排序。sortWithinPartitions(*cols, **kwargs)返回一个新的
DataFrame,其中每个分区按指定的列进行排序。subtract(其他)summary(*统计数据)计算数值和字符串列的指定统计信息。
tail(数量)返回最后
num行作为list的Row。take(数量)返回前
num行作为一个list的Row。to(schema)返回一个新的
DataFrame,其中每一行都经过调整以匹配指定的模式。toDF(*cols)返回一个新的
DataFrame,其中包含指定的新列名toJSON([use_unicode])将一个
DataFrame转换为RDD的字符串。toLocalIterator([prefetchPartitions])返回一个包含此
DataFrame中所有行的迭代器。toPandas()返回此
DataFrame作为Pandaspandas.DataFrame。to_koalas([index_col])to_pandas_on_spark([index_col])transform(func, *args, **kwargs)返回一个新的
DataFrame。union(其他)unionAll(其他)unionByName(其他[, 允许缺失列])unpersist([blocking])将
DataFrame标记为非持久化,并从内存和磁盘中移除其所有块。unpivot(ids, values, variableColumnName, …)将DataFrame从宽格式透视为长格式,可以选择保留标识符列集。
where(条件)withColumn(列名, 列)通过添加一列或替换具有相同名称的现有列,返回一个新的
DataFrame。withColumnRenamed(现有列名, 新列名)通过重命名现有列返回一个新的
DataFrame。withColumns(*colsMap)通过添加多个列或替换具有相同名称的现有列,返回一个新的
DataFrame。withColumnsRenamed(colsMap)通过重命名多个列返回一个新的
DataFrame。withMetadata(columnName, metadata)通过更新现有列的元数据返回一个新的
DataFrame。withWatermark(eventTime, delayThreshold)为此
DataFrame定义一个事件时间水印。writeTo(表)创建一个用于v2源的写配置构建器。
属性
检索
DataFrame中所有列的名称,并将其作为列表返回。返回所有列名及其数据类型作为列表。
如果此
DataFrame包含一个或多个持续返回数据的数据源,则返回True。返回一个用于处理缺失值的
DataFrameNaFunctions。返回内容作为
pyspark.RDD的Row。返回此
DataFrame的架构为pyspark.sql.types.StructType。返回创建此
DataFrame的Spark会话。sql_ctx返回一个用于统计函数的
DataFrameStatFunctions。获取
DataFrame的当前存储级别。用于将非流式
DataFrame的内容保存到外部存储的接口。用于将流式
DataFrame的内容保存到外部存储的接口。