当前位置:Gxlcms > 数据库问题 > day61-Spark SQL数据加载和保存内幕深度解密实战

day61-Spark SQL数据加载和保存内幕深度解密实战

时间:2021-07-01 10:21:17 帮助过:2人阅读

("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0")
def load(path:String): DataFrame = {
  read.load(path)
}

/**
 * Returns the dataset stored at path asa DataFrame, using the given data source.
 *
 * @group genericdata
 * @deprecated As of 1.4.0,replaced by
`read().format(source).load(path)`.
 *             This will be removed in Spark 2.0.
 */
@deprecated("Useread.format(source).load(path). This will be removed in Spark 2.0.", "1.4.0")
def load(path:String, source:String): DataFrame = {
  read.format(source).load(path)
}

 

DataFrameReader源码:

/**
 * Specifies the input data source format.
 *
 * @since 1.4.0
 */
def format(source: String): DataFrameReader = {
  this.source = source
  this
}

 

* Loads input inas a [[DataFrame]],for data sources that don‘t require a path (e.g. external
 * key-value stores).
 *
 * @since 1.4.0
 */
def load(): DataFrame = {
 
val resolved= ResolvedDataSource(
    sqlContext
,
   
userSpecifiedSchema = userSpecifiedSchema,
   
partitionColumns = Array.empty[String],
   
provider = source,
   
options = extraOptions.toMap)
  DataFrame(sqlContext
, LogicalRelation(resolved.relation))
}

 

ResolvedDataSource源码

object ResolvedDataSource extends Logging {

 
/** A map to maintain backward compatibility in case wemove data sources around. */
 
private val backwardCompatibilityMap = Map(
   
"org.apache.spark.sql.jdbc" -> classOf[jdbc.DefaultSource].getCanonicalName,
   
"org.apache.spark.sql.jdbc.DefaultSource" -> classOf[jdbc.DefaultSource].getCanonicalName,
   
"org.apache.spark.sql.json" -> classOf[json.DefaultSource].getCanonicalName,
   
"org.apache.spark.sql.json.DefaultSource" -> classOf[json.DefaultSource].getCanonicalName,
   
"org.apache.spark.sql.parquet" -> classOf[parquet.DefaultSource].getCanonicalName,
   
"org.apache.spark.sql.parquet.DefaultSource" -> classOf[parquet.DefaultSource].getCanonicalName
  )

可以直接读取数据格式:jdbc,parquet

def apply(
    sqlContext: SQLContext
,
   
provider: String,
   
partitionColumns: Array[String],
   
mode: SaveMode,
   
options: Map[String, String],
   
data: DataFrame): ResolvedDataSource = {

DataFramtWriter源码:

/**
 * Specifies the behavior when data ortable already exists. Options include:
 *  -
`SaveMode.Overwrite`: overwrite the existing data.
 *  -
`SaveMode.Append`:append the data.
 *  -
`SaveMode.Ignore`:ignore the operation (i.e. no-op).
 *  -
`SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
 *
 * @since 1.4.0
 */
def mode(saveMode: SaveMode): DataFrameWriter = {
 
this.mode = saveMode
 
this
}

 

 

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.StructField;

/**
 * @author 作者 E-mail:
 * @version 创建时间:2016年5月8日 上午7:54:28 类说明
 */
public class SparkSQLLoadSaveOps {

    public static void main( String[] args ) {
        SparkConf conf = new SparkConf().setMaster( "local" ).setAppName( "rdd2d" );
        JavaSparkContext sc = new JavaSparkContext();
        SQLContext sqlContext = new SQLContext( sc );
        DataFrame peopleDF = sqlContext.read().format("json").load("D://person.json");
        peopleDF.select( "name" ).write().format( "json" ).save( "D://logs//personName.json" );

文件追加方式:是创建一个新文件还是append追加


day61-Spark SQL数据加载和保存内幕深度解密实战

标签:

人气教程排行