package InputFormat;
2
3 import java.io.DataInput;
4 import java.io.DataOutput;
5 import java.io.IOException;
6 import java.net.URI;
7 import java.sql.PreparedStatement;
8 import java.sql.ResultSet;
9 import java.sql.SQLException;
10
11
12 import org.apache.hadoop.conf.Configuration;
13 import org.apache.hadoop.fs.FileSystem;
14 import org.apache.hadoop.fs.Path;
15 import org.apache.hadoop.io.LongWritable;
16 import org.apache.hadoop.io.NullWritable;
17 import org.apache.hadoop.io.Text;
18 import org.apache.hadoop.io.Writable;
19 import org.apache.hadoop.mapreduce.Job;
20 import org.apache.hadoop.mapreduce.Mapper;
21 import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
22 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
23 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
24 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
25
26 /**
27 * 将Mysql的驱动包放在TaskTracker的lib下 重启集群
28 * @author Administrator
29 *
30 */
31
32 public class DBInputFormatApp {
33 public static void main(String[] args)
throws Exception {
34
35 Configuration conf =
new Configuration();
36
37 DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://hadoop:3306/test", "root", "123456"
);
38
39 FileSystem fileSystem = FileSystem.get(
new URI("hdfs://hadoop:9000"
), conf);
40 boolean delete = fileSystem.delete(
new Path("hdfs://hadoop:9000/out"),
true);
41
42 if (delete) {
43 System.out.println("==============ooooooooookkkkkkkkkkkkkkkkkkkkkkks=================="
);
44
45 }
46
47
48 Job job =
new Job(conf, DBInputFormatApp.
class.getSimpleName());
49 job.setJarByClass(DBInputFormatApp.
class);
50
51 job.setInputFormatClass(DBInputFormat.
class);
52 DBInputFormat.setInput(job, MyUser.
class, "myuser",
null,
null, "id","name"
);
53 job.setMapperClass(MyDBMapper.
class);
54 job.setMapOutputKeyClass(Text.
class);
55 job.setOutputValueClass(NullWritable.
class);
56
57 job.setNumReduceTasks(0
);
58 job.setOutputKeyClass(Text.
class);
59 job.setOutputValueClass(NullWritable.
class);
60
61 FileOutputFormat.setOutputPath(job,
new Path("hdfs://hadoop:9000/out"
));
62
63 job.waitForCompletion(
true);
64
65
66 }
67
68 public static class MyDBMapper
extends Mapper<LongWritable, MyUser, Text, NullWritable>
{
69 @Override
70 protected void map(LongWritable key, MyUser value,
71 org.apache.hadoop.mapreduce.Mapper<LongWritable, MyUser, Text, NullWritable>
.Context context)
72 throws IOException, InterruptedException {
73
74 context.write(
new Text(value.toString()), NullWritable.get());
75
76 }
77
78 }
79
80
81 public static class MyUser
implements Writable,DBWritable{
82
83 int id;
84 String name;
85
86 @Override
87 public void write(PreparedStatement statement)
throws SQLException {
88 statement.setInt(1
, id);
89 statement.setString(2
, name);
90
91 }
92
93 @Override
94 public void readFields(ResultSet resultSet)
throws SQLException {
95 this.id=resultSet.getInt(1
);
96 this.name=resultSet.getString(2
);
97
98 }
99
100 @Override
101 public void write(DataOutput out)
throws IOException {
102 out.writeInt(id);
103 Text.writeString(out, name);
104
105 }
106
107 @Override
108 public void readFields(DataInput in)
throws IOException {
109 this.id=
in.readInt();
110 this.name=
Text.readString(in);
111
112 }
113
114 @Override
115 public String toString() {
116
117 return id + ", \t" +
name ;
118 }
119
120
121
122 }
123
124 }
DBInputFormat的使用
标签: