时间:2021-07-01 10:21:17 帮助过:35人阅读
编写一个连接提供器,用于获取mysql数据库连接:
需要引入jar :/usr/local/apps/apache-storm-1.0.3/external/storm-jdbc 的 storm-jdbc-1.0.3.jar
package mystorm.wordcount; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import org.apache.storm.jdbc.common.ConnectionProvider; /** * storm集成jdbc的 连接提供者 * @author Administrator * */ //为jdbcBolt组件提供对应的数据连接 public class MyConnectionProvider implements ConnectionProvider { private static String driver = "com.mysql.jdbc.driver"; private static String url = "jdbc:mysql://192.168.2.1:3306/test"; private static String user = "root"; private static String password = "123456"; static{ try { Class.forName(driver); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block new ExceptionInInitializerError(e); // e.printStackTrace(); } } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public Connection getConnection() { // TODO Auto-generated method stub try { return DriverManager.getConnection(url,user,password); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } @Override public void prepare() { // TODO Auto-generated method stub } }View Code
然后在前面我们编写的Topology任务添加一个Storm提供的Bolt组件,用于把数据写入mysql
//创建一个新的jdbcbolt组件,把前一个bolt组件发送过来的数据插入到mysql数据库中 // storm集成各种框架的jar包路径 : /usr/local/apps/apache-storm-1.0.3/external //集成jdbc路径 :/usr/local/apps/apache-storm-1.0.3/external/storm-jdbc /** * * * jar包 引入: * 1.external/sql/storm-sql-core/*.jar * 2.external/storm-jdbc * 3.mysql驱动 * 4.commons-lang3-3.1.jar * * @return */ private static IRichBolt createJDBCBolt(){ //创建一个connectionProvider MyConnectionProvider conectionProvider = new MyConnectionProvider(); //创建一个mapper,填写表名 JdbcMapper mapper = new SimpleJdbcMapper("result",conectionProvider); //通过mapper创建一个bolt组件 return new JdbcInsertBolt(conectionProvider,mapper) .withTableName("result") .withQueryTimeoutSecs(30); }
然后把这个组件加到Topology任务的最后面 第三个Bolt组件:
/** * * 单词计数的Topology的入口,主程序 * * @author Administrator * */ public class WordCountTopology { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { TopologyBuilder builder = new TopologyBuilder(); //设置任务的spout组件 builder.setSpout("wordcount_spout", new WordCountSpout()); //设置任务的第一个bolt组件 builder.setBolt("wordcount_bolt", new WordCountSplitBolt()) // (随机分配策略接收spout的任务) .shuffleGrouping("wordcount_spout"); //设置第二个bolt组件 builder.setBolt("wordcount_countbolt", new WordCountBoltCount()) //(接受第一个bolt组件的数据,按字段进行分组) .fieldsGrouping("wordcount_bolt", new Fields("word")); //设置第三个bolt组件(storm提供的),用与把记录保存到mysql数据库中。 builder.setBolt("wordcount_jdbcbolt", createJDBCBolt()) .shuffleGrouping("wordcount_countbolt"); //创建topology的任务 StormTopology wc = builder.createTopology(); //配置参数信息 Config conf = new Config(); //1.本地模式提交 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology( "mywordcount", conf, wc); /** * * 本地模式运行结果 * * spout采集的数据是:beijing is the capital of china 输出的结果:{beijing=1} 输出的结果:{is=1, beijing=1} 输出的结果:{the=1, is=1, beijing=1} 输出的结果:{the=1, capital=1, is=1, beijing=1} 输出的结果:{the=1, capital=1, of=1, is=1, beijing=1} 输出的结果:{the=1, capital=1, china=1, of=1, is=1, beijing=1} spout采集的数据是:I love beijing 输出的结果:{the=1, capital=1, china=1, of=1, I=1, is=1, beijing=1} 输出的结果:{the=1, love=1, capital=1, china=1, of=1, I=1, is=1, beijing=1} 输出的结果:{the=1, love=1, capital=1, china=1, of=1, I=1, is=1, beijing=2} spout采集的数据是:I love china 输出的结果:{the=1, love=1, capital=1, china=1, of=1, I=2, is=1, beijing=2} 输出的结果:{the=1, love=2, capital=1, china=1, of=1, I=2, is=1, beijing=2} 输出的结果:{the=1, love=2, capital=1, china=2, of=1, I=2, is=1, beijing=2} * * * */ //2集群模式提交 // StormSubmitter.submitTopology(args[0], conf, wc); }View Code
打成jar包,放到storm集群中运行,会宝如下错误 :
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/commons/lang/Validate at org.apache.storm.jdbc.mapper.SimpleJdbcMapper.<init>(SimpleJdbcMapper.java:39) at mystorm.wordcount.WordCountTopology.createJDBCBolt(WordCountTopology.java:95) at mystorm.wordcount.WordCountTopology.main(WordCountTopology.java:42) Caused by: java.lang.ClassNotFoundException: org.apache.commons.lang.Validate at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 3 more
还有一些jar没有集成进来 ,需要集成storm的jar ,放到storm集群HOME的lib下,复制到集群的其他机器:
* 1.external/sql/storm-sql-core/*.jar * 2.external/storm-jdbc * 3.mysql驱动 * 4.commons-lang3-3.1.jar
Storm集群上的开发 ,任务计算输出到mysql数据库,集成jdbc(十)
标签:shuffle new 输出 private 计算 ted 需要 tac auth