时间:2021-07-01 10:21:17 帮助过:4人阅读
private def df2rdd(dataFrame2: DataFrame) = {
val rowRdd: RDD[Row] = dataFrame2.rdd
val resultRDD: RDD[(String, String)] = rowRdd.map(row => {
val diag: String = row.get(0).toString
val liss: String = row.get(1).toString
(diag, liss)
})
resultRDD
}
第二种方式:
//将结果RDD映射到rowRDD val resultRowRDD = arrayRDD.map(p =>Row( p._1.toInt, p._2.toString, new Timestamp(new java.util.Date().getTime) )) //通过StructType直接指定每个字段的schema val resultSchema = StructType( List( StructField("verify_num", IntegerType, true), StructField("log_date", StringType, true), //是哪一天日志分析出来的结果 StructField("create_time", TimestampType, true) //分析结果的创建时间 ) ) //组装新的DataFrame val DF = spark.createDataFrame(resultRowRDD,resultSchema) //将结果写入到Mysql DF.write.mode("append") .format("jdbc") .option("url","jdbc:mysql://192.168.1.97:3306/xiang_log") .option("dbtable","verify") //表名 .option("user","root") .option("password","123456") .save()
第三种方式:
case class ManxingweiyanLis(diseaseName: String,cardId: String, lisName: String,lisResult:String,lisAndResult:String) object jangganHive { val sparkConf: SparkConf = new SparkConf().setAppName(jangganHive.getClass.getSimpleName) val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() val url = "jdbc:mysql://192.168.2.232:3306/jianggan?Unicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false"; def main(args: Array[String]): Unit = { assc sparkSession.stop() } def assc: Unit = { import sparkSession.implicits._ import sparkSession.sql val df: DataFrame = sql("select cardId,lisName,lisresult,lisbet from janggan.gaozhixuelis where lisbet !=\"\" and lisName !=\"清洁度\"") val rdd: RDD[Row] = df.rdd //计算化验结果 val operatorLis: RDD[(String, String)] = rdd.map(row => { var i = "" val cardID: String = row.get(0).toString val lisName: String = row.get(1).toString try { val lisResult: String = row.get(2).toString val lisBet: String = row.get(3).toString if (lisResult.contains("+")) { (cardID + "&" + lisName, "阳性") } else if(lisResult.contains("阴性") || lisResult.contains("-")){ (cardID + "&" + lisName, "阴性") }else { val splits: Array[String] = lisBet.split("-|-") if (lisResult.toDouble > splits(1).toDouble) { i = "升高" } else if (lisResult.toDouble < splits(0).toDouble) { i = "降低" }else{ i="正常" } (cardID + "&" + lisName, i) } } catch { case e: Exception => { (cardID + "&" + lisName, "数据异常") } } }) val frame: DataFrame = operatorLis.map(x => { ManxingweiyanLis("高脂血症",x._1.split("&")(0), x._1.split("&")(1), x._2,x._1.split("&")(1)+x._2) }).toDF() val proprttity=new Properties() proprttity.put("user", "root") proprttity.put("password", "123456") proprttity.put("driver", "com.mysql.jdbc.Driver") frame.write.mode(SaveMode.Append).jdbc(url, "exceptionLis", proprttity) } }
spark写入mysql
标签:ESS mat dia operator manage sel failover split cut