import java.util.Properties
2 import org.apache.spark.sql.types._
3 import org.apache.spark.sql.Row
4 import org.apache.spark.SparkConf
5 import org.apache.spark.SparkContext
6 import org.apache.spark.sql.SQLContext
7 object TestMySQL {
8 def main(args: Array[String]) {
9 val conf =
new SparkConf()
10 conf.setMaster("local"
)
11 .setAppName("TestMySQL")
//设置运行方式为本地
12 val sc =
new SparkContext(conf)
13 var sqlContext =
new SQLContext(sc)
14 val employeeRDD = sqlContext.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "
))
15 val schema = StructType(List(StructField("id", IntegerType,
true),StructField("name", StringType,
true),StructField("gender", StringType,
true),StructField("age", IntegerType,
true)))
16 val rowRDD = employeeRDD.map(p => Row(p(0).toInt,p(1).trim, p(2).trim,p(3
).toInt))
17 val employeeDF =
sqlContext.createDataFrame(rowRDD, schema)
18 val prop =
new Properties()
19 prop.put("user", "root"
)
20 prop.put("password", "1"
)
21 prop.put("driver","com.mysql.jdbc.Driver"
)
22 prop.put("url","jdbc:mysql://192.168.80.128:3306/sparktest"
)
23 employeeDF.write.mode("append").jdbc("jdbc:mysql://192.168.80.128:3306/sparktest", "sparktest.employee"
, prop)
24 val jdbcDF = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://192.168.80.128:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user","root").option("password", "1"
).load()
25 jdbcDF.agg("age" -> "max", "age" -> "sum"
)
26 jdbcDF.show()
27
28 }
29 }
运行截图:
测试中遇到的问题:
1:mysql服务器拒绝远程连接
解决方法:
授权法
(1)例如,你想myuser使用mypassword从任何主机连接到mysql服务器的话。
第一步:root用户登录;mysql>mysql -u root -p rootpassword;
第二步:赋予权限;mysql>GRANT ALL PRIVILEGES ON *.* TO ‘root‘@‘%‘ IDENTIFIED BY ‘mypassword‘ WITH GRANT OPTION;
第三步:mysql>FLUSH PRIVILEGES;
(2)如果你想允许用户myuser从ip为192.168.1.3的主机连接到mysql服务器,并使用mypassword作为密码
mysql>GRANT ALL PRIVILEGES ON *.* TO ‘root‘@‘192.168.xxx.xxx‘ IDENTIFIED BY ‘mypassword‘ WITH GRANT OPTION;
mysql>FLUSH PRIVILEGES;
(3)如果你想允许用户myuser从ip为192.168.1.3的主机连接到mysql服务器的dk数据库,并使用mypassword作为密码
mysql>GRANT ALL PRIVILEGES ON dk.* TO ‘root‘@‘192.168.xxx.xxx‘ IDENTIFIED BY ‘mypassword‘ WITH GRANT OPTION;
mysql>FLUSH PRIVILEGES;
编程实现利用 DataFrame 读写 MySQL 的数据
标签:load object 利用 load() mysql服务器 ctf alt 运行 types