当前位置:Gxlcms > 数据库问题 > spark连数据库

spark连数据库

时间:2021-07-01 10:21:17 帮助过:11人阅读

第一种方式,会告知无法识别SID,其实在连接时将orcl&user=kang&password=123456都当做其SID,其实就接近了。一般平时用jdbc连接数据库,url user password都分开,学习一下这种方式^^

Oracle的JDBC url三种方式:这

1 2 3 4 5 6 1.普通SID方式 jdbc:oracle:thin:username/password@x.x.x.1:1521:SID 2.普通ServerName方式 jdbc:oracle:thin:username/password@//x.x.x.1:1522/ABCD 3.RAC方式 jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=x.x.x.1)(PORT=1521))(ADDRESS=(PROTOCOL=TCP)(HOST=x.x.x.2)(PORT=1521)))(LOAD_BALANCE=yes)(CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=xxrac)))

具体参看这里

 

3.访问hive  

hive和spark sql的关系,参见

其实spark sql从一开始就支持hive。Spark提供了一个HiveContext的上下文,其实是SQLContext的一个子类,但从作用上来说,sqlContext也支持Hive数据源。只要在部署Spark的时候加入Hive选项,并把已有的hive-site.xml文件挪到$SPARK_HOME/conf路径下,我们就可以直接用Spark查询包含已有元数据的Hive表了。

1.Spark-sql方式

spark-sql是Spark bin目录下的一个可执行脚本,它的目的是通过这个脚本执行Hive的命令,即原来通过

hive>输入的指令可以通过spark-sql>输入的指令来完成。

spark-sql可以使用内置的Hive metadata-store,也可以使用已经独立安装的Hive的metadata store

配置步骤:

1. 将Hive的conf目录的hive-site.xml拷贝到Spark的conf目录

2. 将hive-site.xml中关于时间的配置的时间单位,比如ms,s全部删除掉

错误信息:Exception in thread "main" java.lang.RuntimeException: java.lang.NumberFormatException: For input string: "5s" 一直以为是输入格式的问题。。

3. 将mysql jdbc的驱动添加到Spark的Classpath上

1 export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/software/spark-1.2.0-bin-hadoop2.4/lib/mysql-connector-java-5.1.34.jar
1 2 3 [hadoop@hadoop bin]$ ./spark-sql  Spark assembly has been built with Hive, including Datanucleus jars on classpath  SET spark.sql.hive.version=0.13.1

提示编译的时候要带2个参数

重新编译:./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -DskipTests -Dhadoop.version=2.4.1 -Phive -Phive-thriftserver

在Spark-default中已经指定

 

创建表

1 2 3 spark-sql> create table word6 (id int,word string) row format delimited fields terminated by ‘,‘ stored as textfile ;   OK  Time taken: 10.852 seconds 

 导入数据

1 2 3 4 5 6 7 spark-sql> load data local inpath ‘/home/hadoop/word.txt‘ into table word6 Copying data from file:/home/hadoop/word.txt  Copying file: file:/home/hadoop/word.txt  Loading data to table default.word6  Table default.word6 stats: [numFiles=1, numRows=0, totalSize=31, rawDataSize=0 OK  Time taken: 2.307 seconds

 与其他数据源联合查询

1 select * from src join jdbcmysql on (src.key=jdbcmysql.id);

2.Spark-shell方式 

1 sqlContext.sql("select count(*) from hive_people").show()

  

4.将dataframe数据写入Hive分区表

DataFrame将数据写入hive中时,默认的是hive默认数据库,insertInto没有指定数据库的参数,使用下面方式将数据写入hive表或者hive表的分区中。这

1、将DataFrame数据写入到Hive表中

从DataFrame类中可以看到与hive表有关的写入Api有以下几个:

1 2 3 4 registerTempTable(tableName: String): Unit, insertInto(tableName: String): Unit insertInto(tableName: String, overwrite: Boolean): Unit saveAsTable(tableName: String, source: String, mode: [size=13.3333320617676px]SaveMode, options: Map[String, String]): Unit

还有很多重载函数,不一一列举

registerTempTable函数是创建spark临时表

insertInto函数是向表中写入数据,可以看出此函数不能指定数据库和分区等信息,不可以直接进行写入。

向hive数据仓库写入数据必须指定数据库,hive数据表建立可以在hive上建立,或者使用hiveContext.sql(“create table ....")

 

下面语句是向指定数据库数据表中写入数据:

1 2 3 4 5 6 7 case class Person(name:String,col1:Int,col2:String)  val sc = new org.apache.spark.SparkContext     val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)  import hiveContext.implicits._  hiveContext.sql("use DataBaseName" val data=sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))<br>data.toDF() insertInto("tableName")

创建一个case类将RDD中数据类型转为case类型,然后通过toDF转换为DataFrame,调用insertInto函数时,首先指定数据库,使用的是hiveContext.sql("use DataBaseName")语句,就可以将DataFrame数据写入hive数据表中了

 

2、将DataFrame数据写入hive指定数据表的分区中

hive数据表建立可以在hive上建立,或者使用hiveContext.sql(“create table ...."),使用saveAsTable时数据存储格式有限,默认格式为parquet,可以指定为json,如果有其他格式指定,尽量使用语句来建立hive表。

将数据写入分区表的思路是:首先将DataFrame数据写入临时表,之后是由hiveContext.sql语句将数据写入hive分区表中。具体操作如下:

1 2 3 4 5 6 7 8 case class Person(name:String,col1:Int,col2:String)  val sc = new org.apache.spark.SparkContext     val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)  import hiveContext.implicits._  hiveContext.sql("use DataBaseName" val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))  data.toDF().registerTempTable("table1" hiveContext.sql("insert into table2 partition(date=‘2015-04-02‘) select name,col1,col2 from table1")


使用以上方式就可以将dataframe数据写入hive分区表了。

spark连数据库

标签:copy   loading   val   远程   cin   def   awd   oop   联合查询   

人气教程排行