当前位置:Gxlcms > 数据库问题 > spark写入mysql

spark写入mysql

时间:2021-07-01 10:21:17 帮助过:4人阅读

def singleDataSaveMysql(sql: String) = { val dataFrame2: DataFrame = ss.sql(sql) val resultRDD = df2rdd(dataFrame2) val value: RDD[Map[String, Map[String, Map[String, String]]]] = resultRDD.map(diagLis => { var diagLisMap: Map[String, Map[String, Map[String, String]]] = Map() val diag: String = diagLis._1 val lisText: String = diagLis._2 if (!diagLisMap.contains(diag)) { //用空行分割字符串 val itemResults: Array[String] = lisText.split("(\n|\r\n)\\s+") var itemSpecial: Map[String, Map[String, String]] = Map() for (i <- 0 until (itemResults.length)) { val split1: Array[String] = itemResults(i).split("\n") var item: String = "" var special: Map[String, String] = Map() if (split1.length > 1) for (j <- 0 until split1.length) { if (j == 0) { item = split1(j).replaceAll("【下沙】", "") } else { val splits: Array[String] = split1(j).split("\t") if (splits.length > 2) { val spell: String = splits(0).split(":")(0) val betw: String = splits(1) special += (spell -> betw) } } itemSpecial += (item -> special) } } diagLisMap += (diag -> itemSpecial) } diagLisMap }) val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://192.168.21.2351:3306/diagbot-app?useSSL=false" val username = "root" val password = "diagbot@20180822kwz" Class.forName(driver) //遍历测试 value.foreachPartition(l => { @transient var connectionMqcrm = DriverManager.getConnection(url, username, password) l.foreach(m => { for (k <- m) { val diag: String = k._1 val iteamSpecal: Map[String, Map[String, String]] = k._2 for (l <- iteamSpecal) { val iteam: String = l._1.toString().replace(":", "") //大项 val specails: Map[String, String] = l._2 for (spe <- specails) { val it: String = spe._1 val bet: String = spe._2 println(diag + "\t" + iteam + "\t" + it + "\t" + bet) val sql = "insert into doc_diag_lises(diag,iteam,it,bet) values (‘" + diag + "‘,‘" + iteam + "‘,‘" + it + "‘,‘" + bet + "‘)" val statement: Statement = connectionMqcrm.createStatement() statement.executeUpdate(sql) } } } }) connectionMqcrm.close() }) }

private def df2rdd(dataFrame2: DataFrame) = {
val rowRdd: RDD[Row] = dataFrame2.rdd
val resultRDD: RDD[(String, String)] = rowRdd.map(row => {
val diag: String = row.get(0).toString
val liss: String = row.get(1).toString
(diag, liss)
})
resultRDD
}
 

第二种方式:

//将结果RDD映射到rowRDD
val resultRowRDD = arrayRDD.map(p =>Row(
  p._1.toInt,
  p._2.toString,
  new Timestamp(new java.util.Date().getTime)
))
//通过StructType直接指定每个字段的schema
val resultSchema = StructType(
  List(
    StructField("verify_num", IntegerType, true), 
    StructField("log_date", StringType, true), //是哪一天日志分析出来的结果
    StructField("create_time", TimestampType, true) //分析结果的创建时间
  )
)
//组装新的DataFrame
val DF = spark.createDataFrame(resultRowRDD,resultSchema)
//将结果写入到Mysql
DF.write.mode("append")
  .format("jdbc")
  .option("url","jdbc:mysql://192.168.1.97:3306/xiang_log")
  .option("dbtable","verify") //表名
  .option("user","root")
  .option("password","123456")
  .save()

第三种方式:

case class ManxingweiyanLis(diseaseName: String,cardId: String, lisName: String,lisResult:String,lisAndResult:String)
object jangganHive {
  val sparkConf: SparkConf = new SparkConf().setAppName(jangganHive.getClass.getSimpleName)
  val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
  val url = "jdbc:mysql://192.168.2.232:3306/jianggan?Unicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false";
  def main(args: Array[String]): Unit = {
    assc
    sparkSession.stop()
  }

  def assc: Unit = {
    import sparkSession.implicits._
    import sparkSession.sql
    val df: DataFrame = sql("select cardId,lisName,lisresult,lisbet from janggan.gaozhixuelis where lisbet !=\"\" and lisName !=\"清洁度\"")
    val rdd: RDD[Row] = df.rdd
    //计算化验结果
    val operatorLis: RDD[(String, String)] = rdd.map(row => {
      var i = ""
      val cardID: String = row.get(0).toString
      val lisName: String = row.get(1).toString
      try {
        val lisResult: String = row.get(2).toString
        val lisBet: String = row.get(3).toString
        if (lisResult.contains("+")) {
          (cardID + "&" + lisName, "阳性")
        } else if(lisResult.contains("阴性") || lisResult.contains("-")){
          (cardID + "&" + lisName, "阴性")
        }else {
          val splits: Array[String] = lisBet.split("-|-")
          if (lisResult.toDouble > splits(1).toDouble) {
            i = "升高"
          } else if (lisResult.toDouble < splits(0).toDouble) {
            i = "降低"
          }else{
            i="正常"
          }
          (cardID + "&" + lisName, i)
        }
      } catch {
        case e: Exception => {
          (cardID + "&" + lisName, "数据异常")
        }
      }
    })

    val frame: DataFrame = operatorLis.map(x => {
      ManxingweiyanLis("高脂血症",x._1.split("&")(0), x._1.split("&")(1), x._2,x._1.split("&")(1)+x._2)
    }).toDF()
    val proprttity=new Properties()
    proprttity.put("user", "root")
    proprttity.put("password", "123456")
    proprttity.put("driver", "com.mysql.jdbc.Driver")
    frame.write.mode(SaveMode.Append).jdbc(url, "exceptionLis", proprttity)
  }
}

 

spark写入mysql

标签:ESS   mat   dia   operator   manage   sel   failover   split   cut   

人气教程排行