关于JdbcRDD实例化maprow参数
时间:2021-07-01 10:21:17
帮助过:3人阅读
[spark]
class JdbcPartition(idx: Int, val lower: Long, val upper: Long)
extends Partition {
override def index =
idx
}
// TODO: Expose a jdbcRDD function in SparkContext and mark this as semi-private
/**
* An RDD that executes an SQL query on a JDBC connection and reads results.
* For usage example, see test case JdbcRDDSuite.
*
* @param getConnection a function that returns an open Connection.
* The RDD takes care of closing the connection.
* @param sql the text of the query.
* The query must contain two ? placeholders for parameters used to partition the results.
* E.g. "select title, author from books where ? <= id and id <= ?"
* @param lowerBound the minimum value of the first placeholder
* @param upperBound the maximum value of the second placeholder
* The lower and upper bounds are inclusive.
* @param numPartitions the number of partitions.
* Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
* the query would be executed twice, once with (1, 10) and once with (11, 20)
* @param mapRow a function from a ResultSet to a single row of the desired result type(s).
* This should only call getInt, getString, etc; the RDD takes care of calling next.
* The default maps a ResultSet to an array of Object.
*/
class JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () =>
Connection,
sql: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
mapRow: (ResultSet) => T =
JdbcRDD.resultSetToObjectArray _)
extends RDD[T](sc, Nil) with Logging
其中,主要的问题在于mapRow参数的输入,参看注释:“mapRow a function from a ResultSet to a single row of the desired result type(s). This should only call getInt, getString, etc; the RDD takes care of calling next. The default maps a ResultSet to an array of Object. ”mapRow是一个方法,以ResultSet为输入参数,输出是想要得到的类型的单独一行。只能调用getInt、getString等方法,RDD负责调用next。默认将ResultSet映射到一个对象数组。
那么首先看下默认的参数:JdbcRDD.resultSetToObjectArray _,将方法转成偏函数作为参数
def resultSetToObjectArray(rs: ResultSet): Array[Object] = {
Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
}
如果默认参数转换成对象数组,则JdbcRDD.saveAsTextFile序列化结果,将在文件中存储为Java对象名称,而非真实结果。如果用for函数yield输出,再存储saveAsTextFile,则每个结果都被Vector包裹。可以看出,所有的结果输出都是以Java对象的形式存储。另外,还有其他一些例子的用法,诸如“r => (r.getString(6),r.getString(11))”单独标注列号等,都无法满足以字符串形式输出所有字段的要求为了改变这一状况,决定自己编写一个mapRow函数。
def maprow(rs: ResultSet): String = {
@volatile var res: String = ""
@volatile var i = 1
val cc = rs.getMetaData.getColumnCount
while (i < cc) {
if (i != 1) res = res +", " + rs.getString(i) else res = rs.getString(i)
i = i + 1
}
res
}
使用了while函数,连接每次getString得到的字符串,返回。
关于JdbcRDD的使用可以参考:
http://www.cnblogs.com/yuananyun/p/4281597.html、
http://blog.csdn.net/book_mmicky/article/details/38066067
END
关于JdbcRDD实例化maprow参数
标签: