时间:2021-07-01 10:21:17 帮助过:51人阅读
前言 Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。 HBase作为源的MapReduce读取示例 package hbase;import java.io.IOException;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import jav
Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。
package hbase;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
public class ExampleHbaseToMysqlMapreduce {
public static void main(String[] args) throws Exception {
//hbase配置
Configuration config = HBaseConfiguration.create();
String tableName = "flws";
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes("5768014"));
scan.setStopRow(Bytes.toBytes("5768888"));
scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("AH"));
scan.setCaching(500);
scan.setCacheBlocks(false);
//JOB定义
Job job = new Job(config, "ExampleHbaseMapreduce");
job.setJarByClass(ExampleHbaseToMysqlMapreduce.class);
//设置map读取hbase方法
TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class,
Text.class,Text.class, job);
//reduce设置
job.setReducerClass(MyReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(5);
boolean b = job.waitForCompletion(true);
if (!b) {
throw new Exception("error with job!");
}
}
public static class MyMapper extends TableMapper {
public void map(ImmutableBytesWritable row, Result value,
Context context) throws IOException, InterruptedException {
context.write(
new Text(row.get()),
new Text(value.getValue(Bytes.toBytes("cf"),
Bytes.toBytes("AH"))));
}
}
public static class MyReducer extends
TableReducer {
private Connection conn = null;
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://172.16.35.242/judgment?useUnicode=true&characterEncoding=gbk&zeroDateTimeBehavior=convertToNull";
try {
Class.forName(driver);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
try {
conn = DriverManager.getConnection(url, "root", "root");
} catch (SQLException e) {
e.printStackTrace();
}
super.setup(context);
}
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for (Text text : values) {
sb.append(text.toString());
}
try {
Statement st = conn.createStatement();
st.executeUpdate("insert into test_mapreduce (id,ah) values ("
+ Integer.valueOf(key.toString()) + ",'"
+ sb.toString() + "')");
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
原文地址:Mapreduce读取hbase汇总到RDBMS, 感谢原作者分享。