Hudi 数据湖的插入,更新,查询,分析操作示例

Hudi 数据湖的插入,更新,查询,分析操作示例作者:Grey
原文地址:
博客园:Hudi 数据湖的插入,更新,查询,分析操作示例
CSDN:Hudi 数据湖的插入,更新,查询,分析操作示例
前置工作首先,需要先完成
Linux 下搭建 Kafka 环境
Linux 下搭建 Hadoop 环境
Linux 下搭建 HBase 环境
Linux 下搭建 Hive 环境
本文基于上述四个环境已经搭建完成的基础上进行 Hudi 数据湖的插入,更新,查询操作 。
开发环境Scala 2.11.8
JDK 1.8
需要熟悉 Maven 构建项目和 Scala 一些基础语法 。
操作步骤master 节点首先启动集群,执行:
stop-dfs.sh && start-dfs.sh启动 yarn,执行:
stop-yarn.sh && start-yarn.sh然后准备一个 Mave 项目,在 src/main/resources 目录下,将 Hadoop 的一些配置文件拷贝进来,分别是
$HADOOP_HOME/etc/hadoop/core-site.xml 文件
<?xml version="1.0" encoding="UTF-8"?><?xml-stylesheet type="text/xsl" href="https://www.huyubaike.com/biancheng/configuration.xsl"?><configuration><property><name>fs.default.name</name><value>hdfs://master:9000</value></property><property><name>hadoop.tmp.dir</name><value>/usr/local/hadoop/tmp</value></property></configuration>注意,需要在你访问集群的机器上配置 host 文件,这样才可以识别 master 节点 。
$HADOOP_HOME/etc/hadoop/hdfs-site.xml 文件
<?xml version="1.0" encoding="UTF-8"?><?xml-stylesheet type="text/xsl" href="https://www.huyubaike.com/biancheng/configuration.xsl"?><configuration><property><name>dfs.replication</name><value>1</value></property><property><name>dfs.permissions</name><value>false</value></property></configuration>$HADOOP_HOME/etc/hadoop/yarn-site.xml 文件,目前还没有任何配置
<?xml version="1.0"?><configuration></configuration>然后,设计实体的数据结构,
package git.snippet.entitycase class MyEntity(uid: Int,uname: String,dt: String)插入数据代码如下
package git.snippet.testimport git.snippet.entity.MyEntityimport git.snippet.util.JsonUtilimport org.apache.spark.SparkConfimport org.apache.spark.sql.{SaveMode, SparkSession}object DataInsertion {def main(args: Array[String]): Unit = {System.setProperty("HADOOP_USER_NAME", "root")val sparkConf = new SparkConf().setAppName("MyFirstDataApp").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[*]")val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()val ssc = sparkSession.sparkContextssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")insertData(sparkSession)}def insertData(sparkSession: SparkSession) = {import org.apache.spark.sql.functions._import sparkSession.implicits._val commitTime = System.currentTimeMillis().toString //生成提交时间val df = sparkSession.read.text("/mydata/data1").mapPartitions(partitions => {partitions.map(item => {val jsonObject = JsonUtil.getJsonData(item.getString(0))MyEntity(jsonObject.getIntValue("uid"), jsonObject.getString("uname"), jsonObject.getString("dt"))})})val result = df.withColumn("ts", lit(commitTime)) //添加ts 时间戳列.withColumn("uuid", col("uid")).withColumn("hudipart", col("dt")) //增加hudi分区列result.write.format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", 2).option("hoodie.upsert.shuffle.parallelism", 2).option("PRECOMBINE_FIELD_OPT_KEY", "ts") //指定提交时间列.option("RECORDKEY_FIELD_OPT_KEY", "uuid") //指定uuid唯一标示列.option("hoodie.table.name", "myDataTable").option("hoodie.datasource.write.partitionpath.field", "hudipart") //分区列.mode(SaveMode.Overwrite).save("/snippet/data/hudi")}}

经验总结扩展阅读