执行,输出以下信息,验证成功 。
文章插图
数据查询也支持很多查询条件,比如增量查询,按时间段查询等 。
接下来是 flink 实时数据分析的服务,首先需要在 master 上启动 kafka,并创建 一个名字为 mytopic 的 topic,详见Linux 下搭建 Kafka 环境
相关命令如下
创建topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --create --topicmytopic
生产者启动配置kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic mytopic
消费者启动配置kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mytopic
然后运行如下代码package git.snippet.analyzer;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class DataAnalyzer {public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "192.168.100.130:9092");properties.setProperty("group.id", "snippet");//构建FlinkKafkaConsumerFlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("mytopic", new SimpleStringSchema(), properties);//指定偏移量myConsumer.setStartFromLatest();final DataStream<String> stream = env.addSource(myConsumer);env.enableCheckpointing(5000);stream.print();try {env.execute("DataAnalyzer");} catch (Exception e) {e.printStackTrace();}}}
其中properties.setProperty("bootstrap.servers", "192.168.100.130:9092");
根据自己的配置调整,然后通过 kakfa 的生产者客户端输入一些数据,这边可以收到这个数据,验证完毕 。完整代码见
data-lake
【Hudi 数据湖的插入,更新,查询,分析操作示例】
经验总结扩展阅读
- 2023年10月7日是拜观音求子的黄道吉日吗 2023年农历八月廿三宜拜观音求子吗
- 女孩漂亮有涵养的名字大全 有涵养女孩名字
- 吃苏打饼的好处是什么 吃苏打饼坏处有什么
- 带贤字的男孩名字大全 男孩取名名字大全
- 比较诗情画意的女孩名字 女孩名字取名
- 家长怎样培养孩子的社交能力
- 2023年10月7日是祭拜灶神的黄道吉日吗 2023年10月7日祭拜灶神行吗
- 家长在孩子面前吵架的危害
- 女孩漂亮有内涵的名字大全 有内涵名字大全
- 怎么缓解孩子的分离焦虑