当前位置:Gxlcms > 数据库问题 > Spark SQL和DataFrame的学习总结

Spark SQL和DataFrame的学习总结

时间:2021-07-01 10:21:17 帮助过:2人阅读

.spark.sql.SQLContext(sc) import sqlContext.implicits._ case class Person(name:String,age:Int) val people = sc.textFile("file:///home/hdfs/people.txt").map(_.split(",")).map(p => Person(p(0),p(1).trim.toInt)).toDF() people.registerTempTable("people") val teenagers = sqlContext.sql("SELECT name,age FROM people WHERE age>= 19 AND age <=30") teenagers.map(t => "Name:"+t(0)).collect().foreach(println) teenagers.map(t => "Name:" + t.getAs[String]("name")).collect().foreach(println) teenagers.map(_.getValueMap[Any](List("name","age"))).collect().foreach(println)

(2)编程指定模式
通过一个编程接口构造模式来实现,然后可在存在的RDDs上使用它。适用于当前样本模式未知
一个SchemaRDD可以通过三步来创建。

从原来的RDD创建一个行的RDD
创建由一个StructType表示的模式与第一步创建的RDD的行结构相匹配
在行RDD上通过applySchema方法应用模式

val people = sc.textFile("file:///home/hdfs/people.txt")
val schemaString = "name age"

import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};

val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,true)))

val rowRDD = people.map(_.split(",")).map(p => Row(p(0),p(1).trim))

val peopleSchemaRDD = sqlContext.applySchema(rowRDD,schema)
peopleSchemaRDD.registerTempTable("people")

val results = sqlContext.sql("SELECT name FROM people")  //DataFrame and support all the normal RDD operations
results.map(t => "Name:"+t(0)).collect().foreach(println)

结果输出

Name:Andy
Name:Justin
Name:JohnSmith
Name:Bob

3、性能调优
主要通过在内存中缓存数据或者设置实验选项来提高性能,降低工作负载
(1)在内存中缓存数据
Spark SQL可以通过调用sqlContext.cacheTable(“tableName”)方法来缓存使用柱状格式的表。然后,Spark将会仅仅浏览需要的列并且自动地压缩数据以减少内存的使用以及垃圾回收的压力。
也可以在SQLContext上使用setConf方法或者在用SQL时运行SET key=value命令来配置内存缓存。
(2)配置选项
可以通过spark.sql.shuffle.partitions、spark.sql.codegen等选项来调整查询执行的性能。

4、其他
Spark SQL也支持直接运行SQL查询的接口,不用写任何代码。在Spark目录运行下面的命令可以启动Spark SQL CLI。

./bin/spark-sql

Spark SQL和DataFrame的学习总结

标签:

人气教程排行