当前位置:Gxlcms > 数据库问题 > 大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池

大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池

时间:2021-07-01 10:21:17 帮助过:18人阅读

0.1 Scala0.1.1 Scala 操作符0.1.2 拉链操作0.2 Spark Core0.2.1 Spark RDD 持久化0.2.2 Spark 共享变量0.3 Spark SQL0.3.1 RDD、DataFrame 与 DataSet0.3.2 DataSet 与 RDD 互操作0.3.3 RDD、DataFrame 与 DataSet 之间的转换0.3.4 用户自定义聚合函数(UDAF)0.3.5 开窗函数0.4 Spark Streaming0.4.1 Dstream transformation 算子概览0.4.2 Dstream updataStateByKey 算子概览0.4.3 窗口操作0.4.4 Receiver 与 Direct0.5 Java0.5.1 对象池


第0章 预备知识

0.1 Scala

0.1.1 Scala 操作符

技术图片
List 元素的追加
方式1-在列表的最后增加数据
方式2-在列表的最前面增加数据
技术图片
方式3-在列表的最后增加数据
技术图片
示例代码如下:
object ListDemo01 {
  def main(args: Array[String]): Unit = {
    // 说明
    // 1. 在默认情况下 List 是 scala.collection.immutable.List 即不可变
    // 2. 在 scala 中,List 就是不可变的,如需要使用可变的 List,则需要使用 ListBuffer
    // 3. List 在 package object scala 中做了声明 val List = scala.collection.immutable.List
    // 4. val Nil = scala.collection.immutable.Nil // List()

    val list01 = List(123"Hello"// 创建时,直接分配元素
    println(list01) // List(1, 2, 3, Hello)

    val list02 = Nil // 空集合
    println(list02) // List()

    // 访问 List 的元素
    val value1 = list01(1// 1是索引,表示取出第2个元素
    println("value1=" + value1) // 2


    println("====================list追加元素后的效果====================")
    // 通过 :+ 和 +: 给 list 追加元素(本身的集合并没有变化)
    val list1 = List(123"abc")
    // :+ 运算符表示在列表的最后增加数据
    val list2 = list1 :+ 4 // (1,2,3,"abc", 4)
    println(list1) // list1 没有变化 (1, 2, 3, "abc"),说明 list1 还是不可变
    println(list2) // 新的列表结果是 [1, 2, 3, "abc", 4]

    val list3 = 10 +: list1 // (10, 1, 2, 3, "abc")
    println("list3=" + list3)

    // :: 符号的使用
    val list4 = List(123"abc")
    // 说明 val list5 = 4 :: 5 :: 6 :: list4 :: Nil 步骤:
    // 1. List()
    // 2. List(List(1, 2, 3, "abc"))
    // 3. List(6, List(1, 2, 3, "abc"))
    // 4. List(5, 6, List(1, 2, 3, "abc"))
    // 5. List(4, 5, 6, List(1, 2, 3, "abc"))
    val list5 = 4 :: 5 :: 6 :: list4 :: Nil
    println("list5=" + list5)

    // ::: 符号的使用
    // 说明 val list6 = 4 :: 5 :: 6 :: list4 ::: Nil 步骤:
    // 1. List()
    // 2. List(1, 2, 3, "abc")
    // 3. List(6, 1, 2, 3, "abc")
    // 4. List(5, 6, 1, 2, 3, "abc")
    // 5. List(4, 5, 6, 1, 2, 3, "abc")
    // 下面等价 4 :: 5 :: 6 :: list4
    val list6 
4 :: 5 :: 6 :: list4 ::: Nil
    println("list6=" + list6)
  }
}

输出结果如下:

List(123, Hello)
List()
value1=2
====================list追加元素后的效果====================
List(123, abc)
List(123, abc, 4)
list3=List(10123, abc)
list5=List(456List(123, abc))
list6=List(456123, abc)

0.1.2 拉链操作

把一对集合 A 和 B 的包含的元素合成到一个集合中:

object zipTest01 {
  def main(args: Array[String]): Unit = {
    val prices1 = List(5.020.09.95)
    val quantities1 = List(1021)
    println(prices1.zip(quantities1))

    println("----------------------------------")

    val prices2 = List(5.020.09.95)
    val quantities2 = List(102)
    println(prices2.zip(quantities2))

    println("----------------------------------")

    val prices3 = List(5.020.09.95)
    val quantities3 = List(102)
    println(prices3.zipAll(quantities3, 9.951))

    println("----------------------------------")

    val prices4 = List(5.020.09.95)
    val quantities4 = List(1021)
    println(prices4.zipWithIndex)
    println(quantities4.zipWithIndex)
  }
}

运行结果:

List((5.0,10), (20.0,2), (9.95,1))
----------------------------------
List((5.0,10), (20.0,2))
----------------------------------
List((5.0,10), (20.0,2), (9.95,1))
----------------------------------
List((5.0,0), (20.0,1), (9.95,2))
List((10,0), (2,1), (1,2))

这个方法之所以叫“拉链(zip)”,是因为它就像拉链的齿状结构一样将两个集合结合在一起。
注意:如果一个集合比另一个集合短, 那么结果中的对偶数量和较短的那个集合的元素数量相同。

zipAll 方法可以让你指定较短列表的缺省值。
zipWithIndex 方法返回对偶的列表,其中每个对偶中第二个组成部分是每个元素的下标。

0.2 Spark Core

0.2.1 Spark RDD 持久化

  Spark 非常重要的一个功能特性就是可以将 RDD 持久化在内存中,当对 RDD 执行持久化操作时,每个节点都会将自己操作的 RDD 的 partition 持久化到内存中,并且在之后对该 RDD 的反复使用中,直接使用内存的 partition。这样的话,对于针对一个 RDD 反复执行多个操作的场景, 就只要对 RDD 计算一次即可,后面直接使用该 RDD,而不需要反复计算多次该 RDD
  巧妙使用 RDD 持久化,甚至在某些场景下,可以将 Spark 应用程序的性能提高 10 倍。对于迭代式算法和快速交互式应用来说,RDD 持久化是非常重要的。
  例如,读取一个有着数十万行数据的 HDFS 文件,形成 linesRDD,这一读取过程会消耗大量时间,在 count 操作结束后,linesRDD 会被丢弃,会被后续的数据覆盖,当第二次再次使用 count 时,又需要重新读取 HDFS 文件数据,再次形成新的 linesRDD,这会导致反复消耗大量时间,会严重降低系统性能。
  如果在读取完成后将 linesRDD 缓存起来,那么下一次执行 count 操作时将会直接使用缓存起来的 linesRDD,这会节省大量的时间。
  要持久化一个 RDD,只要调用其 cache() 或者 persist() 方法即可。在该 RDD 第一次被计算出来时,就会直接缓存在每个节点中,而且 Spark 的持久化机制还是自动容错的,如果持久化的 RDD 的任何 partition 丢失了,那么 Spark 会自动通过其源 RDD,使用 transformation 操作重新计算该 partition。
  cache() 和 persist() 的区别在于,cache() 是 persist() 的一种简化方式,cache() 的底层就是调用的 persist() 的无参版本,同时就是调用 persist(MEMORY_ONLY),将输入持久化到内存中。如果需要从内存中清除缓存,那么可以使用 unpersist() 方法。
  Spark 自己也会在 shuffle 操作时,进行数据的持久化,比如写入磁盘,主要是为了在节点失败时,避免需要重新计算整个过程。
  

技术图片
  以下为对一个 156 万行大小为 168MB 的文本文件进行处理, textFile 后只进行 count 操作,持久化与不持久化的结果如下:
  技术图片

0.2.2 Spark 共享变量

  Spark 一个非常重要的特性就是共享变量
  默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个 task 中,此时每个 task 只能操作自己的那份变量副本。如果多个 task 想要共享某个变量,那么这种方式是做不到的。
  Spark 为此提供了两种共享变量,一种是 Broadcast Variable(广播变量),另一种是 Accumulator(累加变量)。Broadcast Variable 会将用到的变量,仅仅为每个节点拷贝一份,更大的用途是优化性能,减少网络传输以及内存损耗。Accumulator 则可以让多个 task 共同操作一份变量,主要可以进行累加操作。Broadcast Variable 是共享读变量,task 不能去修改它,而 Accumulator 可以让多个 task 操作一个变量。

1.广播变量
  广播变量允许程序员在每个机器上保留缓存的只读变量,而不是给每个任务发送一个副本。例如,可以使用它们以有效的方式为每个节点提供一个大型输入数据集的副本。Spark 还尝试使用高效的广播算法分发广播变量,以降低通信成本。
  Spark action 被划分为多个 Stages,被多个 “shuffle” 操作(宽依赖)所分割。Spark 自动广播每个阶段任务所需的公共数据(一个 Stage 中多个 task 使用的数据),以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前反序列化。这意味着,显式创建广播变量仅在跨多个阶段的任务需要相同数据或者以反序列化格式缓存数据很重要时才有用。
  Spark 提供的 Broadcast Variable 是只读的,并且在每个节点上只会有一个副本,而不会为每个 task 都拷贝一份副本,因此,它的最大作用,就是减少变量到各个节点的网络传输消耗,以及在各个节点上的内存消耗。此外,Spark 内部也使用了高效的广播算法来减少网络消耗。
  可以通过调用 SparkContext 的 broadcast() 方法来针对每个变量创建广播变量。然后在算子的函数内,使用到广播变量时,每个节点只会拷贝一份副本了,每个节点可以使用广播变量的 value() 方法获取值。
2.累加器
  累加器(accumulator):Accumulator 是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它们可用于实现计数器(如 MapReduce)或总和计数
  Accumulator 是存在于 Driver 端的,从节点不断把值发到 Driver 端,在 Driver端计数(Spark UI 在 SparkContext 创建时被创建, 即在 Driver 端被创建,因此它可以读取 Accumulator 的数值),存在于 Driver 端的一个值,从节点是读取不到的
  Spark 提供的 Accumulator 主要用于多个节点对一个变量进行共享性的操作
  Accumulator 只提供了累加的功能,但是却给我们提供了多个 task 对于同一个变量并行操作的功能,但是 task 只能对 Accumulator 进行累加操作,不能读取它的值,只有 Driver 程序可以读取 Accumulator 的值。
  自定义累加器类型的功能在 1.X 版本中就已经提供了,但是使用起来比较麻烦,在 2.0 版本后, 累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2 来提供更加友好的自定义类型累加器的实现方式。
  官方同时给出了一个实现的示例: CollectionAccumulator 类, 这个类允许以集合的形式收集 spark 应用执行过程中的一些信息。例如,我们可以用这个类收集 Spark 处理数据时的一些细节,当然,由于累加器的值最终要汇聚到 driver 端,为了避免 driver 端的 outofmemory 问题,需要对收集的信息的规模要加以控制,不宜过大。

package com.atguigu.session

人气教程排行