Skip to content Skip to sidebar Skip to footer

Perform A User Defined Function On A Column Of A Large Pyspark Dataframe Based On Some Columns Of Another Pyspark Dataframe On Databricks

My question is relevant to my previous one at How to efficiently join large pyspark dataframes and small python list for some NLP results on databricks. I have worked out part of i

Solution 1:

IIUC, you can try something like the following (I split the processing flow into 4 steps, Spark 2.4+ is required):

Step-1: convert all df2.tokens to lowercase so we can do text comparison:

from pyspark.sql.functions import expr, desc, row_number, broadcast

df2 = df2.withColumn('tokens', expr("transform(tokens, x -> lower(x))"))

Step-2: left-join df2 with df1 using arrays_overlap

df3 = df2.join(broadcast(df1), expr("arrays_overlap(terms, tokens)"), "left")

Step-3: use aggregate function to calculate matched_sum_of_weights from terms, termWeights and tokens

df4 = df3.selectExpr(
    "r_id",
    "tokens",
    "topic",
    """
      aggregate(
        /* find all terms+termWeights which are shown in tokens array */
        filter(arrays_zip(terms,termWeights), x -> array_contains(tokens, x.terms)),
        0D,
        /* get the sum of all termWeights from the matched terms */
        (acc, y) -> acc + y.termWeights
      ) as matched_sum_of_weights
    """)

Step-4: for each r_id, find the row with highest matched_sum_of_weights using Window function and only keep rows having row_number == 1

from pyspark.sql import Window
w1 = Window.partitionBy('r_id').orderBy(desc('matched_sum_of_weights'))

df_new = df4.withColumn('rn', row_number().over(w1)).filter('rn=1').drop('rn', 'matched_sum_of_weights')

Alternative: if the size of df1 is not very large, this might be handled without join/window.partition etc. below code only outlines the idea which you should improve based on your actual data:

from pyspark.sql.functions import expr, when, coalesce, array_contains, lit, struct

# create a dict from df1 with topic as key and list of termWeights+terms as value
d = df1.selectExpr("string(topic)", "arrays_zip(termWeights,terms) as terms").rdd.collectAsMap()

# ignore this if text comparison are case-sensitive, you might do the same to df1 as well
df2 = df2.withColumn('tokens', expr("transform(tokens, x -> lower(x))"))

# save the column names of the original df2
cols = df2.columns

# iterate through all items of d(or df1) and update df2 with new columns from each # topic with the value a struct containing `sum_of_weights`, `topic` and `has_match`(if any terms is matched)for x,y in d.items():
  df2 = df2.withColumn(x,
      struct(
        sum([when(array_contains('tokens', t.terms), t.termWeights).otherwise(0) for t in y]).alias('sum_of_weights'),
        lit(x).alias('topic'),
        coalesce(*[when(array_contains('tokens', t.terms),1) for t in y]).isNotNull().alias('has_match')
      )
  )

# create a new array containing all new columns(topics), and find array_max# from items with `has_match == true`, and then retrieve the `topic` field
df_new = df2.selectExpr(
    *cols,
    f"array_max(filter(array({','.join(map('`{}`'.format,d.keys()))}), x -> x.has_match)).topic as topic"
)

Post a Comment for "Perform A User Defined Function On A Column Of A Large Pyspark Dataframe Based On Some Columns Of Another Pyspark Dataframe On Databricks"