当前位置:Gxlcms > 数据库问题 > spark sql架构和原理——和Hive类似 dataframe无非是内存中的table而已 底层原始数据存储可以是parquet hive json avro等

spark sql架构和原理——和Hive类似 dataframe无非是内存中的table而已 底层原始数据存储可以是parquet hive json avro等

时间:2021-07-01 10:21:17 帮助过:20人阅读

  Spark SQL对SQL语句的处理和关系型数据库采用了类似的方法,sparksql先会将SQL语句进行解析(parse)形成一个Tree,然后使用Rule对Tree进行绑定,优化等处理过程,通过模式匹配对不同类型的节点采用不同操作。而sparksql的查询优化器是catalyst,它负责处理查询语句的解析,绑定,优化和生成物理执行计划等过程,catalyst是sparksql最核心部分。

  Spark SQL由core,catalyst,hive和hive-thriftserver4个部分组成。

  •   core: 负责处理数据的输入/输出,从不同的数据源获取数据(如RDD,Parquet文件和JSON文件等),然后将结果查询结果输出成Data Frame。
  •        catalyst: 负责处理查询语句的整个处理过程,包括解析,绑定,优化,生成物理计划等。
  •        hive: 负责对hive数据的处理。
  •        hive-thriftserver:提供client和JDBC/ODBC等接口。

  运行原理原理分析: 

  1.使用SesstionCatalog保存元数据

  在解析sql语句前需要初始化sqlcontext,它定义sparksql上下文,在输入sql语句前会加载SesstionCatalog,初始化sqlcontext时会把元数据保存在SesstionCatalog中,包括库名,表名,字段,字段类型等。这些数据将在解析未绑定的逻辑计划上使用。

  2.使用Antlr生成未绑定的逻辑计划

  Spark2.0版本起使用Antlr进行词法和语法解析,Antlr会构建一个按照关键字生成的语法树,也就是生成的未绑定的逻辑计划。

  3.使用Analyzer绑定逻辑计划

  在这个阶段Analyzer 使用Analysis Rules,结合SessionCatalog元数据,对未绑定的逻辑计划进行解析,生成已绑定的逻辑计划。

  4.使用Optimizer优化逻辑计划

  Opetimize(优化器)的实现和处理方式同Analyzer类似,在该类中定义一系列Rule,利用这些Rule对逻辑计划和Expression进行迭代处理,达到树的节点的合并和优化。

  5.使用SparkPlanner生成可执行计划的物理计划

   SparkPlanner使用Planning Strategies对优化的逻辑计划进行转化,生成可执行的物理计划。

  6.使用QueryExecution执行物理计划

 

Spark SQL原理详解及优化器

一.简介

  从Spark 1.3开始,Spark SQL正式发布。而之前的另一个基于Spark的SQL开源项目Shark随之停止更新,基于Spark的最佳SQL计算就是Spark SQL。Spark SQL是Spark的一个模块,专门用于处理结构化数据。Spark SQL与Spark核心及其他模块之间的关系如下:

  技术图片

  Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。与Spark SQL交互的方法有很多种,包括SQL和Dataset API。计算结果时,将使用相同的执行引擎,而与用的表达计算API或语言无关。这种同一意味着开发人员可以轻松地在不同的API之间来回切换,从而提供最自然的方式表达给定的转换。

二.Dataset & DataFrame

    数据集时数据的分布式集合。数据集是Spark 1.6中添加的新接口,它具有RDD的优点【强类型输入,使用强大的Lambda函数的能力】和Spark SQL的优化执行引擎的优点。数据集可以从JVM对象中构造,然后使用功能性的转换【操作map、flatMap、filter等】。Dataset API在Scala和Java中都可使用。Python不支持Dataset API。但是由于Python的动态特性,Dataset API的许多优点已经可用。R语言与之类似。

    DataFrame从概念上讲,它等效于关系数据库中的表或R/Python中的数据框,但是在后台进行了更丰富的优化,可以从多种来源构造DataFrame。例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。DataFrame API在Scala,Java,Python和R中都可以使用。在Scala和Java中,DataFrame表示由Row构成的数据集。在Scala API中,DataFrame只是类型Dataset[Row]的别名。而在Java API中,用户需要使用Dataset<Row>来代表DataFrame。

 

官方解释:

Datasets and DataFrames

A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

spark dataframe派生于RDD类,但是提供了非常强大的数据操作功能。当然主要对类SQL的支持。

DataFrame 的函数
Action 操作
1、 collect() ,返回值是一个数组,返回dataframe集合所有的行
2、 collectAsList() 返回值是一个java类型的数组,返回dataframe集合所有的行
3、 count() 返回一个number类型的,返回dataframe集合的行数
4、 describe(cols: String*) 返回一个通过数学计算的类表值(count, mean, stddev, min, and max),这个可以传多个参数,中间用逗号分隔,如果有字段为空,那么不参与运算,只这对数值类型的字段。例如df.describe("age", "height").show()
5、 first() 返回第一行 ,类型是row类型
6、 head() 返回第一行 ,类型是row类型
7、 head(n:Int)返回n行  ,类型是row 类型
8、 show()返回dataframe集合的值 默认是20行,返回类型是unit
9、 show(n:Int)返回n行,,返回值类型是unit
10、 table(n:Int) 返回n行  ,类型是row 类型

dataframe的基本操作
1、 cache()同步数据的内存
2、 columns 返回一个string类型的数组,返回值是所有列的名字
3、 dtypes返回一个string类型的二维数组,返回值是所有列的名字以及类型
4、 explan()打印执行计划  物理的
5、 explain(n:Boolean) 输入值为 false 或者true ,返回值是unit  默认是false ,如果输入true 将会打印 逻辑的和物理的
6、 isLocal 返回值是Boolean类型,如果允许模式是local返回true 否则返回false
7、 persist(newlevel:StorageLevel) 返回一个dataframe.this.type 输入存储模型类型
8、 printSchema() 打印出字段名称和类型 按照树状结构来打印
9、 registerTempTable(tablename:String) 返回Unit ,将df的对象只放在一张表里面,这个表随着对象的删除而删除了
10、 schema 返回structType 类型,将字段名称和类型按照结构体类型返回
11、 toDF()返回一个新的dataframe类型的
12、 toDF(colnames:String*)将参数中的几个字段返回一个新的dataframe类型的,
13、 unpersist() 返回dataframe.this.type 类型,去除模式中的数据
14、 unpersist(blocking:Boolean)返回dataframe.this.type类型 true 和unpersist是一样的作用false 是去除RDD

DataFrame它不是spark sql提出来的,而是早期在R、Pandas语言就已经有了的
DataSet: A DataSet is a distributed collection of data. (分布式的数据集)
DataFrame:A DataFrame is a DataSet organized into named columns.
以列(列名,列类型,列值)的形式构成的分布式的数据集,按照列赋予不同的名称

三.整体架构

  技术图片

  注意:Spark SQL是Spark Core之上的一个模块,所有SQL操作最终都通过Catalyst翻译成类似的Spark程序代码被Spark Core调度执行,其过程也有Job、Stage、Task的概念。

四.全局临时视图

  Spark SQL中的临时视图是有会话作用域的,如果创建它的会话终止,它将消失。如果要在所有会话中共享一个临时视图并保存活动状态,直到Spark应用程序终止,则需要创建全局临时视图。全局临时视图与系统保留的数据库global_temp相关联,必须使用限定名称来引用它,代码例子如下:

df.createGlobalTempView("people")
// 全局临时视图与系统保留的数据库global_temp spark.sql("select * from global_temp.people").show()
// 全局临时视图垮会话
spark.newSession().sql("select * from global_temp.people").show()

五.创建数据集

  数据集与RDD相似,但是它们不是使用Java或Kryo进行序列化,而是使用专门的Encoder对对象进行序列化以进行网络处理或传输。虽然编码器和标准序列化都负责将对象转换为字节,但是编码器是动态生成的代码,并使用一种格式,该格式允许Spark执行许多操作,例如过滤,排序和哈希处理,而无需将字节反序列化为对象。

技术图片
object DataSetDeml {

  //设置日志级别
  Logger.getLogger("org").setLevel(Level.WARN)
  // 放在引用的函数外部
  case class Person(name : String, age : Long)
  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("Spark SQL").master("local[2]").getOrCreate()

    // 数据集直接的转换
    import spark.implicits._

    // 使用样例类创建数据集
    val caseClassDS = Seq(Person("Andy", 32)).toDS()
    caseClassDS.show()

    val primitiveDS = Seq(1, 2, 3).toDS()
    primitiveDS.map(_ + 1).show()
  }
}
技术图片

  执行结果:

  技术图片

  技术图片

六.与RDD互操作

  Spark SQL支持两种将现有RDD转换为数据集的方法。第一种方法使用反射来推断包含特定对象类型的RDD的架构。这种基于反射的方法可以使代码更简洁,并且当编写Spark应用程序时已经了解架构时,可以很好地工作。

  创建数据集的第二种方法是通过编程界面,该界面允许构造模式,然后将其应用于现有的RDD。尽管此方法较为冗长,但可以在运行时才知道列及其类型的情况下构造数据集。

  1.使用反射

技术图片
object DataFrameDeml {

  //设置日志级别
  Logger.getLogger("org").setLevel(Level.WARN)
  // 放在引用的函数外部
  case class Technology(name : String, level : Long, age : Long)
  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("Spark SQL").master("local[2]").getOrCreate()

    // 数据集直接的转换
    import spark.implicits._

    val technology = spark.sparkContext
      .textFile("D:\\software\\spark-2.4.4\\data\\sql\\dataframe.txt")
      .map(_.split(","))
      .map(row => Technology(row(0), row(1).toLong, row(2)toLong))
      .toDF()

    technology.show()
    // 注册临时视图
    technology.createOrReplaceTempView("technology")

    // SQL查询
    val level_2 = spark.sql("select name,age from technology where level = 2")
    level_2.show()

    // 指定编码器
    implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
    val result = level_2.map(row => row.getValuesMap[Any](List("name", "age")))
    result.show(false)
  }
}
技术图片

  执行结果:

  技术图片

  技术图片

  技术图片

  2.使用模式

技术图片
val userData = Array(
      "2015,11,www.baidu.com", "2016,14,www.google.com",
      "2017,13,www.apache.com", "2015,21,www.spark.com",
      "2016,32,www.hadoop.com", "2017,18,www.solr.com",
      "2017,14,www.hive.com"
    )

    val userDataRDD = sc.parallelize(userData) // 转化为RDD
    val userDataType = userDataRDD.map(line => {
        val Array(age, id, url) = line.split(",")
        Row(age, id.toInt, url)
      })
    val structTypes = StructType(Array(
      StructField("age", StringType, true),
      StructField("id", IntegerType, true),
      StructField("url", StringType, true)
    ))
    // RDD转化为DataFrame
    val userDataFrame = sqlContext.createDataFrame(userDataType,structTypes)
技术图片

七.Catalyst执行优化器

  1 Catalyst最主要的数据结构是树,所有的SQL语句都会用树结构来存储,树中的每个节点都有一个类,以及0或多个子节点。Scala中定义的新的节点类型都是TreeNode这个类的子类,这些对象是不可变的。

  2 Catalyst另外一个重要的概念是规则,基本上,所有的优化都是基于规则的。

  3 执行过程

    1 分析阶段

      分析逻辑树,解决引用。

      使用Catalyst规则和Catalog对象来跟踪所有数据源中的表,以解决所有未辨识的属性。

    2 逻辑优化

    3 物理计划

      Catalyst会生成很多计划,并基于成本进行对比。接受一个逻辑计划作为输入,生产一个或多个物理计划。

    4 代码生成

      将Spark SQL代码编译成Java字节码。

spark sql架构和原理——和Hive类似 dataframe无非是内存中的table而已 底层原始数据存储可以是parquet hive json avro等

标签:并保存   class   latest   代码生成   tdd   exist   开发人员   特定   evel   

人气教程排行