FlinkSQL之Windowing TVF

Windowing TVF
在Flink1.13版本之后出现的替代之前的Group window的产物 , 官网描述其 is more powerful and effective
FlinkSQL之Windowing TVF
 //TVF 中的tumble滚动窗口 //tumble(table sensor,descriptor(et),interval '5' second ):作为一张表存在 //特别注意!!!! //如果在sql中使用了tumble窗口 , 则一定需要group by,而且group by后一定有window_start,window_end两个字段sql实现TVF的tumble窗口实现
 package net.cyan.FlinkSql.TVF; ? import net.cyan.POJO.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; ? import java.time.Duration; ? import static org.apache.flink.table.api.Expressions.$; ? public class Demo1_Window_TableAPI_Tumble {     public static void main(String[] args) {         //创建执行环境         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         //创建表的运行环境         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);         env.setParallelism(1);         DataStream<WaterSensor> waterSensorStream =                 env.fromElements(                         new WaterSensor("sensor_1", 1000L, 10),                         new WaterSensor("sensor_1", 2000L, 20),                         new WaterSensor("sensor_2", 3000L, 30),                         new WaterSensor("sensor_1", 4000L, 40),                         new WaterSensor("sensor_1", 5000L, 50),                         new WaterSensor("sensor_2", 6000L, 60))                       .assignTimestampsAndWatermarks(                                 WatermarkStrategy                                       .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))                                       .withTimestampAssigner((ws, ts) -> ws.getTs()) ?                       );         //创建table         Table table = tabEnv.fromDataStream(waterSensorStream,$("id"),$("ts"),$("vc"),$("et").rowtime());         //创建表         tabEnv.createTemporaryView("sensor",table);         //执行sql         //TVF 中的tumble滚动窗口         //tumble(table sensor,descriptor(et),interval '5' second ):作为一张表存在         //特别注意!!!!         //如果在sql中使用了tumble窗口 , 则一定需要group by,而且group by后一定有window_start,window_end两个字段         tabEnv.sqlQuery("select" +                 "window_start,window_end,id," +                 "sum(vc) sum_vc" +                 " from table (tumble(table sensor,descriptor(et),interval '5' second ))" +                 " group by window_start,window_end,id ")               .execute()               .print(); ?   } }

经验总结扩展阅读