Hadoop中的DBInputFormat
时间:2021-07-01 10:21:17
帮助过:7人阅读
- create table student(id INTEGER NOT NULL PRIMARY KEY,name VARCHAR(32) NOT NULL);
(3):插入数据
[java] view plain
copy
- insert into student values(1,"lavimer");
(4)编写MapReduce程序,我这里使用的版本是hadoop1.2.1,相关知识点都写在注释中了,如下:
[java] view plain
copy
- public class MyDBInputFormat {
-
-
- private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
-
- public static void main(String[] args) {
-
- try {
-
- Configuration conf = new Configuration();
-
-
-
-
-
-
-
- DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://liaozhongmin:3306/myDB","root","134045");
-
-
- FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
-
-
- if (fileSystem.exists(new Path(OUT_PATH))){
- fileSystem.delete(new Path(OUT_PATH),true);
- }
-
-
- Job job = new Job(conf,MyDBInputFormat.class.getName());
-
-
- job.setInputFormatClass(DBInputFormat.class);
- DBInputFormat.setInput(job, Student.class, "student", null, null, new String[]{"id","name"});
-
-
- job.setMapperClass(MyDBInputFormatMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
-
-
- job.setPartitionerClass(HashPartitioner.class);
- job.setNumReduceTasks(1);
-
-
-
-
-
-
- job.setReducerClass(MyDBInputFormatReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
-
-
- FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
- job.setOutputFormatClass(TextOutputFormat.class);
-
-
- System.exit(job.waitForCompletion(true) ? 0 : 1);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-
- public static class MyDBInputFormatMapper extends Mapper<LongWritable, Student, Text, Text>{
-
- private Text mapOutKey = new Text();
-
- private Text mapOutValue = new Text();
-
- @Override
- protected void map(LongWritable key, Student value, Mapper<LongWritable, Student, Text, Text>.Context context) throws IOException, InterruptedException {
-
-
- mapOutKey.set(String.valueOf(value.getId()));
-
- mapOutValue.set(value.getName());
-
-
- context.write(mapOutKey, mapOutValue);
- }
- }
-
-
- public static class MyDBInputFormatReducer extends Reducer<Text, Text, Text, Text>{
-
- @Override
- protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
-
-
- for (Text t : values){
-
- context.write(key, t);
- }
- }
- }
- }
-
- class Student implements Writable,DBWritable{
-
-
- private Integer id;
-
- private String name;
-
-
-
- public Student() {
- }
-
-
- public Student(Integer id, String name) {
- this.id = id;
- this.name = name;
- }
-
-
- public Integer getId() {
- return id;
- }
-
- public void setId(Integer id) {
- this.id = id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
-
- public void readFields(ResultSet resultSet) throws SQLException {
- this.id = resultSet.getInt(1);
- this.name = resultSet.getString(2);
- }
-
-
- public void write(PreparedStatement preparedStatement) throws SQLException {
- preparedStatement.setInt(1, this.id);
- preparedStatement.setString(2, this.name);
- }
-
-
- public void readFields(DataInput dataInput) throws IOException {
- this.id = dataInput.readInt();
- this.name = Text.readString(dataInput);
- }
-
-
- public void write(DataOutput dataOutput) throws IOException {
- dataOutput.writeInt(this.id);
- Text.writeString(dataOutput, this.name);
- }
-
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((id == null) ? 0 : id.hashCode());
- result = prime * result + ((name == null) ? 0 : name.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- Student other = (Student) obj;
- if (id == null) {
- if (other.id != null)
- return false;
- } else if (!id.equals(other.id))
- return false;
- if (name == null) {
- if (other.name != null)
- return false;
- } else if (!name.equals(other.name))
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return "Student [id=" + id + ", name=" + name + "]";
- }
-
-
- }
程序运行的结果是数据库中的数据成功导入到HDFS中,如下:
注:程序运行时,会碰到一个常见的数据库远程连接错误,大致如下:
[java] view plain
copy
- Access denied for user ‘root‘@‘%‘ to database ‘xxxx’
原因:创建完数据库后,需要进行授权(在本地访问一般不会出现这个问题)
解决方法就是进行授权:
[java] view plain
copy
- grant all on xxxx.* to ‘root‘@‘%‘ identified by ‘password‘ with grant option;
-
- xxxx代表创建的数据库;
- password为用户密码,在此为root的密码
另外一个常见的错误就是MYSQL驱动没有导入到hadoop/lib目录下,解决方案有两种,传统的方式我就不多说了,这里说另外一种方式:
(1):把包上传到集群上
[java] view plain
copy
- hadoop fs -put mysql-connector-java-5.1.0- bin.jar /lib
(2):在MR程序提交job前,添加语句:
[java] view plain
copy
- DistributedCache.addFileToClassPath(new Path("/lib/mysql- connector-java- 5.1.0-bin.jar"), conf);
Hadoop中的DBInputFormat
标签: