时间:2021-07-01 10:21:17 帮助过:2人阅读
DataFrameReader源码:
/** * Specifies the input data source format. * * @since 1.4.0 */ def format(source: String): DataFrameReader = { this.source = source this }
* Loads input inas a
[[DataFrame]],for data sources that don‘t require a path (e.g. external
* key-value stores).
*
* @since 1.4.0
*/
def load(): DataFrame = {
val resolved=
ResolvedDataSource(
sqlContext,
userSpecifiedSchema = userSpecifiedSchema,
partitionColumns = Array.empty[String],
provider = source,
options = extraOptions.toMap)
DataFrame(sqlContext, LogicalRelation(resolved.relation))
}
ResolvedDataSource源码
object
ResolvedDataSource extends
Logging {
/** A map to maintain backward compatibility in case wemove data sources around. */
private val backwardCompatibilityMap
= Map(
"org.apache.spark.sql.jdbc" ->
classOf[jdbc.DefaultSource].getCanonicalName,
"org.apache.spark.sql.jdbc.DefaultSource"
-> classOf[jdbc.DefaultSource].getCanonicalName,
"org.apache.spark.sql.json" ->
classOf[json.DefaultSource].getCanonicalName,
"org.apache.spark.sql.json.DefaultSource"
-> classOf[json.DefaultSource].getCanonicalName,
"org.apache.spark.sql.parquet" ->
classOf[parquet.DefaultSource].getCanonicalName,
"org.apache.spark.sql.parquet.DefaultSource"
-> classOf[parquet.DefaultSource].getCanonicalName
)
可以直接读取数据格式:jdbc,parquet
def
apply(
sqlContext: SQLContext,
provider: String,
partitionColumns: Array[String],
mode: SaveMode,
options: Map[String,
String],
data: DataFrame): ResolvedDataSource = {
DataFramtWriter源码:
/**
* Specifies the behavior when data ortable already exists. Options include:
* - `SaveMode.Overwrite`: overwrite the existing data.
* - `SaveMode.Append`:append the data.
* - `SaveMode.Ignore`:ignore the operation (i.e. no-op).
* - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
*
* @since 1.4.0
*/
def mode(saveMode: SaveMode): DataFrameWriter = {
this.mode
= saveMode
this
}
import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.StructField; /** * @author 作者 E-mail: * @version 创建时间:2016年5月8日 上午7:54:28 类说明 */ public class SparkSQLLoadSaveOps { public static void main( String[] args ) { SparkConf conf = new SparkConf().setMaster( "local" ).setAppName( "rdd2d" ); JavaSparkContext sc = new JavaSparkContext(); SQLContext sqlContext = new SQLContext( sc ); DataFrame peopleDF = sqlContext.read().format("json").load("D://person.json"); peopleDF.select( "name" ).write().format( "json" ).save( "D://logs//personName.json" );
文件追加方式:是创建一个新文件还是append追加
day61-Spark SQL数据加载和保存内幕深度解密实战
标签: