时间:2021-07-01 10:21:17 帮助过:44人阅读
package com.sjw.flink
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
object JDBCSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[String] = env.socketTextStream("sunjunwei1.com",6666)
val dataStream: DataStream[SensorReading] = stream.map(data => {
val arr: Array[String] = data.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
})
dataStream.addSink(new MyJdbcSink())
env.execute()
}
}
class MyJdbcSink() extends RichSinkFunction[SensorReading]{
//定义链接
var conn:Connection = _
//定义插入预编译器
var insertStmt:PreparedStatement = _
//定义更新预编译器
var updateStmt:PreparedStatement = _
override def open(parameters: Configuration): Unit ={
conn = DriverManager.getConnection("jdbc:mysql://sunjunwei1.com:3306/1711F","root","986262")
insertStmt = conn.prepareStatement("INSERT INTO temperature(id,temp) VALUE(?,?)")
updateStmt = conn.prepareStatement("UPDATE temperature SET temp = ? WHERE id = ?")
}
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
//执行更新
updateStmt.setDouble(1,value.temperature)
updateStmt.setString(2,value.id)
updateStmt.execute()
//如果没有执行更新,那么就执行插入
if (updateStmt.getUpdateCount == 0) {
insertStmt.setString(1,value.id)
insertStmt.setDouble(2,value.temperature)
insertStmt.execute()
}
}
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
大数据之存mysql数据库
标签:override com UNC rman rri environ 预编译 val 大数据