当前位置:Gxlcms > 数据库问题 > Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

时间:2021-07-01 10:21:17 帮助过:13人阅读

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

Spark 2.0 中的SparkSession 为 Hive 特性提供了内嵌的支持, 包括使用 HiveQL 编写查询的能力, 访问 Hive UDF,以及从 Hive 表中读取数据的能力.为了使用这些特性, 你不需要去有一个已存在的 Hive 设置.

创建 DataFrames

在一个 SparkSession中, 应用程序可以从一个 已经存在的 RDD, 从hive表, 或者从 Spark数据源中创建一个DataFrames.

举个例子, 下面就是基于一个JSON文件创建一个DataFrame:

val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

无类型的Dataset操作 (aka DataFrame 操作)

DataFrames 提供了一个特定的语法用在 Scala, Java, Python and R中机构化数据的操作.

正如上面提到的一样, Spark 2.0中, DataFrames在Scala 和 Java API中, 仅仅是多个 Rows的Dataset. 这些操作也参考了与强类型的Scala/Java Datasets中的”类型转换” 对应的”无类型转换” .

这里包括一些使用 Dataset 进行结构化数据处理的示例 :

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

能够在 DataFrame 上被执行的操作类型的完整列表请参考 API 文档.

除了简单的列引用和表达式之外, DataFrame 也有丰富的函数库, 包括 string 操作, date 算术, 常见的 math 操作以及更多.可用的完整列表请参考  DataFrame 函数指南.

Running SQL Queries Programmatically

SparkSession 的 sql 函数可以让应用程序以编程的方式运行 SQL 查询, 并将结果作为一个 DataFrame 返回.

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

全局临时视图

Spark SQL中的临时视图是session级别的, 也就是会随着session的消失而消失. 如果你想让一个临时视图在所有session中相互传递并且可用, 直到Spark 应用退出, 你可以建立一个全局的临时视图.全局的临时视图存在于系统数据库 global_temp中, 我们必须加上库名去引用它, 比如. SELECT * FROM global_temp.view1.

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

创建Datasets

Dataset 与 RDD 相似, 然而, 并不是使用 Java 序列化或者 Kryo 编码器 来序列化用于处理或者通过网络进行传输的对象. 虽然编码器和标准的序列化都负责将一个对象序列化成字节, 编码器是动态生成的代码, 并且使用了一种允许 Spark 去执行许多像 filtering, sorting 以及 hashing 这样的操作, 不需要将字节反序列化成对象的格式.

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

RDD的互操作性

Spark SQL 支持两种不同的方法用于转换已存在的 RDD 成为 Dataset.第一种方法是使用反射去推断一个包含指定的对象类型的 RDD 的 Schema.在你的 Spark 应用程序中当你已知 Schema 时这个基于方法的反射可以让你的代码更简洁.

第二种用于创建 Dataset 的方法是通过一个允许你构造一个 Schema 然后把它应用到一个已存在的 RDD 的编程接口.然而这种方法更繁琐, 当列和它们的类型知道运行时都是未知时它允许你去构造 Dataset.

使用反射推断Schema

Spark SQL 的 Scala 接口支持自动转换一个包含 case classes 的 RDD 为 DataFrame.Case class 定义了表的 Schema.Case class 的参数名使用反射读取并且成为了列名.Case class 也可以是嵌套的或者包含像 Seq 或者 Array 这样的复杂类型.这个 RDD 能够被隐式转换成一个 DataFrame 然后被注册为一个表.表可以用于后续的 SQL 语句.

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

以编程的方式指定Schema

当 case class 不能够在执行之前被定义(例如, records 记录的结构在一个 string 字符串中被编码了, 或者一个 text 文本 dataset 将被解析并且不同的用户投影的字段是不一样的).一个 DataFrame 可以使用下面的三步以编程的方式来创建.

  1. 从原始的 RDD 创建 RDD 的 Row(行);
  2. Step 1 被创建后, 创建 Schema 表示一个 StructType 匹配 RDD 中的 Row(行)的结构.
  3. 通过 SparkSession 提供的 createDataFrame 方法应用 Schema 到 RDD 的 RowS(行).

例如:

import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

Aggregations

The built-in DataFrames functions provide common aggregations such as count()countDistinct()avg()max()min(), etc. While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in Scala and Java to work with strongly typed Datasets. Moreover, users are not limited to the predefined aggregate functions and can create their own.

Untyped User-Defined Aggregate Functions

Users have to extend the UserDefinedAggregateFunction abstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like:

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object MyAverage extends UserDefinedAggregateFunction {
  // Data types of input arguments of this aggregate function
  def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
  // Data types of values in the aggregation buffer
  def bufferSchema: StructType = {
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
  }
  // The data type of the returned value
  def dataType: DataType = DoubleType
  // Whether this function always returns the same output on the identical input
  def deterministic: Boolean = true
  // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  // immutable.
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }
  // Updates the given aggregation buffer `buffer` with new input data from `input`
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getLong(0) + input.getLong(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }
  // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }
  // Calculates the final result
  def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

// Register the function to access it
spark.udf.register("myAverage", MyAverage)

val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.

Type-Safe User-Defined Aggregate Functions

User-defined aggregations for strongly typed Datasets revolve around the Aggregator abstract class. For example, a type-safe user-defined average can look like:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SparkSession

case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Employee, Average, Double] {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  def zero: Average = Average(0L, 0L)
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  def reduce(buffer: Average, employee: Employee): Average = {
    buffer.sum += employee.salary
    buffer.count += 1
    buffer
  }
  // Merge two intermediate values
  def merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
  }
  // Transform the output of the reduction
  def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
  // Specifies the Encoder for the intermediate value type
  def bufferEncoder: Encoder[Average] = Encoders.product
  // Specifies the Encoder for the final output value type
  def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala" in the Spark repo.

Data Sources (数据源)

Spark SQL 支持通过 DataFrame 接口对各种 data sources (数据源)进行操作. DataFrame 可以使用 relational transformations (关系转换)操作, 也可用于创建 temporary view (临时视图). 将 DataFrame 注册为 temporary view (临时视图)允许您对其数据运行 SQL 查询. 本节 描述了使用 Spark Data Sources 加载和保存数据的一般方法, 然后涉及可用于 built-in data sources (内置数据源)的 specific options (特定选项).

Generic Load/Save Functions (通用 加载/保存 功能)

在最简单的形式中, 默认数据源(parquet, 除非另有配置 spark.sql.sources.default )将用于所有操作.

val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Manually Specifying Options (手动指定选项)

您还可以 manually specify (手动指定)将与任何你想传递给 data source 的其他选项一起使用的 data source . Data sources 由其 fully qualified name (完全限定名称)(即 org.apache.spark.sql.parquet ), 但是对于 built-in sources (内置的源), 你也可以使用它们的 shortnames (短名称)(jsonparquetjdbcorclibsvmcsvtext).从任何 data source type (数据源类型)加载 DataFrames 可以使用此 syntax (语法)转换为其他类型.

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Run SQL on files directly (直接在文件上运行 SQL)

不使用读取 API 将文件加载到 DataFrame 并进行查询, 也可以直接用 SQL 查询该文件.

val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Save Modes (保存模式)

Save operations (保存操作)可以选择使用 SaveMode , 它指定如何处理现有数据如果存在的话. 重要的是要意识到, 这些 save modes (保存模式)不使用任何 locking (锁定)并且不是 atomic (原子). 另外, 当执行 Overwrite 时, 数据将在新数据写出之前被删除.

Scala/JavaAny LanguageMeaning
SaveMode.ErrorIfExists(default) "error"(default) 将 DataFrame 保存到 data source (数据源)时, 如果数据已经存在, 则会抛出异常.
SaveMode.Append "append" 将 DataFrame 保存到 data source (数据源)时, 如果 data/table 已存在, 则 DataFrame 的内容将被 append (附加)到现有数据中.
SaveMode.Overwrite "overwrite" Overwrite mode (覆盖模式)意味着将 DataFrame 保存到 data source (数据源)时, 如果 data/table 已经存在, 则预期 DataFrame 的内容将 overwritten (覆盖)现有数据.
SaveMode.Ignore "ignore" Ignore mode (忽略模式)意味着当将 DataFrame 保存到 data source (数据源)时, 如果数据已经存在, 则保存操作预期不会保存 DataFrame 的内容, 并且不更改现有数据. 这与 SQL 中的 CREATE TABLE IF NOT EXISTS 类似.

Saving to Persistent Tables (保存到持久表)

DataFrames 也可以使用 saveAsTable 命令作为 persistent tables (持久表)保存到 Hive metastore 中. 请注意, existing Hive deployment (现有的 Hive 部署)不需要使用此功能. Spark 将为您创建默认的 local Hive metastore (本地 Hive metastore)(使用 Derby ). 与 createOrReplaceTempView 命令不同, saveAsTable 将 materialize (实现) DataFrame 的内容, 并创建一个指向 Hive metastore 中数据的指针. 即使您的 Spark 程序重新启动, Persistent tables (持久性表)仍然存在, 因为您保持与同一个 metastore 的连接. 可以通过使用表的名称在 SparkSession上调用 table 方法来创建 persistent tabl (持久表)的 DataFrame .

对于 file-based (基于文件)的 data source (数据源), 例如 text, parquet, json等, 您可以通过 path 选项指定 custom table path (自定义表路径), 例如 df.write.option("path", "/some/path").saveAsTable("t") . 当表被 dropped (删除)时, custom table path (自定义表路径)将不会被删除, 并且表数据仍然存在. 如果未指定自定义表路径, Spark 将把数据写入 warehouse directory (仓库目录)下的默认表路径. 当表被删除时, 默认的表路径也将被删除.

从 Spark 2.1 开始, persistent datasource tables (持久性数据源表)将 per-partition metadata (每个分区元数据)存储在 Hive metastore 中. 这带来了几个好处:

  • 由于 metastore 只能返回查询的必要 partitions (分区), 因此不再需要将第一个查询上的所有 partitions discovering 到表中.
  • Hive DDLs 如 ALTER TABLE PARTITION ... SET LOCATION 现在可用于使用 Datasource API 创建的表.

请注意, 创建 external datasource tables (外部数据源表)(带有 path 选项)的表时, 默认情况下不会收集 partition information (分区信息). 要 sync (同步) metastore 中的分区信息, 可以调用 MSCK REPAIR TABLE .

Bucketing, Sorting and Partitioning (分桶, 排序和分区)

对于 file-based data source (基于文件的数据源), 也可以对 output (输出)进行 bucket 和 sort 或者 partition . Bucketing 和 sorting 仅适用于 persistent tables :

peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

在使用 Dataset API 时, partitioning 可以同时与 save 和 saveAsTable 一起使用.

usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

可以为 single table (单个表)使用 partitioning 和 bucketing:

peopleDF
  .write
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("people_partitioned_bucketed")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

partitionBy 创建一个 directory structure (目录结构), 如 Partition Discovery 部分所述. 因此, 对 cardinality (基数)较高的 columns 的适用性有限. 相反, bucketBy 可以在固定数量的 buckets 中分配数据, 并且可以在 a number of unique values is unbounded (多个唯一值无界时)使用数据.

Parquet Files

Parquet 是许多其他数据处理系统支持的 columnar format (柱状格式). Spark SQL 支持读写 Parquet 文件, 可自动保留 schema of the original data (原始数据的模式). 当编写 Parquet 文件时, 出于兼容性原因, 所有 columns 都将自动转换为可空.

Loading Data Programmatically (以编程的方式加载数据)

使用上面例子中的数据:

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Partition Discovery (分区发现)

Table partitioning (表分区)是在像 Hive 这样的系统中使用的常见的优化方法. 在 partitioned table (分区表)中, 数据通常存储在不同的目录中, partitioning column values encoded (分区列值编码)在每个 partition directory (分区目录)的路径中. Parquet data source (Parquet 数据源)现在可以自动 discover (发现)和 infer (推断)分区信息. 例如, 我们可以使用以下 directory structure (目录结构)将所有以前使用的 population data (人口数据)存储到 partitioned table (分区表)中, 其中有两个额外的列 gender 和 country 作为 partitioning columns (分区列):

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

通过将 path/to/table 传递给 SparkSession.read.parquet 或 SparkSession.read.load , Spark SQL 将自动从路径中提取 partitioning information (分区信息). 现在返回的 DataFrame 的 schema (模式)变成:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

请注意, 会自动 inferred (推断) partitioning columns (分区列)的 data types (数据类型).目前, 支持 numeric data types (数字数据类型)和 string type (字符串类型).有些用户可能不想自动推断 partitioning columns (分区列)的数据类型.对于这些用例, automatic type inference (自动类型推断)可以由 spark.sql.sources.partitionColumnTypeInference.enabled 配置, 默认为 true .当禁用 type inference (类型推断)时, string type (字符串类型)将用于 partitioning columns (分区列).

从 Spark 1.6.0 开始, 默认情况下, partition discovery (分区发现)只能找到给定路径下的 partitions (分区).对于上述示例, 如果用户将 path/to/table/gender=male 传递给 SparkSession.read.parquet 或 SparkSession.read.load , 则 gender 将不被视为 partitioning column (分区列).如果用户需要指定 partition discovery (分区发现)应该开始的基本路径, 则可以在数据源选项中设置 basePath.例如, 当 path/to/table/gender=male 是数据的路径并且用户将 basePath 设置为 path/to/table/gender 将是一个 partitioning column (分区列).

Schema Merging (模式合并)

像 ProtocolBuffer , Avro 和 Thrift 一样, Parquet 也支持 schema evolution (模式演进). 用户可以从一个 simple schema (简单的架构)开始, 并根据需要逐渐向 schema 添加更多的 columns (列). 以这种方式, 用户可能会使用不同但相互兼容的 schemas 的 multiple Parquet files (多个 Parquet 文件). Parquet data source (Parquet 数据源)现在能够自动检测这种情况并 merge (合并)所有这些文件的 schemas .

由于 schema merging (模式合并)是一个 expensive operation (相对昂贵的操作), 并且在大多数情况下不是必需的, 所以默认情况下从 1.5.0 开始. 你可以按照如下的方式启用它:

  1. 读取 Parquet 文件时, 将 data source option (数据源选项) mergeSchema 设置为 true (如下面的例子所示), 或
  2. 将 global SQL option (全局 SQL 选项) spark.sql.parquet.mergeSchema 设置为 true .
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = 

                  

	 	
                    
                    
                    
                    
                    
                

人气教程排行