Hudi 数据湖的插入,更新,查询,分析操作示例( 三 )

执行,输出以下信息,验证成功 。

Hudi 数据湖的插入,更新,查询,分析操作示例

文章插图
数据查询也支持很多查询条件,比如增量查询,按时间段查询等 。
接下来是 flink 实时数据分析的服务,首先需要在 master 上启动 kafka,并创建 一个名字为 mytopic 的 topic,详见Linux 下搭建 Kafka 环境
相关命令如下
创建topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --create --topicmytopic生产者启动配置
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic mytopic消费者启动配置
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mytopic然后运行如下代码
package git.snippet.analyzer;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class DataAnalyzer {public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "192.168.100.130:9092");properties.setProperty("group.id", "snippet");//构建FlinkKafkaConsumerFlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("mytopic", new SimpleStringSchema(), properties);//指定偏移量myConsumer.setStartFromLatest();final DataStream<String> stream = env.addSource(myConsumer);env.enableCheckpointing(5000);stream.print();try {env.execute("DataAnalyzer");} catch (Exception e) {e.printStackTrace();}}}其中
properties.setProperty("bootstrap.servers", "192.168.100.130:9092");根据自己的配置调整,然后通过 kakfa 的生产者客户端输入一些数据,这边可以收到这个数据,验证完毕 。
完整代码见
data-lake
【Hudi 数据湖的插入,更新,查询,分析操作示例】

经验总结扩展阅读