时间:2021-07-01 10:21:17 帮助过:32人阅读
为了保持MapReduce架构清晰,同时保留Map和Reduce结构。以便后续扩展。PS:写入HFile的时候,qualifier必须有序。 Mapper: import com.google.common.base.Strings;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.io.L
为了保持MapReduce架构清晰,同时保留Map和Reduce结构。以便后续扩展。PS:写入HFile的时候,qualifier必须有序。
Mapper:
import com.google.common.base.Strings; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import yeepay.util.HBaseUtil; public class LoadMapper extends Mapper{ protected void map(LongWritable key, Text value, Context context) { try { String line = value.toString(); if (Strings.isNullOrEmpty(line)) { return; } String[] arr = line.split("\t", 9); if (arr.length != 9) { throw new RuntimeException("line.splite() not == 9"); } if (arr.length < 1) { return; } String k1 = arr[0]; ImmutableBytesWritable keyH = new ImmutableBytesWritable(HBaseUtil.getRowKey(k1)); context.write(keyH, new Text(line)); } catch (Exception e) { throw new RuntimeException(e); } } }
Reducer
import com.google.common.base.Splitter; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; public class LoadReducer extends ReducerJob&BulkLoad{ final static String[] fileds = new String[]{ "ID", "A_ACCOUNT_ID", "A_TRX_ID", "P_ID", "P_TRXORDER_ID", "P_FRP_ID", "O_PRODUCTCAT", "O_RECEIVER_ID", "O_REQUESTID" }; @Override public void reduce(ImmutableBytesWritable rowkey, Iterable values, Context context) throws java.io.IOException, InterruptedException { // super.setID(stringArray[0]); // this.A_ACCOUNT_ID = stringArray[1]; // this.A_TRX_ID = stringArray[2]; // this.P_ID = stringArray[3]; // this.P_TRXORDER_ID = stringArray[4]; // this.P_FRP_ID = stringArray[5]; // this.O_PRODUCTCAT = stringArray[6]; // this.O_RECEIVER_ID = stringArray[7]; // this.O_REQUESTID = stringArray[8]; try { Text vv = values.iterator().next(); String vs = vv.toString(); Splitter splitter = Splitter.on("\t").limit(9); Iterable iterable = splitter.split(vs); Iterator iterator = iterable.iterator(); // String[] arr = vs.split("\\t", 9); int i = 0; // Put put = new Put(rowkey.get()); /** * 值的写入必须按照顺序。 */ Map map = new TreeMap (); while (iterator.hasNext()) { map.put(fileds[i++], iterator.next()); } for (Map.Entry entry : map.entrySet()) { KeyValue kv = new KeyValue(rowkey.copyBytes(), Bytes.toBytes("f"), entry.getKey().getBytes(), 0L, entry.getValue().getBytes()); context.write(rowkey, kv); } } catch (Exception e) { new RuntimeException(e); } } }
package yeepay.load; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import yeepay.util.HdfsUtil; import yeepay.util.YeepayConstant; import java.util.Date; public abstract class AbstractJobBulkLoad { public static Configuration conf = HBaseConfiguration.create(); public void run(String[] args) throws Exception { if (args.length < 2) { System.err.println("please set input dir"); System.exit(-1); return; } String txtPath = args[0]; String tableName = args[1]; Job job = new Job(conf, "txt2HBase"); HTable htable = null; try { htable = new HTable(conf, tableName); //set table name // 根据region的数量来决定reduce的数量以及每个reduce覆盖的rowkey范围 HFileOutputFormat.configureIncrementalLoad(job, htable); htable.close(); job.setJarByClass(AbstractJobBulkLoad.class); FileSystem fs = FileSystem.get(conf); System.out.println("input file :" + txtPath); Path inputFile = new Path(txtPath); if (!fs.exists(inputFile)) { System.err.println("inputFile " + txtPath + " not exist."); throw new RuntimeException("inputFile " + txtPath + " not exist."); } FileInputFormat.addInputPath(job, inputFile); // job.setMapperClass(getMapperClass()); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); // job.setReducerClass(getReducerClass()); Date now = new Date(); Path output = new Path("/output/" + tableName + "/" + now.getTime()); System.out.println("/output/" + tableName + "/" + now.getTime()); FileOutputFormat.setOutputPath(job, output); job.waitForCompletion(true); //执行BulkLoad HdfsUtil.chmod(conf, output.toString()); HdfsUtil.chmod(conf, output + "/" + YeepayConstant.COMMON_FAMILY); htable = new HTable(conf, tableName); new LoadIncrementalHFiles(conf).doBulkLoad(output, htable); htable.close(); System.out.println("HFile data load success!"); System.out.println(getJobName() + " end!"); } catch (Throwable t) { throw new RuntimeException(t); } } protected abstract Class getMapperClass(); protected abstract Class getReducerClass(); protected abstract String getTableName(); protected abstract String getJobName(); }