时间:2021-07-01 10:21:17 帮助过:12人阅读
目录
本文将简述Flink SQL / Table API的内部实现,为大家把 "从SQL语句到具体执行" 这个流程串起来。并且尽量多提供调用栈,这样大家在遇到问题时就知道应该从什么地方设置断点,对整体架构理解也能更加深入。
SQL流程中涉及到几个重要的节点举例如下:
// NOTE : 执行顺序是从上至下, " -----> " 表示生成的实例类型
*
* +-----> "left outer JOIN" (SQL statement)
* |
* |
* SqlParser.parseQuery // SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode
* |
* |
* +-----> SqlJoin (SqlNode)
* |
* |
* SqlToRelConverter.convertQuery // 语义分析,生成逻辑计划,作用是SqlNode–>RelNode
* |
* |
* +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未优化的RelNode
* |
* |
* FlinkLogicalJoinConverter (RelOptRule) // Flink定制的优化rules
* VolcanoRuleCall.onMatch // 基于Flink定制的一些优化rules去优化 Logical Plan
* |
* |
* +-----> FlinkLogicalJoin (RelNode) // Optimized Logical Plan,逻辑执行计划
* |
* |
* StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin
* VolcanoRuleCall.onMatch // 基于Flink rules将optimized LogicalPlan转成Flink物理执行计划
* |
* |
* +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理执行计划
* |
* |
* StreamExecJoin.translateToPlanInternal // 作用是生成 StreamOperator, 即Flink算子
* |
* |
* +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask
* |
* |
* StreamTwoInputProcessor.processRecord1// 在TwoInputStreamTask调用StreamingJoinOperator,真实的执行
* |
* |
后续我们会以这个图为脉络进行讲解。
Flink Table API&SQL 为流式数据和静态数据的关系查询保留统一的接口,而且利用了Apache Calcite的查询优化框架和SQL parser。
为什么Flink要使用Table API呢?总结来说,关系型API的好处如下:
Calcite是这里面的核心成员。Apache Calcite是面向Hadoop新的sql引擎,它提供了标准的SQL语言、多种查询优化和连接各种数据源的能力。
下面是 Calcite 概念梳理:
Sql 的执行过程一般可以分为四个阶段,Calcite 与这个很类似,但Calcite是分成五个阶段 :
SQL 解析阶段,生成AST(抽象语法树)(SQL–>SqlNode)
SqlNode 验证(SqlNode–>SqlNode)
语义分析,生成逻辑计划(Logical Plan)(SqlNode–>RelNode/RexNode)
优化阶段,按照相应的规则(Rule)进行优化(RelNode–>RelNode)
生成ExecutionPlan,生成物理执行计划(DataStream Plan)
Flink承载了 Table API 和 SQL API 两套表达方式。它以Apache Calcite这个SQL解析器做SQL语义解析,统一生成为 Calcite Logical Plan(SqlNode 树);随后验证;再利用 Calcite的优化器优化转换规则和logical plan,根据数据源的性质(流和批)使用不同的规则进行优化,优化为 RelNode 逻辑执行计划树;最终优化后的plan转成常规的Flink DataSet 或 DataStream 程序。任何对于DataStream API和DataSet API的性能调优提升都能够自动地提升Table API或者SQL查询的效率。
一条stream sql从提交到calcite解析、优化最后到Flink引擎执行,一般分为以下几个阶段:
而如果是通过table api来提交任务的话,也会经过calcite优化等阶段,基本流程和直接运行sql类似:
可以看出来,Table API 与 SQL 在获取 RelNode 之后是一样的流程,只是获取 RelNode 的方式有所区别:
TableEnvironment对象是Table API和SQL集成的一个核心,支持以下场景:
一个查询中只能绑定一个指定的TableEnvironment,TableEnvironment可以通过来配置TableConfig来配置,通过TableConfig可以自定义查询优化以及translation的进程。
TableEnvironment执行过程如下:
TableEnvironment.sql()为调用入口;
Flink实现了FlinkPlannerImpl,执行parse(sql),validate(sqlNode),rel(sqlNode)操作;
生成Table;
具体代码摘要如下
package org.apache.Flink.table.api.internal;
@Internal
public class TableEnvironmentImpl implements TableEnvironment {
private final CatalogManager catalogManager;
private final ModuleManager moduleManager;
private final OperationTreeBuilder operationTreeBuilder;
private final List<ModifyOperation> bufferedModifyOperations = new ArrayList<>();
protected final TableConfig tableConfig;
protected final Executor execEnv;
protected final FunctionCatalog functionCatalog;
protected final Planner planner;
protected final Parser parser;
}
// 在程序中打印类内容如下
this = {StreamTableEnvironmentImpl@4701}
functionCatalog = {FunctionCatalog@4702}
scalaExecutionEnvironment = {StreamExecutionEnvironment@4703}
planner = {StreamPlanner@4704}
config = {TableConfig@4708}
executor = {StreamExecutor@4709}
PlannerBase.config = {TableConfig@4708}
functionCatalog = {FunctionCatalog@4702}
catalogManager = {CatalogManager@1250}
isStreamingMode = true
plannerContext = {PlannerContext@4711}
parser = {ParserImpl@4696}
catalogManager = {CatalogManager@1250}
moduleManager = {ModuleManager@4705}
operationTreeBuilder = {OperationTreeBuilder@4706}
bufferedModifyOperations = {ArrayList@4707} size = 0
tableConfig = {TableConfig@4708}
execEnv = {StreamExecutor@4709}
TableEnvironmentImpl.functionCatalog = {FunctionCatalog@4702}
TableEnvironmentImpl.planner = {StreamPlanner@4704}
parser = {ParserImpl@4696}
registration = {TableEnvironmentImpl$1@4710}
Catalog – 定义元数据和命名空间,包含 Schema(库),Table(表),RelDataType(类型信息)。
所有对数据库和表的元数据信息都存放在Flink CataLog内部目录结构中,其存放了Flink内部所有与Table相关的元数据信息,包括表结构信息/数据源信息等。
// TableEnvironment里面包含一个CatalogManager
public final class CatalogManager {
// A map between names and catalogs.
private Map<String, Catalog> catalogs;
}
// Catalog接口
public interface Catalog {
......
default Optional<TableFactory> getTableFactory() {
return Optional.empty();
}
......
}
// 当数据来源是在程序里面自定义的时候,对应是GenericInMemoryCatalog
public class GenericInMemoryCatalog extends AbstractCatalog {
public static final String DEFAULT_DB = "default";
private final Map<String, CatalogDatabase> databases;
private final Map<ObjectPath, CatalogBaseTable> tables;
private final Map<ObjectPath, CatalogFunction> functions;
private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogPartition>> partitions;
private final Map<ObjectPath, CatalogTableStatistics> tableStats;
private final Map<ObjectPath, CatalogColumnStatistics> tableColumnStats;
private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogTableStatistics>> partitionStats;
private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogColumnStatistics>> partitionColumnStats;
}
// 程序中调试的内容
catalogManager = {CatalogManager@4646}
catalogs = {LinkedHashMap@4652} size = 1
"default_catalog" -> {GenericInMemoryCatalog@4659}
key = "default_catalog"
value = {char[15]@4668}
hash = 552406043
value = {GenericInMemoryCatalog@4659}
databases = {LinkedHashMap@4660} size = 1
tables = {LinkedHashMap@4661} size = 0
functions = {LinkedHashMap@4662} size = 0
partitions = {LinkedHashMap@4663} size = 0
tableStats = {LinkedHashMap@4664} size = 0
tableColumnStats = {LinkedHashMap@4665} size = 0
partitionStats = {LinkedHashMap@4666} size = 0
partitionColumnStats = {LinkedHashMap@4667} size = 0
catalogName = "default_catalog"
defaultDatabase = "default_database"
temporaryTables = {HashMap@4653} size = 2
currentCatalogName = "default_catalog"
currentDatabaseName = "default_database"
builtInCatalogName = "default_catalog"
StreamPlanner是新的Blink Planner一种。
Flink Table 的新架构实现了查询处理器的插件化,社区完整保留原有 Flink Planner (Old Planner),同时又引入了新的 Blink Planner,用户可以自行选择使用 Old Planner 还是 Blink Planner。
在模型上,Old Planner 没有考虑流计算作业和批处理作业的统一,针对流计算作业和批处理作业的实现不尽相同,在底层会分别翻译到 DataStream API 和 DataSet API 上。而 Blink Planner 将批数据集看作 bounded DataStream (有界流式数据) ,流计算作业和批处理作业最终都会翻译到 Transformation API 上。 在架构上,Blink Planner 针对批处理和流计算,分别实现了BatchPlanner 和 StreamPlanner ,两者共用了大部分代码,共享了很多优化逻辑。 Old Planner 针对批处理和流计算的代码实现的是完全独立的两套体系,基本没有实现代码和优化逻辑复用。
除了模型和架构上的优点外,Blink Planner 沉淀了许多实用功能,集中在三个方面:
具体对应代码来看,StreamPlanner体现在translateToPlan会调用到不同的 StreamOperator 生成系统上。
class StreamPlanner(
executor: Executor,
config: TableConfig,
functionCatalog: FunctionCatalog,
catalogManager: CatalogManager)
extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = true) {
override protected def translateToPlan(
execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = {
execNodes.map {
case node: StreamExecNode[_] => node.translateToPlan(this)
case _ =>
throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +
"This is a bug and should not happen. Please file an issue.")
}
}
}
@Internal
public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl implements StreamTableEnvironment {
private <T> DataStream<T> toDataStream(Table table, OutputConversionModifyOperation modifyOperation) {
// 在转换回DataStream时候进行调用 planner 生成plan的操作。
List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation));
Transformation<T> transformation = getTransformation(table, transformations);
executionEnvironment.addOperator(transformation);
return new DataStream<>(executionEnvironment, transformation);
}
}
// 程序中调试打印的运行栈
translateToPlanInternal:85, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec)
translateToPlan:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToTransformation:184, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:153, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec)
translateToPlan:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation)
apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation)
apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
map:234, TraversableLike$class (scala.collection)
map:104, AbstractTraversable (scala.collection)
translateToPlan:59, StreamPlanner (org.apache.Flink.table.planner.delegation)
translate:153, PlannerBase (org.apache.Flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
main:89, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
Flink实现了FlinkPlannerImpl,做为和Calcite 联系的桥梁,执行parse(sql),validate(sqlNode),rel(sqlNode)操作。
class FlinkPlannerImpl(
config: FrameworkConfig,
catalogReaderSupplier: JFunction[JBoolean, CalciteCatalogReader],
typeFactory: FlinkTypeFactory,
cluster: RelOptCluster) {
val operatorTable: SqlOperatorTable = config.getOperatorTable
val parser: CalciteParser = new CalciteParser(config.getParserConfig)
val convertletTable: SqlRexConvertletTable = config.getConvertletTable
val sqlToRelConverterConfig: SqlToRelConverter.Config = config.getSqlToRelConverterConfig
}
// 这里会有使用 FlinkPlannerImpl
public class ParserImpl implements Parser {
private final CatalogManager catalogManager;
private final Supplier<FlinkPlannerImpl> validatorSupplier;
private final Supplier<CalciteParser> calciteParserSupplier;
@Override
public List<Operation> parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
// 这里会有使用 FlinkPlannerImpl
FlinkPlannerImpl planner = validatorSupplier.get();
// parse the sql query
SqlNode parsed = parser.parse(statement);
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
}
}
// 程序中调试的内容
planner = {FlinkPlannerImpl@4659}
config = {Frameworks$StdFrameworkConfig@4685}
catalogReaderSupplier = {PlannerContext$lambda@4686}
typeFactory = {FlinkTypeFactory@4687}
cluster = {FlinkRelOptCluster@4688}
operatorTable = {ChainedSqlOperatorTable@4689}
parser = {CalciteParser@4690}
convertletTable = {StandardConvertletTable@4691}
sqlToRelConverterConfig = {SqlToRelConverter$ConfigImpl@4692}
validator = null
// 程序调用栈之一
validate:104, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
convert:127, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
parse:66, ParserImpl (org.apache.Flink.table.planner.delegation)
sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
main:82, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
// 程序调用栈之二
rel:135, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
toQueryOperation:522, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
convertSqlQuery:436, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
convert:154, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
parse:66, ParserImpl (org.apache.Flink.table.planner.delegation)
sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
main:82, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
从代码中能看出,这就是个把各种相关操作和信息封装起来类而已,并不涉及太多实际逻辑。
@Internal
public class TableImpl implements Table {
private static final AtomicInteger uniqueId = new AtomicInteger(0);
private final TableEnvironment tableEnvironment;
private final QueryOperation operationTree;
private final OperationTreeBuilder operationTreeBuilder;
private final LookupCallResolver lookupResolver;
private TableImpl joinInternal(
Table right,
Optional<Expression> joinPredicate,
JoinType joinType) {
verifyTableCompatible(right);
return createTable(operationTreeBuilder.join(
this.operationTree,
right.getQueryOperation(),
joinType,
joinPredicate,
false));
}
}
// 程序中调试的内容
view = {TableImpl@4583} "UnnamedTable$0"
tableEnvironment = {StreamTableEnvironmentImpl@4580}
functionCatalog = {FunctionCatalog@4646}
scalaExecutionEnvironment = {StreamExecutionEnvironment@4579}
planner = {StreamPlanner@4647}
catalogManager = {CatalogManager@4644}
moduleManager = {ModuleManager@4648}
operationTreeBuilder = {OperationTreeBuilder@4649}
bufferedModifyOperations = {ArrayList@4650} size = 0
tableConfig = {TableConfig@4651}
execEnv = {StreamExecutor@4652}
TableEnvironmentImpl.functionCatalog = {FunctionCatalog@4646}
TableEnvironmentImpl.planner = {StreamPlanner@4647}
parser = {ParserImpl@4653}
registration = {TableEnvironmentImpl$1@4654}
operationTree = {ScalaDataStreamQueryOperation@4665}
identifier = null
dataStream = {DataStreamSource@4676}
fieldIndices = {int[2]@4677}
tableSchema = {TableSchema@4678} "root\n |-- orderId: STRING\n |-- productName: STRING\n"
operationTreeBuilder = {OperationTreeBuilder@4649}
config = {TableConfig@4651}
functionCatalog = {FunctionCatalog@4646}
tableReferenceLookup = {TableEnvironmentImpl$lambda@4668}
lookupResolver = {LookupCallResolver@4669}
projectionOperationFactory = {ProjectionOperationFactory@4670}
sortOperationFactory = {SortOperationFactory@4671}
calculatedTableFactory = {CalculatedTableFactory@4672}
setOperationFactory = {SetOperationFactory@4673}
aggregateOperationFactory = {AggregateOperationFactory@4674}
joinOperationFactory = {JoinOperationFactory@4675}
lookupResolver = {LookupCallResolver@4666}
functionLookup = {FunctionCatalog@4646}
tableName = "UnnamedTable$0"
value = {char[14]@4667}
hash = 1355882650
这里对应前面脉络图,作用是生成了 SqlJoin 这样的 SqlNode
// NOTE : 执行顺序是从上至下," -----> " 表示生成的实例类型
*
* +-----> "left outer JOIN" (SQL statement)
* |
* |
* SqlParser.parseQuery // SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode
* |
* |
* +-----> SqlJoin (SqlNode)
* |
* |
Calcite 使用 JavaCC 做 SQL 解析,JavaCC 根据 Calcite 中定义的 Parser.jj 文件,生成一系列的 java 代码,生成的 Java 代码会把 SQL 转换成 AST 的数据结构(这里是 SqlNode 类型)。
即:把 SQL 转换成为 AST (抽象语法树),在 Calcite 中用 SqlNode 来表示;
package org.apache.Flink.table.planner.delegation;
public class ParserImpl implements Parser {
@Override
public List<Operation> parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// parse the sql query
SqlNode parsed = parser.parse(statement);
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
}
}
// 打印出来解析之后 parsed 的内容,我们能看到 SqlNode 的基本格式。
parsed = {SqlBasicCall@4690} "SELECT *\nFROM `UnnamedTable$0`\nWHERE `amount` > 2\nUNION ALL\nSELECT *\nFROM `OrderB`\nWHERE `amount` < 2"
operator = {SqlSetOperator@4716} "UNION ALL"
all = true
name = "UNION ALL"
kind = {SqlKind@4742} "UNION"
leftPrec = 14
rightPrec = 15
returnTypeInference = {ReturnTypes$lambda@4743}
operandTypeInference = null
operandTypeChecker = {SetopOperandTypeChecker@4744}
operands = {SqlNode[2]@4717}
0 = {SqlSelect@4746} "SELECT *\nFROM `UnnamedTable$0`\nWHERE `amount` > 2"
1 = {SqlSelect@4747} "SELECT *\nFROM `OrderB`\nWHERE `amount` < 2"
functionQuantifier = null
expanded = false
pos = {SqlParserPos@4719} "line 2, column 1"
// 下面是调试相关Stack,可以帮助大家深入理解
SqlStmt:3208, FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl)
SqlStmtEof:3732, FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl)
parseSqlStmtEof:234, FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl)
parseQuery:160, SqlParser (org.apache.calcite.sql.parser)
parseStmt:187, SqlParser (org.apache.calcite.sql.parser)
parse:48, CalciteParser (org.apache.Flink.table.planner.calcite)
parse:64, ParserImpl (org.apache.Flink.table.planner.delegation)
sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
main:82, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
// 另一个参考 in FlinkSqlParserImpl.FromClause
e = {SqlJoin@4709} "`Orders` AS `o`\nLEFT JOIN `Payment` AS `p` ON `o`.`orderId` = `p`.`orderId`"
left = {SqlBasicCall@4676} "`Orders` AS `o`"
operator = {SqlAsOperator@4752} "AS"
operands = {SqlNode[2]@4753}
functionQuantifier = null
expanded = false
pos = {SqlParserPos@4755} "line 7, column 3"
natural = {SqlLiteral@4677} "FALSE"
typeName = {SqlTypeName@4775} "BOOLEAN"
value = {Boolean@4776} false
pos = {SqlParserPos@4777} "line 7, column 13"
joinType = {SqlLiteral@4678} "LEFT"
typeName = {SqlTypeName@4758} "SYMBOL"
value = {JoinType@4759} "LEFT"
pos = {SqlParserPos@4724} "line 7, column 26"
right = {SqlBasicCall@4679} "`Payment` AS `p`"
operator = {SqlAsOperator@4752} "AS"
operands = {SqlNode[2]@4763}
functionQuantifier = null
expanded = false
pos = {SqlParserPos@4764} "line 7, column 31"
conditionType = {SqlLiteral@4680} "ON"
typeName = {SqlTypeName@4758} "SYMBOL"
value = {JoinConditionType@4771} "ON"
pos = {SqlParserPos@4772} "line 7, column 44"
condition = {SqlBasicCall@4681} "`o`.`orderId` = `p`.`orderId`"
operator = {SqlBinaryOperator@4766} "="
operands = {SqlNode[2]@4767}
functionQuantifier = null
expanded = false
pos = {SqlParserPos@4768} "line 7, column 47"
pos = {SqlParserPos@4724} "line 7, column 26"
// 下面是调试相关Stack,可以帮助大家深入理解
FromClause:10192, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
SqlSelect:5918, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
LeafQuery:630, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
LeafQueryOrExpr:15651, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
QueryOrExpr:15118, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
OrderedQueryOrExpr:504, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
SqlStmt:3693, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
SqlStmtEof:3732, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
parseSqlStmtEof:234, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
parseQuery:160, SqlParser (org.apache.calcite.sql.parser)
parseStmt:187, SqlParser (org.apache.calcite.sql.parser)
parse:48, CalciteParser (org.apache.flink.table.planner.calcite)
parse:64, ParserImpl (org.apache.flink.table.planner.delegation)
sqlQuery:464, TableEnvironmentImpl (org.apache.flink.table.api.internal)
main:73, SimpleOuterJoin$ (spendreport)
main:-1, SimpleOuterJoin (spendreport)
经过上面的第一步,会生成一个 SqlNode 对象,它是一个未经验证的抽象语法树,下面就进入了一个语法检查阶段,语法检查前需要知道元数据信息,这个检查会包括表名、字段名、函数名、数据类型的检查。
即:语法检查,根据元数据信息进行语法验证,验证之后还是用 SqlNode 表示 AST 语法树;
package org.apache.Flink.table.planner.operations;
public class SqlToOperationConverter {
public static Optional<Operation> convert(
// 这里进行validate的调用
final SqlNode validated = FlinkPlanner.validate(sqlNode);
SqlToOperationConverter converter = new SqlToOperationConverter(FlinkPlanner, catalogManager);
}
}
// 打印出来解析之后 validated 的内容。
validated = {SqlBasicCall@4675} "SELECT `UnnamedTable$0`.`user`, `UnnamedTable$0`.`product`, `UnnamedTable$0`.`amount`\nFROM `default_catalog`.`default_database`.`UnnamedTable$0` AS `UnnamedTable$0`\nWHERE `UnnamedTable$0`.`amount` > 2\nUNION ALL\nSELECT `OrderB`.`user`, `OrderB`.`product`, `OrderB`.`amount`\nFROM `default_catalog`.`default_database`.`OrderB` AS `OrderB`\nWHERE `OrderB`.`amount` < 2"
operator = {SqlSetOperator@5000} "UNION ALL"
all = true
name = "UNION ALL"
kind = {SqlKind@5029} "UNION"
leftPrec = 14
rightPrec = 15
returnTypeInference = {ReturnTypes$lambda@5030}
operandTypeInference = null
operandTypeChecker = {SetopOperandTypeChecker@5031}
operands = {SqlNode[2]@5001}
0 = {SqlSelect@4840} "SELECT `UnnamedTable$0`.`user`, `UnnamedTable$0`.`product`, `UnnamedTable$0`.`amount`\nFROM `default_catalog`.`default_database`.`UnnamedTable$0` AS `UnnamedTable$0`\nWHERE `UnnamedTable$0`.`amount` > 2"
1 = {SqlSelect@5026} "SELECT `OrderB`.`user`, `OrderB`.`product`, `OrderB`.`amount`\nFROM `default_catalog`.`default_database`.`OrderB` AS `OrderB`\nWHERE `OrderB`.`amount` < 2"
functionQuantifier = null
expanded = false
pos = {SqlParserPos@5003} "line 2, column 1"
// 下面是调试相关Stack,可以帮助大家深入理解
validate:81, AbstractNamespace (org.apache.calcite.sql.validate)
validateNamespace:1008, SqlValidatorImpl (org.apache.calcite.sql.validate)
validateQuery:968, SqlValidatorImpl (org.apache.calcite.sql.validate)
validateCall:90, SqlSetOperator (org.apache.calcite.sql)
validateCall:5304, SqlValidatorImpl (org.apache.calcite.sql.validate)
validate:116, SqlCall (org.apache.calcite.sql)
validateScopedExpression:943, SqlValidatorImpl (org.apache.calcite.sql.validate)
validate:650, SqlValidatorImpl (org.apache.calcite.sql.validate)
org$apache$Flink$table$planner$calcite$FlinkPlannerImpl$$validate:126, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
validate:105, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
convert:127, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
parse:66, ParserImpl (org.apache.Flink.table.planner.delegation)
sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
main:82, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
脉络图中,这时候来到了
// NOTE : 执行顺序是从上至下," -----> " 表示生成的实例类型
*
* +-----> "left outer JOIN" (SQL statement)
* |
* |
* SqlParser.parseQuery // SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode
* |
* |
* +-----> SqlJoin (SqlNode)
* |
* |
* SqlToRelConverter.convertQuery // 语义分析,生成逻辑计划,作用是SqlNode–>RelNode
* |
* |
* +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未优化的RelNode
* |
* |
经过第二步之后,这里的 SqlNode 就是经过语法校验的 SqlNode 树,接下来这一步就是将 SqlNode 转换成 RelNode/RexNode,也就是生成相应的逻辑计划(Logical Plan)
即:语义分析,根据 SqlNode及元信息构建 RelNode 树,也就是最初版本的逻辑计划(Logical Plan);
根据这个已经生成的Flink的logical Plan,将它转换成calcite的logicalPlan,这样我们才能用到calcite强大的优化规则。
Flink由上往下依次调用各个节点的construct方法,将Flink节点转换成calcite的RelNode节点。真正的实现是在 convertQueryRecursive()
方法中完成的。
比如生成 LogicalProject 调用关系大概如下:
createJoin:378, RelFactories$JoinFactoryImpl (org.apache.calcite.rel.core)
createJoin:2520, SqlToRelConverter (org.apache.calcite.sql2rel)
convertFrom:2111, SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelectImpl:646, SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelect:627, SqlToRelConverter (org.apache.calcite.sql2rel)
convertQueryRecursive:3181, SqlToRelConverter (org.apache.calcite.sql2rel)
convertQuery:563, SqlToRelConverter (org.apache.calcite.sql2rel)
org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:148, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
rel:135, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
toQueryOperation:522, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convertSqlQuery:436, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convert:154, SqlToOperationConverter (org.apache.flink.table.planner.operations)
parse:66, ParserImpl (org.apache.flink.table.planner.delegation)
sqlQuery:464, TableEnvironmentImpl (org.apache.flink.table.api.internal)
main:73, SimpleOuterJoin$ (spendreport)
main:-1, SimpleOuterJoin (spendreport)
具体详细源码如下:
SqlToRelConverter 中的 convertQuery() 将 SqlNode 转换为 RelRoot
public class SqlToRelConverter {
public RelRoot convertQuery(SqlNode query, boolean needsValidation, boolean top) {
if (needsValidation) {
query = this.validator.validate(query);
}
RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(this.cluster.getMetadataProvider()));
RelNode result = this.convertQueryRecursive(query, top, (RelDataType)null).rel;
if (top && isStream(query)) {
result = new LogicalDelta(this.cluster, ((RelNode)result).getTraitSet(), (RelNode)result);
}
RelCollation collation = RelCollations.EMPTY;
if (!query.isA(SqlKind.DML) && isOrdered(query)) {
collation = this.requiredCollation((RelNode)result);
}
this.checkConvertedType(query, (RelNode)result);
RelDataType validatedRowType = this.validator.getValidatedNodeType(query);
// 这里设定了Root
return RelRoot.of((RelNode)result, validatedRowType, query.getKind()).withCollation(collation);
}
}
// 在这里打印
toQueryOperation:523, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
// 得到如下内容,可以看到一个RelRoot的真实结构
relational = {RelRoot@5248} "Root {kind: UNION, rel: LogicalUnion#6, rowType: RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount), fields: [<0, user>, <1, product>, <2, amount>], collation: []}"
rel = {LogicalUnion@5227} "LogicalUnion#6"
inputs = {RegularImmutableList@5272} size = 2
kind = {SqlKind@5029} "UNION"
all = true
desc = "LogicalUnion#6"
rowType = {RelRecordType@5238} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)"
digest = "LogicalUnion#6"
cluster = {FlinkRelOptCluster@4800}
id = 6
traitSet = {RelTraitSet@5273} size = 5
validatedRowType = {RelRecordType@5238} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)"
kind = {StructKind@5268} "FULLY_QUALIFIED"
nullable = false
fieldList = {RegularImmutableList@5269} size = 3
digest = "RecordType(BIGINT user, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" product, INTEGER amount) NOT NULL"
kind = {SqlKind@5029} "UNION"
lowerName = "union"
sql = "UNION"
name = "UNION"
ordinal = 18
fields = {RegularImmutableList@5254} size = 3
{Integer@5261} 0 -> "user"
{Integer@5263} 1 -> "product"
{Integer@5265} 2 -> "amount"
collation = {RelCollationImpl@5237} "[]"
fieldCollations = {RegularImmutableList@5256} size = 0
// 调用栈内容
convertQuery:561, SqlToRelConverter (org.apache.calcite.sql2rel)
org$apache$Flink$table$planner$calcite$FlinkPlannerImpl$$rel:148, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
rel:135, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
toQueryOperation:522, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
convertSqlQuery:436, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
convert:154, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
parse:66, ParserImpl (org.apache.Flink.table.planner.delegation)
sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
main:82, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
// 再次举例,生成了LogicalProject
bb = {SqlToRelConverter$Blackboard@4978}
scope = {SelectScope@4977}
nameToNodeMap = null
root = {LogicalProject@5100} "LogicalProject#4"
exps = {RegularImmutableList@5105} size = 3
input = {LogicalJoin@5106} "LogicalJoin#3"
desc = "LogicalProject#4"
rowType = {RelRecordType@5107} "RecordType(VARCHAR(2147483647) orderId, VARCHAR(2147483647) productName, VARCHAR(2147483647) payType)"
digest = "LogicalProject#4"
cluster = {FlinkRelOptCluster@4949}
id = 4
traitSet = {RelTraitSet@5108} size = 5
inputs = {Collections$SingletonList@5111} size = 1
mapCorrelateToRex = {HashMap@5112} size = 0
isPatternVarRef = false
cursors = {ArrayList@5113} size = 0
subQueryList = {LinkedHashSet@5114} size = 0
agg = null
window = null
mapRootRelToFieldProjection = {HashMap@5115} size = 0
columnMonotonicities = {ArrayList@5116} size = 3
systemFieldList = {ArrayList@5117} size = 0
top = true
initializerExpressionFactory = {NullInitializerExpressionFactory@5118}
this$0 = {SqlToRelConverter@4926}
// 举例,LogicalProject是在这里生成的。
protected void convertFrom(SqlToRelConverter.Blackboard bb, SqlNode from) {
case JOIN:
RelNode joinRel = this.createJoin(fromBlackboard, leftRel, rightRel, conditionExp, convertedJoinType);
bb.setRoot(joinRel, false);
}
// 相关调用栈
createJoin:378, RelFactories$JoinFactoryImpl (org.apache.calcite.rel.core)
createJoin:2520, SqlToRelConverter (org.apache.calcite.sql2rel)
convertFrom:2111, SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelectImpl:646, SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelect:627, SqlToRelConverter (org.apache.calcite.sql2rel)
convertQueryRecursive:3181, SqlToRelConverter (org.apache.calcite.sql2rel)
convertQuery:563, SqlToRelConverter (org.apache.calcite.sql2rel)
org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:148, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
rel:135, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
toQueryOperation:522, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convertSqlQuery:436, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convert:154, SqlToOperationConverter (org.apache.flink.table.planner.operations)
parse:66, ParserImpl (org.apache.flink.table.planner.delegation)
sqlQuery:464, TableEnvironmentImpl (org.apache.flink.table.api.internal)
main:73, SimpleOuterJoin$ (spendreport)
main:-1, SimpleOuterJoin (spendreport)
这时候,脉络图到了这里
// NOTE : 执行顺序是从上至下," -----> " 表示生成的实例类型
*
* +-----> "left outer JOIN" (SQL statement)
* |
* |
* SqlParser.parseQuery // SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode
* |
* |
* +-----> SqlJoin (SqlNode)
* |
* |
* SqlToRelConverter.convertQuery // 语义分析,生成逻辑计划,作用是SqlNode–>RelNode
* |
* |
* +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未优化的RelNode
* |
* |
* FlinkLogicalJoinConverter (RelOptRule) // Flink定制的优化rules
* VolcanoRuleCall.onMatch // 基于Flink定制的一些优化rules去优化 Logical Plan
* |
* |
* +-----> FlinkLogicalJoin (RelNode) // Optimized Logical Plan,逻辑执行计划
* |
* |
* StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin
* VolcanoRuleCall.onMatch // 基于Flink rules将optimized LogicalPlan转成Flink物理执行计划
* |
* |
* +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理执行计划
* |
* |
第四阶段,也就是 Calcite 的核心所在。
即:逻辑计划优化,优化器的核心,根据前面生成的逻辑计划按照相应的规则(Rule)进行优化;
Flink的这部分实现统一封装在optimize方法里头。这部分涉及到多个阶段,每个阶段都是用Rule对逻辑计划进行优化和改进。
在 Calcite 架构中,最核心地方就是 Optimizer,也就是优化器,一个 Optimization Engine 包含三个组成部分:
优化器的作用是将解析器生成的关系代数表达式转换成执行计划,供执行引擎执行,在这个过程中,会应用一些规则优化,以帮助生成更高效的执行计划。优化器进行优化的地方如过滤条件的下压(push down),在进行 join 操作前,先进行 filter 操作,这样的话就不需要在 join 时进行全量 join,减少参与 join 的数据量等。
Calcite 中 RelOptPlanner 是 Calcite 中优化器的基类。Calcite 中关于优化器提供了两种实现:
基于代价的优化器(Cost-Based Optimizer,CBO) 是根据优化规则对关系表达式进行转换。这里的转换是说一个关系表达式经过优化规则后会生成另外一个关系表达式,同时原有表达式也会保留,经过一系列转换后会生成多个执行计划,然后 CBO 会根据统计信息和代价模型 (Cost Model) 计算每个执行计划的 Cost,从中挑选 Cost 最小的执行计划。
由上可知,CBO 中有两个依赖:统计信息和代价模型。统计信息的准确与否、代价模型的合理与否都会影响 CBO 选择最优计划。 从上述描述可知,CBO 是优于 RBO 的,原因是 RBO 是一种只认规则,对数据不敏感的呆板的优化器,而在实际过程中,数据往往是有变化的,通过 RBO 生成的执行计划很有可能不是最优的。事实上目前各大数据库和大数据计算引擎都倾向于使用 CBO,但是对于流式计算引擎来说,使用 CBO 还是有很大难度的,因为并不能提前预知数据量等信息,这会极大地影响优化效果,CBO 主要还是应用在离线的场景。
VolcanoPlanner就是 CBO 的实现,它会一直迭代 rules,直到找到 cost 最小的 paln。其部分相关概念如下:
rels
中;best
)和最佳 plan 的 cost(bestCost
)信息。在应用 VolcanoPlanner 时,整体分为以下四步:
Convention
);setRoot()
方法注册相应的 RelNode,并进行相应的初始化操作;下面通过这个 示例 来详细看下 VolcanoPlanner 内部的实现逻辑。
//1. 初始化 VolcanoPlanner 对象,并添加相应的 Rule
VolcanoPlanner planner = new VolcanoPlanner();
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
planner.addRelTraitDef(RelDistributionTraitDef.INSTANCE);
// 添加相应的 rule
planner.addRule(FilterJoinRule.FilterIntoJoinRule.FILTER_ON_JOIN);
planner.addRule(ReduceExpressionsRule.PROJECT_INSTANCE);
planner.addRule(PruneEmptyRules.PROJECT_INSTANCE);
// 添加相应的 ConverterRule
planner.addRule(EnumerableRules.ENUMERABLE_MERGE_JOIN_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_SORT_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_VALUES_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_PROJECT_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_FILTER_RULE);
//2. Changes a relational expression to an equivalent one with a different set of traits.
RelTraitSet desiredTraits =
relNode.getCluster().traitSet().replace(EnumerableConvention.INSTANCE);
relNode = planner.changeTraits(relNode, desiredTraits);
//3. 通过 VolcanoPlanner 的 setRoot 方法注册相应的 RelNode,并进行相应的初始化操作
planner.setRoot(relNode);
//4. 通过动态规划算法找到 cost 最小的 plan
relNode = planner.findBestExp();
Flink 中相关代码如下:
public PlannerContext(
TableConfig tableConfig,
FunctionCatalog functionCatalog,
CatalogManager catalogManager,
CalciteSchema rootSchema,
List<RelTraitDef> traitDefs) {
this.tableConfig = tableConfig;
this.context = new FlinkContextImpl(
tableConfig,
functionCatalog,
catalogManager,
this::createSqlExprToRexConverter);
this.rootSchema = rootSchema;
this.traitDefs = traitDefs;
// Make a framework config to initialize the RelOptCluster instance,
// caution that we can only use the attributes that can not be overwrite/configured
// by user.
this.frameworkConfig = createFrameworkConfig();
// 这里使用了VolcanoPlanner
RelOptPlanner planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), frameworkConfig.getContext());
planner.setExecutor(frameworkConfig.getExecutor());
for (RelTraitDef traitDef : frameworkConfig.getTraitDefs()) {
planner.addRelTraitDef(traitDef);
}
this.cluster = FlinkRelOptClusterFactory.create(planner, new RexBuilder(typeFactory));
}
//初始化
<init>:119, PlannerContext (org.apache.Flink.table.planner.delegation)
<init>:86, PlannerBase (org.apache.Flink.table.planner.delegation)
<init>:44, StreamPlanner (org.apache.Flink.table.planner.delegation)
create:50, BlinkPlannerFactory (org.apache.Flink.table.planner.delegation)
create:325, StreamTableEnvironmentImpl$ (org.apache.Flink.table.api.scala.internal)
create:425, StreamTableEnvironment$ (org.apache.Flink.table.api.scala)
main:56, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
class FlinkVolcanoProgram[OC <: FlinkOptimizeContext] extends FlinkRuleSetProgram[OC] {
override def optimize(root: RelNode, context: OC): RelNode = {
val targetTraits = root.getTraitSet.plusAll(requiredOutputTraits.get).simplify()
// VolcanoPlanner limits that the planer a RelNode tree belongs to and
// the VolcanoPlanner used to optimize the RelNode tree should be same instance.
// see: VolcanoPlanner#registerImpl
// here, use the planner in cluster directly
// 这里也使用了VolcanoPlanner
val planner = root.getCluster.getPlanner.asInstanceOf[VolcanoPlanner]
val optProgram = Programs.ofRules(rules)
}
}
// 其调用栈
optimize:60, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
foldLeft:157, TraversableOnce$class (scala.collection)
foldLeft:104, AbstractTraversable (scala.collection)
optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
main:89, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
// 下面全部是 VolcanoPlanner 相关代码和调用栈
// VolcanoPlanner添加Rule,筛选出来的优化规则会封装成VolcanoRuleMatch,然后扔到RuleQueue里,而这个RuleQueue正是接下来执行动态规划算法要用到的核心类。
public class VolcanoPlanner extends AbstractRelOptPlanner {
public boolean addRule(RelOptRule rule) {
......
}
}
addRule:438, VolcanoPlanner (org.apache.calcite.plan.volcano)
run:315, Programs$RuleSetProgram (org.apache.calcite.tools)
optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
foldLeft:157, TraversableOnce$class (scala.collection)
foldLeft:104, AbstractTraversable (scala.collection)
optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
main:89, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
// VolcanoPlanner修改Traits
public class VolcanoPlanner extends AbstractRelOptPlanner {
public RelNode changeTraits(RelNode rel, RelTraitSet toTraits) {
assert !rel.getTraitSet().equals(toTraits);
assert toTraits.allSimple();
RelSubset rel2 = this.ensureRegistered(rel, (RelNode)null);
return rel2.getTraitSet().equals(toTraits) ? rel2 : rel2.set.getOrCreateSubset(rel.getCluster(), toTraits.simplify());
}
}
changeTraits:529, VolcanoPlanner (org.apache.calcite.plan.volcano)
run:324, Programs$RuleSetProgram (org.apache.calcite.tools)
optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
foldLeft:157, TraversableOnce$class (scala.collection)
foldLeft:104, AbstractTraversable (scala.collection)
optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
main:89, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
// VolcanoPlanner设定Root
public class VolcanoPlanner extends AbstractRelOptPlanner {
public void setRoot(RelNode rel) {
this.registerMetadataRels();
this.root = this.registerImpl(rel, (RelSet)null);
if (this.originalRoot == null) {
this.originalRoot = rel;
}
this.ruleQueue.recompute(this.root);
this.ensureRootConverters();
}
}
setRoot:294, VolcanoPlanner (org.apache.calcite.plan.volcano)
run:326, Programs$RuleSetProgram (org.apache.calcite.tools)
optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
foldLeft:157, TraversableOnce$class (scala.collection)
foldLeft:104, AbstractTraversable (scala.collection)
optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
main:89, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
// VolcanoPlanner找到最小cost,本质上就是一个动态规划算法的实现。
public class VolcanoPlanner extends AbstractRelOptPlanner {
public RelNode findBestExp() {
this.ensureRootConverters();
this.registerMaterializations();
int cumulativeTicks = 0;
VolcanoPlannerPhase[] var2 = VolcanoPlannerPhase.values();
int var3 = var2.length;
for(int var4 = 0; var4 < var3; ++var4) {
VolcanoPlannerPhase phase = var2[var4];
this.setInitialImportance();
RelOptCost targetCost = this.costFactory.makeHugeCost();
int tick = 0;
int firstFiniteTick = -1;
int splitCount = 0;
int giveUpTick = 2147483647;
while(true) {
++tick;
++cumulativeTicks;
if (this.root.bestCost.isLe(targetCost)) {
if (firstFiniteTick < 0) {
firstFiniteTick = cumulativeTicks;
this.clearImportanceBoost();
}
if (!this.ambitious) {
break;
}
targetCost = this.root.bestCost.multiplyBy(0.9D);
++splitCount;
if (this.impatient) {
if (firstFiniteTick < 10) {
giveUpTick = cumulativeTicks + 25;
} else {
giveUpTick = cumulativeTicks + Math.max(firstFiniteTick / 10, 25);