Hadoop中的DBOutputFormat
时间:2021-07-01 10:21:17
帮助过:6人阅读
- create table user(id INTEGER NOT NULL PRIMARY KEY,name VARCHAR(32) NOT NULL);
数据准备:在数据文件上传到HDFS中,数据如下图:
我这里使用的hadoop版本为hadoop1.X,具体的代码和相关的知识点我们写在注释里了,代码如下:
[java] view plain
copy
- public class MyDBOutputFormat {
-
-
- private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/user";
-
- public static void main(String[] args) {
-
- try {
-
- Configuration conf = new Configuration();
-
-
-
-
-
-
-
- DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://liaozhongmin:3306/myDB", "root", "134045");
-
-
-
-
- Job job = new Job(conf, MyDBOutputFormat.class.getName());
-
-
- job.setInputFormatClass(TextInputFormat.class);
- FileInputFormat.setInputPaths(job, INPUT_PATH);
-
- job.setMapperClass(MyDBOutputFormatMapper.class);
- job.setMapOutputKeyClass(LongWritable.class);
- job.setMapOutputValueClass(User.class);
-
-
- job.setPartitionerClass(HashPartitioner.class);
- job.setNumReduceTasks(1);
-
-
-
-
-
-
- job.setReducerClass(MyDBOutputFormatReducer.class);
-
-
- job.setOutputFormatClass(DBOutputFormat.class);
- DBOutputFormat.setOutput(job, "user", new String[] { "id", "name" });
-
-
- System.exit(job.waitForCompletion(true) ? 0 : 1);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-
- public static class MyDBOutputFormatMapper extends Mapper<LongWritable, Text, LongWritable, User>{
-
-
- private User user = new User();
-
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, User>.Context context) throws IOException,
- InterruptedException {
-
- String[] splits = value.toString().split("\t");
-
- user.setId(Integer.parseInt(splits[0]));
- user.setName(splits[1]);
-
- context.write(key, user);
- }
- }
-
-
-
- public static class MyDBOutputFormatReducer extends Reducer<LongWritable, User, User, Text> {
-
- protected void reduce(LongWritable key, Iterable<User> values, Reducer<LongWritable, User, User, Text>.Context context) throws IOException,
- InterruptedException {
- for (User user : values){
- context.write(user, new Text(new Text(user.getName())));
- }
- }
-
- }
- }
- class User implements Writable, DBWritable {
-
- private int id;
- private String name;
-
-
- public User() {
- }
-
- public int getId() {
- return id;
- }
-
- public void setId(int id) {
- this.id = id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
-
- public void readFields(ResultSet resultSet) throws SQLException {
- this.id = resultSet.getInt(1);
- this.name = resultSet.getString(2);
- }
-
-
- public void write(PreparedStatement preparedStatement) throws SQLException {
- preparedStatement.setInt(1, this.id);
- preparedStatement.setString(2, this.name);
- }
-
-
- public void readFields(DataInput dataInput) throws IOException {
- this.id = dataInput.readInt();
- this.name = Text.readString(dataInput);
- }
-
-
- public void write(DataOutput dataOutput) throws IOException {
- dataOutput.writeInt(this.id);
- Text.writeString(dataOutput, this.name);
- }
-
- @Override
- public String toString() {
- return "User [id=" + id + ", name=" + name + "]";
- }
-
- }
程序运行的结果如下:
Hadoop中的DBOutputFormat
标签: