Spark Scala 将数据保存到Mysql
时间:2021-07-01 10:21:17
帮助过:4人阅读
build>
<sourceDirectory>src/main/scala
</sourceDirectory>
<testSourceDirectory>src/test/scala
</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven
</groupId>
<artifactId>scala-maven-plugin
</artifactId>
<version>3.2.2
</version>
<executions>
<execution>
<goals>
<goal>compile
</goal>
<goal>testCompile
</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile
</arg>
<arg>${project.build.directory}/.scala_dependencies
</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins
</groupId>
<artifactId>maven-shade-plugin
</artifactId>
<version>2.4.3
</version>
<executions>
<execution>
<phase>package
</phase>
<goals>
<goal>shade
</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*
</artifact>
<excludes>
<exclude>META-INF/*.SF
</exclude>
<exclude>META-INF/*.DSA
</exclude>
<exclude>META-INF/*.RSA
</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
scala代码
package com.ScalcForSpark.Core
import java.sql.{DriverManager}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object ToMysql extends App {
//创建Spark连接
val sparkConf=new SparkConf().setAppName("ToMysal").setMaster("local[2]")
val sc=new SparkContext(sparkConf)
//读取本地文件,并且每行数据用逗号拆分
private val rdd: RDD[Array[String]] = sc.textFile("F:\\Work\\demodata\\person.txt").map(_.split(","))
//按照分区进行遍历
rdd.foreachPartition(i=>{
//创建mysql连接
val connection= DriverManager.getConnection("jdbc:mysql://localhost/spark?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC","root","Aa123!@#")
val sql="insert into user(id,name,age) values(?,?,?)"
val ps = connection.prepareStatement(sql)
for(a<-i){
ps.setInt(1,a(0).toInt)
ps.setString(2,a(1))
ps.setInt(3,a(2).toInt)
ps.addBatch()
}
ps.executeBatch()
ps.close()
connection.close()
})
}
Spark Scala 将数据保存到Mysql
标签:数据 ext apach tin nbsp code col use app