com.shenyuchong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
public class App
{
/**
* 作用:
* 将关系型数据库数据导入hdfs(sql方式)
* 支持mysql和oracle
* 支持覆盖和追加模式
* 支持增量导入(取checkColumn字段的最大值)
* 支持去重导入(数据源主键inKey,hdfs表主键outKey,多字段使用concat函数(以实际数据源字段连接函数为准))
*/
public static String ip = "127.0.0.1"
;
public static String port = "3306"
;
public static String baseType = "mysql"
;
public static String inBase = "in_base"
;
public static String userName = "un"
;
public static String password = "pas"
;
public static String sql = "select 1"
;
public static String hdfs="hdfs://127.0.0.1:9000"
;
public static String outBase = "base"
;
public static String outTable = "table"
;
public static String noticeUrl="http://127.0.0.1:6009/schedule/schedule/donothing"
;
public static String writeMode = "append"
;
public static String checkColumn = ""
;
public static String inKey = ""
;
public static String outKey = ""
;
public static void main( String[] args )
{
for (
int i = 0; i < args.length-1; i++
) {
if (args[i].equals("-ip")) ip=args[i + 1];
//数据源地址
if (args[i].equals("-port")) port=args[i + 1];
//数据源端口
if (args[i].equals("-base_type")) baseType=args[i + 1];
//数据源类型
if (args[i].equals("-in_base")) inBase = args[i + 1];
//数据源数据库名称
if (args[i].equals("-in_key")) inKey = args[i + 1];
//数据源主键
if (args[i].equals("-out_key")) outKey = args[i + 1];
//HDFS表主键
if (args[i].equals("-user_name")) userName=args[i + 1];
//数据源用户名
if (args[i].equals("-password")) password=args[i + 1];
//数据源密码
if (args[i].equals("-sql")) sql=args[i + 1];
//导出语句(普通查询语句)
if (args[i].equals("-hdfs")) hdfs=args[i + 1];
//HDFS地址
if (args[i].equals("-out_base")) outBase=args[i + 1];
//输出数据库名
if (args[i].equals("-out_table")) outTable=args[i + 1];
//输出表名
if (args[i].equals("-notice_url")) noticeUrl=args[i + 1];
//完成通知地址
if (args[i].equals("-write_mode")) writeMode=args[i + 1];
//写入模式:overwrite|append
if (args[i].equals("-check_column")) checkColumn=args[i + 1];
//增量追加检查字段
}
/**
* 必要的临时变量
*/
SparkSession spark =
SparkSession.builder().getOrCreate();
String tmpTable = outBase+"_"+
outTable;
String condition = ""
;
String driver = ""
;
String url = ""
;
/**
* 根据数据源类型加载驱动
*/
if ("mysql"
.equals(baseType.toLowerCase())) {
driver = "com.mysql.cj.jdbc.Driver"
;
url = "jdbc:mysql://" + ip + ":" + port + "/" + outBase+ "?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false"
;
} else if ("oracle"
.equals(baseType.toLowerCase())) {
driver = "oracle.jdbc.driver.OracleDriver"
;
url = "jdbc:oracle:thin:@" + ip + ":" + port + ":" +
outBase;
}
/**
* 写入模式:追加|覆盖
*/
SaveMode saveMode =
SaveMode.Append;
if("overwrite"
.equals(writeMode))
saveMode =
SaveMode.Overwrite;
String outSql = "select * from rdbTmpTable "
;
try {
FileSystem fs = FileSystem.get(
new URI(hdfs),
new Configuration(), "root"
);
/**
* 检查给定库表的路径是否存在
* 若存在则注册该路径到临时表
* 表存在条件下checkColumn增量检查字段和inKey、outKey主键才起效,并拼装到导出语句
*/
if(fs.exists(
new Path("/user/"+outBase+"/"+outTable))&&fs.exists(
new Path("/user/"+outBase+"/"+outTable+"/_SUCCESS"
))){
spark.read().parquet(hdfs+"/user/"+outBase+"/"+outTable+"/*").createOrReplaceTempView(outBase+"_"+
outTable);
/**
* 增量检查字段拼装
*/
if (checkColumn !=
null && !""
.equals(checkColumn)) {
String lastValue = spark.sql("select max("+checkColumn+") from "+outBase+"_"+outTable).collectAsList().get(0).get(0
).toString();
condition = " where " + checkColumn + " >‘" + lastValue + "‘"
;
}
/**
* 加载远程数据源并注册临时表
*/
spark.read().format("jdbc").option("driver", driver).option("url"
, url)
.option("user", userName).option("password"
,password)
.option("dbtable", "(select * from (" +sql+ ") tmp_table1 " + condition +") tmp_table2 "
)
.load().registerTempTable("rdbTmpTable"
);
/**
* 若inKey、outKey都不为空,添加主键约束
*/
if(!"".equals(inKey)&&!""
.equals(outKey))
outSql = "select * from rdbTmpTable where "+inKey+" not in ( select "+outKey+" from "+tmpTable+")"
;
}
/**
* 打印
*/
spark.sql("select * from rdbTmpTable"
).show();
spark.sql("select "+outKey+" from "+
tmpTable).show();
spark.sql(outSql).show();
/**
* 将数据写入hdfs
*/
spark.sql(outSql).write().format("parquet").mode(saveMode).save(hdfs+"/user/"+outBase+"/"+
outTable);
} catch (Exception e) {
e.printStackTrace();
}
/**
* 通知后续服务直到后续服务接受了请求
*/
boolean noticed=
false;
try {
while(!
noticed){
Thread.sleep(2000
);
noticed =
connectSuccess(noticeUrl);
}
} catch (Exception e) {
e.printStackTrace();
}
spark.log().info("---------------:成功!!"
);
}
/**
* 根据地址请求服务,请求成功则返回true
*/
public static boolean connectSuccess(String path){
URL url;
try {
url =
new URL(noticeUrl);
HttpURLConnection con =
(HttpURLConnection) url.openConnection();
if(con.getResponseCode()==200)
return true;
} catch (Exception e) {
return false;
}
return false;
}
}
maven打包后使用:
sh /opt/apps/spark/bin/spark-submit --name mysql2hdfs --class com.gbd.App --master spark://127.0.0.1:7077 --deploy-mode client --executor-memory 8G --total-executor-cores 4 /opt/apps/schedule/sparkrdbms2hdfs-2.0.jar -ip 127.0.0.1 -port 3306 -base_type mysql -user_name root -password root -base_type mysql -out_base od -out_table table1 -hdfs hdfs://127.0.0.1:9000 -in_key "concat(id,datetime)" -out_key "concat(id,datetime)" -in_base ulanqab -sql "select t.* from table1 t where datetime >=CONCAT(DATE_ADD(CURDATE(),INTERVAL 1 DAY),‘ ‘,‘00:00:00‘) and datetime <=CONCAT(DATE_ADD(CURDATE(),INTERVAL 2 DAY),‘ ‘,‘23:00:00‘) " -notice_url http://127.0.0.1:6009/schedule/schedule/donothing
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>gbd</groupId>
<artifactId>sparkrdbms2hdfs</artifactId>
<version>2.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.11</version><!--$NO-MVN-MAN-VER$ -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc8</artifactId>
<version>12.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.shenyuchong.App</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
基于spark的关系型数据库到HDFS的数据导入
标签:connect jdbc null zone 用户 sha 关系 数据导入 cee