时间:2021-07-01 10:21:17 帮助过:8人阅读
本页说明如何在Flink的Table API和SQL中为基于时间的操作定义时间属性。
Table API和SQL中的基于时间的操作(例如窗口)都需要有关时间概念及其起源的信息。因此,表可以提供逻辑时间属性,以指示时间并访问表程序中的相应时间戳。
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default // alternatively: // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[(String, String)] = ... // declare an additional logical field as a processing time attribute val table = tEnv.fromDataStream(stream, ‘UserActionTimestamp, ‘Username, ‘Data, ‘UserActionTime.proctime) val windowedTable = table.window(Tumble over 10.minutes on ‘UserActionTime as ‘userActionWindow)
class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute { override def getReturnType = { val names = Array[String]("Username" , "Data") val types = Array[TypeInformation[_]](Types.STRING, Types.STRING) Types.ROW(names, types) } override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { // create stream val stream = ... stream } override def getProctimeAttribute = { // field with this name will be appended as a third field "UserActionTime" } } // register table source tEnv.registerTableSource("UserActions", new UserActionSource) val windowedTable = tEnv .scan("UserActions") .window(Tumble over 10.minutes on ‘UserActionTime as ‘userActionWindow)
可以在DataStream到Table的转换期间或使用TableSource 定义事件时间属性。
将 DataStream
转换为 Table 时,有两种定义时间属性的方法。根据指定的.rowtime字段名称是否存在于DataStream的结构中,timestamp字段为
无论哪种情况,事件时间时间戳字段都将保留DataStream事件时间 时间戳的值。
// Option 1: // extract timestamp and assign watermarks based on knowledge of the stream val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...) // declare an additional logical field as an event time attribute val table = tEnv.fromDataStream(stream, ‘Username, ‘Data, ‘UserActionTime.rowtime) // Option 2: // extract timestamp from first field, and assign watermarks based on knowledge of the stream val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...) // the first field has been used for timestamp extraction, and is no longer necessary // replace first field with a logical event time attribute val table = tEnv.fromDataStream(stream, ‘UserActionTime.rowtime, ‘Username, ‘Data) // Usage: val windowedTable = table.window(Tumble over 10.minutes on ‘UserActionTime as ‘userActionWindow)
请确保由getDataStream()方法返回的DataStream与定义的时间属性对齐。仅当定义了StreamRecordTimestamp时间戳提取器时,才考虑DataStream的时间戳(由TimestampAssigner分配的时间戳)。仅当定义了PreserveWatermarks水印策略时,才会保留DataStream的水印。 否则,仅TableSource的rowtime属性的值相关。
// define a table source with a rowtime attribute class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttributes { override def getReturnType = { val names = Array[String]("Username" , "Data", "UserActionTime") val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG) Types.ROW(names, types) } override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { // create stream // ... // assign watermarks based on the "UserActionTime" attribute val stream = inputStream.assignTimestampsAndWatermarks(...) stream } override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = { // Mark the "UserActionTime" attribute as event-time attribute. // We create one attribute descriptor of "UserActionTime". val rowtimeAttrDescr = new RowtimeAttributeDescriptor( "UserActionTime", new ExistingField("UserActionTime"), new AscendingTimestamps) val listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr) listRowtimeAttrDescr } } // register the table source tEnv.registerTableSource("UserActions", new UserActionSource) val windowedTable = tEnv .scan("UserActions") .window(Tumble over 10.minutes on ‘UserActionTime as ‘userActionWindow)
【翻译】Flink Table Api & SQL —Streaming 概念 ——时间属性
标签:div MTA sage gic 官网 extra 建表 tar cte