二、TableAPI读取文件使用TableAPI读取文件时,我们首先需要知道去哪里读取也就是文件路径、读取文件的格式、读取出来的数据的结构也就是结果表的表结构及表名
package net.cyan.FlinkSql; ? import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ? import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.types.DataType; ? import static org.apache.flink.table.api.Expressions.$; ? public class Demo2_readWriteText { public static void main(String[] args) { //创建执行环境 // Configuration configuration = new Configuration(); // configuration.setInteger("rest.port", 3333); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment talEnv = StreamTableEnvironment.create(env); //创建查询的数据结果封装类型 Schema schema = new Schema() .field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc", DataTypes.INT()); talEnv .connect(new FileSystem().path("input/sensor.txt")) //读取文件路径 .withFormat(new Csv()) //读取文件的数据格式 .withSchema(schema) //读取出来的数据格式 .createTemporaryTable("sensor");//定义结果表名 ? //进行查询 Table select = talEnv.from("sensor") .where($("id").isEqual("sensor_1")) .select($("id"), $("ts"), $("vc")); ? ? //将查询结果写入到新文件中 //同样建立一个动态表连接 talEnv .connect(new FileSystem().path("input/b.txt")) //写入路径 .withFormat(new Csv()) //写入文件的数据格式 .withSchema(schema) //写入的数据格式 .createTemporaryTable("abc");//定义写入表名 //进行写入操作 ? select.executeInsert("abc"); ? // try { // //启动执行环境 // env.execute(); // } catch (Exception e) { // e.printStackTrace(); // } ? } }三、TableAPI 读取、写入Kakfa基本流程
1>需要创建表的运行环境
//创建表的运行环境 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);2>创建查询出的数据写出结构
//创建表结构 Schema schema=new Schema() .field("id",DataTypes.STRING()) .field("ts",DataTypes.BIGINT()) .field("vc",DataTypes.INT());
经验总结扩展阅读
- 原神机械之心任务的完成方法是什么
- 奶油冻过之后再化开还能用吗
- 百分之七十五的酒精怎么配
- 四 【单片机入门】应用层软件开发的单片机学习之路-----ESP32开发板PWM控制电机以及中断的使用
- 原神坎蒂丝命之座效果是什么
- 对亡人的哀思之情句子 人突然去世的句子
- flutter 系列之:flutter 中的幽灵offstage
- 高铁中转站怎么换乘
- 美妆蛋一定要吸水之后使用吗?
- vulnhub靶场之ICA: 1