时间:2021-07-01 10:21:17 帮助过:16人阅读
有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。 刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的 mapreduce job让一个文件只由一个map来处理。 或者是把目录写在文
有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。
刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的 “mapreduce job让一个文件只由一个map来处理“。
或者是把目录写在文件里面,作为输入:
/path/to/directory1
/path/to/directory2
/path/to/directory3
代码里面按行读取:
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { FileSystem fs = FileSystem.get(context.getConfiguration()); for (FileStatus status : fs.listStatus(new Path(value.toString()))) { // process file } }
都不能满足需求,还是自己实现一个 OneMapOneDirectoryInputFormat 吧,也很简单:
import java.io.IOException; import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; /** * 一个map处理一个目录的数据 */ public abstract class OneMapOneDirectoryInputFormat extends CombineFileInputFormat { private static final Log LOG = LogFactory.getLog(OneMapOneDirectoryInputFormat.class); @Override protected boolean isSplitable(JobContext context, Path file) { return false; } @Override public List getSplits(JobContext job) throws IOException { // get all the files in input path List stats = listStatus(job); List splits = new ArrayList(); if (stats.size() == 0) { return splits; } LOG.info("fileNums=" + stats.size()); Map> map = new HashMap>(); for (FileStatus stat : stats) { String directory = stat.getPath().getParent().toString(); if (map.containsKey(directory)) { map.get(directory).add(stat); } else { List fileList = new ArrayList(); fileList.add(stat); map.put(directory, fileList); } } // 设置inputSplit long currentLen = 0; List pathLst = new ArrayList(); List offsetLst = new ArrayList(); List lengthLst = new ArrayList(); Iterator itr = map.keySet().iterator(); while (itr.hasNext()) { String dir = itr.next(); List fileList = map.get(dir); for (int i = 0; i < fileList.size(); i++) { FileStatus stat = fileList.get(i); pathLst.add(stat.getPath()); offsetLst.add(0L); lengthLst.add(stat.getLen()); currentLen += stat.getLen(); } Path[] pathArray = new Path[pathLst.size()]; CombineFileSplit thissplit = new CombineFileSplit(pathLst.toArray(pathArray), getLongArray(offsetLst), getLongArray(lengthLst), new String[0]); LOG.info("combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size() + ") length(" + currentLen + ")"); for (int i = 0; i < pathArray.length; i++) { LOG.info(" -> path[" + i + "]=" + pathArray[i].toString()); } splits.add(thissplit); pathLst.clear(); offsetLst.clear(); lengthLst.clear(); currentLen = 0; } return splits; } private long[] getLongArray(List lst) { long[] rst = new long[lst.size()]; for (int i = 0; i < lst.size(); i++) { rst[i] = lst.get(i); } return rst; } }
这个InputFormat的具体使用方法就不说了。其实与“一个Hadoop程序的优化过程 – 根据文件实际大小实现CombineFileInputFormat”中的MultiFileInputFormat比较类似。
原文地址:Hadoop : 一个目录下的数据只由一个map处理, 感谢原作者分享。