当前位置:Gxlcms > 数据库问题 > spark sql 操作

spark sql 操作

时间:2021-07-01 10:21:17 帮助过:27人阅读

df1.registerTempTable("t_person")

 

1、查询年龄最大的前两个人

scala> sqlContext.sql("select * from t_person order by age desc limit 2").show
+---+--------+---+
| id| name|age|
+---+--------+---+
| 4|xiaofang| 22|
| 3| wangwu| 21|
+---+--------+---+

  

2、显示表的schema信息

scala> sqlContext.sql("desc t_person").show
+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
| id| int| |
| name| string| |
| age| int| |
+--------+---------+-------+

  

DataFrame api 操作

 

package bigdata.spark.sql

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}

import scala.reflect.internal.util.TableDef.Column

/**
  * Created by Administrator on 2017/4/27.
  */
object SparkSqlDemo {

  def main(args: Array[String]) {
    val conf = new SparkConf()
    conf.setAppName("SparkSqlDemo")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val rdd1 = sc.textFile("hdfs://m1:9000/persons.txt").map(_.split(" "))
    val rdd2 = rdd1.map(x => Person(x(0).toInt, x(1), x(2).toInt))

    // 导入隐式转换,里面包含了RDD隐式转换为DataFrame的方法
    import sqlContext.implicits._
    // df1现在已经是DataFrame了
    val df1 = rdd2.toDF
    df1.show


    df1.select("age").show()

    df1.select(col="age").show
    df1.select(df1.col("age")).show

    import df1._
    df1.select(col("age")).show

    df1.select(col("age") > 20).show

    df1.select(col("age") + 1).show

    df1.filter(col("age") > 20).show()


    df1.registerTempTable("t_person")

    sqlContext.sql("select * from t_person").show()

    sqlContext.sql("select * from t_person order by age desc limit 2").show()

    sc.stop()

  }

  // 这个类必须放在main方法外面,不然的话会报错
  case class Person(id:Int, name:String, age:Int)

}

  

StructType指定Schema

package bigdata.spark.sql

import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}

import scala.reflect.internal.util.TableDef.Column

/**
  * Created by Administrator on 2017/4/27.
  */
object SparkSqlDemo {

  def main(args: Array[String]) {
    val conf = new SparkConf()
    conf.setAppName("SparkSqlDemo")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val rdd1 = sc.textFile("hdfs://m1:9000/persons.txt").map(_.split(" "))
    val rdd2 = rdd1.map(x => Row(x(0).toInt, x(1), x(2).toInt))
    // 创建schema
    val schema = StructType(
      List(
        // 名称 类型 是否可以为空
        StructField("id", IntegerType, false),
        StructField("name", StringType, false),
        StructField("age", IntegerType, false)
      )
    )

    // 创建DataFrame
    val df1 = sqlContext.createDataFrame(rdd2, schema)

    df1.registerTempTable("t_person")

    sqlContext.sql("select * from t_person").show()

    sc.stop()

  }

}

  

spark sql 操作

标签:.sql   apach   app   bsp   desc   type   cal   ctf   comm   

人气教程排行