当前位置:Gxlcms >
数据库问题 >
kafka -> structuredStreaming读取kafka日志 ->自定义输出到mysql
kafka -> structuredStreaming读取kafka日志 ->自定义输出到mysql
时间:2021-07-01 10:21:17
帮助过:13人阅读
/** *
*
* @autor gaowei
* @Date 2020-04-13 17:59
*/
object kafkaToMysqlTest {
class MysqlSink(url: String, user: String, pwd: String) extends ForeachWriter[Row] {
var conn: Connection =
_
override def open(partitionId: Long, epochId: Long): Boolean =
{
Class.forName("com.mysql.jdbc.Driver")
conn =
DriverManager.getConnection(url, user, pwd)
true
}
override def process(value: Row): Unit =
{
val p = conn.prepareStatement(
"replace into test(pid,pv) values(?,?)")
p.setString(1, value(
0).toString)
p.setLong(2, value(
1).toString.toLong)
p.execute()
}
override def close(errorOrNull: Throwable): Unit =
{
conn.close()
}
}
def main(args: Array[String]): Unit =
{
val spark = SparkSession.builder().appName(
"kafkaToMysqlTest").getOrCreate()
val brokers =
"localhost:9092"
val topics =
"test1"
val df = spark.readStream.format(
"kafka").option(
"kafka.bootstrap.servers", brokers).option(
"subscribe", topics).load()
import spark.implicits._
val kafkaDf = df.selectExpr(
"CAST(value AS STRING)").
as[String]
val dataFrame = kafkaDf.groupBy(
"value").count().
toDF("pid",
"pv")
//todo 将数据写到MYSQL
val mysqlSink =
new MysqlSink(
"jdbc:mysql://localhost:3306/warehouse",
"root",
"410410410")
val query = dataFrame.writeStream.outputMode(
"complete").
foreach(mysqlSink).start()
query.awaitTermination()
}
}
kafka -> structuredStreaming读取kafka日志 ->自定义输出到mysql
标签:connect row tst ast data mod write owa main