当前位置:Gxlcms > 数据库问题 > 大数据之存mysql数据库

大数据之存mysql数据库

时间:2021-07-01 10:21:17 帮助过:44人阅读

package com.sjw.flink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

object JDBCSinkTest {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val stream: DataStream[String] = env.socketTextStream("sunjunwei1.com",6666)

val dataStream: DataStream[SensorReading] = stream.map(data => {
val arr: Array[String] = data.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
})
dataStream.addSink(new MyJdbcSink())

env.execute()

}
}

class MyJdbcSink() extends RichSinkFunction[SensorReading]{

//定义链接
var conn:Connection = _
//定义插入预编译器
var insertStmt:PreparedStatement = _
//定义更新预编译器
var updateStmt:PreparedStatement = _

override def open(parameters: Configuration): Unit ={

conn = DriverManager.getConnection("jdbc:mysql://sunjunwei1.com:3306/1711F","root","986262")

insertStmt = conn.prepareStatement("INSERT INTO temperature(id,temp) VALUE(?,?)")

updateStmt = conn.prepareStatement("UPDATE temperature SET temp = ? WHERE id = ?")

}

override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {

//执行更新
updateStmt.setDouble(1,value.temperature)
updateStmt.setString(2,value.id)
updateStmt.execute()

//如果没有执行更新,那么就执行插入
if (updateStmt.getUpdateCount == 0) {
insertStmt.setString(1,value.id)
insertStmt.setDouble(2,value.temperature)
insertStmt.execute()
}

}

override def close(): Unit = {

insertStmt.close()
updateStmt.close()
conn.close()

}

}

大数据之存mysql数据库

标签:override   com   UNC   rman   rri   environ   预编译   val   大数据   

人气教程排行