2.假设我们的增量数据存储在下表中(非Hudi格式,可以是Hive) 。
+---------+-------------+-----------------+---------------+-------------------+-------------------+|seller_id|prod_category|product_name |product_package|discount_percentage|eff_start_ts |+---------+-------------+-----------------+---------------+-------------------+-------------------+|1234 |Detergent |Tide 5L |6 |25 |2022-01-31 10:00:30||4565 |Gourmet |Dairy Milk Almond|12 |45 |2022-06-12 20:30:40||3345 |Stationary |Sticky Notes |4 |12 |2022-07-09 21:30:45|+---------+-------------+-----------------+---------------+-------------------+-------------------+
- 现在让我们通过对目标表进行Left Anti Join过滤掉增量表中的所有 Insert only 记录 。
val updFileDf = spark.read.option("header",true).csv("gs://target_bucket/hudi_product_catalog/hudi_product_update.csv")val tgtHudiDf = spark.sql("select * from hudi_product_catalog")hudiTableData.createOrReplaceTempView("hudiTable")//Cast as neededval stgDf = updFileDf.withColumn("eff_start_ts",to_timestamp(col("eff_start_ts"))).withColumn("seller_id",col("seller_id").cast("int"))//Prepare an insert DF from incremental temp DFval instmpDf = stgDf.as("stg") .join(tgtHudiDf.as("tgt"), col("stg.seller_id") === col("tgt.seller_id") && col("stg.prod_category") === col("tgt.prod_category"),"left_anti").select("stg.*")val insDf = instmpDf.withColumn("eff_end_ts",to_timestamp(lit("9999-12-31 23:59:59"))).withColumn("actv_ind",lit(1))insDf.show(false)+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+|seller_id|prod_category|product_name|product_package|discount_percentage| eff_start_ts| eff_end_ts|actv_ind|+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+| 3345| Stationary|Sticky Notes| 4| 12|2022-07-09 21:30:45|9999-12-31 23:59:59| 1|+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+
- 我们有一个只插入记录的DataFrame 。接下来让我们创建一个DataFrame,其中将包含来自 delta 表和目标表的属性,并在目标上使用内连接,它将获取需要更新的记录 。
经验总结扩展阅读
-
2022年10月24日是出火的黄道吉日吗 2022年农历九月廿九宜出火吗
-
亚麻衣服怎么洗不缩水,亚麻面料的衣服这样清洗不会缩水?
-
-
-
-
-
-
-
-
导语:心仪的他就在眼前 心仪的男人不唐突的4种搭讪方式,教你把握机会,迈出追爱第一步
-
-
再过4天,鸿运当头,高人一等,喜事接踵,横财不断,尽享富贵
-
六款2023年最佳手机推荐 手机好用排行榜2023
-
-
疾病 40岁是肠道疾病“高发期”,若想肠道更健康,3个坏习惯要改掉
-
-
-
-
-