3> 创建kafka连接
//创建kafka连接 tabEnv.connect( new Kafka() .version("universal")// 版本号 .property("bootstrap.servers","hadoop102:9092")//地址 .property("group.id","cy")//消费者组 .topic("first")//消费主题 ? ) .withFormat(new Json())//写入的格式 .withSchema(schema) .createTemporaryTable("a");//临时表4> 进行查询
//创建表 Table select = tabEnv.from("a").select("*");5> 创建写入kafka连接
//创建写入主题 tabEnv.connect( new Kafka() .version("universal")// 版本号 .property("bootstrap.servers","hadoop102:9092")//地址 .topic("first1")//消费主题 .sinkPartitionerRoundRobin()//随机分区 ? ) .withFormat(new Json())//写入的格式 .withSchema(schema) .createTemporaryTable("c");6> 写入
//写入 select.executeInsert("c");完整代码如下
package net.cyan.FlinkSql; ? import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; ? public class Demo5_readWriteKafka { public static void main(String[] args) { //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //创建表的运行环境 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env); //创建表结构 Schema schema=new Schema() .field("id",DataTypes.STRING()) .field("ts",DataTypes.BIGINT()) .field("vc",DataTypes.INT()); //创建kafka连接 tabEnv.connect( new Kafka() .version("universal")// 版本号 .property("bootstrap.servers","hadoop102:9092")//地址 .property("group.id","cy")//消费者组 .topic("first")//消费主题 ? ) .withFormat(new Json())//写入的格式 .withSchema(schema) .createTemporaryTable("a"); //创建表 Table select = tabEnv.from("a").select("*"); //创建写入主题 tabEnv.connect( new Kafka() .version("universal")// 版本号 .property("bootstrap.servers","hadoop102:9092")//地址 .topic("first1")//消费主题 .sinkPartitionerRoundRobin()//随即分区 ? ) .withFormat(new Json())//写入的格式 .withSchema(schema) .createTemporaryTable("c"); ? //写入 select.executeInsert("c"); ? ? } }
经验总结扩展阅读
- 原神机械之心任务的完成方法是什么
- 奶油冻过之后再化开还能用吗
- 百分之七十五的酒精怎么配
- 四 【单片机入门】应用层软件开发的单片机学习之路-----ESP32开发板PWM控制电机以及中断的使用
- 原神坎蒂丝命之座效果是什么
- 对亡人的哀思之情句子 人突然去世的句子
- flutter 系列之:flutter 中的幽灵offstage
- 高铁中转站怎么换乘
- 美妆蛋一定要吸水之后使用吗?
- vulnhub靶场之ICA: 1