时间:2021-07-01 10:21:17 帮助过:10人阅读
该类包含sql解析、验证、优化、执行等各环节需要的元数据管理器CatalogManager
,模块管理器(模块包含函数集、类型集、规则集)moduleManager
,用户自定义函数管理器FunctionCatalog
,线程池、sql解析器Planner
。
- <code>StreamTableEnvironmentImpl.create(executionEnvironment, settings, new TableConfig)
- def create(
- executionEnvironment: StreamExecutionEnvironment,
- settings: EnvironmentSettings,
- tableConfig: TableConfig)
- : StreamTableEnvironmentImpl = {
- val catalogManager = new CatalogManager(
- settings.getBuiltInCatalogName,
- new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
- val moduleManager = new ModuleManager
- val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
- val executorProperties = settings.toExecutorProperties
- val executor = lookupExecutor(executorProperties, executionEnvironment)
- val plannerProperties = settings.toPlannerProperties
- val planner = ComponentFactoryService.find(classOf[PlannerFactory], plannerProperties)
- .create(
- plannerProperties,
- executor,
- tableConfig,
- functionCatalog,
- catalogManager)
- new StreamTableEnvironmentImpl(
- catalogManager,
- moduleManager,
- functionCatalog,
- tableConfig,
- executionEnvironment,
- planner,
- executor,
- settings.isStreamingMode
- )
- }</code>
定义了逻辑类型,并且对其底层实际物理类型进行暗示。
逻辑类型有点类似标准SQL的数据类型,其子类做了具体的约束。
表结构定义,包含各字段名称和各字段类型
- <code> override def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table = {
- val queryOperation = asQueryOperation(dataStream, Some(fields.toList.asJava))
- createTable(queryOperation)
- }</code>
- <code> private final DataStream<E> dataStream;
- private final int[] fieldIndices;
- private final TableSchema tableSchema;</code>
Table
类是sql api的核心组件,定义了转换数据的方法如filter
、groupBy
、join
等。使用TableEnvironment
类可以把Table
转换成DataStream
或者DataSet
。
- <code> private TableImpl(
- TableEnvironment tableEnvironment,
- QueryOperation operationTree,
- OperationTreeBuilder operationTreeBuilder,
- LookupCallResolver lookupResolver) {
- this.tableEnvironment = tableEnvironment;
- this.operationTree = operationTree;
- this.operationTreeBuilder = operationTreeBuilder;
- this.lookupResolver = lookupResolver;
- }</code>
- <code> private void createTemporaryView(UnresolvedIdentifier identifier, Table view) {
- if (((TableImpl) view).getTableEnvironment() != this) {
- throw new TableException(
- "Only table API objects that belong to this TableEnvironment can be registered.");
- }
- CatalogBaseTable tableTable = new QueryOperationCatalogView(view.getQueryOperation());
- ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier);
- catalogManager.createTemporaryTable(tableTable, tableIdentifier, false);
- }</code>
Expression
代表字面量、函数调用或者field引用。
转换Expression
的visitor
ExpressionVisitor的子类把Expression
解析成FieldInfo
- <code> @Override
- public FieldInfo visit(UnresolvedReferenceExpression unresolvedReference) {
- String fieldName = unresolvedReference.getName();
- return new FieldInfo(fieldName, index, fromLegacyInfoToDataType(getTypeAt(unresolvedReference)));
- }</code>
应用举例,把Expression转换成FieldInfo:
- <code> private static List<FieldInfo> extractFieldInfosFromTupleType(TupleTypeInfoBase<?> inputType, Expression[] exprs) {
- boolean isRefByPos = isReferenceByPosition(inputType, exprs);
- if (isRefByPos) {
- return IntStream.range(0, exprs.length)
- .mapToObj(idx -> exprs[idx].accept(new IndexedExprToFieldInfo(inputType, idx)))
- .collect(Collectors.toList());
- } else {
- return extractFieldInfosByNameReference(inputType, exprs);
- }
- }</code>
- <code> private final String fieldName;
- private final int index;
- private final DataType type;</code>
代表一行数据,可以包含任意数量的列,并且各列可能包含不同的数据类型.Row
不是强类型的所以需要配合RowTypeInfo
类获取各列具体的类型.
Row:
- <code> /** The array to store actual values. */
- private final Object[] fields;
- </code>
RowTypeInfo:
- <code> protected final String[] fieldNames;
- protected final TypeInformation<?>[] types;</code>
- <code> override def toAppendStream[T: TypeInformation](table: Table): DataStream[T] = {
- val returnType = createTypeInformation[T]
- val modifyOperation = new OutputConversionModifyOperation(
- table.getQueryOperation,
- TypeConversions.fromLegacyInfoToDataType(returnType),
- OutputConversionModifyOperation.UpdateMode.APPEND)
- toDataStream[T](table, modifyOperation)
- }</code>
Parser.parse(sql)
的返回结果。
定义了sql解析优化规则集合,包含把calcite节点转换成flink节点的规则,比如FlinkLogicalTableSourceScan
,把flink逻辑节点转换成物理执行节点的规则,比如StreamExecTableSourceScanRule
,条件过滤下推的规则PushFilterIntoTableSourceScanRule
等.
- <code> /** Converts a relational expression to the target trait(s) of this rule.
- *
- * <p>Returns null if conversion is not possible. */
- public abstract RelNode convert(RelNode rel);
- public void onMatch(RelOptRuleCall call) {
- RelNode rel = call.rel(0);
- if (rel.getTraitSet().contains(inTrait)) {
- final RelNode converted = convert(rel);
- if (converted != null) {
- call.transformTo(converted);
- }
- }
- }
- class FlinkLogicalTableSourceScanConverter
- extends ConverterRule(
- classOf[LogicalTableScan],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalTableSourceScanConverter") {
- override def matches(call: RelOptRuleCall): Boolean = {
- val scan: TableScan = call.rel(0)
- isTableSourceScan(scan)
- }
- def convert(rel: RelNode): RelNode = {
- val scan = rel.asInstanceOf[TableScan]
- val table = scan.getTable.asInstanceOf[FlinkRelOptTable]
- FlinkLogicalTableSourceScan.create(rel.getCluster, table)
- }
- }
- </code>
flink RelNode基类不仅包含了RelNode
本身可表达的关系依赖逻辑,而且包含了各关系依赖的Flink体系中的额外信息。比如FlinkLogicalTableSourceScan
包含了TableSource
信息。
物理关系节点基类,其子类同时也会实现ExecNode
接口,用于把物理节点转换成Transformation
- <code> /**
- * Internal method, translates this node into a Flink operator.
- *
- * @param planner The [[Planner]] of the translated Table.
- */
- protected def translateToPlanInternal(planner: E): Transformation[T]
- def translateToPlan(planner: E): Transformation[T] = {
- if (transformation == null) {
- transformation = translateToPlanInternal(planner)
- }
- transformation
- }</code>
ExecNode
转换成Transformation
的过程中部分逻辑会采用动态生成代码的方式实现。动态生成的代码保存在GeneratedClass
子类的实例中,会分发到各个TM节点然后由Janino
库编译执行。比如聚合查询生成聚合处理函数NamespaceTableAggsHandleFunction
的子类。
- <code> public T newInstance(ClassLoader classLoader, Object... args) {
- try {
- return (T) compile(classLoader).getConstructors()[0].newInstance(args);
- } catch (Exception e) {
- throw new RuntimeException(
- "Could not instantiate generated class '" + className + "'", e);
- }
- }
- /**
- * Compiles the generated code, the compiled class will be cached in the {@link GeneratedClass}.
- */
- public Class<T> compile(ClassLoader classLoader) {
- if (compiledClass == null) {
- // cache the compiled class
- compiledClass = CompileUtils.compile(classLoader, className, code);
- }
- return compiledClass;
- }
- </code>
- <code> val sql =
- """
- |SELECT
- | `string`,
- | HOP_START(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),
- | HOP_ROWTIME(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),
- | COUNT(1),
- | SUM(1),
- | COUNT(`int`),
- | COUNT(DISTINCT `float`),
- | concat_distinct_agg(name)
- |FROM T1
- |GROUP BY `string`, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND)
- """.stripMargin
- </code>
- <code>LogicalProject#3
- LogicalAggregate#2
- LogicalProject#1
- LogicalTableScan#0</code>
- <code>rel#271:StreamExecSink.STREAM_PHYSICAL.any.None: 0.false.Acc(input=StreamExecCalc#269,name=DataStreamTableSink,fields=string, EXPR$1, EXPR$2, EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7)
- rel#269:StreamExecCalc.STREAM_PHYSICAL.any.None: 0.false.Acc(input=StreamExecGroupWindowAggregate#267,select=string, w$start AS EXPR$1, w$rowtime AS EXPR$2, EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7)
- rel#267:StreamExecGroupWindowAggregate.STREAM_PHYSICAL.any.None: 0.false.Acc(input=StreamExecExchange#265,groupBy=string,window=SlidingGroupWindow('w$, rowtime, 5, 4),properties=w$start, w$end, w$rowtime, w$proctime,select=string, COUNT(*) AS EXPR$3, $SUM0($f2) AS EXPR$4, COUNT(int) AS EXPR$5, COUNT(DISTINCT float) AS EXPR$6, concat_distinct_agg(name) AS EXPR$7, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime)
- rel#265:StreamExecExchange.STREAM_PHYSICAL.hash[0]true.None: -1.true.Acc(input=StreamExecCalc#263,distribution=hash[string])
- rel#263:StreamExecCalc.STREAM_PHYSICAL.any.None: -1.true.Acc(input=StreamExecDataStreamScan#257,select=string, rowtime, 1 AS $f2, int, float, name)
- rel#257:StreamExecDataStreamScan.STREAM_PHYSICAL.any.None: -1.true.Acc(table=[Unregistered_DataStream_2],fields=rowtime, int, double, float, bigdec, string, name)
- </code>
代码生成:
- <code>StreamExecGroupWindowAggregateBase->translateToPlanInternal
- StreamExecGroupWindowAggregateBase ->createAggsHandler
- AggsHandlerCodeGenerator->generateNamespaceAggsHandler
- new OneInputTransformation
- 任务提交中会把 OneInputTransformation -> OneInputStreamTask
- Task->run
- Task->doRun
- StreamTask->invoke
- StreamTask->openAllOperators
- AggregateWindowOperator->open
- WindowOperator->compileGeneratedCode
- GeneratedNamespaceAggsHandleFunction->newInstance
- SimpleCompiler->cook
- </code>
flink sql
标签:image 数据管理 for operation 用户自定义函数 nta scala name sch