当前位置:Gxlcms > 数据库问题 > Spark如何写入HBase/Redis/MySQL/Kafka

Spark如何写入HBase/Redis/MySQL/Kafka

时间:2021-07-01 10:21:17 帮助过:33人阅读

private val DEFAULT_ZOOKEEPER_QUORUM = "127.0.0.1:2181" private lazy val (table, conn) = createConnection def bulk(items:Iterator) = { items.foreach(conn.put(_)) conn.flush.... } ...... }

然后保证这个类在map,foreachRDD等函数下使用,譬如:

dstream.foreachRDD{ rdd =>
    rdd.foreachPartition{iter=>
        SimpleHBaseClient.bulk(iter)  
    }
}

为什么要保证放到foreachRDD/map 等这些函数里呢?
Spark的机制是先将用户的程序作为一个单机运行(运行者是Driver),Driver通过序列化机制,将对应算子规定的函数发送到Executor进行执行。这里,foreachRDD/map 等函数都是会发送到Executor执行的,Driver端并不会执行。里面引用的object 类 会作为一个stub 被序列化过去,object内部属性的初始化其实是在Executor端完成的,所以可以避过序列化的问题。

Pool也是类似的做法。然而我们并不建议使用pool,因为Spark 本身已经是分布式的,举个例子可能有100个executor,如果每个executor再搞10个connection
的pool,则会有100*10 个链接,Kafka也受不了。一个Executor 维持一个connection就好。

关于Executor挂掉丢数据的问题,其实就看你什么时候flush,这是一个性能的权衡。

Spark如何写入HBase/Redis/MySQL/Kafka

标签:producer   函数   data   redis   flush   mysq   executor   hba   ...   

人气教程排行