时间:2021-07-01 10:21:17 帮助过:4人阅读
val tableEnv = StreamTableEnvironment.create(env)表环境(TableEnvironment)是 flink 中集成 Table API & SQL 的核心概念。它负责: ? 注册 catalog ? 在内部 catalog 中注册表 ? 执行 SQL 查询 ? 注册用户自定义函数 ? 将 DataStream 或 DataSet 转换为表 ? 保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用 在创建 TableEnv 的时候,可以多传入一个 EnvironmentSettings 或者 TableConfig 参数,可以用来配置 TableEnvironment 的一些特性。 比如,配置老版本的流式查询(Flink-Streaming-Query):
val settings = EnvironmentSettings.newInstance() .useOldPlanner() // 使用老版本 planner .inStreamingMode() // 流处理模式 .build() val tableEnv = StreamTableEnvironment.create(env, settings)基于老版本的批处理环境(Flink-Batch-Query):
val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val batchTableEnv = BatchTableEnvironment.create(batchEnv)
基于 blink 版本的流处理环境(Blink-Streaming-Query):
val bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
基于 blink 版本的批处理环境(Blink-Batch-Query):
val bbSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)
tableEnv .connect( new FileSystem().path("sensor.txt")) // 定义表数据来源,外部连接 .withFormat(new OldCsv()) // 定义从外部系统读取数据之后的格式化方法 .withSchema( new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ) // 定义表结构 .createTemporaryTable("inputTable") // 创建临时表这是旧版本的 csv 格式描述器。由于它是非标的,跟外部系统对接并不通用,所以将被弃用,以后会被一个符合 RFC-4180 标准的新 format 描述器取代。新的描述器就叫 Csv(),但 flink 没有直接提供,需要引入依赖 flink-csv:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.10.0</version> </dependency>代码非常类似,只需要把 withFormat 里的 OldCsv 改成 Csv 就可以了。 3.3 连接到 Kafka kafka 的连接器 flink-kafka-connector 中,1.10 版本的已经提供了 Table API 的支持。我们可以在 connect 方法中直接传入一个叫做 Kafka 的类,这就是 kafka 连接器的描述器 ConnectorDescriptor。
tableEnv.connect( new Kafka() .version("0.11") // 定义 kafka 的版本 .topic("sensor") // 定义主题 .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaInputTable")当然也可以连接到 ElasticSearch、MySql、HBase、Hive 等外部系统,实现方式基本上是类似的。
val sensorTable: Table = tableEnv.from("inputTable") val resultTable: Table = senorTable .select("id, temperature") .filter("id =‘sensor_1‘")4.2 SQL 查询 Flink 的 SQL 集成,基于的是 ApacheCalcite,它实现了 SQL 标准。在 Flink 中,用常规字符串来定义 SQL 查询语句。SQL 查询的结果,是一个新的 Table。 代码实现如下:
val resultSqlTable: Table = tableEnv.sqlQuery("select id, temperature from inputTable where id =‘sensor_1‘")或者:
val resultSqlTable: Table = tableEnv.sqlQuery( """ |select id, temperature |from inputTable |where id = ‘sensor_1‘ """.stripMargin)当然,也可以加上聚合操作,比如我们统计每个 sensor 温度数据出现的个数,做个 count统计:
val aggResultTable = sensorTable
.groupBy(‘id)
.select(‘id, ‘id.count as ‘count)
SQL 的实现:
val aggResultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id")这里 Table API 里指定的字段,前面加了一个单引号’,这是 Table API 中定义的 Expression类型的写法,可以很方便地表示一个表中的字段。 字段可以直接全部用双引号引起来,也可以用半边单引号+字段名的方式。以后的代码中,一般都用后一种形式。
val inputStream: DataStream[String] = env.readTextFile("sensor.txt") val dataStream: DataStream[SensorReading] = inputStream .map(data => { val dataArray = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) val sensorTable: Table = tableEnv.fromDataStream(dataStream) val sensorTable2 = tableEnv.fromDataStream(dataStream, ‘id, ‘timestamp as ‘ts)5.2 数据类型与 Table schema 的对应 在上节的例子中,DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按照样例类中的字段名来对应的(name-based mapping),所以还可以用 as 做重命名。 另外一种对应方式是,直接按照字段的位置来对应(position-based mapping),对应的过程中,就可以直接指定新的字段名了。 基于名称的对应:
val sensorTable = tableEnv.fromDataStream(dataStream, ‘timestamp as ‘ts, ‘id as ‘myId, ‘temperature)基于位置的对应:
val sensorTable = tableEnv.fromDataStream(dataStream, ‘myId, ‘ts)Flink 的 DataStream 和 DataSet API 支持多种类型。 组合类型,比如元组(内置 Scala 和 Java 元组)、POJO、Scala case 类和 Flink 的 Row 类型等,允许具有多个字段的嵌套数据结构,这些字段可以在 Table 的表达式中访问。其他类 型,则被视为原子类型。 元组类型和原子类型,一般用位置对应会好一些;如果非要用名称对应,也是可以的:元组类型,默认的名称是 “_1”, “_2”;而原子类型,默认名称是 ”f0”。
tableEnv.createTemporaryView("sensorView", dataStream) tableEnv.createTemporaryView("sensorView", dataStream, ‘id, ‘temperature, ‘timestamp as ‘ts)另外,当然还可以基于 Table 创建视图:
tableEnv.createTemporaryView("sensorView", sensorTable)View 和 Table 的 Schema 完全相同。事实上,在 Table API 中,可以认为 View 和 Table是等价的。
// 注册输出表 tableEnv.connect( new FileSystem().path("…\\resources\\out.txt") ) // 定义到文件系统的连接 .withFormat(new Csv()) // 定义格式化方法,Csv 格式 .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("temp", DataTypes.DOUBLE()) ) // 定义表结构 .createTemporaryTable("outputTable") // 创建临时表 resultSqlTable.insertInto("outputTable")7.2 更新模式(Update Mode) 在流处理过程中,表的处理并不像传统定义的那样简单。 对于流式查询(Streaming Queries),需要声明如何在(动态)表和外部连接器之间执行转换。与外部系统交换的消息类型,由更新模式(update mode)指定。 Flink Table API 中的更新模式有以下三种: 1)追加模式(Append Mode) 在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。 2)撤回模式(Retract Mode) 在撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。 ? 插入(Insert)会被编码为添加消息; ? 删除(Delete)则编码为撤回消息; ? 更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行)的添加消息。 在此模式下,不能定义 key,这一点跟 upsert 模式完全不同。 3)Upsert(更新插入)模式 在 Upsert 模式下,动态表和外部连接器交换 Upsert 和 Delete 消息。 这个模式需要一个唯一的 key,通过这个 key 可以传递更新消息。为了正确应用消息,外部连接器需要知道这个唯一 key 的属性。 ? 插入(Insert)和更新(Update)都被编码为 Upsert 消息; ? 删除(Delete)编码为 Delete 信息。 这种模式和 Retract 模式的主要区别在于,Update 操作是用单个消息编码的,所以效率会更高 7.3 输出到 Kafka 除了输出到文件,也可以输出到 Kafka。我们可以结合前面 Kafka 作为输入数据,构建数据管道,kafka 进,kafka 出。 代码如下:
// 输出到 kafka tableEnv.connect( new Kafka() .version("0.11") .topic("sinkTest") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) .withFormat( new Csv() ) .withSchema( new Schema() .field("id", DataTypes.STRING()) .field("temp", DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaOutputTable") resultTable.insertInto("kafkaOutputTable")7.4 输出到 ElasticSearch ElasticSearch 的 connector 可以在 upsert(update+insert,更新插入)模式下操作,这样就可以使用 Query 定义的键(key)与外部系统交换 UPSERT/DELETE 消息。 另外,对于“仅追加”(append-only)的查询,connector 还可以在 append 模式下操作,这样就可以与外部系统只交换 insert 消息。 es 目前支持的数据格式,只有 Json,而 flink 本身并没有对应的支持,所以还需要引入依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.10.0</version> </dependency>代码实现如下:
// 输出到 es tableEnv.connect( new Elasticsearch() .version("6") .host("localhost", 9200, "http") .index("sensor") .documentType("temp") ) .inUpsertMode() // 指定是 Upsert 模式 .withFormat(new Json()) .withSchema( new Schema() .field("id", DataTypes.STRING()) .field("count", DataTypes.BIGINT()) ) .createTemporaryTable("esOutputTable") aggResultTable.insertInto("esOutputTable")7.5 输出到 MySql Flink 专门为 Table API 的 jdbc 连接提供了 flink-jdbc 连接器,我们需要先引入依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.10.0</version> </dependency>jdbc 连接的代码实现比较特殊,因为没有对应的 java/scala 类实现 ConnectorDescriptor,所以不能直接tableEnv.connect()。不过Flink SQL留下了执行DDL的接口:tableEnv.sqlUpdate()。 对于 jdbc 的创建表操作,天生就适合直接写 DDL 来实现,所以我们的代码可以这样写:
// 输出到 Mysql val sinkDDL: String = """ |create table jdbcOutputTable ( | id varchar(20) not null, | cnt bigint not null |) with ( | ‘connector.type‘ = ‘jdbc‘, | ‘connector.url‘ = ‘jdbc:mysql://localhost:3306/test‘, | ‘connector.table‘ = ‘sensor_count‘, | ‘connector.driver‘ = ‘com.mysql.jdbc.Driver‘, | ‘connector.username‘ = ‘root‘, | ‘connector.password‘ = ‘123456‘ |) """.stripMargin tableEnv.sqlUpdate(sinkDDL) aggResultSqlTable.insertInto("jdbcOutputTable")
val resultStream: DataStream[Row] = tableEnv.toAppendStream[Row](resultTable) val aggResultStream: DataStream[(Boolean, (String, Long))] = tableEnv.toRetractStream[(String, Long)](aggResultTable) resultStream.print("result") aggResultStream.print("aggResult")所以,没有经过 groupby 之类聚合操作,可以直接用 toAppendStream 来转换;而如果经过了聚合,有更新操作,一般就必须用 toRetractDstream。
val explaination: String = tableEnv.explain(resultTable)
println(explaination)
Query 的解释和执行过程,老 planner 和 blink planner 大体是一致的,又有所不同。整体来讲,Query 都会表示成一个逻辑查询计划,然后分两步解释: 1. 优化查询计划 2. 解释成 DataStream 或者 DataSet 程序 而 Blink 版本是批流统一的,所以所有的 Query,只会被解释成 DataStream 程序;另外在批处理环境 TableEnvironment 下,Blink 版本要到 tableEnv.execute()执行调用才开始解释。
Flink基础(十三):Table API 和 Flink SQL(二)API 调用
标签:drive 表达式 art 适合 text ast ora elastic view