FlinkSql之TableAPI详解( 二 )

二、TableAPI读取文件使用TableAPI读取文件时,我们首先需要知道去哪里读取也就是文件路径、读取文件的格式、读取出来的数据的结构也就是结果表的表结构及表名
 package net.cyan.FlinkSql; ? import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ? import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.types.DataType; ? import static org.apache.flink.table.api.Expressions.$; ? public class Demo2_readWriteText {     public static void main(String[] args) {         //创建执行环境 //      Configuration configuration = new Configuration(); //      configuration.setInteger("rest.port", 3333);         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1);         StreamTableEnvironment talEnv = StreamTableEnvironment.create(env);         //创建查询的数据结果封装类型         Schema schema = new Schema()               .field("id", DataTypes.STRING())               .field("ts", DataTypes.BIGINT())               .field("vc", DataTypes.INT());         talEnv               .connect(new FileSystem().path("input/sensor.txt"))  //读取文件路径               .withFormat(new Csv()) //读取文件的数据格式               .withSchema(schema) //读取出来的数据格式               .createTemporaryTable("sensor");//定义结果表名 ?         //进行查询         Table select = talEnv.from("sensor")               .where($("id").isEqual("sensor_1"))               .select($("id"), $("ts"), $("vc")); ? ?         //将查询结果写入到新文件中         //同样建立一个动态表连接         talEnv               .connect(new FileSystem().path("input/b.txt"))  //写入路径               .withFormat(new Csv()) //写入文件的数据格式               .withSchema(schema) //写入的数据格式               .createTemporaryTable("abc");//定义写入表名         //进行写入操作 ?         select.executeInsert("abc"); ? //      try { //          //启动执行环境 //          env.execute(); //      } catch (Exception e) { //          e.printStackTrace(); //      } ?   } }三、TableAPI 读取、写入Kakfa基本流程
1>需要创建表的运行环境
 //创建表的运行环境 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);2>创建查询出的数据写出结构
 //创建表结构 Schema schema=new Schema()       .field("id",DataTypes.STRING())       .field("ts",DataTypes.BIGINT())       .field("vc",DataTypes.INT());

经验总结扩展阅读