时间:2021-07-01 10:21:17 帮助过:42人阅读
package com.sjw.flink.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.types.Row
object StudentScoreSql {
def main(args: Array[String]): Unit = {
//如下数据第一列是学生姓名,第二列是科目,第三列是分数
// xiaoming english 90
// xiaoming math 80
// xiaohong english 98
// xiaohong math 82
// 1. 查询所有数据。
// 2. 统计学生各科总分。
// 3. 统计学生平均分。
// 4. 查询出各科最高分的学生姓名
// 5. 查询出english最分的学生姓名。
// 6. 查询出每名学生的总分
// 7. 查询出总分最高的学生姓名。
// 8. 统计学生人数。
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//创建表环境
val tEnv = TableEnvironment.getTableEnvironment(env)
//数据集
val score: List[(String, String, Int)] = List(
("xiaoming", "english", 90),
("xiaoming", "math", 80),
("xiaohong", "english", 98),
("xiaohong", "math", 82)
)
//将数据集转成DS
val stuDataDS: DataSet[(String, String, Int)] = env.fromCollection(score)
//封装样例类
val stu1: DataSet[ScoreList] = stuDataDS.map(x=>ScoreList(x._1,x._2,x._3))
//基于表环境 注册table
tEnv.registerDataSet("t_stu",stu1)
//查询所有数据
val allData: Table = tEnv.sqlQuery("select name,subject,score from t_stu")
val sql1: DataSet[Row] = tEnv.toDataSet[Row](allData)
//sql1.print()
//统计学生各科总分
val subjectSum: Table = tEnv.sqlQuery("select subject,sum(score) from t_stu group by subject")
val sql2: DataSet[Row] = tEnv.toDataSet(subjectSum)
//sql2.print()
//统计学生平均分
val avgStu: Table = tEnv.sqlQuery("select name,avg(score) from t_stu group by name")
val sql3: DataSet[Row] = tEnv.toDataSet(avgStu)
//sql3.print()
// 查询出各科最高分的学生姓名
val maxName: Table = tEnv.sqlQuery(
"""
select s1.name, s2.subject, s2.maxscore from t_stu as s1 join
(select subject,max(score) maxscore from t_stu group by subject)as s2
on s1.subject=s2.subject and s1.score=s2.maxscore
""")
val sql4: DataSet[Row] = tEnv.toDataSet(maxName)
// sql4.print()
// 5. 查询出english最分的学生姓名。
val englishMax: Table = tEnv.sqlQuery(
"""
select s1.name,s2.maxscore from t_stu as s1 join
(select max(score) maxscore from t_stu where subject = ‘english‘)as s2
on s1.score=s2.maxscore
"""
)
val sql5: DataSet[Row] = tEnv.toDataSet(englishMax)
//sql5.print()
// 6. 查询出每名学生的总分
val sumStu: Table = tEnv.sqlQuery("select name,sum(score) from t_stu group by name")
val sql6: DataSet[Row] = tEnv.toDataSet(sumStu)
//sql6.print()
// 7. 查询出总分最高的学生姓名。
//val sumHigh: Table = tEnv.sqlQuery("select name,sum(score) sumscore from t_stu group by name order by sumscore desc limit 1")
val sumHigh: Table = tEnv.sqlQuery( """
""")
val sql7: DataSet[Row] = tEnv.toDataSet(sumHigh)
sql7.print()
// 8. 统计学生人数。
val countStu: Table = tEnv.sqlQuery("select count(distinct name) from t_stu")
val sql8: DataSet[Row] = tEnv.toDataSet(countStu)
//sql8.print()
}
}
case class ScoreList(name:String,subject:String,score:Int)
大数据之SQL语句
标签:odata min main lis tab nts lin where order