当前位置:Gxlcms > 数据库问题 > Spark-SparkSql

Spark-SparkSql

时间:2021-07-01 10:21:17 帮助过:2人阅读

.split(‘,‘).map(fieldName=>StructField(fieldName,StringType,true))) vak rowRdd = sc.textFile("文件地址").map(_.split(‘,‘)).map(p=>Row(p(0),p(1).trim)) val peopleSchemaRdd = sqlContext.applySchema(rowRdd ,schema ) peopleSchemaRdd.registerTable("people")

或者

  DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
 schemaPeople.registerTempTable("people");
 Parquet是柱状的格式,binnaryAsString标记sparksql将二进制文件解释成字符串。cacheMetadata打开缓存提高静态数据的查询速度。comperssion.codec是设置文件的压缩算法(snappy、gzip、lzo)。filterPushdown是该文件过滤器的pushdown优化。

对于SparkSql的性能调优可以通过缓存数据和打开一些设置选项来调优。
如cacheTable缓存柱状格式的表spark会只浏览需要的列并且自动的去压缩数据减少内存的使用以及垃圾回收的压力。uncacheTable()可以删除临时表,spark.sql.inMemoryColumarStorage.compressed 基于数据的统计信息每列自动的选择一个压缩算法,
spark.sql.inMemoryColumarStorage.batchSize柱状缓存的批数据大小,越大的数据可以提高内存的利用率和压缩效率,但是OOM是个问题啊,据说spark 2.0的钨丝计划会解决spark申请内存的管理问题。
2. 实例

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

public class JavaSparkSQL {
  public static class Person implements Serializable {
    private String name;
    private int age;
    public String getName() {
      return name;
    }
    public void setName(String name) {
      this.name = name;
    }
    public int getAge() {
      return age;
    }
    public void setAge(int age) {
      this.age = age;
    }
  }

  public static void main(String[] args) throws Exception {
    /**
     * 初始化
     */
    SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL").setMaster("local[*]");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    SQLContext sqlContext = new SQLContext(ctx);
    System.out.println("=== Data source: RDD ===");
    /**
     * 加载本地文件转换成Bean
     */
    JavaRDD<String> a = ctx.textFile("resources/people.txt");
    System.out.println(a.toDebugString());//rdd的(血统)其实就是RDD得的转换
    JavaRDD<Person> people = ctx.textFile("resources/people.txt").map(
      new Function<String, Person>() {
        @Override
        public Person call(String line) {
          String[] parts = line.split(",");
          Person person = new Person();
          person.setName(parts[0]);
          person.setAge(Integer.parseInt(parts[1].trim()));
          return person;
        }
      });
    //注册表 javabean形RDD即对象
    DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
    schemaPeople.registerTempTable("people");
    // SQL
    DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    List<String> teenagerNames = teenagers.toJavaRDD().map(new Function<Row, String>() {
      @Override
      public String call(Row row) {
        return "Name: " + row.getString(0);
      }
    }).collect();
    for (String name: teenagerNames) {
      System.out.println(name);
    }

  /*  System.out.println("=== Data source: Parquet File ===");
    // DataFrames can be saved as parquet files, maintaining the schema information.
    schemaPeople.write().parquet("testdata/people.parquet");

    // Read in the parquet file created above.
    // Parquet files are self-describing so the schema is preserved.
    // The result of loading a parquet file is also a DataFrame.
    DataFrame parquetFile = sqlContext.read().parquet("testdata/people.parquet");

    //Parquet files can also be registered as tables and then used in SQL statements.
    parquetFile.registerTempTable("testdata/parquetFile");
    DataFrame teenagers2 =
      sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
    teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
      @Override
      public String call(Row row) {
          return "Name: " + row.getString(0);
      }
    }).collect();
    for (String name: teenagerNames) {
      System.out.println(name);
    }*/
    /**
     * 读取本地json文件
     */
    System.out.println("=== Data source: JSON Dataset ===");
    String path = "resources/people.json";
    // Because the schema of a JSON dataset is automatically inferred, to write queries,
    DataFrame peopleFromJsonFile = sqlContext.read().json(path);
    peopleFromJsonFile.printSchema();
    // root
    //  |-- age: IntegerType
    //  |-- name: StringType
    peopleFromJsonFile.registerTempTable("people");
    DataFrame teenagers3 = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
    teenagerNames = teenagers3.toJavaRDD().map(new Function<Row, String>() {
      @Override
      public String call(Row row) { return "Name: " + row.getString(0); }
    }).collect();
    for (String name: teenagerNames) {
      System.out.println(name);
    }
    /**
     * 测试
     */
    List<String> jsonData = Arrays.asList(
          "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
    JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
    DataFrame peopleFromJsonRDD = sqlContext.read().json(anotherPeopleRDD.rdd());
    peopleFromJsonRDD.printSchema();
    // root
    //  |-- address: StructType
    //  |    |-- city: StringType
    //  |    |-- state: StringType
    //  |-- name: StringType
    peopleFromJsonRDD.registerTempTable("people2");
    DataFrame peopleWithCity = sqlContext.sql("SELECT name, address.city FROM people2");
    List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
      @Override
      public String call(Row row) {
        return "Name: " + row.getString(0) + ", City: " + row.getString(1);
      }
    }).collect();
    for (String name: nameAndCity) {
      System.out.println(name);
    }
    ctx.stop();
  }
}

Spark-SparkSql

标签:

人气教程排行