Skip to content Skip to sidebar Skip to footer

How To Improve The Performance Of A Merge Operation With An Incremental DeltaLake Table?

I am specifically looking to optimize performance by updating and inserting data to a DeltaLake base table, with about 4 trillion records. Environment : Spark 3.0.0 DeltaLake 0.7.0

Solution 1:

Well I choose to share this answer so that you can take advantage of some tips.

Delta recommends using all partitioned columns, in this way the final data search is less, given by the effect of "pruning"

So it is necessary to identify all the cases where the merge can update the data, for this A query is made on the incremental data to generate a dictionary of this type:

filter_columns = spark.sql (f "" "
SELECT
    YEAR,
    MONTH,
    DAY,
    COLLECT_LIST (DISTINCT TYPE) AS TYPES
Incremental FROM
GROUP BY YEAR, MONTH, DAY
ORDER BY 1, 2, 3
"" ") .toPandas ()

With this df it is possible to generate the conditions where the merge must update / insert:

[! [df grouped by year, month, day, type] 1] 1

Then it generated a string called "final_cond" like this:

dic = filter_columns.groupby (['YEAR', 'MONTH', 'DAY']) ['TYPE']. apply (lambda grp: list (grp.value_counts (). index)). to_dict ()
final_cond = ''
index = 0
for key, value in dic.items ():
    cond = ''
    year = key [0]
    month = key [1]
    day = key [2]
    variables = ','. join (["'" + str (x) + "'" for x in value [0]])
    or_cond = '' if index + 1 == len (dic) else '\ nOR \ n'
    
    cond = f "" "({BASE_TABLE_NAME} .YEAR == {year} AND {BASE_TABLE_NAME} .MONTH == {month} AND {BASE_TABLE_NAME} .DAY == {day} AND {BASE_TABLE_NAME}. TYPE IN ({variables} )) "" "
      
    final_cond = final_cond + cond + f '{or_cond}'
    index + = 1
    #break
    
print (final_cond)

[! [string condition] 2]

Finally we add these conditions to the MERGE:

...
WHEN MATCHED AND ({final_cond}) THEN
...

This simple "filter" reduced the merge time for large operations


Post a Comment for "How To Improve The Performance Of A Merge Operation With An Incremental DeltaLake Table?"