FlinkSql之TableAPI详解( 三 )

3> 创建kafka连接
 //创建kafka连接 tabEnv.connect(         new Kafka()       .version("universal")// 版本号       .property("bootstrap.servers","hadoop102:9092")//地址       .property("group.id","cy")//消费者组       .topic("first")//消费主题 ?  )       .withFormat(new Json())//写入的格式       .withSchema(schema)       .createTemporaryTable("a");//临时表4> 进行查询
 //创建表 Table select = tabEnv.from("a").select("*");5> 创建写入kafka连接
 //创建写入主题 tabEnv.connect(         new Kafka()               .version("universal")// 版本号               .property("bootstrap.servers","hadoop102:9092")//地址               .topic("first1")//消费主题               .sinkPartitionerRoundRobin()//随机分区 ? )       .withFormat(new Json())//写入的格式       .withSchema(schema)       .createTemporaryTable("c");6> 写入
 //写入 select.executeInsert("c");完整代码如下
 package net.cyan.FlinkSql; ? import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; ? public class Demo5_readWriteKafka {     public static void main(String[] args) {        //创建执行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         //创建表的运行环境         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);         //创建表结构         Schema schema=new Schema()               .field("id",DataTypes.STRING())               .field("ts",DataTypes.BIGINT())               .field("vc",DataTypes.INT());         //创建kafka连接         tabEnv.connect(                 new Kafka()               .version("universal")// 版本号               .property("bootstrap.servers","hadoop102:9092")//地址               .property("group.id","cy")//消费者组               .topic("first")//消费主题 ?          )               .withFormat(new Json())//写入的格式               .withSchema(schema)               .createTemporaryTable("a");         //创建表         Table select = tabEnv.from("a").select("*");         //创建写入主题         tabEnv.connect(                 new Kafka()                       .version("universal")// 版本号                       .property("bootstrap.servers","hadoop102:9092")//地址                       .topic("first1")//消费主题                       .sinkPartitionerRoundRobin()//随即分区 ?       )               .withFormat(new Json())//写入的格式               .withSchema(schema)               .createTemporaryTable("c"); ?         //写入         select.executeInsert("c"); ? ?   } }

经验总结扩展阅读