时间:2021-07-01 10:21:17 帮助过:14人阅读
平常在使用mysql的时候,我们在写SQL的时候会使用到MySQL为我们提供的一些内置函数,如数值函数:求绝对值abs()、平方根sqrt()等,还有其它的字符函数、日期函数、聚合函数等等。使我们利用这些内置函数能够快速实现我们的业务逻辑。在SparkSQL里其实也为我们提供了近两百多种内置函数,我们通过
- <span style="font-size: 15px">import org.apache.spark.sql.functions._</span>
导入内置函数包,来使用。也可以在SQL语句中直接使用。SparkSQL内置函数分类:聚合函数、集合函数、日期函数、数学函数、混杂函数、非聚合函数、排序函数、字符串函数、UDF函数和窗口函数这10类函数。
1 内置函数的使用
使用内置函数的方式有两种,一种是通过编程的方式的使用,另一种是通过SQL的方式使用。
例如:我们有如下数据,想要使用SparkSQL内置函数lower()来将名字全部转为小写
- <span style="font-size: 15px">+----+---+-----------+
- |name|age| phone|
- +----+---+-----------+
- |Ming| <span style="color: #800080">20</span>|<span style="color: #800080">15552211521</span>|
- |hong| <span style="color: #800080">19</span>|<span style="color: #800080">13287994007</span>|
- | zhi| <span style="color: #800080">21</span>|<span style="color: #800080">15552211523</span>|
- +----+---+-----------+</span>
以编程的方式使用内置函数
- <span style="font-size: 15px"><span style="color: #000000">import org.apache.spark.sql.functions._
- df.</span><span style="color: #0000ff">select</span>(lower(col(<span style="color: #800000">"</span><span style="color: #800000">name</span><span style="color: #800000">"</span>)).<span style="color: #0000ff">as</span>(<span style="color: #800000">"</span><span style="color: #800000">name</span><span style="color: #800000">"</span>), col(<span style="color: #800000">"</span><span style="color: #800000">age</span><span style="color: #800000">"</span>), col(<span style="color: #800000">"</span><span style="color: #800000">phone</span><span style="color: #800000">"</span>)).show()</span>
以SQL的方式使用
- <span style="font-size: 15px">df.createOrReplaceTempView(<span style="color: #800000">"</span><span style="color: #800000">people</span><span style="color: #800000">"</span><span style="color: #000000">)
- spark.sql(</span><span style="color: #800000">"</span><span style="color: #800000">select lower(name) as name,age,phone from people</span><span style="color: #800000">"</span>).show()</span>
2 UDF函数的使用
有的时候,SparkSQL提供的内置函数无法满足我们的业务的时候,我们可以使用过UDF函数来自定义我们的实现逻辑。例如:需要对上面的数据添加一列id,要求id的生成是name+随机生成的uuid+phone。这时候我们可以使用UDF自定义函数实现。如下所示:
- <span style="font-size: 15px"><span style="color: #008000">//</span><span style="color: #008000">根据name和phone生成组合,并加上一段uud生成唯一表示id</span>
- def idGenerator(name: String, phone: Long): String =<span style="color: #000000"> {
- name </span>+ <span style="color: #800000">"</span><span style="color: #800000">-</span><span style="color: #800000">"</span> + UUID.randomUUID().toString + <span style="color: #800000">"</span><span style="color: #800000">-</span><span style="color: #800000">"</span> +<span style="color: #000000"> phone.toString
- }
- </span><span style="color: #008000">//</span><span style="color: #008000">生成udf函数</span>
- val idGeneratorUDF =<span style="color: #000000"> udf(idGenerator _)
- </span><span style="color: #008000">//</span><span style="color: #008000">加入隐式转换</span>
- <span style="color: #000000">import spark.implicits._
- df.withColumn(</span><span style="color: #800000">"</span><span style="color: #800000">id</span><span style="color: #800000">"</span>, idGeneratorUDF($<span style="color: #800000">"</span><span style="color: #800000">name</span><span style="color: #800000">"</span>, $<span style="color: #800000">"</span><span style="color: #800000">phone</span><span style="color: #800000">"</span>)).show()</span>
也可以这样写:
- <span style="font-size: 15px"><span style="color: #008000">//</span><span style="color: #008000">加入隐式转换</span>
- <span style="color: #000000">import spark.implicits._
- </span><span style="color: #008000">//</span><span style="color: #008000">根据name和phone生成组合,并加上一段uud生成唯一表示id</span>
- def idGenerator(name: String, phone: Long): String =<span style="color: #000000"> {
- name </span>+ <span style="color: #800000">"</span><span style="color: #800000">-</span><span style="color: #800000">"</span> + UUID.randomUUID().toString + <span style="color: #800000">"</span><span style="color: #800000">-</span><span style="color: #800000">"</span> +<span style="color: #000000"> phone.toString
- }
- </span><span style="color: #008000">//</span><span style="color: #008000">注册udf函数</span>
- spark.udf.register(<span style="color: #800000">"</span><span style="color: #800000">idGenerator</span><span style="color: #800000">"</span><span style="color: #000000">,idGenerator _)
- </span><span style="color: #008000">//</span><span style="color: #008000">使用idGenerator</span>
- df.withColumn(<span style="color: #800000">"</span><span style="color: #800000">id</span><span style="color: #800000">"</span>,callUDF(<span style="color: #800000">"</span><span style="color: #800000">idGenerator</span><span style="color: #800000">"</span>,$<span style="color: #800000">"</span><span style="color: #800000">name</span><span style="color: #800000">"</span>,$<span style="color: #800000">"</span><span style="color: #800000">phone</span><span style="color: #800000">"</span>)).show()</span>
结果都是一样的:
- <span style="font-size: 15px">+----+---+-----------+--------------------+
- |name|age| phone| id|
- +----+---+-----------+--------------------+
- |Ming| <span style="color: #800080">20</span>|<span style="color: #800080">15552211521</span>|Ming-9b87d4d5-91d...|
- |hong| <span style="color: #800080">19</span>|<span style="color: #800080">13287994007</span>|hong-7a91f7d8-66a...|
- | zhi| <span style="color: #800080">21</span>|<span style="color: #800080">15552211523</span>|zhi-f005859c-<span style="color: #800080">4516</span>...|
- +----+---+-----------+--------------------+</span>
同样,我们可以将我们自定义的UDF函数注册到SparkSQL里,然后用SQL实现
- <span style="font-size: 15px"><span style="color: #008000">//</span><span style="color: #008000">将自定义函数注册到SparkSQL里</span>
- spark.udf.register(<span style="color: #800000">"</span><span style="color: #800000">idGeneratorUDF</span><span style="color: #800000">"</span><span style="color: #000000">,idGeneratorUDF)
- </span><span style="color: #008000">//</span><span style="color: #008000">创建临时表</span>
- df.createOrReplaceTempView(<span style="color: #800000">"</span><span style="color: #800000">people</span><span style="color: #800000">"</span><span style="color: #000000">)
- </span><span style="color: #008000">//</span><span style="color: #008000">使用sql查询</span>
- spark.sql(<span style="color: #800000">"</span><span style="color: #800000">select idGeneratorUDF(name,phone) as id,name,age,phone from people</span><span style="color: #800000">"</span>).show()</span>
注意:上面加入import spark.implicits._隐式转换是为了方便使用$”列名”来代替col(“列名”)
完整代码:
- <span style="font-size: 15px"><span style="color: #000000">import java.util.UUID
- import org.apache.spark.sql.SparkSession
- </span><span style="color: #008000">/*</span><span style="color: #008000">*
- * spark sql 内置函数
- </span><span style="color: #008000">*/</span>
- <span style="color: #0000ff">object</span><span style="color: #000000"> SparkSQLFunctionApp {
- def main(args: Array[String]): Unit </span>=<span style="color: #000000"> {
- val spark </span>= SparkSession.builder().appName(<span style="color: #0000ff">this</span>.getClass.getSimpleName).master(<span style="color: #800000">"</span><span style="color: #800000">local</span><span style="color: #800000">"</span><span style="color: #000000">).getOrCreate()
- import org.apache.spark.sql.functions._
- </span><span style="color: #008000">//</span><span style="color: #008000">加入隐式转换: 本例子里可以使用toDF方法和$"列名"代替col("列名")</span>
- <span style="color: #000000"> import spark.implicits._
- val df </span>= Seq((<span style="color: #800000">"</span><span style="color: #800000">Ming</span><span style="color: #800000">"</span>, <span style="color: #800080">20</span>, <span style="color: #800080">15552211521L</span>), (<span style="color: #800000">"</span><span style="color: #800000">hong</span><span style="color: #800000">"</span>, <span style="color: #800080">19</span>, <span style="color: #800080">13287994007L</span>), (<span style="color: #800000">"</span><span style="color: #800000">zhi</span><span style="color: #800000">"</span>, <span style="color: #800080">21</span>, <span style="color: #800080">15552211523L</span>)).toDF(<span style="color: #800000">"</span><span style="color: #800000">name</span><span style="color: #800000">"</span>, <span style="color: #800000">"</span><span style="color: #800000">age</span><span style="color: #800000">"</span>, <span style="color: #800000">"</span><span style="color: #800000">phone</span><span style="color: #800000">"</span><span style="color: #000000">)
- df.show()
- </span><span style="color: #008000">/*</span><span style="color: #008000">*
- * +----+---+-----------+
- * |name|age| phone|
- * +----+---+-----------+
- * |Ming| 20|15552211521|
- * |hong| 19|13287994007|
- * | zhi| 21|15552211523|
- * +----+---+-----------+
- </span><span style="color: #008000">*/</span>
- <span style="color: #008000">//</span><span style="color: #008000">1 使用内置函数将所有名字都转为小写
- </span><span style="color: #008000">//</span><span style="color: #008000">1.1 编程的方式:</span>
- df.<span style="color: #0000ff">select</span>(lower($<span style="color: #800000">"</span><span style="color: #800000">name</span><span style="color: #800000">"</span>).<span style="color: #0000ff">as</span>(<span style="color: #800000">"</span><span style="color: #800000">name</span><span style="color: #800000">"</span>), $<span style="color: #800000">"</span><span style="color: #800000">age</span><span style="color: #800000">"</span>, $<span style="color: #800000">"</span><span style="color: #800000">phone</span><span style="color: #800000">"</span><span style="color: #000000">).show()
- </span><span style="color: #008000">/*</span><span style="color: #008000">*
- * +----+---+-----------+
- * |name|age| phone|
- * +----+---+-----------+
- * |ming| 20|15552211521|
- * |hong| 19|13287994007|
- * | zhi| 21|15552211523|
- * +----+---+-----------+
- </span><span style="color: #008000">*/</span>
- <span style="color: #008000">//</span><span style="color: #008000">1.2 SQL的方式
- </span><span style="color: #008000">//</span><span style="color: #008000">注册表</span>
- df.createOrReplaceTempView(<span style="color: #800000">"</span><span style="color: #800000">people</span><span style="color: #800000">"</span><span style="color: #000000">)
- spark.sql(</span><span style="color: #800000">"</span><span style="color: #800000">select lower(name) as name,age,phone from people</span><span style="color: #800000">"</span><span style="color: #000000">).show()
- </span><span style="color: #008000">/*</span><span style="color: #008000">*
- * +----+---+-----------+
- * |name|age| phone|
- * +----+---+-----------+
- * |ming| 20|15552211521|
- * |hong| 19|13287994007|
- * | zhi| 21|15552211523|
- * +----+---+-----------+
- </span><span style="color: #008000">*/</span>
- <span style="color: #008000">//</span><span style="color: #008000">2 UDF函数的使用
- </span><span style="color: #008000">//</span><span style="color: #008000">2.1 直接使用
- </span><span style="color: #008000">//</span><span style="color: #008000">根据name和phone生成组合,并加上一段uud生成唯一表示id</span>
- def idGenerator(name: String, phone: Long): String =<span style="color: #000000"> {
- name </span>+ <span style="color: #800000">"</span><span style="color: #800000">-</span><span style="color: #800000">"</span> + UUID.randomUUID().toString + <span style="color: #800000">"</span><span style="color: #800000">-</span><span style="color: #800000">"</span> +<span style="color: #000000"> phone.toString
- }
- </span><span style="color: #008000">//</span><span style="color: #008000">生成udf函数</span>
- val idGeneratorUDF =<span style="color: #000000"> udf(idGenerator _)
- df.withColumn(</span><span style="color: #800000">"</span><span style="color: #800000">id</span><span style="color: #800000">"</span>, idGeneratorUDF($<span style="color: #800000">"</span><span style="color: #800000">name</span><span style="color: #800000">"</span>, $<span style="color: #800000">"</span><span style="color: #800000">phone</span><span style="color: #800000">"</span><span style="color: #000000">)).show()
- </span><span style="color: #008000">/*</span><span style="color: #008000">*
- * +----+---+-----------+--------------------+
- * |name|age| phone| id|
- * +----+---+-----------+--------------------+
- * |Ming| 20|15552211521|Ming-74338e40-548...|
- * |hong| 19|13287994007|hong-4f058f2b-9d3...|
- * | zhi| 21|15552211523|zhi-f42bea86-a9cf...|
- * +----+---+-----------+--------------------+
- </span><span style="color: #008000">*/</span>
- <span style="color: #008000">//</span><span style="color: #008000">将自定义函数注册到SparkSQL里</span>
- spark.udf.register(<span style="color: #800000">"</span><span style="color: #800000">idGeneratorUDF</span><span style="color: #800000">"</span><span style="color: #000000">, idGeneratorUDF)
- </span><span style="color: #008000">//</span><span style="color: #008000">创建临时表</span>
- df.createOrReplaceTempView(<span style="color: #800000">"</span><span style="color: #800000">people</span><span style="color: #800000">"</span><span style="color: #000000">)
- </span><span style="color: #008000">//</span><span style="color: #008000">使用sql查询</span>
- spark.sql(<span style="color: #800000">"</span><span style="color: #800000">select idGeneratorUDF(name,phone) as id,name,age,phone from people</span><span style="color: #800000">"</span><span style="color: #000000">).show()
- </span><span style="color: #008000">/*</span><span style="color: #008000">*
- * +----+---+-----------+--------------------+
- * |name|age| phone| id|
- * +----+---+-----------+--------------------+
- * |Ming| 20|15552211521|Ming-74338e40-548...|
- * |hong| 19|13287994007|hong-4f058f2b-9d3...|
- * | zhi| 21|15552211523|zhi-f42bea86-a9cf...|
- * +----+---+-----------+--------------------+
- </span><span style="color: #008000">*/</span>
- <span style="color: #008000">//</span><span style="color: #008000">2.2 通过callUDF使用
- </span><span style="color: #008000">//</span><span style="color: #008000">注册udf函数</span>
- spark.udf.register(<span style="color: #800000">"</span><span style="color: #800000">idGenerator</span><span style="color: #800000">"</span><span style="color: #000000">, idGenerator _)
- </span><span style="color: #008000">//</span><span style="color: #008000">使用idGenerator</span>
- df.withColumn(<span style="color: #800000">"</span><span style="color: #800000">id</span><span style="color: #800000">"</span>, callUDF(<span style="color: #800000">"</span><span style="color: #800000">idGenerator</span><span style="color: #800000">"</span>, $<span style="color: #800000">"</span><span style="color: #800000">name</span><span style="color: #800000">"</span>, $<span style="color: #800000">"</span><span style="color: #800000">phone</span><span style="color: #800000">"</span><span style="color: #000000">)).show()
- </span><span style="color: #008000">/*</span><span style="color: #008000">*
- * +----+---+-----------+--------------------+
- * |name|age| phone| id|
- * +----+---+-----------+--------------------+
- * |Ming| 20|15552211521|Ming-74338e40-548...|
- * |hong| 19|13287994007|hong-4f058f2b-9d3...|
- * | zhi| 21|15552211523|zhi-f42bea86-a9cf...|
- * +----+---+-----------+--------------------+
- </span><span style="color: #008000">*/</span>
- <span style="color: #008000">//</span><span style="color: #008000">创建临时表</span>
- df.createOrReplaceTempView(<span style="color: #800000">"</span><span style="color: #800000">people</span><span style="color: #800000">"</span><span style="color: #000000">)
- </span><span style="color: #008000">//</span><span style="color: #008000">使用sql查询</span>
- spark.sql(<span style="color: #800000">"</span><span style="color: #800000">select idGenerator(name,phone) as id,name,age,phone from people</span><span style="color: #800000">"</span><span style="color: #000000">).show()
- </span><span style="color: #008000">/*</span><span style="color: #008000">*
- * +--------------------+----+---+-----------+
- * | id|name|age| phone|
- * +--------------------+----+---+-----------+
- * |Ming-d4236bac-e21...|Ming| 20|15552211521|
- * |hong-bff84c0d-67d...|hong| 19|13287994007|
- * |zhi-aa0174b0-c8b3...| zhi| 21|15552211523|
- * +--------------------+----+---+-----------+
- </span><span style="color: #008000">*/</span><span style="color: #000000">
- }
- }</span></span>
Spark SQL内置函数
标签:代码 组合 排序 targe show 快速 bsp getc 要求