当前位置:Gxlcms > 数据库问题 > kafka -> structuredStreaming读取kafka日志 ->自定义输出到mysql

kafka -> structuredStreaming读取kafka日志 ->自定义输出到mysql

时间:2021-07-01 10:21:17 帮助过:13人阅读

/** * * * @autor gaowei * @Date 2020-04-13 17:59 */ object kafkaToMysqlTest { class MysqlSink(url: String, user: String, pwd: String) extends ForeachWriter[Row] { var conn: Connection = _ override def open(partitionId: Long, epochId: Long): Boolean = { Class.forName("com.mysql.jdbc.Driver") conn = DriverManager.getConnection(url, user, pwd) true } override def process(value: Row): Unit = { val p = conn.prepareStatement("replace into test(pid,pv) values(?,?)") p.setString(1, value(0).toString) p.setLong(2, value(1).toString.toLong) p.execute() } override def close(errorOrNull: Throwable): Unit = { conn.close() } } def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("kafkaToMysqlTest").getOrCreate() val brokers = "localhost:9092" val topics = "test1" val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", topics).load() import spark.implicits._ val kafkaDf = df.selectExpr("CAST(value AS STRING)").as[String] val dataFrame = kafkaDf.groupBy("value").count(). toDF("pid","pv") //todo 将数据写到MYSQL val mysqlSink = new MysqlSink("jdbc:mysql://localhost:3306/warehouse", "root", "410410410") val query = dataFrame.writeStream.outputMode("complete").foreach(mysqlSink).start() query.awaitTermination() } }

 

kafka -> structuredStreaming读取kafka日志 ->自定义输出到mysql

标签:connect   row   tst   ast   data   mod   write   owa   main   

人气教程排行