sql实现TVF的累计窗口
累计窗口的应用:
【FlinkSQL之Windowing TVF】需求:每天每隔一个小时统计一次当天的pv(浏览量)
流的方式如何解决:
1、用滚动窗口 , 窗口长度设为1h
2、每天的第一个窗口清除状态 , 后面的不清 , 进行状态的累加
或者
用滚动窗口 , 长度设置为2day
自定义触发器 , 每隔1小时对窗内的元素计算一次 , 不关闭窗口
sql的方式如何解决?
直接使用累计窗口cumulate
//TVF 中的cumulate累计窗口 //cumulate(table tableName,descriptor(timecol),step,size):作为一张表存在 //tableName:表名 //timecol:时间属性字段 //step:累计步长 , 跟滑动步长类似 //size:窗口长度 //特别注意!!!! //1.累计窗口的步长与窗口长度同样是需要整数倍关系 // 2.如果在sql中使用了cumulate窗口 , 则一定需要group by,而且group by后一定有window_start,window_end两个字段 package net.cyan.FlinkSql.TVF; ? import net.cyan.POJO.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; ? import java.time.Duration; ? import static org.apache.flink.table.api.Expressions.$; ? public class Demo3_Window_TVF_cumulate { public static void main(String[] args) { //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //创建表的运行环境 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env); env.setParallelism(1); DataStream<WaterSensor> waterSensorStream = env.fromElements( new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((ws, ts) -> ws.getTs()) ? ); //创建table Table table = tabEnv.fromDataStream(waterSensorStream,$("id"),$("ts"),$("vc"),$("et").rowtime()); //创建表 tabEnv.createTemporaryView("sensor",table); //执行sql //TVF 中的cumulate累计窗口 //cumulate(table tableName,descriptor(timecol),step,size):作为一张表存在 //tableName:表名 //timecol:时间属性字段 //step:累计步长 , 跟滑动步长类似 //size:窗口长度 //特别注意!!!! //1.累计窗口的步长与窗口长度同样是需要整数倍关系 // 2.如果在sql中使用了cumulate窗口 , 则一定需要group by,而且group by后一定有window_start,window_end两个字段 tabEnv.sqlQuery("select" + "window_start,window_end,id," + " sum(vc) sum_vc" + " from table (cumulate(table sensor,descriptor(et),interval '2' second,interval '6' second)) " + "group by window_start,window_end,id") .execute() .print(); } }
经验总结扩展阅读
- 云小课|MRS基础原理之MapReduce介绍
- SLAM中的内外点
- vulnhub靶场之NOOB: 1
- 世界上最丑陋的鱼类之一
- 世界上脸最像人类的鱼排行榜
- 世界上最大的带鱼是什么 鲱王皇带鱼体长在3米到10米之间
- 原神机械之心任务怎么完成
- 支持JDK19虚拟线程的web框架,之二:完整开发一个支持虚拟线程的quarkus应用
- 金铲铲之战黯灵龙技能是什么
- 金铲铲之战巨岩龙是怎么样的