时间:2021-07-01 10:21:17 帮助过:3人阅读
spark整合mongodb之写入mongodb:
方案1:
val mongoConfig = new Configuration()
mongoConfig.set("mongo.auth.uri","mongodb://"+ userName +":"+ pwd+"@"+hosts+"/admin")
mongoConfig.set("mongo.output.uri","mongodb://"+ hosts + "/GRAPHDB.DB_GRAPH")
saveRdd.saveAsNewAPIHadoopFile("", classOf[Object], classOf[BSONObject],
classOf[MongoOutputFormat[Object, BSONObject]], mongoConfig)
方案2:
import MongodbConfig._
import com.mongodb.casbah.{WriteConcern => MongodbWriteConcern, MongoClient}
import com.stratio.provider.mongodb._
val sqlContext = new SQLContext( sc )
val property = Array("id","name","age","sex","info")
val dataFrame = sqlContext.createDataFrame( data ).toDF( property:_*)
val builder = MongodbConfigBuilder(Map(Host -> List("master:27017","slave1:27017","slave2:27017"), Database -> "test",
Collection -> "test", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal))
val mongoConf = builder.build()
val dataFrame: DataFrame = sqlcontex.createDataFrame( rdd )
dataFrame.saveToMongodb(mongoConf,true)
方案3:
利用rdd的foreachPartition在每个paritition建立连接,导入数据,此时如果分区输比较多,分配给spark的cpu核数比较多的话,会出现很多问题,比如:在查看mongodb日志的时候,mongos进程有时候会挂掉,是因为mongodb在分配读写锁的时候出现了问题,而且还会出现OOM(无法创建本地线程,这一点本小白正在解决)。一定要在里面创建连接哟,否则会出现序列化问题。
hadoo整合mongodb更新:
val mongoConfig = new Configuration()
mongoConfig.set(“mongo.output.uri”,”mongodb://master:27017/db.table”)
saveRdd.saveAsNewAPIHadoopFile(“”, classOf[ Object ], classOf[ MongoUpdateWritable ],
classOf[ MongoOutputFormat[ Object,MongoUpdateWritable ] ],mongoConfig ).
更新的时候可以结合mongodb的数值修改器使用。以后有时间了给大家分享数据修改器的使用。本人小白一枚,如果有问题,希望大家给予指出,小杨在这里拜谢各位大神了。
spark/hadoop整合mongodb
标签: