一、FlinkSql的概念核心概念Flink 的 Table API 和 SQL 是流批统一的 API 。这意味着 Table API & SQL 在无论有限的批式输入还是无限的流式输入下,都具有相同的语义 。因为传统的关系代数以及 SQL 最开始都是为了批式处理而设计的,关系型查询在流式场景下不如在批式场景下容易理解.
动态表和连续查询动态表(Dynamic Tables) 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念 。
与表示批处理数据的静态表不同,动态表是随时间变化的 。可以像查询静态批处理表一样查询它们 。查询动态表将生成一个连续查询(Continuous Query) 。一个连续查询永远不会终止,结果会生成一个动态表 。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改 。
TableAPI
首先需要导入依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> ? <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-compress</artifactId> <version>1.21</version> </dependency> /** * 使用TableAPI的基本套路: * 1.创建表的执行环境 * 2.创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取 * 3.对动态表进行查询 * 4.把动态表转换为流 */这里需要注意的问题:
1.TableAPI 中将动态表转换为流时有两种方法
DataStream<Row> rowDataStream = tableEnvironment.toAppendStream(result, Row.class);【FlinkSql之TableAPI详解】toAppendStream方法只能在查询时使用,不能使用包含聚合函数等更新语句
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(select, Row.class);toRetractStream则可以使用
2.上述两种方法内传入的参数Row.class,表示将表中查询出的数据封装为行类型,也就是对每行进行封装,解决查询出的数据列少于或者多于原表 。如何能够确保所查询的数据与之前封装的Bean有完全一致的结构则也可以封装为原Bean.class
代码实现:
package net.cyan.FlinkSql; ? import net.cyan.POJO.WaterSensor; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; ? import static org.apache.flink.table.api.Expressions.$; ? /** * 使用TableAPI的基本套路: * 1.创建表的执行环境 * 2.创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取 * 3.对动态表进行查询 * 4.把动态表转换为流 */ public class Demo1 { public static void main(String[] args) { Configuration configuration=new Configuration(); configuration.setInteger("rest.port",3333); //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.setParallelism(1); //模拟数据 DataStreamSource<WaterSensor> WaterSensorSource = env.fromElements( new WaterSensor("S1", 1000L, 10), new WaterSensor("S1", 1000L, 10), new WaterSensor("S2", 2000L, 20), new WaterSensor("S3", 3000L, 30), new WaterSensor("S4", 4000L, 40), new WaterSensor("S5", 5000L, 50) ); //创建表的执行环境 StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env); //创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取 Table table = tableEnvironment.fromDataStream(WaterSensorSource); //对表进行查询 //1、过时的查询书写 Table result = table .where("id='S1'") .select("*"); //2、不过时的书写 Table result1 = table // .where($("id").isEqual("S1")) .select($("id"), $("ts"), $("vc")); //3.聚合函数 Table select = table .groupBy($("id")) .aggregate($("vc").sum().as("sum_vc")) .select($("id"), $("sum_vc")); //把动态表转换为流,使用到了之前创建的表运行环境 ? SingleOutputStreamOperator<Row> tuple2DataStream = tableEnvironment .toRetractStream(select, Row.class) .filter(t -> t.f0) .map(t -> t.f1); // DataStream<Row> rowDataStream = tableEnvironment.toAppendStream(result, Row.class); // DataStream<Row> rowDataStream1 = tableEnvironment.toAppendStream(result1, Row.class); // rowDataStream.print(); // rowDataStream1.print(); tuple2DataStream.print(); ? ? try { //启动执行环境 env.execute(); } catch (Exception e) { e.printStackTrace(); } ? ? ? } }
经验总结扩展阅读
- 原神机械之心任务的完成方法是什么
- 奶油冻过之后再化开还能用吗
- 百分之七十五的酒精怎么配
- 四 【单片机入门】应用层软件开发的单片机学习之路-----ESP32开发板PWM控制电机以及中断的使用
- 原神坎蒂丝命之座效果是什么
- 对亡人的哀思之情句子 人突然去世的句子
- flutter 系列之:flutter 中的幽灵offstage
- 高铁中转站怎么换乘
- 美妆蛋一定要吸水之后使用吗?
- vulnhub靶场之ICA: 1