当前位置:Gxlcms > 数据库问题 > flink sql

flink sql

时间:2021-07-01 10:21:17 帮助过:10人阅读

StreamTableEnvironment

该类包含sql解析、验证、优化、执行等各环节需要的元数据管理器CatalogManager,模块管理器(模块包含函数集、类型集、规则集)moduleManager,用户自定义函数管理器FunctionCatalog,线程池、sql解析器Planner

  1. <code>StreamTableEnvironmentImpl.create(executionEnvironment, settings, new TableConfig)
  2. def create(
  3. executionEnvironment: StreamExecutionEnvironment,
  4. settings: EnvironmentSettings,
  5. tableConfig: TableConfig)
  6. : StreamTableEnvironmentImpl = {
  7. val catalogManager = new CatalogManager(
  8. settings.getBuiltInCatalogName,
  9. new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
  10. val moduleManager = new ModuleManager
  11. val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
  12. val executorProperties = settings.toExecutorProperties
  13. val executor = lookupExecutor(executorProperties, executionEnvironment)
  14. val plannerProperties = settings.toPlannerProperties
  15. val planner = ComponentFactoryService.find(classOf[PlannerFactory], plannerProperties)
  16. .create(
  17. plannerProperties,
  18. executor,
  19. tableConfig,
  20. functionCatalog,
  21. catalogManager)
  22. new StreamTableEnvironmentImpl(
  23. catalogManager,
  24. moduleManager,
  25. functionCatalog,
  26. tableConfig,
  27. executionEnvironment,
  28. planner,
  29. executor,
  30. settings.isStreamingMode
  31. )
  32. }</code>

DataType

定义了逻辑类型,并且对其底层实际物理类型进行暗示。

LogicalType

逻辑类型有点类似标准SQL的数据类型,其子类做了具体的约束。

TableSchema

表结构定义,包含各字段名称和各字段类型

DataStream -> Table

  1. <code> override def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table = {
  2. val queryOperation = asQueryOperation(dataStream, Some(fields.toList.asJava))
  3. createTable(queryOperation)
  4. }</code>

ScalaDataStreamQueryOperation

  1. <code> private final DataStream<E> dataStream;
  2. private final int[] fieldIndices;
  3. private final TableSchema tableSchema;</code>

Table

Table类是sql api的核心组件,定义了转换数据的方法如filtergroupByjoin等。使用TableEnvironment类可以把Table转换成DataStream或者DataSet

  1. <code> private TableImpl(
  2. TableEnvironment tableEnvironment,
  3. QueryOperation operationTree,
  4. OperationTreeBuilder operationTreeBuilder,
  5. LookupCallResolver lookupResolver) {
  6. this.tableEnvironment = tableEnvironment;
  7. this.operationTree = operationTree;
  8. this.operationTreeBuilder = operationTreeBuilder;
  9. this.lookupResolver = lookupResolver;
  10. }</code>

注册表信息

  1. <code> private void createTemporaryView(UnresolvedIdentifier identifier, Table view) {
  2. if (((TableImpl) view).getTableEnvironment() != this) {
  3. throw new TableException(
  4. "Only table API objects that belong to this TableEnvironment can be registered.");
  5. }
  6. CatalogBaseTable tableTable = new QueryOperationCatalogView(view.getQueryOperation());
  7. ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier);
  8. catalogManager.createTemporaryTable(tableTable, tableIdentifier, false);
  9. }</code>

Expression

Expression代表字面量、函数调用或者field引用。

ExpressionVisitor

转换Expressionvisitor

IndexedExprToFieldInfo

ExpressionVisitor的子类把Expression解析成FieldInfo

  1. <code> @Override
  2. public FieldInfo visit(UnresolvedReferenceExpression unresolvedReference) {
  3. String fieldName = unresolvedReference.getName();
  4. return new FieldInfo(fieldName, index, fromLegacyInfoToDataType(getTypeAt(unresolvedReference)));
  5. }</code>

应用举例,把Expression转换成FieldInfo:

  1. <code> private static List<FieldInfo> extractFieldInfosFromTupleType(TupleTypeInfoBase<?> inputType, Expression[] exprs) {
  2. boolean isRefByPos = isReferenceByPosition(inputType, exprs);
  3. if (isRefByPos) {
  4. return IntStream.range(0, exprs.length)
  5. .mapToObj(idx -> exprs[idx].accept(new IndexedExprToFieldInfo(inputType, idx)))
  6. .collect(Collectors.toList());
  7. } else {
  8. return extractFieldInfosByNameReference(inputType, exprs);
  9. }
  10. }</code>

FieldInfo

  1. <code> private final String fieldName;
  2. private final int index;
  3. private final DataType type;</code>

Row & RowTypeInfo

代表一行数据,可以包含任意数量的列,并且各列可能包含不同的数据类型.Row不是强类型的所以需要配合RowTypeInfo类获取各列具体的类型.

Row:

  1. <code> /** The array to store actual values. */
  2. private final Object[] fields;
  3. </code>

RowTypeInfo:

  1. <code> protected final String[] fieldNames;
  2. protected final TypeInformation<?>[] types;</code>

Table -> DataStream

  1. <code> override def toAppendStream[T: TypeInformation](table: Table): DataStream[T] = {
  2. val returnType = createTypeInformation[T]
  3. val modifyOperation = new OutputConversionModifyOperation(
  4. table.getQueryOperation,
  5. TypeConversions.fromLegacyInfoToDataType(returnType),
  6. OutputConversionModifyOperation.UpdateMode.APPEND)
  7. toDataStream[T](table, modifyOperation)
  8. }</code>

Operation

Parser.parse(sql)的返回结果。

  • ModifyOperation (DML)
  • QueryOperation (DQL)
  • CreateOperation & DropOperation (DDL)

FlinkStreamRuleSets

定义了sql解析优化规则集合,包含把calcite节点转换成flink节点的规则,比如FlinkLogicalTableSourceScan,把flink逻辑节点转换成物理执行节点的规则,比如StreamExecTableSourceScanRule,条件过滤下推的规则PushFilterIntoTableSourceScanRule等.

ConverterRule

  1. <code> /** Converts a relational expression to the target trait(s) of this rule.
  2. *
  3. * <p>Returns null if conversion is not possible. */
  4. public abstract RelNode convert(RelNode rel);
  5. public void onMatch(RelOptRuleCall call) {
  6. RelNode rel = call.rel(0);
  7. if (rel.getTraitSet().contains(inTrait)) {
  8. final RelNode converted = convert(rel);
  9. if (converted != null) {
  10. call.transformTo(converted);
  11. }
  12. }
  13. }
  14. class FlinkLogicalTableSourceScanConverter
  15. extends ConverterRule(
  16. classOf[LogicalTableScan],
  17. Convention.NONE,
  18. FlinkConventions.LOGICAL,
  19. "FlinkLogicalTableSourceScanConverter") {
  20. override def matches(call: RelOptRuleCall): Boolean = {
  21. val scan: TableScan = call.rel(0)
  22. isTableSourceScan(scan)
  23. }
  24. def convert(rel: RelNode): RelNode = {
  25. val scan = rel.asInstanceOf[TableScan]
  26. val table = scan.getTable.asInstanceOf[FlinkRelOptTable]
  27. FlinkLogicalTableSourceScan.create(rel.getCluster, table)
  28. }
  29. }
  30. </code>

FlinkLogicalRel

flink RelNode基类不仅包含了RelNode本身可表达的关系依赖逻辑,而且包含了各关系依赖的Flink体系中的额外信息。比如FlinkLogicalTableSourceScan包含了TableSource信息。

FlinkPhysicalRel

物理关系节点基类,其子类同时也会实现ExecNode接口,用于把物理节点转换成Transformation

ExecNode

  1. <code> /**
  2. * Internal method, translates this node into a Flink operator.
  3. *
  4. * @param planner The [[Planner]] of the translated Table.
  5. */
  6. protected def translateToPlanInternal(planner: E): Transformation[T]
  7. def translateToPlan(planner: E): Transformation[T] = {
  8. if (transformation == null) {
  9. transformation = translateToPlanInternal(planner)
  10. }
  11. transformation
  12. }</code>

调用时序图

技术图片

代码生成gencode

ExecNode转换成Transformation的过程中部分逻辑会采用动态生成代码的方式实现。动态生成的代码保存在GeneratedClass子类的实例中,会分发到各个TM节点然后由Janino库编译执行。比如聚合查询生成聚合处理函数NamespaceTableAggsHandleFunction的子类。

GeneratedClass

  1. <code> public T newInstance(ClassLoader classLoader, Object... args) {
  2. try {
  3. return (T) compile(classLoader).getConstructors()[0].newInstance(args);
  4. } catch (Exception e) {
  5. throw new RuntimeException(
  6. "Could not instantiate generated class '" + className + "'", e);
  7. }
  8. }
  9. /**
  10. * Compiles the generated code, the compiled class will be cached in the {@link GeneratedClass}.
  11. */
  12. public Class<T> compile(ClassLoader classLoader) {
  13. if (compiledClass == null) {
  14. // cache the compiled class
  15. compiledClass = CompileUtils.compile(classLoader, className, code);
  16. }
  17. return compiledClass;
  18. }
  19. </code>

示例

  1. <code> val sql =
  2. """
  3. |SELECT
  4. | `string`,
  5. | HOP_START(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),
  6. | HOP_ROWTIME(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),
  7. | COUNT(1),
  8. | SUM(1),
  9. | COUNT(`int`),
  10. | COUNT(DISTINCT `float`),
  11. | concat_distinct_agg(name)
  12. |FROM T1
  13. |GROUP BY `string`, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND)
  14. """.stripMargin
  15. </code>
  1. <code>LogicalProject#3
  2. LogicalAggregate#2
  3. LogicalProject#1
  4. LogicalTableScan#0</code>
  1. <code>rel#271:StreamExecSink.STREAM_PHYSICAL.any.None: 0.false.Acc(input=StreamExecCalc#269,name=DataStreamTableSink,fields=string, EXPR$1, EXPR$2, EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7)
  2. rel#269:StreamExecCalc.STREAM_PHYSICAL.any.None: 0.false.Acc(input=StreamExecGroupWindowAggregate#267,select=string, w$start AS EXPR$1, w$rowtime AS EXPR$2, EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7)
  3. rel#267:StreamExecGroupWindowAggregate.STREAM_PHYSICAL.any.None: 0.false.Acc(input=StreamExecExchange#265,groupBy=string,window=SlidingGroupWindow('w$, rowtime, 5, 4),properties=w$start, w$end, w$rowtime, w$proctime,select=string, COUNT(*) AS EXPR$3, $SUM0($f2) AS EXPR$4, COUNT(int) AS EXPR$5, COUNT(DISTINCT float) AS EXPR$6, concat_distinct_agg(name) AS EXPR$7, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime)
  4. rel#265:StreamExecExchange.STREAM_PHYSICAL.hash[0]true.None: -1.true.Acc(input=StreamExecCalc#263,distribution=hash[string])
  5. rel#263:StreamExecCalc.STREAM_PHYSICAL.any.None: -1.true.Acc(input=StreamExecDataStreamScan#257,select=string, rowtime, 1 AS $f2, int, float, name)
  6. rel#257:StreamExecDataStreamScan.STREAM_PHYSICAL.any.None: -1.true.Acc(table=[Unregistered_DataStream_2],fields=rowtime, int, double, float, bigdec, string, name)
  7. </code>

代码生成:

  1. <code>StreamExecGroupWindowAggregateBase->translateToPlanInternal
  2. StreamExecGroupWindowAggregateBase ->createAggsHandler
  3. AggsHandlerCodeGenerator->generateNamespaceAggsHandler
  4. new OneInputTransformation
  5. 任务提交中会把 OneInputTransformation -> OneInputStreamTask
  6. Task->run
  7. Task->doRun
  8. StreamTask->invoke
  9. StreamTask->openAllOperators
  10. AggregateWindowOperator->open
  11. WindowOperator->compileGeneratedCode
  12. GeneratedNamespaceAggsHandleFunction->newInstance
  13. SimpleCompiler->cook
  14. </code>

flink sql

标签:image   数据管理   for   operation   用户自定义函数   nta   scala   name   sch   

人气教程排行