当前位置:Gxlcms > 数据库问题 > spark_sql_DataFromMysql_InferringSchema_SparkSqlSchema_SparkSqlToMysql_SparkStreaming_Flume_Poll

spark_sql_DataFromMysql_InferringSchema_SparkSqlSchema_SparkSqlToMysql_SparkStreaming_Flume_Poll

时间:2021-07-01 10:21:17 帮助过:29人阅读

package com.spark_sql 2 3 import java.util.Properties 4 import org.apache.spark.sql.{DataFrame, SparkSession} 5 6 object DataFromMysql { 7 def main(args: Array[String]): Unit = { 8 //todo:1、创建sparkSession对象 9 val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate() 10 //todo:2、创建Properties对象,设置连接mysql的用户名和密码 11 val properties: Properties = new Properties() 12 properties.setProperty("user", "root") 13 properties.setProperty("password", "123") 14 //todo:3、读取mysql中的数据 15 val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student", properties) 16 //todo:4、显示mysql中表的数据 17 mysqlDF.show() 18 spark.stop() 19 } 20 21 } View Code
 1 package com.spark_sql
 2 
 3 import java.util.Properties
 4 import org.apache.spark.sql.{DataFrame, SparkSession}
 5 
 6 object DataFromMysql {
 7   def main(args: Array[String]): Unit = {
 8     //todo:1、创建sparkSession对象
 9     val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()
10     //todo:2、创建Properties对象,设置连接mysql的用户名和密码
11     val properties: Properties = new Properties()
12     properties.setProperty("user", "root")
13     properties.setProperty("password", "123")
14     //todo:3、读取mysql中的数据
15     val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student", properties)
16     //todo:4、显示mysql中表的数据
17     mysqlDF.show()
18     spark.stop()
19   }
20 
21 }
 1 package com.spark_sql
 2 
 3 import org.apache.spark.SparkContext
 4 import org.apache.spark.rdd.RDD
 5 import org.apache.spark.sql.{DataFrame, SparkSession}
 6 
 7 object InferringSchema {
 8   def main(args: Array[String]): Unit = {
 9     //todo:1、构建sparkSession 指定appName和master的地址
10     val spark: SparkSession = SparkSession.builder().appName("InferringSchema").master("local[2]").getOrCreate()
11     //todo:2、从sparkSession获取sparkContext对象
12     val sc: SparkContext = spark.sparkContext
13     sc.setLogLevel("WARN") //设置日志输出级别
14     //todo:3、加载数据
15     val dataRDD: RDD[String] = sc.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt")
16     //todo:4、切分每一行记录
17     val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))
18     //todo:5、将RDD与Person类关联
19     val personRDD: RDD[Person] = lineArrayRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
20     //todo:6、创建dataFrame,需要导入隐式转换
21     import spark.implicits._
22     val personDF: DataFrame = personRDD.toDF()
23 
24     //todo-------------------DSL语法操作 start--------------
25     //1、显示DataFrame的数据,默认显示20行
26     personDF.show()
27     //2、显示DataFrame的schema信息
28     personDF.printSchema()
29     //3、显示DataFrame记录数
30     println(personDF.count())
31     //4、显示DataFrame的所有字段
32     personDF.columns.foreach(println)
33     //5、取出DataFrame的第一行记录
34     println(personDF.head())
35     //6、显示DataFrame中name字段的所有值
36     personDF.select("name").show()
37     //7、过滤出DataFrame中年龄大于30的记录
38     personDF.filter($"age" > 30).show()
39     //8、统计DataFrame中年龄大于30的人数
40     println(personDF.filter($"age" > 30).count())
41     //9、统计DataFrame中按照年龄进行分组,求每个组的人数
42     personDF.groupBy("age").count().show()
43     //todo-------------------DSL语法操作 end-------------
44 
45     //todo--------------------SQL操作风格 start-----------
46     //todo:将DataFrame注册成表
47     personDF.createOrReplaceTempView("t_person")
48     //todo:传入sql语句,进行操作
49 
50     spark.sql("select * from t_person").show()
51 
52     spark.sql("select * from t_person where name=‘zhangsan‘").show()
53 
54     spark.sql("select * from t_person order by age desc").show()
55     //todo--------------------SQL操作风格 end-------------
56 
57 
58     sc.stop()
59   }
60 }
61 
62 case class Person (val id:Int,val name: String, val age: Int)
 1 package com.spark_sql
 2 
 3 import org.apache.spark.SparkContext
 4 import org.apache.spark.rdd.RDD
 5 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
 6 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 7 
 8 object SparkSqlSchema {
 9   def main(args: Array[String]): Unit = {
10     //todo:1、创建SparkSession,指定appName和master
11     val spark: SparkSession = SparkSession.builder().appName("SparkSqlSchema").master("local[2]").getOrCreate()
12     //todo:2、获取sparkContext对象
13     val sc: SparkContext = spark.sparkContext
14     //todo:3、加载数据
15     val dataRDD: RDD[String] = sc.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt")
16     //todo:4、切分每一行
17     val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))
18     //todo:5、加载数据到Row对象中
19     val personRDD: RDD[Row] = dataArrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt))
20     //todo:6、创建schema
21     val schema: StructType = StructType(Seq(
22       StructField("id", IntegerType, false),
23       StructField("name", StringType, false),
24       StructField("age", IntegerType, false)
25     ))
26 
27     //todo:7、利用personRDD与schema创建DataFrame
28     val personDF: DataFrame = spark.createDataFrame(personRDD, schema)
29 
30     //todo:8、DSL操作显示DataFrame的数据结果
31     personDF.show()
32 
33     //todo:9、将DataFrame注册成表
34     personDF.createOrReplaceTempView("t_person")
35 
36     //todo:10、sql语句操作
37     spark.sql("select * from t_person").show()
38 
39     spark.sql("select count(*) from t_person").show()
40 
41 
42     sc.stop()
43   }
44 }
 1 package com.spark_sql
 2 
 3 import java.util.Properties
 4 import org.apache.spark.rdd.RDD
 5 import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
 6 
 7 object SparkSqlToMysql {
 8   def main(args: Array[String]): Unit = {
 9     //val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()
10     //todo:1、创建sparkSession对象
11     val spark: SparkSession = SparkSession.builder().appName("SparkSqlToMysql").master("local[2]").getOrCreate()
12     //todo:2、读取数据
13     val data: RDD[String] = spark.sparkContext.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt")
14     //todo:3、切分每一行,
15     val arrRDD: RDD[Array[String]] = data.map(_.split(" "))
16     //todo:4、RDD关联Student
17     val studentRDD: RDD[student01] = arrRDD.map(x => student01(x(0).toInt, x(1), x(2).toInt))
18     //todo:导入隐式转换
19     import spark.implicits._
20     //todo:5、将RDD转换成DataFrame
21     val studentDF: DataFrame = studentRDD.toDF()
22     //todo:6、将DataFrame注册成表
23     studentDF.createOrReplaceTempView("student")
24     //todo:7、操作student表 ,按照年龄进行降序排列
25     val resultDF: DataFrame = spark.sql("select * from student order by age desc")
26 
27     //todo:8、把结果保存在mysql表中
28     //todo:创建Properties对象,配置连接mysql的用户名和密码
29     val prop = new Properties()
30     prop.setProperty("user", "root")
31     prop.setProperty("password", "123")
32 
33     resultDF.write.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student01", prop)
34 
35     //todo:写入mysql时,可以配置插入mode,overwrite覆盖,append追加,ignore忽略,error默认表存在报错
36     //resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.200.150:3306/spark","student",prop)
37     spark.stop()
38   }
39 }
40 
41 //todo:创建样例类Student
42 case class student01(id: Int, name: String, age: Int)
 1 package com.SparkStreaming_Flume_Poll
 2 
 3 import java.net.InetSocketAddress
 4 import org.apache.spark.storage.StorageLevel
 5 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
 6 import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
 7 import org.apache.spark.streaming.{Seconds, StreamingContext}
 8 import org.apache.spark.{SparkConf, SparkContext}
 9 
10 object SparkStreaming_Flume_Poll {
11 
12   //newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1
13   //runningCount 历史的所有相同key的value总和
14   def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
15     val newCount = runningCount.getOrElse(0) + newValues.sum
16     Some(newCount)
17   }
18 
19 
20   def main(args: Array[String]): Unit = {
21     //配置sparkConf参数
22     val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]")
23     //构建sparkContext对象
24     val sc: SparkContext = new SparkContext(sparkConf)
25     //设置日志级别
26     sc.setLogLevel("WARN")
27     //构建StreamingContext对象,每个批处理的时间间隔
28     val scc: StreamingContext = new StreamingContext(sc, Seconds(5))
29     //设置checkpoint
30     scc.checkpoint("./")
31     //设置flume的地址,可以设置多台
32     val address = Seq(new InetSocketAddress("192.168.107.144", 8888))
33     // 从flume中拉取数据
34     val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc, address, StorageLevel.MEMORY_AND_DISK)
35 
36     //获取flume中数据,数据存在event的body中,转化为String
37     val lineStream: DStream[String] = flumeStream.map(x => new String(x.event.getBody.array()))
38     //实现单词汇总
39     val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunction)
40 
41     result.print()
42     scc.start()
43     scc.awaitTermination()
44   }
45 
46 
47 }

 

spark_sql_DataFromMysql_InferringSchema_SparkSqlSchema_SparkSqlToMysql_SparkStreaming_Flume_Poll

标签:evel   func   input   --   lines   隐式   tap   top   time   

人气教程排行