当前位置:Gxlcms > 数据库问题 > 使用hadoop mapreduce分析mongodb数据:(1)

使用hadoop mapreduce分析mongodb数据:(1)

时间:2021-07-01 10:21:17 帮助过:2人阅读

github.com/mongodb/mongo-hadoop $ cd mongo-hadoop $ ./gradlew jar
  • 编译时间比较长,成功编译之后mongo-hadoop-core-1.5.2.jar存在的路径是core/build/libs
  • 下载mongo-java-driver-3.0.4.jar
  • http://central.maven.org/maven2/org/mongodb/mongo-java-driver/3.0.4/
    选择
    mongo-java-driver-3.0.4.jar
  • 数据

    • 数据样例
    • > db.in.find({})
      { "_id" : ObjectId("5758db95ab12e17a067fbb6f"), "x" : "hello world" }
      { "_id" : ObjectId("5758db95ab12e17a067fbb70"), "x" : "nice to meet you" }
      { "_id" : ObjectId("5758db95ab12e17a067fbb71"), "x" : "good to see you" }
      { "_id" : ObjectId("5758db95ab12e17a067fbb72"), "x" : "world war 2" }
      { "_id" : ObjectId("5758db95ab12e17a067fbb73"), "x" : "see you again" }
      { "_id" : ObjectId("5758db95ab12e17a067fbb74"), "x" : "bye bye" }
    • 最后的结果
    • > db.out.find({})
      { "_id" : "2", "value" : 1 }
      { "_id" : "again", "value" : 1 }
      { "_id" : "bye", "value" : 2 }
      { "_id" : "good", "value" : 1 }
      { "_id" : "hello", "value" : 1 }
      { "_id" : "meet", "value" : 1 }
      { "_id" : "nice", "value" : 1 }
      { "_id" : "see", "value" : 2 }
      { "_id" : "to", "value" : 2 }
      { "_id" : "war", "value" : 1 }
      { "_id" : "world", "value" : 2 }
      { "_id" : "you", "value" : 3 }
    • 目标是统计每个文档中出现的词频,并且把单词作为key,词频作为value存在mongodb中

    Hadoop mapreduce代码

    • Mapreduce 代码
       1 import java.util.*; 
       2 import java.io.*;
       3 
       4 import org.bson.*;
       5 
       6 import com.mongodb.hadoop.MongoInputFormat;
       7 import com.mongodb.hadoop.MongoOutputFormat;
       8 
       9 import org.apache.hadoop.conf.Configuration;
      10 import org.apache.hadoop.io.*;
      11 import org.apache.hadoop.mapreduce.*;
      12 
      13 
      14 public class WordCount {
      15     public static class TokenizerMapper extends Mapper<Object, BSONObject, Text, IntWritable> {
      16         private final static IntWritable one = new IntWritable(1);
      17         private Text word = new Text();
      18         public void map(Object key, BSONObject value, Context context ) 
      19                 throws IOException, InterruptedException {
      20             System.out.println( "key: " + key );
      21             System.out.println( "value: " + value );
      22             StringTokenizer itr = new StringTokenizer(value.get( "x" ).toString());
      23             while (itr.hasMoreTokens()) {
      24                 word.set(itr.nextToken());
      25                 context.write(word, one);
      26             }
      27         }
      28     }
      29     public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
      30         private IntWritable result = new IntWritable();
      31         public void reduce(Text key, Iterable<IntWritable> values, Context context )
      32             throws IOException, InterruptedException {
      33             int sum = 0;
      34             for (IntWritable val : values) {
      35                 sum += val.get();
      36             }
      37             result.set(sum);
      38             context.write(key, result);
      39         }
      40     }
      41     public static void main(String[] args) throws Exception {
      42         Configuration conf = new Configuration();
      43         conf.set( "mongo.input.uri" , "mongodb://localhost/testmr.in" );
      44         conf.set( "mongo.output.uri" , "mongodb://localhost/testmr.out" );
      45         @SuppressWarnings("deprecation")
      46         Job job = new Job(conf, "word count");
      47         job.setJarByClass(WordCount.class);
      48         job.setMapperClass(TokenizerMapper.class);
      49         job.setCombinerClass(IntSumReducer.class);
      50         job.setReducerClass(IntSumReducer.class);
      51         job.setOutputKeyClass(Text.class);
      52         job.setOutputValueClass(IntWritable.class);
      53         job.setInputFormatClass( MongoInputFormat.class );
      54         job.setOutputFormatClass( MongoOutputFormat.class );
      55         System.exit(job.waitForCompletion(true) ? 0 : 1);
      56     }
      57 }
      • 注意:设置mongo.input.uri和mongo.output.uri
        1 conf.set( "mongo.input.uri" , "mongodb://localhost/testmr.in" );
        2 conf.set( "mongo.output.uri" , "mongodb://localhost/testmr.out" );
    • 编译
      • 编译
        $ hadoop com.sun.tools.javac.Main WordCount.java -Xlint:deprecation
      • 编译jar包
        $ jar cf wc.jar WordCount*.class
    • 运行
      • 启动hadoop,运行mapreduce代码必须启动hadoop
        $ start-all.sh
      • 运行程序
      • $ hadoop jar  wc.jar WordCount
    • 查看结果
    • $ mongo
      MongoDB shell version: 2.4.9
      connecting to: test
      > use testmr;
      switched to db testmr
      > db.out.find({})
      { "_id" : "2", "value" : 1 }
      { "_id" : "again", "value" : 1 }
      { "_id" : "bye", "value" : 2 }
      { "_id" : "good", "value" : 1 }
      { "_id" : "hello", "value" : 1 }
      { "_id" : "meet", "value" : 1 }
      { "_id" : "nice", "value" : 1 }
      { "_id" : "see", "value" : 2 }
      { "_id" : "to", "value" : 2 }
      { "_id" : "war", "value" : 1 }
      { "_id" : "world", "value" : 2 }
      { "_id" : "you", "value" : 3 }
      > 

       

    以上是一个简单的例子,接下来我要用hadoop mapreduce处理mongodb中的更加复杂的数据。敬请期待,如果有疑问,请在留言区提出 ^_^

     

    参考资料以及文档

    1. The elephant in the room mongo db + hadoop
    2. http://chenhua-1984.iteye.com/blog/2162576
    3. http://api.mongodb.com/java/2.12/com/mongodb/MongoURI.html
    4. http://stackoverflow.com/questions/27020075/mongo-hadoop-connector-issue

    如果The elephant in the room mongo db +

    使用hadoop mapreduce分析mongodb数据:(1)

    标签:

    人气教程排行