时间:2021-07-01 10:21:17 帮助过:11人阅读
package dev.spark.sql
import java.util.Properties
import org.apache.spark.sql.{Row, SQLContext, SaveMode}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
object DataFrame {
val num = 0
val map = scala.collection.immutable.Map("url" -> "jdbc:mysql://192.168.0.1:3306/spark",
"dbtable"-> "tmp_table3",
"user"-> "spark",
"password"->"spark")
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("dataFrame")
val sc = new SparkContext(conf)
val ssc = new SQLContext(sc)
val df = ssc.read.json()
ssc.read.format("json").load(".json")
// dataFrame.show 直接查看数据集 按条件查看数据集
df.show()
df.filter(df.col("col")<= num).show()
// 将dataFrame注册为临时表 按照SQL方式访问数据集
df.registerTempTable("tmp_table0")
// 返回的结果是将每行包装为ROW的数据集集
val dataSet0 = ssc.sql("SELECT col FROM tmp_table WHERE col <="+ num)
// dataSet属性方法很多
dataSet0.collect()foreach(println)
dataSet0.columns.foreach(println)
dataSet0.rdd.foreach(println)
dataSet0.explain()
dataSet0.alias("")
dataSet0.cache()
dataSet0.na
// SQLContext格式化读取文件
// parquet
val pssc = new SQLContext(sc)
pssc.read.format("parquet")load(".parquet")
// jdbc
val dataSet3 = ssc.read.format("jdbc").options(map).load()
dataSet3.write.jdbc("jdbc:mysql://192.168.0.1:3306/spark","tmp_table3",new Properties())
// HiveSQLContext在resources中配置hive-site.xml后对hive仓库进行查询 注意:优先从临时表中查询,可以通过数据库.表名的方式完全限定避免歧义,默认仓库是default
val hssc = new HiveContext(sc)
val dataSet1 = hssc.sql("SELECT col FROM database.table")
dataSet1.registerTempTable("tmp_table1")
// 相同sparkContext上下文可以进行联表操作
hssc.sql("SELECT * FROM tmp_table0 t0 inner join tmp_table1 t1 on t0.col = t1.col")
// 数据映射为表
dataSet0.rdd.map(line=>Row(line.size))
val rowkeyStructField = new StructField("rowkey", IntegerType,true)
val tableStructType = new StructType(Array(rowkeyStructField))
val dataSet2 = hssc.createDataFrame(dataSet0.rdd, tableStructType)
dataSet2.registerTempTable("tmp_table2")
dataSet2.write.mode(SaveMode.Append).saveAsTable("hive_spark.tmp_table2")
// rdd转dataframe需要隐式转换
import ssc.implicits._
case class RowKeyClass (rowkey:Int)
dataSet0.rdd.map(x => new RowKeyClass(x.size)).toDF()
}
}
spark sql 小样
标签:方式 mys tap for ack user 表名 select 数据集