当前位置:Gxlcms > 数据库问题 > spark/hadoop整合mongodb

spark/hadoop整合mongodb

时间:2021-07-01 10:21:17 帮助过:3人阅读

val mongoConfig = new Configuration( ) mongoConfig.set("mongo.input.uri", "mongodb://master:20000,slave1:20000,slave2:20000/yang.relation2") mongoConfig.set( "mongo.input.split_size", "32" )//输入的大小 mongoConfig.set( "mongo.input.split.read_shard_chunks", "true" )//读取分片 mongoConfig.set( "mongo.input.fields{\"srcid\":\"1\",\"dstid\":\"1\"}" ) //读取的时候只读取自己需要的列 1表示读出,0表示不需要类似mongodb里面的projecttion mongoConfig.set( "mongo.input.query","{\"dstid\":{\"$gt\":\"0\"}}" ) val readfile = sc.newAPIHadoopRDD( mongoConfig, classOf[ MongoInputFormat ], classOf[ Object ],classOf[ BSONObject ] ) readfile.count( ) //方案2 val sqlContex = new SQLContext( sc ) val builder = MongodbConfigBuilder(Map(Host -> Host -> List("master:27017","slave1:27017","slave2:27017"), Database -> "graphdb", Collection -> "mongo", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal ) ) val mconf = builder.build( ) val readfile2 = sqlContex.fromMongoDB( mconf ) readfile2.count()

spark整合mongodb之写入mongodb:
方案1:

val mongoConfig = new Configuration()
    mongoConfig.set("mongo.auth.uri","mongodb://"+ userName +":"+ pwd+"@"+hosts+"/admin")
    mongoConfig.set("mongo.output.uri","mongodb://"+ hosts + "/GRAPHDB.DB_GRAPH")
    saveRdd.saveAsNewAPIHadoopFile("", classOf[Object], classOf[BSONObject],
      classOf[MongoOutputFormat[Object, BSONObject]], mongoConfig)


方案2:

import MongodbConfig._
import com.mongodb.casbah.{WriteConcern => MongodbWriteConcern, MongoClient}
import com.stratio.provider.mongodb._
 val sqlContext = new SQLContext( sc )
 val property = Array("id","name","age","sex","info")
 val dataFrame = sqlContext.createDataFrame(  data ).toDF( property:_*)
 val builder = MongodbConfigBuilder(Map(Host -> List("master:27017","slave1:27017","slave2:27017"), Database -> "test",
      Collection -> "test", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal))
 val mongoConf = builder.build()
 val dataFrame: DataFrame = sqlcontex.createDataFrame( rdd )
 dataFrame.saveToMongodb(mongoConf,true)
方案3:
利用rdd的foreachPartition在每个paritition建立连接,导入数据,此时如果分区输比较多,分配给spark的cpu核数比较多的话,会出现很多问题,比如:在查看mongodb日志的时候,mongos进程有时候会挂掉,是因为mongodb在分配读写锁的时候出现了问题,而且还会出现OOM(无法创建本地线程,这一点本小白正在解决)。一定要在里面创建连接哟,否则会出现序列化问题。

hadoo整合mongodb更新:
val mongoConfig = new Configuration()
mongoConfig.set(“mongo.output.uri”,”mongodb://master:27017/db.table”)
saveRdd.saveAsNewAPIHadoopFile(“”, classOf[ Object ], classOf[ MongoUpdateWritable ],
classOf[ MongoOutputFormat[ Object,MongoUpdateWritable ] ],mongoConfig ).

更新的时候可以结合mongodb的数值修改器使用。以后有时间了给大家分享数据修改器的使用。本人小白一枚,如果有问题,希望大家给予指出,小杨在这里拜谢各位大神了。

spark/hadoop整合mongodb

标签:

人气教程排行