当前位置:Gxlcms > 数据库问题 > DBInputFormat的使用

DBInputFormat的使用

时间:2021-07-01 10:21:17 帮助过:2人阅读

package InputFormat; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.net.URI; 7 import java.sql.PreparedStatement; 8 import java.sql.ResultSet; 9 import java.sql.SQLException; 10 11 12 import org.apache.hadoop.conf.Configuration; 13 import org.apache.hadoop.fs.FileSystem; 14 import org.apache.hadoop.fs.Path; 15 import org.apache.hadoop.io.LongWritable; 16 import org.apache.hadoop.io.NullWritable; 17 import org.apache.hadoop.io.Text; 18 import org.apache.hadoop.io.Writable; 19 import org.apache.hadoop.mapreduce.Job; 20 import org.apache.hadoop.mapreduce.Mapper; 21 import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; 22 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; 23 import org.apache.hadoop.mapreduce.lib.db.DBWritable; 24 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 25 26 /** 27 * 将Mysql的驱动包放在TaskTracker的lib下 重启集群 28 * @author Administrator 29 * 30 */ 31 32 public class DBInputFormatApp { 33 public static void main(String[] args) throws Exception { 34 35 Configuration conf = new Configuration(); 36 37 DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://hadoop:3306/test", "root", "123456"); 38 39 FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop:9000"), conf); 40 boolean delete = fileSystem.delete(new Path("hdfs://hadoop:9000/out"), true); 41 42 if (delete) { 43 System.out.println("==============ooooooooookkkkkkkkkkkkkkkkkkkkkkks=================="); 44 45 } 46 47 48 Job job = new Job(conf, DBInputFormatApp.class.getSimpleName()); 49 job.setJarByClass(DBInputFormatApp.class); 50 51 job.setInputFormatClass(DBInputFormat.class); 52 DBInputFormat.setInput(job, MyUser.class, "myuser", null, null, "id","name"); 53 job.setMapperClass(MyDBMapper.class); 54 job.setMapOutputKeyClass(Text.class); 55 job.setOutputValueClass(NullWritable.class); 56 57 job.setNumReduceTasks(0); 58 job.setOutputKeyClass(Text.class); 59 job.setOutputValueClass(NullWritable.class); 60 61 FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop:9000/out")); 62 63 job.waitForCompletion(true); 64 65 66 } 67 68 public static class MyDBMapper extends Mapper<LongWritable, MyUser, Text, NullWritable>{ 69 @Override 70 protected void map(LongWritable key, MyUser value, 71 org.apache.hadoop.mapreduce.Mapper<LongWritable, MyUser, Text, NullWritable>.Context context) 72 throws IOException, InterruptedException { 73 74 context.write(new Text(value.toString()), NullWritable.get()); 75 76 } 77 78 } 79 80 81 public static class MyUser implements Writable,DBWritable{ 82 83 int id; 84 String name; 85 86 @Override 87 public void write(PreparedStatement statement) throws SQLException { 88 statement.setInt(1, id); 89 statement.setString(2, name); 90 91 } 92 93 @Override 94 public void readFields(ResultSet resultSet) throws SQLException { 95 this.id=resultSet.getInt(1); 96 this.name=resultSet.getString(2); 97 98 } 99 100 @Override 101 public void write(DataOutput out) throws IOException { 102 out.writeInt(id); 103 Text.writeString(out, name); 104 105 } 106 107 @Override 108 public void readFields(DataInput in) throws IOException { 109 this.id=in.readInt(); 110 this.name=Text.readString(in); 111 112 } 113 114 @Override 115 public String toString() { 116 117 return id + ", \t" + name ; 118 } 119 120 121 122 } 123 124 }

 

DBInputFormat的使用

标签:

人气教程排行