当前位置:Gxlcms > 数据库问题 > [源码分析]从"UDF不应有状态" 切入来剖析Flink SQL代码生成 (修订版)

[源码分析]从"UDF不应有状态" 切入来剖析Flink SQL代码生成 (修订版)

时间:2021-07-01 10:21:17 帮助过:17人阅读

c45b0e23278f15e8f7d075abac9a121b 这个就是 myUdf 转换之后的函数。

  // 原始 SQL "SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0"
 
    java.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
      isNull$8 ? null : (java.lang.Long) result$7); // 这次 UDF 调用对应 WHERE myFrequency <> 0

    boolean isNull$14 = result$12 == null; 
    boolean isNull$17 = isNull$14 || false;
    boolean result$16;
    if (isNull$17) {
      result$16 = false;
    }
    else {
      result$16 = result$13 != result$15;
    }
    
    if (result$16) { // 这里说明 myFrequency <> 0,所以可以进入
	    java.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
	      isNull$8 ? null : (java.lang.Long) result$7); // 这里对应的是 SELECT myFrequency,注意的是,这里又调用了 UDF,重新计算了一遍,所以 UDF 才不应该有状态信息。 
	    boolean isNull$11 = result$9 == null;
	    long result$10;
	    if (isNull$11) {
	      result$10 = -1L;
	    }
	    else {
	      result$10 = result$9; // 这里才进行SELECT myFrequency,但是这时候 UDF 已经被计算两次了
	    }
    }

2. 完整版

以下是生成的代码,因为是自动生成,所以看起来会有点费劲,不过好在已经是最后一步了。

public class DataSetCalcRule$18 extends org.apache.flink.api.common.functions.RichFlatMapFunction {

  final mytestpackage.myUdf function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b;

  final org.apache.flink.types.Row out =
      new org.apache.flink.types.Row(2);
  
  private org.apache.flink.types.Row in1;

  public DataSetCalcRule$18() throws Exception {
    
    function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b = (mytestpackage.myUdf)
    org.apache.flink.table.utils.EncodingUtils.decodeStringToObject(
      "rO0ABXNyABFzcGVuZHJlcG9ydC5teVVkZmGYnDRF7Hj4AgABTAAHY3VycmVudHQAEExqYXZhL2xhbmcvTG9uZzt4cgAvb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuU2NhbGFyRnVuY3Rpb25uLPkGQbqbDAIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9u14hb_NiViUACAAB4cHNyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHAAAAAAAAAAAA",
      org.apache.flink.table.functions.UserDefinedFunction.class); 
  }

  @Override
  public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
    function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
  }

  @Override
  public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception {
    in1 = (org.apache.flink.types.Row) _in1;
    
    boolean isNull$6 = (java.lang.String) in1.getField(0) == null;
    java.lang.String result$5;
    if (isNull$6) {
      result$5 = "";
    }
    else {
      result$5 = (java.lang.String) (java.lang.String) in1.getField(0);
    }
    
    boolean isNull$8 = (java.lang.Long) in1.getField(1) == null;
    long result$7;
    if (isNull$8) {
      result$7 = -1L;
    }
    else {
      result$7 = (java.lang.Long) in1.getField(1);
    }

    java.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
      isNull$8 ? null : (java.lang.Long) result$7);

    boolean isNull$14 = result$12 == null;
    long result$13;
    if (isNull$14) {
      result$13 = -1L;
    }
    else {
      result$13 = result$12;
    }

    long result$15 = 0L;
    
    boolean isNull$17 = isNull$14 || false;
    boolean result$16;
    if (isNull$17) {
      result$16 = false;
    }
    else {
      result$16 = result$13 != result$15;
    }
    
    if (result$16) {
    
        if (isNull$6) {
          out.setField(0, null);
        }
        else {
          out.setField(0, result$5);
        }

        java.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
          isNull$8 ? null : (java.lang.Long) result$7);

        boolean isNull$11 = result$9 == null;
        long result$10;
        if (isNull$11) {
          result$10 = -1L;
        }
        else {
          result$10 = result$9;
        }

        if (isNull$11) {
          out.setField(1, null);
        }
        else {
          out.setField(1, result$10);
        }

          c.collect(out);
        }
  }

  @Override
  public void close() throws Exception {  
    function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.close();
  }
}

0x06 总结

至此,我们把Flink SQL如何生成JAVA代码的流程大致走了一遍。

Flink生成的内部代码,是把"投影运算"和"过滤条件"分别生成,然后拼接在一起

即使原始SQL中只有一次UDF调用,但是如果SELECT和WHERE都间接用到了UDF,那么最终"投影运算"和"过滤条件"就会分别调用了UDF,所以拼接之后就会有多个UDF调用。

这就是 "UDF不应该有内部历史状态" 的最终原因。我们在实际开发过程中一定要注意这个问题。

0x07 参考

UDX概述 https://help.aliyun.com/document_detail/69463.html

[源码分析]从"UDF不应有状态" 切入来剖析Flink SQL代码生成 (修订版)

标签:lis   重复   some   utils   生命周期   基本   reg   The   new t   

人气教程排行