当前位置:Gxlcms > 数据库问题 > Spark(1.6.1) Sql 编程指南+实战案例分析

Spark(1.6.1) Sql 编程指南+实战案例分析

时间:2021-07-01 10:21:17 帮助过:2人阅读




JavaBeans类定义了表的模式,JavaBeans类的参数的名称使用反射来读取,然后称为列的名称。
JavaBeans类还可以嵌套或者包含复杂的类型,例如Sequences或者Arrays。
这个RDD可以隐式地转换为DataFrame,然后注册成表,
表可以在后续SQL语句中使用Spark SQL中的Scala接口支持自动地将包含JavaBeans类的RDD转换成DataFrame。



步骤:

1、使用JavaBeans类定义schema
2、创建一个SQLContext
3、通过调用createDataFrame方法模式应用到所有现有的RDD,并为JavaBean提供class对象  达到将RDD转换成DataFrame
4、创建一个DataFrame,并将它注册成表。
5、使用sqlContext提供的sql方法,就可以使用SQL语句来查询了。查询后返回的结果是DataFrame,它支持所有的RDD操作


首先写一个JavaBean类,实现序列化接口,并提供get和set方法

package com.tg.spark.sql;

import scala.Serializable;

public class Person implements Serializable {
	  /**
	 * 
	 */
	private static final long serialVersionUID = 727694963564948838L;
	private String name;
	private int age;

	  public String getName() {
	    return name;
	  }

	  public void setName(String name) {
	    this.name = name;
	  }

	  public int getAge() {
	    return age;
	  }

	  public void setAge(int age) {
	    this.age = age;
	  }
	}


package com.tg.spark.sql;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.storage.StorageLevel;

import java.util.List;

import org.apache.spark.SparkConf;
public class CreateDataFrame1 {
	
	public static void main(String[] args) {
		SparkConf conf=new SparkConf();
		conf.set("spark.testing.memory", "2147480000");		//因为jvm无法获得足够的资源
		JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
		System.out.println(sc);
		// sc is an existing JavaSparkContext.
		SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

		// Load a text file and convert each line to a JavaBean.
		JavaRDD<Person> people = sc.textFile("hdfs://master:9000/testFile/people.txt").map(
		  new Function<String, Person>() {
		    public Person call(String line) throws Exception {
		      String[] parts = line.split(",");

		      Person person = new Person();
		      person.setName(parts[0]);
		      person.setAge(Integer.parseInt(parts[1].trim()));

		      return person;
		    }
		  });

		//A schema can be applied to an existing RDD by calling createDataFrame and providing the Class object for the JavaBean.
		
		
		// Apply a schema to an RDD of JavaBeans and register it as a table.
		DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
		schemaPeople.registerTempTable("people");

		// SQL can be run over RDDs that have been registered as tables.
		DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

		// The results of SQL queries are DataFrames and support all the normal RDD operations.
		// The columns of a row in the result can be accessed by ordinal.
		List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
		  public String call(Row row) {
		    return "Name: " + row.getString(0);
		  }
		}).collect();
		
		teenagers.persist(StorageLevel.MEMORY_ONLY());
		System.out.println(teenagerNames);
	}

}


上面的这段代码

DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
SQLContext中的sql函数使应用可以以编程方式运行SQL查询,并且将结果以DataFrame形式返回

以编程方式指定模式(Programmatically Specifying the Schema)

不知道RDD的列和它的类型时

步骤:

1.从原有的RDD中创建包含行的RDD。
2.创建一个由StructType表示的模式,StructType符合由步骤1创建的RDD的行的结构。
3.通过SQLContext提供的createDataFrame方法,将模式应用于包含行的RDD。

package com.tg.spark.sql;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;

public class CreateDataFrame2 {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
		JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
		System.out.println(sc);
		// sc is an existing JavaSparkContext.
		SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
		// Load a text file and convert each line to a JavaBean.
		JavaRDD<String> people = sc.textFile("hdfs://master:9000/testFile/people.txt");

		// Convert records of the RDD (people) to Rows.
		JavaRDD<Row> rowRDD = people.map(new Function<String, Row>() {
			public Row call(String record) throws Exception {
				String[] fields = record.split(",");
				return RowFactory.create(fields[0], fields[1].trim());
			}
		});

		// The schema is encoded in a string
		String schemaString = "name age";

		// Generate the schema based on the string of schema
		List<StructField> fields = new ArrayList<StructField>();
		for (String fieldName : schemaString.split(" ")) {
			// true表示可以为空
			fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
		}
		StructType schema = DataTypes.createStructType(fields);

		// Apply the schema to the RDD.
		DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);

		// Register the DataFrame as a table.
		peopleDataFrame.registerTempTable("people");

		// SQL can be run over RDDs that have been registered as tables.
		DataFrame results = sqlContext.sql("SELECT name FROM people");

		// The results of SQL queries are DataFrames and support all the normal
		// RDD operations.
		// The columns of a row in the result can be accessed by ordinal.
		List<String> names = results.javaRDD().map(new Function<Row, String>() {
			public String call(Row row) {
				return "Name: " + row.getString(0);
			}
		}).collect();
		results.persist(StorageLevel.MEMORY_ONLY());
		System.out.println(names);
	}

}



数据源(Data Sources)

Spark SQL支持通过DataFrame接口在多种数据源上进行操作。一个DataFrame可以如同一个标准的RDDs那样进行操作,还可以注册成临时的表。将一个DataFrame注册成临时表允许你在它的数据上运行SQL查询。本节介绍使用Spark数据源装载和保存数据的常用方法,使用Spark数据源保存数据。然后进入可用于内置数据源的特定选项。

通用的加载/保存功能(Generic Load/Save Functions)

在最简单的形式中,默认的数据源(parquet除非通过spark.sql.sources.default另外进行配置)将被用于所有的操作。


package com.tg.spark.sql;

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.storage.StorageLevel;
/**
 * 加载默认的数据源格式并保存
 * //第一种读取方式xxxFile(path)
 * @author Administrator
 *
 */
public class DataSource {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
		JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
		System.out.println(sc);
		// sc is an existing JavaSparkContext.
		SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
		
		DataFrame df = sqlContext.read().load("hdfs://master:9000/testFile/users.parquet");
		
		df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
		//指定保存模式
		//df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");
		//第一种读取方式
		DataFrame parquetFile = sqlContext.parquetFile("namesAndFavColors.parquet");
		
		parquetFile.registerTempTable("people");

		// SQL can be run over RDDs that have been registered as tables.
		DataFrame teenagers = sqlContext.sql("SELECT name FROM people ");

		// The results of SQL queries are DataFrames and support all the normal RDD operations.
		// The columns of a row in the result can be accessed by ordinal.
		List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
		  public String call(Row row) {
		    return "Name: " + row.getString(0);
		  }
		}).collect();
		
		teenagers.persist(StorageLevel.MEMORY_ONLY());
		System.out.println(teenagerNames);
	}
		
	
}


手动指定选项(Manually Specifying Options)

你还可以手动指定数据源,这些数据源将与任何额外的选项一同使用,你希望将这些选项传入到数据源中。数据源是通过它们的全名来指定的(如org.apache.spark.sql.parquet),但是对于内置的数据源,你也可以使用简短的名称(json, parquet, jdbc)。任何类型的DataFrames使用这些语法可以转化成其他的数据源:

package com.tg.spark.sql;

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.storage.StorageLevel;
/**
 * 加载指定的数据源格式并保存
 * //第二种读取方式sqlContext.read().XXX(path)
 * @author Administrator
 *
 */
public class DataSource2 {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
		JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
		System.out.println(sc);
		// sc is an existing JavaSparkContext.
		SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
		DataFrame df = sqlContext.read().format("json").load("hdfs://master:9000/testFile/people.json");
		df.select("name", "age").write().format("parquet").save("people.parquet");
		DataFrame parquetFile = sqlContext.read().parquet("people.parquet");

		// Parquet files can also be registered as tables and then used in SQL statements.
		parquetFile.registerTempTable("parquetFile");
		DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
		List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
		  public String call(Row row) {
		    return "Name: " + row.getString(0);
		  }
		}).collect();
	
		
		teenagers.persist(StorageLevel.MEMORY_ONLY());
		System.out.println(teenagerNames);
	}
}

保存模式(Save Modes)

Save操作可以可选择性地接收一个SaveModel,如果数据已经存在了,指定如何处理已经存在的数据。意识到这些保存模式没有利用任何锁,也不是原子的,这很重要。因此,如果有多个写入者试图往同一个地方写入,这是不安全的。此外,当执行一个Overwrite,在写入新的数据之前会将原来的数据进行删除。

df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");


Scala/Java

Python

Meaning

SaveMode.ErrorIfExists (default)

"error" (default)

When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown. 

当往一个数据源中保存一个DataFrame,如果数据已经存在,会抛出一个异常。

SaveMode.Append

"append"

When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data. 

当往一个数据源中保存一个DataFrame,如果data/table已经存在,DataFrame的内容会追加到已经存在的数据后面。

SaveMode.Overwrite

"overwrite"

Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame. 

Overwrite模式意味着当向数据源中保存一个DataFrame时,如果data/table已经存在了,已经存在的数据会被DataFrame中内容覆盖掉。

SaveMode.Ignore

"ignore"

Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a `CREATE TABLE IF NOT EXISTS` in SQL. 

Ignore模式意味着当向数据源中保存一个DataFrame时,如果数据已经存在,save操作不会将DataFrame的内容进行保存,也不会修改已经存在的数据。这与SQL中的`CREATE TABLE IF NOT EXISTS`相似。

Parquet 文件

Parquet是一种列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL支持度对Parquet文件的读和写,自动保存原有数据的模式。


代码上面用过一次
package com.tg.spark.sql;

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.storage.StorageLevel;
/**
 * 加载默认的数据源格式并保存
 * //第一种读取方式xxxFile(path)
 * @author Administrator
 *
 */
public class DataSource {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
		JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
		System.out.println(sc);
		// sc is an existing JavaSparkContext.
		SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
		
		DataFrame df = sqlContext.read().load("hdfs://master:9000/testFile/users.parquet");
		
		df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
		//指定保存模式
		//df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");
		//第一种读取方式
		DataFrame parquetFile = sqlContext.parquetFile("namesAndFavColors.parquet");
		
		parquetFile.registerTempTable("people");

		// SQL can be run over RDDs that have been registered as tables.
		DataFrame teenagers = sqlContext.sql("SELECT name FROM people ");

		// The results of SQL queries are DataFrames and support all the normal RDD operations.
		// The columns of a row in the result can be accessed by ordinal.
		List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
		  public String call(Row row) {
		    return "Name: " + row.getString(0);
		  }
		}).collect();
		
		teenagers.persist(StorageLevel.MEMORY_ONLY());
		System.out.println(teenagerNames);
	}
		
	
}

JSON数据集(JSON Datasets)

Spark SQL可以自动推断出JSON数据集的模式,将它作为DataFrame进行加载。这个转换可以通过使用SQLContext中的下面两个方法中的任意一个来完成。
? jsonFile - 从一个JSON文件的目录中加载数据,文件中的每一个行都是一个JSON对象。

? jsonRDD - 从一个已经存在的RDD中加载数据,每一个RDD的元素是一个包含一个JSON对象的字符串。

代码前面都有涉及到

public class DataSource3 {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
		JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
		System.out.println(sc);
		// sc is an existing JavaSparkContext.
		SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

		// A JSON dataset is pointed to by path.
		// The path can be either a single text file or a directory storing text files.
		DataFrame people = sqlContext.read().json("hdfs://master:9000/testFile/people.json");
		//DataFrame people = sqlContext.jsonFile("hdfs://master:9000/testFile/people.json");

		// The inferred schema can be visualized using the printSchema() method.
		people.printSchema();
		// root
		//  |-- age: integer (nullable = true)
		//  |-- name: string (nullable = true)

		// Register this DataFrame as a table.
		people.registerTempTable("people");

		// SQL statements can be run by using the sql methods provided by sqlContext.
		DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

		// Alternatively, a DataFrame can be created for a JSON dataset represented by
		// an RDD[String] storing one JSON object per string.
		List<String> jsonData = Arrays.asList(
		  "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
		JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
		DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD);
		anotherPeople.show();
	}
}

Datasets

Datasets是新出的接口在1.6版本,为了使RDDS更便利(强类型,能使用强大的lambda函数),可以通过JVM对象构建或者通过熟练使用函数化转换得到(map, flatMap, filter, etc)
The unified Dataset API can be used both in Scala and Java. Python does not yet have support for the Dataset API, but due to its dynamic nature many of the benefits are already available (i.e. you can access the field of a row by name naturally row.columnName). Full python support will be added in a future release.

至于怎么用spark操作hive和其他数据库,以后再做学习

码字不易,转载请指明出处http://blog.csdn.net/tanggao1314/article/details/51594942

更多内容,请参考官网 Spark sql 编程指南


Spark(1.6.1) Sql 编程指南+实战案例分析

标签:

人气教程排行