Load the jobentry from repository
//
public void loadRep( Repository rep, IMetaStore metaStore, ObjectId id_jobentry, List<DatabaseMeta>
databases,
List<SlaveServer> slaveServers )
throws KettleException {
try {
//.................这里的源码与官方一致。
passingAllParameters = rep.getJobEntryAttributeBoolean( id_jobentry, "pass_all_parameters",
true );
if(transMetaMap.containsKey(getDirectory()+"/"+
getName())){
logBasic( "该转换已经缓存,马上移除缓存:" + getDirectory()+"/"+
getName() );
transMetaMap.remove(getDirectory()+"/"+
getName());
}
} catch ( KettleDatabaseException dbe ) {
throw new KettleException( "Unable to load job entry of type ‘trans‘ from the repository for id_jobentry="
+
id_jobentry, dbe );
}
}
public TransMeta getTransMeta( Repository rep, IMetaStore metaStore, VariableSpace space )
throws KettleException {
try {
TransMeta transMeta =
null;
switch( specificationMethod ) {
case FILENAME:
long start =
new Date().getTime();
String filename =
space.environmentSubstitute( getFilename() );
logBasic( "Loading transformation from XML file [" + filename + "]"
);
transMeta =
new TransMeta( filename, metaStore,
null,
true,
this,
null );
log.logBasic(transMeta.getName()+",从文件读取转换耗时:"+(
new Date().getTime()-
start));
break;
case REPOSITORY_BY_NAME:
if(transMetaMap.containsKey(getDirectory()+"/"+
getName())){
logBasic( "该转换已经缓存,直接使用缓存:" + getDirectory()+"/"+
getName() );
transMeta = transMetaMap.get(getDirectory()+"/"+
getName());
}else{
String transname =
space.environmentSubstitute( getTransname() );
String realDirectory =
space.environmentSubstitute( getDirectory() );
logBasic( BaseMessages.getString( PKG, "JobTrans.Log.LoadingTransRepDirec"
, transname, realDirectory ) );
if ( rep !=
null ) {
//
// It only makes sense to try to load from the repository when the
// repository is also filled in.
//
// It reads last the last revision from the repository.
//
RepositoryDirectoryInterface repositoryDirectory =
rep.findDirectory( realDirectory );
transMeta = rep.loadTransformation( transname, repositoryDirectory,
null,
true,
null );
transMetaMap.put(getDirectory()+"/"+
getName(), transMeta);
logBasic( "从资源库获取转换并缓存:" + getDirectory()+"/"+
getName() );
} else {
throw new KettleException( BaseMessages.getString( PKG, "JobTrans.Exception.NoRepDefined"
) );
}
}
break;
case REPOSITORY_BY_REFERENCE:
if ( transObjectId ==
null ) {
throw new KettleException( BaseMessages.getString( PKG,
"JobTrans.Exception.ReferencedTransformationIdIsNull"
) );
}
if ( rep !=
null ) {
// Load the last revision
//
transMeta = rep.loadTransformation( transObjectId,
null );
}
break;
default:
throw new KettleException( "The specified object location specification method ‘"
+ specificationMethod + "‘ is not yet supported in this job entry."
);
}
if ( transMeta !=
null ) {
// copy parent variables to this loaded variable space.
//
transMeta.copyVariablesFrom(
this );
// Pass repository and metastore references
//
transMeta.setRepository( rep );
transMeta.setMetaStore( metaStore );
}
return transMeta;
} catch ( Exception e ) {
throw new KettleException( BaseMessages.getString( PKG, "JobTrans.Exception.MetaDataLoad"
), e );
}
}
方案1总结:
1.该方案解决了转换重复加载的问题,在一次加载后,进行了缓存,并给出了清除缓存重新加载的机制,实际使用效果是:第一次运行仍然很慢,但之后就是飞一般的感觉,之前没有数据每次都要运行十来分钟,现在就只需几秒钟,证明这个问题就是导致kettle运行慢的唯一原因,这里我们不完美的解决了他。
2.该方案第一次运行job仍然很慢,日志记录可能也有问题,对时效性要求高的话,可能该方案还不能完全解决问题,因为重启后台程序时,job的延迟仍然可能达到一个多小时,当然也就刚重启那一下的事,就看你的业务能不能接受了。
方案2:
方案1中重启后台程序导致的高延迟,我们的业务上仍然不能忍受,于是开始思考其他解决方案。
通过尝试解决这个问题,认为想通过改进kettle从数据库读取转换的过程来优化是行不通的,因为kettle读取资源库的过程很复杂,涉及很多逻辑(仔细看过相关代码,并测试了各个关键操作的耗时,对测试出的耗时操作进行分析后认为很难优化),另外经测试,文件资源库读取转换要比数据库资源库快近100倍,于是决定采用如下方式解决:
1.创建job等过程不做任何改变,所有操作都对数据库资源库。
2.后台job在获取一些job信息时也使用数据库资源库,但在最后一步运行job时,获取文件资源库中的job并运行,这里就要求job启动时,两个资源库相关job要相同,特别是job名称和路径,这是确认同一个job的方式,实际运行的就是文件资源库中的job了。
3.这里就涉及到文件资源库与数据库资源库的同步问题了:
1)人工手动将数据库资源库导出到指定的文件资源库。
2)页面控制job的启动与停止,只在页面请求启动job时后台自动将该job相关信息同步到文件资源库(待实现),其他地方不做任何改变,即使后台程序重启,也不用同步job,这样重启后台程序时就不会出现几十分钟job都没有运行起来的情况了。
3)当我们修改了job,确认要运行了,这时就在页面先停止job,再启动就实现同步了。
以上只是问题解决思路,具体实现也实现了个大概,资源库同步还有问题,通过以上方式实际并没有解决数据库资源库读取慢的问题,这需要对kettle有更深入的了解的技术大牛来解决。
这个解决方案达到的效果:
1.解决了后台程序重启时高延迟的问题(主要问题)。
2.间接减轻了数据库资源库的压力,这样我们创建修改job时就会快一些。
3.算是备份了一下资源库。
4.给出了同步资源库的方案,其实这里的文件资源库与上面的缓存很相似,只是缓存得更彻底。
kettle系列-3.kettle读取数据库资源库很慢的优化
标签: