时间:2021-07-01 10:21:17 帮助过:2人阅读
1、注意,需要声明为静态内部类,否则会报java.lang.NoSuchMethodException...<init>的错误
public static class MySqlWritable implements Writable, DBWritable {
2、如果输出目录存在,需要先删除
3、由于需要从mysql数据取值,则需要有mysql数据库驱动包,hadoop classpath查看hadoop类加载路径,将驱动包拷贝到其中一个目录下即可;
4、解决mysql"Access denied for user‘root‘@‘IP地址‘"问题
a、登录mysql
mysql -u username -p passwd
b、然后执行如下命令:
b-1、 GRANT ALL PRIVILEGES ON *.* TO ‘myuser‘@‘%‘ IDENTIFIED BY ‘mypassword‘ WITH GRANT OPTION;
b-2、FLUSH PRIVILEGES;
Java代码:
package cn.sniper.hadoop.inputformat; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import cn.sniper.hadoop.util.HdfsUtil; public class MyDBInputFormat { public static String IN_PATH = "hdfs://192.168.1.231:9000/README.txt"; public static String OUT_PATH = "hdfs://192.168.1.231:9000/out"; public static class MyMap extends Mapper<LongWritable, MySqlWritable, Text, IntWritable> { protected void map(LongWritable key, MySqlWritable value, org.apache.hadoop.mapreduce.Mapper<LongWritable,MySqlWritable,Text,IntWritable>.Context context) throws java.io.IOException ,InterruptedException { System.out.println(value); context.write(new Text(value.toString()), new IntWritable(1)); }; } /** * 注意,需要声明为静态内部类,否则会报java.lang.NoSuchMethodException...<init>的错误 * @author sniper * */ public static class MySqlWritable implements Writable, DBWritable { private String id; private String name; private String nickName; @Override public void readFields(ResultSet resultSet) throws SQLException { id = resultSet.getString(1); name = resultSet.getString(2); nickName = resultSet.getString(3); } @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1, id); statement.setString(2, name); statement.setString(3, nickName); } @Override public void readFields(DataInput dataInput) throws IOException { id = dataInput.readUTF(); name = dataInput.readUTF(); nickName = dataInput.readUTF(); } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(id); dataOutput.writeUTF(name); dataOutput.writeUTF(nickName); } @Override public String toString() { return "MySqlWritable [id=" + id + ", name=" + name + ", nickName=" + nickName + "]"; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String driverClass = "com.mysql.jdbc.Driver"; String dbUrl = "jdbc:mysql://192.168.1.105:3306/my"; String userName = "root"; String passwd = "123456"; DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd); Job job = Job.getInstance(conf, MyDBInputFormat.class.getName()); job.setJarByClass(MyDBInputFormat.class); //默认是TextInputFormat job.setInputFormatClass(DBInputFormat.class); String inputQuery = "select c_id, c_name, c_nickName from t_test"; String inputCountQuery = "select count(*) from t_test"; DBInputFormat.setInput(job, MySqlWritable.class, inputQuery, inputCountQuery); /*String tableName = "t_test"; String conditions = ""; String orderBy = ""; String[] fieldNames = new String[]{"c_id", "c_name", "c_nickName"}; DBInputFormat.setInput(job, MySqlWritable.class, tableName, conditions, orderBy, fieldNames);*/ job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); job.setMapperClass(MyMap.class); //map输出的key如果不做设置,默认和reduce的key、value一致 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.waitForCompletion(true); } }
关于hadoop中的DBInputFormat试验
标签: