当前位置:Gxlcms > mysql > 深入解析MapReduce架构设计与实现原理–读书笔记(5)hadoop工作流

深入解析MapReduce架构设计与实现原理–读书笔记(5)hadoop工作流

时间:2021-07-01 10:21:17 帮助过:25人阅读

用户编写的作业比较复杂,相互之间存在依赖关系,这种依赖关系可以用有向图表示,我们称之为 工作流 。 1.JobControl实现原理: 传统做法:为每个作业创建相应的JobConf对象,并按照依赖关系依次(串行)提交各个作业。 //创建Job对象JobConf extractJobConf =

用户编写的作业比较复杂,相互之间存在依赖关系,这种依赖关系可以用有向图表示,我们称之为工作流
1.JobControl实现原理:
传统做法:为每个作业创建相应的JobConf对象,并按照依赖关系依次(串行)提交各个作业。

//创建Job对象JobConf extractJobConf = new JobConf(ExtractJob.class);JobConf classPriorJobConf = new JobConf(classPriorJob.class);JobConf conditionalProbilityJobConf = new JobConf(conditionalProbilityJobConf.class);JobConf predictJobConf = new JobConf(predictJobConf.class);//配置JobConf//按照依赖关系依次提交作业JobClient.runJob(extractJobConf);JobClient.runJob(classPriorJobConf);JobClient.runJob(conditionalProbilityJobConf);JobClient.runJob(predictJobConf);

如果使用JobControl,则用户只需使用addDepending()函数加作业依赖关系接口,JobControl会按照依赖关系调度各个作业。
Configuration extractJobConf = new Configuration();Configuration classPriorJobConf = new Configuration();Configuration conditionalProbilityJobConf = new Configuration();Configuration predictJobConf = new Configuration();。。。设置各个Conf//创建Job对象Job extractJob = new Job(extractJobConf);Job classPriorJob = new Job(classPriorJobConf);Job conditionalProbilityJob = new Job(conditionalProbilityJobConf);Job predictJob = new Job(predictJobConf);//设置依赖关系,构造一个DAG作业classPriorJob.addDepending(extractJob);conditionalProbilityJob.addDepending(extractJob);predictJob.addDepending(classPriorJob);predictJob.addDepending(conditionalProbilityJob);//创建JobControl对象,由它对作业进行监控和调度JobControl jc = new JobControl("test");jc.addJob(extractJob);jc.addJob(classPriorJob);jc.addJob(conditionalProbilityJob);jc.addJob(predictJob);jc.run();

JobControl设计原理分析
由两个类组成。Job和JobControl。其中Job封装了一个MapReduce作业及其对应的依赖关系,主要负责监控各个依赖作业的运行状态,以此更新最忌的状态。
开始--》waitingJobs--》readyJobs--》runningJobs--》successfulJobs--》结束						??????--》failedJobs????--》结束

JobControl封装了一系列MapReduce作业及其对应的依赖关系。它将处于不同状态的作业放到不同的哈希表中。并按照Job的状态转移作业,直至所有作业完成。

2.ChainMapper/ChainReducer的实现原理
CMR主要为了解决线性链式Mapper而提出的,在MR中存在多个Mapper,这些Mapper像管道一样,前一个Mapper的输出结果会被重定向到下一个Mapper的输入,形成
一个流水线,形式类似于[Map+Reduce Map*]
对于任意一个MR作业 ,MR阶段可以有无限个Mapper,但Reducer只能有一个。
实例:

conf.setJobName("chain");conf.setInputFormat(TextInputFormat.class);conf.setOutputFormat(TextOutputFormat.class);JobConf mapper1Conf = new JobConf(false);JobConf mapper2Conf = new JobConf(false);JobConf reduce1Conf = new JobConf(false);JobConf mapper3Conf = new JobConf(false);?ChainMapper.addMapper(conf,Mapper1.class,LongWritable.class,Text.class,Text.class,Text.class,true,mapper1Conf);//第七个字段是:key/value是否按值传递,如果为true:则为按值传递;如果为false:则为按引用传递。如果设置为true,需要保证key/value不会被修改ChainMapper.addMapper(conf,Mapper2.class,Text.class,Text.class,LongWritable.class,Text.class,false,mapper2Conf);ChainMapper.setReducer(conf,Reducer.class,LongWritable.class,Text.class,Text.class,Text.class,true,reduce1Conf);ChainMapper.addMapper(conf,Mapper3.class,Text.class,Text.class,LongWritable.class,Text.class,false,mapper1Conf);JobClient.runJob(conf);

3.hadoop工作流引擎
隐式工作流引擎:hive,pig,Cascading
显式工作流引擎:Oozie,Azkaban

人气教程排行