时间:2021-07-01 10:21:17 帮助过:2人阅读
步骤:
1、使用JavaBeans类定义schema
2、创建一个SQLContext
3、通过调用createDataFrame方法模式应用到所有现有的RDD,并为JavaBean提供class对象 达到将RDD转换成DataFrame
4、创建一个DataFrame,并将它注册成表。
5、使用sqlContext提供的sql方法,就可以使用SQL语句来查询了。查询后返回的结果是DataFrame,它支持所有的RDD操作
首先写一个JavaBean类,实现序列化接口,并提供get和set方法
package com.tg.spark.sql; import scala.Serializable; public class Person implements Serializable { /** * */ private static final long serialVersionUID = 727694963564948838L; private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } }
package com.tg.spark.sql; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.storage.StorageLevel; import java.util.List; import org.apache.spark.SparkConf; public class CreateDataFrame1 { public static void main(String[] args) { SparkConf conf=new SparkConf(); conf.set("spark.testing.memory", "2147480000"); //因为jvm无法获得足够的资源 JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf); System.out.println(sc); // sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean. JavaRDD<Person> people = sc.textFile("hdfs://master:9000/testFile/people.txt").map( new Function<String, Person>() { public Person call(String line) throws Exception { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; } }); //A schema can be applied to an existing RDD by calling createDataFrame and providing the Class object for the JavaBean. // Apply a schema to an RDD of JavaBeans and register it as a table. DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class); schemaPeople.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect(); teenagers.persist(StorageLevel.MEMORY_ONLY()); System.out.println(teenagerNames); } }
上面的这段代码
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");SQLContext中的sql函数使应用可以以编程方式运行SQL查询,并且将结果以DataFrame形式返回
不知道RDD的列和它的类型时
步骤:
1.从原有的RDD中创建包含行的RDD。
2.创建一个由StructType表示的模式,StructType符合由步骤1创建的RDD的行的结构。
3.通过SQLContext提供的createDataFrame方法,将模式应用于包含行的RDD。
package com.tg.spark.sql; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.storage.StorageLevel; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; public class CreateDataFrame2 { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源 JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf); System.out.println(sc); // sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean. JavaRDD<String> people = sc.textFile("hdfs://master:9000/testFile/people.txt"); // Convert records of the RDD (people) to Rows. JavaRDD<Row> rowRDD = people.map(new Function<String, Row>() { public Row call(String record) throws Exception { String[] fields = record.split(","); return RowFactory.create(fields[0], fields[1].trim()); } }); // The schema is encoded in a string String schemaString = "name age"; // Generate the schema based on the string of schema List<StructField> fields = new ArrayList<StructField>(); for (String fieldName : schemaString.split(" ")) { // true表示可以为空 fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true)); } StructType schema = DataTypes.createStructType(fields); // Apply the schema to the RDD. DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema); // Register the DataFrame as a table. peopleDataFrame.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. DataFrame results = sqlContext.sql("SELECT name FROM people"); // The results of SQL queries are DataFrames and support all the normal // RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> names = results.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect(); results.persist(StorageLevel.MEMORY_ONLY()); System.out.println(names); } }
在最简单的形式中,默认的数据源(parquet除非通过spark.sql.sources.default另外进行配置)将被用于所有的操作。
package com.tg.spark.sql; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SaveMode; import org.apache.spark.storage.StorageLevel; /** * 加载默认的数据源格式并保存 * //第一种读取方式xxxFile(path) * @author Administrator * */ public class DataSource { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源 JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf); System.out.println(sc); // sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); DataFrame df = sqlContext.read().load("hdfs://master:9000/testFile/users.parquet"); df.select("name", "favorite_color").write().save("namesAndFavColors.parquet"); //指定保存模式 //df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet"); //第一种读取方式 DataFrame parquetFile = sqlContext.parquetFile("namesAndFavColors.parquet"); parquetFile.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. DataFrame teenagers = sqlContext.sql("SELECT name FROM people "); // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect(); teenagers.persist(StorageLevel.MEMORY_ONLY()); System.out.println(teenagerNames); } }
package com.tg.spark.sql; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.storage.StorageLevel; /** * 加载指定的数据源格式并保存 * //第二种读取方式sqlContext.read().XXX(path) * @author Administrator * */ public class DataSource2 { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源 JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf); System.out.println(sc); // sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); DataFrame df = sqlContext.read().format("json").load("hdfs://master:9000/testFile/people.json"); df.select("name", "age").write().format("parquet").save("people.parquet"); DataFrame parquetFile = sqlContext.read().parquet("people.parquet"); // Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile"); DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect(); teenagers.persist(StorageLevel.MEMORY_ONLY()); System.out.println(teenagerNames); } }
df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");
Scala/Java |
Python |
Meaning |
SaveMode.ErrorIfExists (default) |
"error" (default) |
When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown. 当往一个数据源中保存一个DataFrame,如果数据已经存在,会抛出一个异常。 |
SaveMode.Append |
"append" |
When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data. 当往一个数据源中保存一个DataFrame,如果data/table已经存在,DataFrame的内容会追加到已经存在的数据后面。 |
SaveMode.Overwrite |
"overwrite" |
Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame. Overwrite模式意味着当向数据源中保存一个DataFrame时,如果data/table已经存在了,已经存在的数据会被DataFrame中内容覆盖掉。 |
SaveMode.Ignore |
"ignore" |
Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a `CREATE TABLE IF NOT EXISTS` in SQL. Ignore模式意味着当向数据源中保存一个DataFrame时,如果数据已经存在,save操作不会将DataFrame的内容进行保存,也不会修改已经存在的数据。这与SQL中的`CREATE TABLE IF NOT EXISTS`相似。 |
package com.tg.spark.sql; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SaveMode; import org.apache.spark.storage.StorageLevel; /** * 加载默认的数据源格式并保存 * //第一种读取方式xxxFile(path) * @author Administrator * */ public class DataSource { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源 JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf); System.out.println(sc); // sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); DataFrame df = sqlContext.read().load("hdfs://master:9000/testFile/users.parquet"); df.select("name", "favorite_color").write().save("namesAndFavColors.parquet"); //指定保存模式 //df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet"); //第一种读取方式 DataFrame parquetFile = sqlContext.parquetFile("namesAndFavColors.parquet"); parquetFile.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. DataFrame teenagers = sqlContext.sql("SELECT name FROM people "); // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect(); teenagers.persist(StorageLevel.MEMORY_ONLY()); System.out.println(teenagerNames); } }
? jsonRDD - 从一个已经存在的RDD中加载数据,每一个RDD的元素是一个包含一个JSON对象的字符串。
代码前面都有涉及到
public class DataSource3 { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源 JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf); System.out.println(sc); // sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. DataFrame people = sqlContext.read().json("hdfs://master:9000/testFile/people.json"); //DataFrame people = sqlContext.jsonFile("hdfs://master:9000/testFile/people.json"); // The inferred schema can be visualized using the printSchema() method. people.printSchema(); // root // |-- age: integer (nullable = true) // |-- name: string (nullable = true) // Register this DataFrame as a table. people.registerTempTable("people"); // SQL statements can be run by using the sql methods provided by sqlContext. DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. List<String> jsonData = Arrays.asList( "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData); DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD); anotherPeople.show(); } }
至于怎么用spark操作hive和其他数据库,以后再做学习
码字不易,转载请指明出处http://blog.csdn.net/tanggao1314/article/details/51594942
更多内容,请参考官网 Spark sql 编程指南
Spark(1.6.1) Sql 编程指南+实战案例分析
标签: