当前位置:Gxlcms > 数据库问题 > spark_sql

spark_sql

时间:2021-07-01 10:21:17 帮助过:31人阅读

package com.spark_sql 2 3 import java.util.Properties 4 import org.apache.spark.sql.{DataFrame, SparkSession} 5 6 object DataFromMysql { 7 def main(args: Array[String]): Unit = { 8 //todo:1、创建sparkSession对象 9 val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate() 10 //todo:2、创建Properties对象,设置连接mysql的用户名和密码 11 val properties: Properties = new Properties() 12 properties.setProperty("user", "root") 13 properties.setProperty("password", "123") 14 //todo:3、读取mysql中的数据 15 val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student", properties) 16 //todo:4、显示mysql中表的数据 17 mysqlDF.show() 18 spark.stop() 19 } 20 21 }
 1 package com.spark_sql
 2 
 3 import org.apache.spark.SparkContext
 4 import org.apache.spark.rdd.RDD
 5 import org.apache.spark.sql.{DataFrame, SparkSession}
 6 
 7 object InferringSchema {
 8   def main(args: Array[String]): Unit = {
 9     //todo:1、构建sparkSession 指定appName和master的地址
10     val spark: SparkSession = SparkSession.builder().appName("InferringSchema").master("local[2]").getOrCreate()
11     //todo:2、从sparkSession获取sparkContext对象
12     val sc: SparkContext = spark.sparkContext
13     sc.setLogLevel("WARN") //设置日志输出级别
14     //todo:3、加载数据
15     val dataRDD: RDD[String] = sc.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt")
16     //todo:4、切分每一行记录
17     val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))
18     //todo:5、将RDD与Person类关联
19     val personRDD: RDD[Person] = lineArrayRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
20     //todo:6、创建dataFrame,需要导入隐式转换
21     import spark.implicits._
22     val personDF: DataFrame = personRDD.toDF()
23 
24     //todo-------------------DSL语法操作 start--------------
25     //1、显示DataFrame的数据,默认显示20行
26     personDF.show()
27     //2、显示DataFrame的schema信息
28     personDF.printSchema()
29     //3、显示DataFrame记录数
30     println(personDF.count())
31     //4、显示DataFrame的所有字段
32     personDF.columns.foreach(println)
33     //5、取出DataFrame的第一行记录
34     println(personDF.head())
35     //6、显示DataFrame中name字段的所有值
36     personDF.select("name").show()
37     //7、过滤出DataFrame中年龄大于30的记录
38     personDF.filter($"age" > 30).show()
39     //8、统计DataFrame中年龄大于30的人数
40     println(personDF.filter($"age" > 30).count())
41     //9、统计DataFrame中按照年龄进行分组,求每个组的人数
42     personDF.groupBy("age").count().show()
43     //todo-------------------DSL语法操作 end-------------
44 
45     //todo--------------------SQL操作风格 start-----------
46     //todo:将DataFrame注册成表
47     personDF.createOrReplaceTempView("t_person")
48     //todo:传入sql语句,进行操作
49 
50     spark.sql("select * from t_person").show()
51 
52     spark.sql("select * from t_person where name=‘zhangsan‘").show()
53 
54     spark.sql("select * from t_person order by age desc").show()
55     //todo--------------------SQL操作风格 end-------------
56 
57 
58     sc.stop()
59   }
60 }
61 
62 case class Person (val id:Int,val name: String, val age: Int)
 1 package com.spark_sql
 2 
 3 import org.apache.spark.SparkContext
 4 import org.apache.spark.rdd.RDD
 5 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
 6 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 7 
 8 object SparkSqlSchema {
 9   def main(args: Array[String]): Unit = {
10     //todo:1、创建SparkSession,指定appName和master
11     val spark: SparkSession = SparkSession.builder().appName("SparkSqlSchema").master("local[2]").getOrCreate()
12     //todo:2、获取sparkContext对象
13     val sc: SparkContext = spark.sparkContext
14     //todo:3、加载数据
15     val dataRDD: RDD[String] = sc.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt")
16     //todo:4、切分每一行
17     val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))
18     //todo:5、加载数据到Row对象中
19     val personRDD: RDD[Row] = dataArrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt))
20     //todo:6、创建schema
21     val schema: StructType = StructType(Seq(
22       StructField("id", IntegerType, false),
23       StructField("name", StringType, false),
24       StructField("age", IntegerType, false)
25     ))
26 
27     //todo:7、利用personRDD与schema创建DataFrame
28     val personDF: DataFrame = spark.createDataFrame(personRDD, schema)
29 
30     //todo:8、DSL操作显示DataFrame的数据结果
31     personDF.show()
32 
33     //todo:9、将DataFrame注册成表
34     personDF.createOrReplaceTempView("t_person")
35 
36     //todo:10、sql语句操作
37     spark.sql("select * from t_person").show()
38 
39     spark.sql("select count(*) from t_person").show()
40 
41 
42     sc.stop()
43   }
44 }
 1 package com.spark_sql
 2 
 3 import java.util.Properties
 4 import org.apache.spark.rdd.RDD
 5 import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
 6 
 7 object SparkSqlToMysql {
 8   def main(args: Array[String]): Unit = {
 9     //val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()
10     //todo:1、创建sparkSession对象
11     val spark: SparkSession = SparkSession.builder().appName("SparkSqlToMysql").master("local[2]").getOrCreate()
12     //todo:2、读取数据
13     val data: RDD[String] = spark.sparkContext.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt")
14     //todo:3、切分每一行,
15     val arrRDD: RDD[Array[String]] = data.map(_.split(" "))
16     //todo:4、RDD关联Student
17     val studentRDD: RDD[student01] = arrRDD.map(x => student01(x(0).toInt, x(1), x(2).toInt))
18     //todo:导入隐式转换
19     import spark.implicits._
20     //todo:5、将RDD转换成DataFrame
21     val studentDF: DataFrame = studentRDD.toDF()
22     //todo:6、将DataFrame注册成表
23     studentDF.createOrReplaceTempView("student")
24     //todo:7、操作student表 ,按照年龄进行降序排列
25     val resultDF: DataFrame = spark.sql("select * from student order by age desc")
26 
27     //todo:8、把结果保存在mysql表中
28     //todo:创建Properties对象,配置连接mysql的用户名和密码
29     val prop = new Properties()
30     prop.setProperty("user", "root")
31     prop.setProperty("password", "123")
32 
33     resultDF.write.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student01", prop)
34 
35     //todo:写入mysql时,可以配置插入mode,overwrite覆盖,append追加,ignore忽略,error默认表存在报错
36     //resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.200.150:3306/spark","student",prop)
37     spark.stop()
38   }
39 }
40 
41 //todo:创建样例类Student
42 case class student01(id: Int, name: String, age: Int)

 

spark_sql

标签:each   bsp   年龄   code   ast   show   加载   int   stop   

人气教程排行