时间:2021-07-01 10:21:17 帮助过:17人阅读
// 原始 SQL "SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0"
java.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
isNull$8 ? null : (java.lang.Long) result$7); // 这次 UDF 调用对应 WHERE myFrequency <> 0
boolean isNull$14 = result$12 == null;
boolean isNull$17 = isNull$14 || false;
boolean result$16;
if (isNull$17) {
result$16 = false;
}
else {
result$16 = result$13 != result$15;
}
if (result$16) { // 这里说明 myFrequency <> 0,所以可以进入
java.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
isNull$8 ? null : (java.lang.Long) result$7); // 这里对应的是 SELECT myFrequency,注意的是,这里又调用了 UDF,重新计算了一遍,所以 UDF 才不应该有状态信息。
boolean isNull$11 = result$9 == null;
long result$10;
if (isNull$11) {
result$10 = -1L;
}
else {
result$10 = result$9; // 这里才进行SELECT myFrequency,但是这时候 UDF 已经被计算两次了
}
}
以下是生成的代码,因为是自动生成,所以看起来会有点费劲,不过好在已经是最后一步了。
public class DataSetCalcRule$18 extends org.apache.flink.api.common.functions.RichFlatMapFunction {
final mytestpackage.myUdf function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b;
final org.apache.flink.types.Row out =
new org.apache.flink.types.Row(2);
private org.apache.flink.types.Row in1;
public DataSetCalcRule$18() throws Exception {
function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b = (mytestpackage.myUdf)
org.apache.flink.table.utils.EncodingUtils.decodeStringToObject(
"rO0ABXNyABFzcGVuZHJlcG9ydC5teVVkZmGYnDRF7Hj4AgABTAAHY3VycmVudHQAEExqYXZhL2xhbmcvTG9uZzt4cgAvb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuU2NhbGFyRnVuY3Rpb25uLPkGQbqbDAIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9u14hb_NiViUACAAB4cHNyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHAAAAAAAAAAAA",
org.apache.flink.table.functions.UserDefinedFunction.class);
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
}
@Override
public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception {
in1 = (org.apache.flink.types.Row) _in1;
boolean isNull$6 = (java.lang.String) in1.getField(0) == null;
java.lang.String result$5;
if (isNull$6) {
result$5 = "";
}
else {
result$5 = (java.lang.String) (java.lang.String) in1.getField(0);
}
boolean isNull$8 = (java.lang.Long) in1.getField(1) == null;
long result$7;
if (isNull$8) {
result$7 = -1L;
}
else {
result$7 = (java.lang.Long) in1.getField(1);
}
java.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
isNull$8 ? null : (java.lang.Long) result$7);
boolean isNull$14 = result$12 == null;
long result$13;
if (isNull$14) {
result$13 = -1L;
}
else {
result$13 = result$12;
}
long result$15 = 0L;
boolean isNull$17 = isNull$14 || false;
boolean result$16;
if (isNull$17) {
result$16 = false;
}
else {
result$16 = result$13 != result$15;
}
if (result$16) {
if (isNull$6) {
out.setField(0, null);
}
else {
out.setField(0, result$5);
}
java.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
isNull$8 ? null : (java.lang.Long) result$7);
boolean isNull$11 = result$9 == null;
long result$10;
if (isNull$11) {
result$10 = -1L;
}
else {
result$10 = result$9;
}
if (isNull$11) {
out.setField(1, null);
}
else {
out.setField(1, result$10);
}
c.collect(out);
}
}
@Override
public void close() throws Exception {
function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.close();
}
}
至此,我们把Flink SQL如何生成JAVA代码的流程大致走了一遍。
Flink生成的内部代码,是把"投影运算"和"过滤条件"分别生成,然后拼接在一起。
即使原始SQL中只有一次UDF调用,但是如果SELECT和WHERE都间接用到了UDF,那么最终"投影运算"和"过滤条件"就会分别调用了UDF,所以拼接之后就会有多个UDF调用。
这就是 "UDF不应该有内部历史状态" 的最终原因。我们在实际开发过程中一定要注意这个问题。
UDX概述 https://help.aliyun.com/document_detail/69463.html
[源码分析]从"UDF不应有状态" 切入来剖析Flink SQL代码生成 (修订版)
标签:lis 重复 some utils 生命周期 基本 reg The new t