渐变维度 使用 Apache Hudi 实现 SCD-2( 三 )

2.假设我们的增量数据存储在下表中(非Hudi格式,可以是Hive) 。
+---------+-------------+-----------------+---------------+-------------------+-------------------+|seller_id|prod_category|product_name     |product_package|discount_percentage|eff_start_ts       |+---------+-------------+-----------------+---------------+-------------------+-------------------+|1234     |Detergent    |Tide 5L          |6              |25                 |2022-01-31 10:00:30||4565     |Gourmet      |Dairy Milk Almond|12             |45                 |2022-06-12 20:30:40||3345     |Stationary   |Sticky Notes     |4              |12                 |2022-07-09 21:30:45|+---------+-------------+-----------------+---------------+-------------------+-------------------+

  1. 现在让我们通过对目标表进行Left Anti Join过滤掉增量表中的所有 Insert only 记录 。
val updFileDf = spark.read.option("header",true).csv("gs://target_bucket/hudi_product_catalog/hudi_product_update.csv")val tgtHudiDf = spark.sql("select * from hudi_product_catalog")hudiTableData.createOrReplaceTempView("hudiTable")//Cast as neededval stgDf = updFileDf.withColumn("eff_start_ts",to_timestamp(col("eff_start_ts"))).withColumn("seller_id",col("seller_id").cast("int"))//Prepare an insert DF from incremental temp DFval instmpDf = stgDf.as("stg")      .join(tgtHudiDf.as("tgt"),        col("stg.seller_id") === col("tgt.seller_id") &&          col("stg.prod_category") === col("tgt.prod_category"),"left_anti").select("stg.*")val insDf = instmpDf.withColumn("eff_end_ts",to_timestamp(lit("9999-12-31 23:59:59"))).withColumn("actv_ind",lit(1))insDf.show(false)+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+|seller_id|prod_category|product_name|product_package|discount_percentage|       eff_start_ts|         eff_end_ts|actv_ind|+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+|     3345|   Stationary|Sticky Notes|              4|                 12|2022-07-09 21:30:45|9999-12-31 23:59:59|       1|+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+
  1. 我们有一个只插入记录的DataFrame 。接下来让我们创建一个DataFrame,其中将包含来自 delta 表和目标表的属性,并在目标上使用内连接,它将获取需要更新的记录 。

    经验总结扩展阅读