时间:2021-07-01 10:21:17 帮助过:9人阅读
map
map
可以将数据集中每条数据转为另一种形式
import spark.implicits._
val ds = Seq(Person("zhangsan", 15), Person("lisi", 15)).toDS()
ds.map( person => Person(person.name, person.age * 2) ).show()
mapPartitions
mapPartitions
和 map
一样, 但是 map
的处理单位是每条数据, mapPartitions
的处理单位是每个分区
import spark.implicits._
val ds = Seq(Person("zhangsan", 15), Person("lisi", 15)).toDS()
ds.mapPartitions( iter => {
val returnValue = iter.map(
item => Person(item.name, item.age * 2)
)
returnValue
} )
.show()
transform
map
和 mapPartitions
以及 transform
都是转换, map
和 mapPartitions
是针对数据, 而 transform
是针对整个数据集, 这种方式最大的区别就是 transform
可以直接拿到 Dataset
进行操作
import spark.implicits._
val ds = spark.range(5)
ds.transform( dataset => dataset.withColumn("doubled", ‘id * 2) )
as
as[Type]
算子的主要作用是将弱类型的 Dataset
转为强类型的 Dataset
, 它有很多适用场景, 但是最常见的还是在读取数据的时候, 因为 DataFrameReader
体系大部分情况下是将读出来的数据转换为 DataFrame
的形式, 如果后续需要使用 Dataset
的强类型 API
, 则需要将 DataFrame
转为 Dataset
. 可以使用 as[Type]
算子完成这种操作
import spark.implicits._
val structType = StructType(
Seq(
StructField("name", StringType),
StructField("age", IntegerType),
StructField("gpa", FloatType)
)
)
val sourceDF = spark.read
.schema(structType)
.option("delimiter", "\t")
.csv("dataset/studenttab10k")
val dataset = sourceDF.as[Student]
dataset.show()
过滤
filter
filter
用来按照条件过滤数据集
import spark.implicits._
val ds = Seq(Person("zhangsan", 15), Person("lisi", 15)).toDS()
ds.filter( person => person.name == "lisi" ).show()
聚合
groupByKey
grouByKey
算子的返回结果是 KeyValueGroupedDataset
, 而不是一个 Dataset
, 所以必须要先经过 KeyValueGroupedDataset
中的方法进行聚合, 再转回 Dataset
, 才能使用 Action
得出结果
其实这也印证了分组后必须聚合的道理
import spark.implicits._
val ds = Seq(Person("zhangsan", 15), Person("zhangsan", 15), Person("lisi", 15)).toDS()
ds.groupByKey( person => person.name ).count().show()
切分
randomSplit
randomSplit
会按照传入的权重随机将一个 Dataset
分为多个 Dataset
, 传入 randomSplit
的数组有多少个权重, 最终就会生成多少个 Dataset
, 这些权重的加倍和应该为 1, 否则将被标准化
val ds = spark.range(15)
val datasets: Array[Dataset[lang.Long]] = ds.randomSplit(Array[Double](2, 3))
datasets.foreach(dataset => dataset.show())
sample
sample
会随机在 Dataset
中抽样
val ds = spark.range(15)
ds.sample(withReplacement = false, fraction = 0.4).show()
排序
orderBy
orderBy
配合 Column
的 API
, 可以实现正反序排列
import spark.implicits._
val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
ds.orderBy("age").show()
ds.orderBy(‘age.desc).show()
sort
其实 orderBy
是 sort
的别名, 所以它们所实现的功能是一样的
import spark.implicits._
val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
ds.sort(‘age.desc).show()
分区
coalesce
减少分区, 此算子和 RDD
中的 coalesce
不同, Dataset
中的 coalesce
只能减少分区数, coalesce
会直接创建一个逻辑操作, 并且设置 Shuffle
为 false
val ds = spark.range(15)
ds.coalesce(1).explain(true)
repartitions
repartitions
有两个作用, 一个是重分区到特定的分区数, 另一个是按照某一列来分区, 类似于 SQL
中的 DISTRIBUTE BY
val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
ds.repartition(4)
ds.repartition(‘name)
去重
dropDuplicates
使用 dropDuplicates
可以去掉某一些列中重复的行
import spark.implicits._
val ds = spark.createDataset(Seq(Person("zhangsan", 15), Person("zhangsan", 15), Person("lisi", 15)))
ds.dropDuplicates("age").show()
distinct
当 dropDuplicates
中没有传入列名的时候, 其含义是根据所有列去重, dropDuplicates()
方法还有一个别名, 叫做 distinct
所以, 使用 distinct
也可以去重, 并且只能根据所有的列来去重
import spark.implicits._
val ds = spark.createDataset(Seq(Person("zhangsan", 15), Person("zhangsan", 15), Person("lisi", 15)))
ds.distinct().show()
集合操作
except
except
和 SQL
语句中的 except
一个意思, 是求得 ds1
中不存在于 ds2
中的数据, 其实就是差集
val ds1 = spark.range(1, 10)
val ds2 = spark.range(5, 15)
ds1.except(ds2).show()
intersect
求得两个集合的交集
val ds1 = spark.range(1, 10)
val ds2 = spark.range(5, 15)
ds1.intersect(ds2).show()
union
求得两个集合的并集
val ds1 = spark.range(1, 10)
val ds2 = spark.range(5, 15)
ds1.union(ds2).show()
limit
限制结果集数量
val ds = spark.range(1, 10)
ds.limit(3).show()
分类 | 算子 | 解释 |
---|---|---|
选择 |
|
|
|
在
|
|
|
通过
|
|
|
修改列名
|
|
剪除 |
drop |
剪掉某个列
|
聚合 |
groupBy |
按照给定的行进行分组
|
Column 表示了 Dataset 中的一个列, 并且可以持有一个表达式, 这个表达式作用于每一条数据, 对每条数据都生成一个值, 之所以有单独这样的一个章节是因为列的操作属于细节, 但是又比较常见, 会在很多算子中配合出现
分类 | 操作 | 解释 |
---|---|---|
创建 |
|
单引号
|
|
同理,
|
|
|
|
|
|
|
|
|
前面的
|
|
|
可以通过
|
|
别名和转换 |
|
|
|
通过
|
|
添加列 |
|
通过
|
操作 |
|
通过
|
|
通过
|
|
|
在排序的时候, 可以通过
|
DataFrame
中什么时候会有无效值
DataFrame
如何处理无效的值
DataFrame
如何处理 null
如果想探究如何处理无效值, 首先要知道无效值从哪来, 从而分析可能产生的无效值有哪些类型, 在分别去看如何处理无效值
一个值本身的含义是这个值不存在则称之为缺失值, 也就是说这个值本身代表着缺失, 或者这个值本身无意义, 比如说 null
, 比如说空字符串
关于数据的分析其实就是统计分析的概念, 如果这样的话, 当数据集中存在缺失值, 则无法进行统计和分析, 对很多操作都有影响
Spark 大多时候处理的数据来自于业务系统中, 业务系统中可能会因为各种原因, 产生一些异常的数据
例如说因为前后端的判断失误, 提交了一些非法参数. 再例如说因为业务系统修改 MySQL
表结构产生的一些空值数据等. 总之在业务系统中出现缺失值其实是非常常见的一件事, 所以大数据系统就一定要考虑这件事.
常见的缺失值有两种
null
, NaN
等特殊类型的值, 某些语言中 null
可以理解是一个对象, 但是代表没有对象, NaN
是一个数字, 可以代表不是数字
针对这一类的缺失值, Spark
提供了一个名为 DataFrameNaFunctions
特殊类型来操作和处理
"Null"
, "NA"
, " "
等解析为字符串的类型, 但是其实并不是常规字符串数据
针对这类字符串, 需要对数据集进行采样, 观察异常数据, 总结经验, 各个击破
DataFrameNaFunctions
DataFrameNaFunctions
使用 Dataset
的 na
函数来获取
val df = ...
val naFunc: DataFrameNaFunctions = df.na
当数据集中出现缺失值的时候, 大致有两种处理方式, 一个是丢弃, 一个是替换为某值, DataFrameNaFunctions
中包含一系列针对空值数据的方案
DataFrameNaFunctions.drop
可以在当某行中包含 null
或 NaN
的时候丢弃此行
DataFrameNaFunctions.fill
可以在将 null
和 NaN
充为其它值
DataFrameNaFunctions.replace
可以把 null
或 NaN
替换为其它值, 但是和 fill
略有一些不同, 这个方法针对值来进行替换
SparkSQL
处理 null
和 NaN
?首先要将数据读取出来, 此次使用的数据集直接存在 NaN
, 在指定 Schema
后, 可直接被转为 Double.NaN
val schema = StructType(
List(
StructField("id", IntegerType),
StructField("year", IntegerType),
StructField("month", IntegerType),
StructField("day", IntegerType),
StructField("hour", IntegerType),
StructField("season", IntegerType),
StructField("pm", DoubleType)
)
)
val df = spark.read
.option("header", value = true)
.schema(schema)
.csv("dataset/beijingpm_with_nan.csv")
对于缺失值的处理一般就是丢弃和填充
null
和 NaN
的行当某行数据所有值都是 null
或者 NaN
的时候丢弃此行
df.na.drop("all").show()
当某行中特定列所有值都是 null
或者 NaN
的时候丢弃此行
df.na.drop("all", List("pm", "id")).show()
当某行数据任意一个字段为 null
或者 NaN
的时候丢弃此行
df.na.drop().show()
df.na.drop("any").show()
当某行中特定列任意一个字段为 null
或者 NaN
的时候丢弃此行
df.na.drop(List("pm", "id")).show()
df.na.drop("any", List("pm", "id")).show()
null
和 NaN
的列填充所有包含 null
和 NaN
的列
df.na.fill(0).show()
填充特定包含 null
和 NaN
的列
df.na.fill(0, List("pm")).show()
根据包含 null
和 NaN
的列的不同来填充
import scala.collection.JavaConverters._
df.na.fill(Map[String, Any]("pm" -> 0).asJava).show
SparkSQL
处理异常字符串 ?读取数据集, 这次读取的是最原始的那个 PM
数据集
val df = spark.read
.option("header", value = true)
.csv("dataset/BeijingPM20100101_20151231.csv")
使用函数直接转换非法的字符串
df.select(‘No as "id", ‘year, ‘month, ‘day, ‘hour, ‘season,
when(‘PM_Dongsi === "NA", 0)
.otherwise(‘PM_Dongsi cast DoubleType)
.as("pm"))
.show()
使用 where
直接过滤
df.select(‘No as "id", ‘year, ‘month, ‘day, ‘hour, ‘season, ‘PM_Dongsi)
.where(‘PM_Dongsi =!= "NA")
.show()
使用 DataFrameNaFunctions
替换, 但是这种方式被替换的值和新值必须是同类型
df.select(‘No as "id", ‘year, ‘month, ‘day, ‘hour, ‘season, ‘PM_Dongsi)
.na.replace("PM_Dongsi", Map("NA" -> "NaN"))
.show()
groupBy
rollup
cube
pivot
RelationalGroupedDataset
上的聚合操作
groupBy
groupBy
算子会按照列将 Dataset
分组, 并返回一个 RelationalGroupedDataset
对象, 通过 RelationalGroupedDataset
可以对分组进行聚合
private val spark = SparkSession.builder()
.master("local[6]")
.appName("aggregation")
.getOrCreate()
import spark.implicits._
private val schema = StructType(
List(
StructField("id", IntegerType),
StructField("year", IntegerType),
StructField("month", IntegerType),
StructField("day", IntegerType),
StructField("hour", IntegerType),
StructField("season", IntegerType),
StructField("pm", DoubleType)
)
)
private val pmDF = spark.read
.schema(schema)
.option("header", value = true)
.csv("dataset/pm_without_null.csv")
functions
函数进行聚合import org.apache.spark.sql.functions._
val groupedDF: RelationalGroupedDataset = pmDF.groupBy(‘year)
groupedDF.agg(avg(‘pm) as "pm_avg")
.orderBy(‘pm_avg)
.show()
functions
进行聚合, 还可以直接使用 RelationalGroupedDataset
的 API
进行聚合groupedDF.avg("pm")
.orderBy(‘pm_avg)
.show()
groupedDF.max("pm")
.orderBy(‘pm_avg)
.show()
我们可能经常需要针对数据进行多维的聚合, 也就是一次性统计小计, 总计等, 一般的思路如下
private val spark = SparkSession.builder()
.master("local[6]")
.appName("aggregation")
.getOrCreate()
import spark.implicits._
private val schemaFinal = StructType(
List(
StructField("source", StringType),
StructField("year", IntegerType),
StructField("month", IntegerType),
StructField("day", IntegerType),
StructField("hour", IntegerType),
StructField("season", IntegerType),
StructField("pm", DoubleType)
)
)
private val pmFinal = spark.read
.schema(schemaFinal)
.option("header", value = true)
.csv("dataset/pm_final.csv")
import org.apache.spark.sql.functions._
val groupPostAndYear = pmFinal.groupBy(‘source, ‘year)
.agg(sum("pm") as "pm")
val groupPost = pmFinal.groupBy(‘source)
.agg(sum("pm") as "pm")
.select(‘source, lit(null) as "year", ‘pm)
groupPostAndYear.union(groupPost)
.sort(‘source, ‘year asc_nulls_last, ‘pm)
.show()
大家其实也能看出来, 在一个数据集中又小计又总计, 可能需要多个操作符, 如何简化呢? 请看下面
rollup
操作符rollup
操作符其实就是 groupBy
的一个扩展, rollup
会对传入的列进行滚动 groupBy
, groupBy
的次数为列数量 + 1
, 最后一次是对整个数据集进行聚合
import org.apache.spark.sql.functions._
val sales = Seq(
("Beijing", 2016, 100),
("Beijing", 2017, 200),
("Shanghai", 2015, 50),
("Shanghai", 2016, 150),
("Guangzhou", 2017, 50)
).toDF("city", "year", "amount")
rollup
的操作sales.rollup("city", "year")
.agg(sum("amount") as "amount")
.sort($"city".desc_nulls_last, $"year".asc_nulls_last)
.show()
/**
* 结果集:
* +---------+----+------+
* | city|year|amount|
* +---------+----+------+
* | Shanghai|2015| 50| <-- 上海 2015 的小计
* | Shanghai|2016| 150|
* | Shanghai|null| 200| <-- 上海的总计
* |Guangzhou|2017| 50|
* |Guangzhou|null| 50|
* | Beijing|2016| 100|
* | Beijing|2017| 200|
* | Beijing|null| 300|
* | null|null| 550| <-- 整个数据集的总计
* +---------+----+------+
*/
val cityAndYear = sales
.groupBy("city", "year") // 按照 city 和 year 聚合
.agg(sum("amount") as "amount")
val city = sales
.groupBy("city") // 按照 city 进行聚合
.agg(sum("amount") as "amount")
.select($"city", lit(null) as "year", $"amount")
val all = sales
.groupBy() // 全局聚合
.agg(sum("amount") as "amount")
.select(lit(null) as "city", lit(null) as "year", $"amount")
cityAndYear
.union(city)
.union(all)
.sort($"city".desc_nulls_last, $"year".asc_nulls_last)
.show()
/**
* 统计结果:
* +---------+----+------+
* | city|year|amount|
* +---------+----+------+
* | Shanghai|2015| 50|
* | Shanghai|2016| 150|
* | Shanghai|null| 200|
* |Guangzhou|2017| 50|
* |Guangzhou|null| 50|
* | Beijing|2016| 100|
* | Beijing|2017| 200|
* | Beijing|null| 300|
* | null|null| 550|
* +---------+----+------+
*/
很明显可以看到, 在上述案例中, rollup
就相当于先按照 city
, year
进行聚合, 后按照 city
进行聚合, 最后对整个数据集进行聚合, 在按照 city
聚合时, year
列值为 null
, 聚合整个数据集的时候, 除了聚合列, 其它列值都为 null
rollup
完成 pm
值的统计上面的案例使用 rollup
来实现会非常的简单
import org.apache.spark.sql.functions._
pmFinal.rollup(‘source, ‘year)
.agg(sum("pm") as "pm_total")
.sort(‘source.asc_nulls_last, ‘year.asc_nulls_last)
.show()
cube
cube
的功能和 rollup
是一样的, 但也有区别, 区别如下
rollup(A, B).sum©
其结果集中会有三种数据形式: A B C
, A null C
, null null C
不知道大家发现没, 结果集中没有对 B
列的聚合结果
cube(A, B).sum©
其结果集中会有四种数据形式: A B C
, A null C
, null null C
, null B C
不知道大家发现没, 比 rollup
的结果集中多了一个 null B C
, 也就是说, rollup
只会按照第一个列来进行组合聚合, 但是 cube
会将全部列组合聚合
import org.apache.spark.sql.functions._
pmFinal.cube(‘source, ‘year)
.agg(sum("pm") as "pm_total")
.sort(‘source.asc_nulls_last, ‘year.asc_nulls_last)
.show()
/**
* 结果集为
*
* +-------+----+---------+
* | source|year| pm_total|
* +-------+----+---------+
* | dongsi|2013| 735606.0|
* | dongsi|2014| 745808.0|
* | dongsi|2015| 752083.0|
* | dongsi|null|2233497.0|
* |us_post|2010| 841834.0|
* |us_post|2011| 796016.0|
* |us_post|2012| 750838.0|
* |us_post|2013| 882649.0|
* |us_post|2014| 846475.0|
* |us_post|2015| 714515.0|
* |us_post|null|4832327.0|
* | null|2010| 841834.0| <-- 新增
* | null|2011| 796016.0| <-- 新增
* | null|2012| 750838.0| <-- 新增
* | null|2013|1618255.0| <-- 新增
* | null|2014|1592283.0| <-- 新增
* | null|2015|1466598.0| <-- 新增
* | null|null|7065824.0|
* +-------+----+---------+
*/
SparkSQL
中支持的 SQL
语句实现 cube
功能SparkSQL
支持 GROUPING SETS
语句, 可以随意排列组合空值分组聚合的顺序和组成, 既可以实现 cube
也可以实现 rollup
的功能
pmFinal.createOrReplaceTempView("pm_final")
spark.sql(
"""
|select source, year, sum(pm)
|from pm_final
|group by source, year
|grouping sets((source, year), (source), (year), ())
|order by source asc nulls last, year asc nulls last
""".stripMargin)
.show()
RelationalGroupedDataset
常见的 RelationalGroupedDataset
获取方式有三种
groupBy
rollup
cube
无论通过任何一种方式获取了 RelationalGroupedDataset
对象, 其所表示的都是是一个被分组的 DataFrame
, 通过这个对象, 可以对数据集的分组结果进行聚合
val groupedDF: RelationalGroupedDataset = pmDF.groupBy(‘year)
需要注意的是, RelationalGroupedDataset
并不是 DataFrame
, 所以其中并没有 DataFrame
的方法, 只有如下一些聚合相关的方法, 如下这些方法在调用过后会生成 DataFrame
对象, 然后就可以再次使用 DataFrame
的算子进行操作了