当前位置:Gxlcms > mysql > HBase之普通BulkLoad

HBase之普通BulkLoad

时间:2021-07-01 10:21:17 帮助过:32人阅读

为了保持MapReduce架构清晰,同时保留Map和Reduce结构。以便后续扩展。PS:写入HFile的时候,qualifier必须有序。 Mapper: import com.google.common.base.Strings;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.io.L

为了保持MapReduce架构清晰,同时保留Map和Reduce结构。以便后续扩展。PS:写入HFile的时候,qualifier必须有序。

Mapper:

import com.google.common.base.Strings;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import yeepay.util.HBaseUtil;

public class LoadMapper extends Mapper {

    protected void map(LongWritable key, Text value, Context context) {

        try {

            String line = value.toString();

            if (Strings.isNullOrEmpty(line)) {
                return;
            }

            String[] arr = line.split("\t", 9);

            if (arr.length != 9) {
                throw new RuntimeException("line.splite() not == 9");
            }

            if (arr.length < 1) {
                return;
            }
            String k1 = arr[0];
            ImmutableBytesWritable keyH = new ImmutableBytesWritable(HBaseUtil.getRowKey(k1));
            context.write(keyH, new Text(line));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }


}

Reducer

import com.google.common.base.Splitter;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;

public class LoadReducer extends Reducer {

    final static String[] fileds = new String[]{
            "ID",
            "A_ACCOUNT_ID",
            "A_TRX_ID",
            "P_ID",
            "P_TRXORDER_ID",
            "P_FRP_ID",
            "O_PRODUCTCAT",
            "O_RECEIVER_ID",
            "O_REQUESTID"
    };

    @Override
    public void reduce(ImmutableBytesWritable rowkey, Iterable values, Context context) throws java.io.IOException, InterruptedException {

//        super.setID(stringArray[0]);
//        this.A_ACCOUNT_ID = stringArray[1];
//        this.A_TRX_ID = stringArray[2];
//        this.P_ID = stringArray[3];
//        this.P_TRXORDER_ID = stringArray[4];
//        this.P_FRP_ID = stringArray[5];
//        this.O_PRODUCTCAT = stringArray[6];
//        this.O_RECEIVER_ID = stringArray[7];
//        this.O_REQUESTID = stringArray[8];

        try {
            Text vv = values.iterator().next();
            String vs = vv.toString();

            Splitter splitter = Splitter.on("\t").limit(9);

            Iterable iterable = splitter.split(vs);
            Iterator iterator = iterable.iterator();
//            String[] arr = vs.split("\\t", 9);

            int i = 0;
//            Put put = new Put(rowkey.get());

            /**
             *       值的写入必须按照顺序。
             */
            Map map = new TreeMap();
            while (iterator.hasNext()) {
                map.put(fileds[i++], iterator.next());
            }

            for (Map.Entry entry : map.entrySet()) {

                KeyValue kv = new KeyValue(rowkey.copyBytes(), Bytes.toBytes("f"), entry.getKey().getBytes(), 0L, entry.getValue().getBytes());
                context.write(rowkey, kv);


            }

        } catch (Exception e) {
            new RuntimeException(e);
        }


    }

}
Job&BulkLoad
package yeepay.load;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import yeepay.util.HdfsUtil;
import yeepay.util.YeepayConstant;

import java.util.Date;

public abstract class AbstractJobBulkLoad {
    public static Configuration conf = HBaseConfiguration.create();

    public void run(String[] args) throws Exception {
        if (args.length < 2) {
            System.err.println("please set input dir");
            System.exit(-1);
            return;
        }
        String txtPath = args[0];
        String tableName = args[1];
        Job job = new Job(conf, "txt2HBase");
        HTable htable = null;
        try {
            htable = new HTable(conf, tableName); //set table name
            // 根据region的数量来决定reduce的数量以及每个reduce覆盖的rowkey范围
            HFileOutputFormat.configureIncrementalLoad(job, htable);
            htable.close();
            job.setJarByClass(AbstractJobBulkLoad.class);
            FileSystem fs = FileSystem.get(conf);

            System.out.println("input file :" + txtPath);
            Path inputFile = new Path(txtPath);
            if (!fs.exists(inputFile)) {
                System.err.println("inputFile " + txtPath + " not exist.");
                throw new RuntimeException("inputFile " + txtPath + " not exist.");
            }
            FileInputFormat.addInputPath(job, inputFile);
//
            job.setMapperClass(getMapperClass());
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(Text.class);
            job.setInputFormatClass(TextInputFormat.class);
//
            job.setReducerClass(getReducerClass());
            Date now = new Date();
            Path output = new Path("/output/" + tableName + "/" + now.getTime());
            System.out.println("/output/" + tableName + "/" + now.getTime());
            FileOutputFormat.setOutputPath(job, output);
            job.waitForCompletion(true);
            //执行BulkLoad
            HdfsUtil.chmod(conf, output.toString());
            HdfsUtil.chmod(conf, output + "/" + YeepayConstant.COMMON_FAMILY);
            htable = new HTable(conf, tableName);
            new LoadIncrementalHFiles(conf).doBulkLoad(output, htable);
            htable.close();
            System.out.println("HFile data load success!");
            System.out.println(getJobName() + " end!");

        } catch (Throwable t) {
            throw new RuntimeException(t);
        }
    }

    protected abstract Class getMapperClass();

    protected abstract Class getReducerClass();


    protected abstract String getTableName();

    protected abstract String getJobName();
}

人气教程排行