时间:2021-07-01 10:21:17 帮助过:18人阅读
准备 准备一些输入文件,可以用hdfs dfs -put xxx/*?/user/fatkun/input上传文件 代码 package com.fatkun;?import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.StringTokenizer;?import org.apache.commons.lo
准备一些输入文件,可以用hdfs dfs -put xxx/*?/user/fatkun/input上传文件
package com.fatkun; ? import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.StringTokenizer; ? import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; ? public class WordCount extends Configured implements Tool { static enum Counters { INPUT_WORDS // 计数器 } ? static Log logger = LogFactory.getLog(WordCount.class); ? public static class CountMapper extends Mapper { private final IntWritable one = new IntWritable(1); private Text word = new Text(); private boolean caseSensitive = true; ? @Override protected void setup(Context context) throws IOException, InterruptedException { // 读取配置 Configuration conf = context.getConfiguration(); caseSensitive = conf.getBoolean("wordcount.case.sensitive", true); super.setup(context); } ? @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { if (caseSensitive) { // 是否大小写敏感 word.set(itr.nextToken()); } else { word.set(itr.nextToken().toLowerCase()); } context.write(word, one); context.getCounter(Counters.INPUT_WORDS).increment(1); } } } ? public static class CountReducer extends Reducer { ? @Override protected void reduce(Text text, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(text, new IntWritable(sum)); } ? } ? @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(getConf()); Job job = Job.getInstance(conf, "Example Hadoop WordCount"); job.setJarByClass(WordCount.class); job.setMapperClass(CountMapper.class); job.setCombinerClass(CountReducer.class); job.setReducerClass(CountReducer.class); ? job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); ? List other_args = new ArrayList(); for (int i = 0; i < args.length; ++i) { other_args.add(args[i]); } ? FileInputFormat.addInputPath(job, new Path(other_args.get(0))); FileOutputFormat.setOutputPath(job, new Path(other_args.get(1))); int ret = job.waitForCompletion(true) ? 0 : 1; ? long inputWord = job.getCounters().findCounter(Counters.INPUT_WORDS) .getValue(); System.out.println("INPUT_WORDS:" + inputWord); logger.info("test log: " + inputWord); return ret; } ? public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WordCount(), args); ? System.exit(res); } ? }
在eclipse导出jar包,执行以下命令
hadoop jar wordcount.jar com.fatkun.WordCount -Dwordcount.case.sensitive=false /user/fatkun/input /user/fatkun/output
http://cxwangyi.blogspot.com/2009/12/wordcount-tutorial-for-hadoop-0201.html
http://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#Example%3A+WordCount+v2.0
原文地址:hadoop wordcount新API例子, 感谢原作者分享。