时间:2021-07-01 10:21:17 帮助过:30人阅读
SparkSQL编程模型:
第一步:
需要一个SQLContext对象,该对象是SparkSQL操作的入口
而构建一个SQLContext对象需要一个SparkContext
第二步:
构建好入口对象之后,要引入隐式转换的方法,作用是将读取到的各种文件转换成DataFrame,DataFrame是SparkSQL上进行统一操作的数据类型
第三步:
根据数据的格式,构建一个样例类。作用是提供将读取到的各种各样的数据类型隐式转换成一个统一的数据格式,方便编程
第四步:
使用SQLContext对象读取文件,并将其转换成DataFrame
第五步:
对数据进行相关操作。
1.DataFrame自带的操作方式。DataFrame提供了很多操作数据的方法,如where,select等
2.DSL方式。DSL其实使用的也是DataFrame提供的方法,但是在操作属性时可以方便的使用’ + 属性名的方式进行操作
3.将数据注册成表,通过SQL语句操作
- <code class=" hljs go">object TextFile{
- def main(args:Array[String]){
- <span class="hljs-comment">//第一步</span>
- <span class="hljs-comment">//构建SparkContext对象,主要要使用new调用构造方法,否则就变成使用样例类的Apply方法了</span>
- val sc = <span class="hljs-built_in">new</span> SparkContext()
- <span class="hljs-comment">//构建SQLContext对象</span>
- val sqlContext = <span class="hljs-built_in">new</span> SQLContext(sc)
- <span class="hljs-comment">//第二步</span>
- <span class="hljs-keyword">import</span> sqlContext.implicits._
- <span class="hljs-comment">//第三步</span>
- <span class="hljs-keyword">case</span> Person(name:String,age:Int)
- <span class="hljs-comment">//第四步,textFile从指定路径读取文件如果是集群模式要写hdfs文件地址;通过两个map操作将读取到的文件转换成Person类的对象,每一行对应一个Person对象;toDF将其转换成DataFrame</span>
- val people = sc.textFile(<span class="hljs-string">"文件路径"</span>).<span class="hljs-keyword">map</span>(_.split(<span class="hljs-string">","</span>)).<span class="hljs-keyword">map</span>{<span class="hljs-keyword">case</span> (name,age) => Person(name,age.toInt)}.toDF()
- <span class="hljs-comment">//第五步</span>
- <span class="hljs-comment">//DataFrame方法</span>
- <span class="hljs-built_in">println</span>(<span class="hljs-string">"------------------------DataFrame------------------------------------"</span>)
- <span class="hljs-comment">//赛选出age>10的记录,然后只选择name属性,show方法将其输出</span>
- people.where(people(<span class="hljs-string">"age"</span>) ><span class="hljs-number"> 10</span>).<span class="hljs-keyword">select</span>(people(<span class="hljs-string">"name"</span>)).show()
- <span class="hljs-comment">//DSL</span>
- <span class="hljs-built_in">println</span>(<span class="hljs-string">"---------------------------DSL---------------------------------"</span>)
- people.where(<span class="hljs-string">‘age > 10).select(‘</span>name).show()
- <span class="hljs-comment">//SQL</span>
- <span class="hljs-built_in">println</span>(<span class="hljs-string">"-----------------------------SQL-------------------------------"</span>)
- <span class="hljs-comment">//将people注册成people表</span>
- people.registerTempTable(<span class="hljs-string">"people"</span>)
- <span class="hljs-comment">//使用sqlContext的sql方法来写SQL语句</span>
- <span class="hljs-comment">//查询返回的是RDD,所以对其进行collect操作,之后循环打印</span>
- sqlContext.sql(<span class="hljs-string">"select name from people where age > 10"</span>).collect.foreach(<span class="hljs-built_in">println</span>)
- <span class="hljs-comment">//保存为parquet文件,之后的parquet演示会用到</span>
- people.saveAsParquet(<span class="hljs-string">"保存的路径"</span>)
- }
- }</code>
parquet格式文件测试:
- <code class=" hljs go">val sc = <span class="hljs-built_in">new</span> SparkContext()
- val sql = <span class="hljs-built_in">new</span> SQLContext(sc)
- <span class="hljs-keyword">import</span> sql.implicits._
- val parquet = sql.parquetFile(args<span class="hljs-number">(0</span>))
- <span class="hljs-built_in">println</span>(<span class="hljs-string">"------------------------DataFrame------------------------------------"</span>)
- <span class="hljs-built_in">println</span>(parquet.where(parquet(<span class="hljs-string">"age"</span>) ><span class="hljs-number"> 10</span>).<span class="hljs-keyword">select</span>(parquet(<span class="hljs-string">"name"</span>)).show())
- <span class="hljs-built_in">println</span>(<span class="hljs-string">"---------------------------DSL---------------------------------"</span>)
- <span class="hljs-built_in">println</span>(parquet.where(<span class="hljs-string">‘age > 10).select(‘</span>name).show())
- <span class="hljs-built_in">println</span>(<span class="hljs-string">"-----------------------------SQL-------------------------------"</span>)
- parquet.registerTempTable(<span class="hljs-string">"parquet"</span>)
- sql.sql(<span class="hljs-string">"select name from parquet where age > 10"</span>).<span class="hljs-keyword">map</span>(p => <span class="hljs-string">"name:"</span> + p<span class="hljs-number">(0</span>)).collect().foreach(<span class="hljs-built_in">println</span>)</code>
Json格式测试:
- <code class=" hljs go">val sc = <span class="hljs-built_in">new</span> SparkContext()
- val sql = <span class="hljs-built_in">new</span> SQLContext(sc)
- <span class="hljs-keyword">import</span> sql.implicits._
- val json = sql.jsonFile(args<span class="hljs-number">(0</span>))
- <span class="hljs-built_in">println</span>(<span class="hljs-string">"------------------------DataFrame------------------------------------"</span>)
- <span class="hljs-built_in">println</span>(json.where(json(<span class="hljs-string">"age"</span>) ><span class="hljs-number"> 10</span>).<span class="hljs-keyword">select</span>(json(<span class="hljs-string">"name"</span>)).show())
- <span class="hljs-built_in">println</span>(<span class="hljs-string">"---------------------------DSL---------------------------------"</span>)
- <span class="hljs-built_in">println</span>(json.where(<span class="hljs-string">‘age > 10).select(‘</span>name).show())
- <span class="hljs-built_in">println</span>(<span class="hljs-string">"-----------------------------SQL-------------------------------"</span>)
- json.registerTempTable(<span class="hljs-string">"json"</span>)
- sql.sql(<span class="hljs-string">"select name from json where age > 10"</span>).<span class="hljs-keyword">map</span>(p => <span class="hljs-string">"name:"</span> + p<span class="hljs-number">(0</span>)).collect().foreach(<span class="hljs-built_in">println</span>)</code>
可以看到上面的代码几乎和读取文本文件的一模一样,只不顾sc在读取文件的时候使用了parquetFile/jsonFile方法,而之后的操作是一摸一样的
由于parquet和json数据读取进来就是一个可操作的格式并且会自动转换成DataFrame,所以省去了case class的定义步骤和toDF的操作
以上为SparkSQL API的简单使用
Spark(九) -- SparkSQL API编程
标签:sparksql