当前位置:Gxlcms > 数据库问题 > 基于spark的关系型数据库到HDFS的数据导入

基于spark的关系型数据库到HDFS的数据导入

时间:2021-07-01 10:21:17 帮助过:10人阅读

com.shenyuchong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import java.net.HttpURLConnection; import java.net.URI; import java.net.URL; public class App { /** * 作用: * 将关系型数据库数据导入hdfs(sql方式) * 支持mysql和oracle * 支持覆盖和追加模式 * 支持增量导入(取checkColumn字段的最大值) * 支持去重导入(数据源主键inKey,hdfs表主键outKey,多字段使用concat函数(以实际数据源字段连接函数为准)) */ public static String ip = "127.0.0.1"; public static String port = "3306"; public static String baseType = "mysql"; public static String inBase = "in_base"; public static String userName = "un"; public static String password = "pas"; public static String sql = "select 1"; public static String hdfs="hdfs://127.0.0.1:9000"; public static String outBase = "base"; public static String outTable = "table"; public static String noticeUrl="http://127.0.0.1:6009/schedule/schedule/donothing"; public static String writeMode = "append"; public static String checkColumn = ""; public static String inKey = ""; public static String outKey = ""; public static void main( String[] args ) { for (int i = 0; i < args.length-1; i++) { if (args[i].equals("-ip")) ip=args[i + 1]; //数据源地址 if (args[i].equals("-port")) port=args[i + 1]; //数据源端口 if (args[i].equals("-base_type")) baseType=args[i + 1]; //数据源类型 if (args[i].equals("-in_base")) inBase = args[i + 1]; //数据源数据库名称 if (args[i].equals("-in_key")) inKey = args[i + 1]; //数据源主键 if (args[i].equals("-out_key")) outKey = args[i + 1]; //HDFS表主键 if (args[i].equals("-user_name")) userName=args[i + 1]; //数据源用户名 if (args[i].equals("-password")) password=args[i + 1]; //数据源密码 if (args[i].equals("-sql")) sql=args[i + 1]; //导出语句(普通查询语句) if (args[i].equals("-hdfs")) hdfs=args[i + 1]; //HDFS地址 if (args[i].equals("-out_base")) outBase=args[i + 1]; //输出数据库名 if (args[i].equals("-out_table")) outTable=args[i + 1]; //输出表名 if (args[i].equals("-notice_url")) noticeUrl=args[i + 1]; //完成通知地址 if (args[i].equals("-write_mode")) writeMode=args[i + 1]; //写入模式:overwrite|append if (args[i].equals("-check_column")) checkColumn=args[i + 1];//增量追加检查字段 } /** * 必要的临时变量 */ SparkSession spark = SparkSession.builder().getOrCreate(); String tmpTable = outBase+"_"+outTable; String condition = ""; String driver = ""; String url = ""; /** * 根据数据源类型加载驱动 */ if ("mysql".equals(baseType.toLowerCase())) { driver = "com.mysql.cj.jdbc.Driver"; url = "jdbc:mysql://" + ip + ":" + port + "/" + outBase+ "?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false"; } else if ("oracle".equals(baseType.toLowerCase())) { driver = "oracle.jdbc.driver.OracleDriver"; url = "jdbc:oracle:thin:@" + ip + ":" + port + ":" + outBase; } /** * 写入模式:追加|覆盖 */ SaveMode saveMode = SaveMode.Append; if("overwrite".equals(writeMode)) saveMode = SaveMode.Overwrite; String outSql = "select * from rdbTmpTable "; try { FileSystem fs = FileSystem.get(new URI(hdfs), new Configuration(), "root"); /** * 检查给定库表的路径是否存在 * 若存在则注册该路径到临时表 * 表存在条件下checkColumn增量检查字段和inKey、outKey主键才起效,并拼装到导出语句 */ if(fs.exists(new Path("/user/"+outBase+"/"+outTable))&&fs.exists(new Path("/user/"+outBase+"/"+outTable+"/_SUCCESS"))){ spark.read().parquet(hdfs+"/user/"+outBase+"/"+outTable+"/*").createOrReplaceTempView(outBase+"_"+outTable); /** * 增量检查字段拼装 */ if (checkColumn != null && !"".equals(checkColumn)) { String lastValue = spark.sql("select max("+checkColumn+") from "+outBase+"_"+outTable).collectAsList().get(0).get(0).toString(); condition = " where " + checkColumn + " >‘" + lastValue + "‘"; } /** * 加载远程数据源并注册临时表 */ spark.read().format("jdbc").option("driver", driver).option("url", url) .option("user", userName).option("password",password) .option("dbtable", "(select * from (" +sql+ ") tmp_table1 " + condition +") tmp_table2 ") .load().registerTempTable("rdbTmpTable"); /** * 若inKey、outKey都不为空,添加主键约束 */ if(!"".equals(inKey)&&!"".equals(outKey)) outSql = "select * from rdbTmpTable where "+inKey+" not in ( select "+outKey+" from "+tmpTable+")"; } /** * 打印 */ spark.sql("select * from rdbTmpTable").show(); spark.sql("select "+outKey+" from "+tmpTable).show(); spark.sql(outSql).show(); /** * 将数据写入hdfs */ spark.sql(outSql).write().format("parquet").mode(saveMode).save(hdfs+"/user/"+outBase+"/"+outTable); } catch (Exception e) { e.printStackTrace(); } /** * 通知后续服务直到后续服务接受了请求 */ boolean noticed=false; try { while(!noticed){ Thread.sleep(2000); noticed = connectSuccess(noticeUrl); } } catch (Exception e) { e.printStackTrace(); } spark.log().info("---------------:成功!!"); } /** * 根据地址请求服务,请求成功则返回true */ public static boolean connectSuccess(String path){ URL url; try { url = new URL(noticeUrl); HttpURLConnection con = (HttpURLConnection) url.openConnection(); if(con.getResponseCode()==200) return true; } catch (Exception e) { return false; } return false; } }

maven打包后使用:

sh /opt/apps/spark/bin/spark-submit  --name mysql2hdfs --class com.gbd.App --master spark://127.0.0.1:7077  --deploy-mode client  --executor-memory 8G --total-executor-cores 4  /opt/apps/schedule/sparkrdbms2hdfs-2.0.jar -ip 127.0.0.1 -port 3306 -base_type mysql  -user_name root -password root -base_type mysql -out_base od -out_table table1 -hdfs hdfs://127.0.0.1:9000  -in_key "concat(id,datetime)" -out_key "concat(id,datetime)" -in_base ulanqab  -sql "select t.* from table1 t where datetime >=CONCAT(DATE_ADD(CURDATE(),INTERVAL 1 DAY),‘ ‘,‘00:00:00‘) and datetime <=CONCAT(DATE_ADD(CURDATE(),INTERVAL 2 DAY),‘ ‘,‘23:00:00‘)  " -notice_url http://127.0.0.1:6009/schedule/schedule/donothing

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>gbd</groupId>
    <artifactId>sparkrdbms2hdfs</artifactId>
    <version>2.0</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.11</version><!--$NO-MVN-MAN-VER$ -->
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>ojdbc8</artifactId>
            <version>12.1.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.3</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src</sourceDirectory>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.shenyuchong.App</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

基于spark的关系型数据库到HDFS的数据导入

标签:connect   jdbc   null   zone   用户   sha   关系   数据导入   cee   

人气教程排行