Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
149 views
in Technique[技术] by (71.8m points)

python - PySpark: Fill NAs with mode of column based on aggregation of other columns

** Please note the edits at the bottom, with an adapted solution script from Anna K. (thank you!) **

I have a dataframe with 4 columns:

# Compute the mode to fill NAs for Item
values = [(None, 'Red', 3, 10), (None, 'Red', 6, 20), ('A', 'Green', 5, 30),
         ('A', 'Red', 3, 10), (None, 'Green', 6, 10), ('B', 'Green', 5, 10),
         ('B', 'Orange', 6, 20), ('A', 'Red', 3, 20), ('B', 'Green', 5, 30),
         (None, 'Red', 3, 10)]
items = spark.createDataFrame(values, ['Item', 'Color', 'Weight', 'Price'])
items.show()

#+----+------+------+-----+
#|Item| Color|Weight|Price|
#+----+------+------+-----+
#|null|   Red|     3|   10|
#|null|   Red|     6|   20|
#|   A| Green|     5|   30|
#|   A|   Red|     3|   10|
#|null| Green|     6|   10|
#|   B| Green|     5|   10|
#|   B|Orange|     6|   20|
#|   A|   Red|     3|   20|
#|   B| Green|     5|   30|
#|null|   Red|     3|   10|
#+----+------+------+-----+

I would like to fill the null 'Item' values with the mode of the 'Item' column when grouped by 'Weight' and 'Color':

grouped = items.where(items['Item'].isNotNull()).groupBy('Item', 'Weight', 'Color').count()
window = Window.partitionBy('Weight').orderBy(desc("count"))
grouped = grouped
    .withColumn('order', row_number().over(window))
    .where(col('order') == 1)

grouped.show()

#+----+------+------+-----+-----+
#|Item|Weight| Color|count|order|
#+----+------+------+-----+-----+
#|   B|     6|Orange|    1|    1|
#|   B|     5| Green|    2|    1|
#|   A|     3|   Red|    2|    1|
#+----+------+------+-----+-----+

In this case, all null 'Item' values with a 'Weight' and 'Color' combination of 6 and Orange will be assigned 'B'.

All null 'Item' values with a 'Weight' = 5 and 'Color' = Green will also be assigned 'B'.

Null 'Item' records with 'Weight' = 3 and 'Color' = Red will be assigned Item = A.

My first thought was to perform some sort of join using this new grouped df and my original df- but it's failing, and is also massively inelegant. Is there a more streamlined way to do this?

# Not elegant, and not working...

# Add new empty column to fill NAs
items = items.withColumn('item_weight_impute', lit(None))

# Select columns to include in the join based on weight
items.join(grouped.select('Item','Weight','Color'), ['Item','Weight','Color'], 'left_outer') 
    .withColumn('item_weight_impute', when((col('Item').isNull()), grouped.Item).otherwise(items.Item)) 
    .select('Item','Color','Weight', 'Price','item_weight_impute') 
    .show()

#+----+------+------+-----+------------------+
#|Item| Color|Weight|Price|item_weight_impute|
#+----+------+------+-----+------------------+
#|   B|Orange|     6|   20|                 B|
#|   A| Green|     5|   30|                 A|
#|null|   Red|     3|   10|              null|
#|null|   Red|     3|   10|              null|
#|   B| Green|     5|   30|                 B|
#|   B| Green|     5|   10|                 B|
#|null| Green|     6|   10|              null|
#|   A|   Red|     3|   20|                 A|
#|   A|   Red|     3|   10|                 A|
#|null|   Red|     6|   20|              null|
#+----+------+------+-----+------------------+

EDITS! Courtesy of Anna K. Here is an adaptation of the answer with a twist- perhaps we would like to programmatically impute > 1 categorical column. Here, we impute Item and Color based on the combination of Weight and Price:

# Practice- compute hierarchical modes to fill NAs for Item and Color
values = [(None, 'Red', 3, 10), (None, 'Red', 6, 20), ('A', 'Green', 5, 30),
         ('A', None, 3, 10), (None, None, 6, 10), ('B', 'Green', 5, 10),
         ('B', 'Orange', 6, 20), ('A', 'Red', 3, 20), ('B', None, 5, 30),
         (None, 'Red', 3, 10)]
items = spark.createDataFrame(values, ['Item', 'Color', 'Weight', 'Price'])
items.show()
#+----+------+------+-----+
#|Item| Color|Weight|Price|
#+----+------+------+-----+
#|null|   Red|     3|   10|
#|null|   Red|     6|   20|
#|   A| Green|     5|   30|
#|   A|  null|     3|   10|
#|null|  null|     6|   10|
#|   B| Green|     5|   10|
#|   B|Orange|     6|   20|
#|   A|   Red|     3|   20|
#|   B|  null|     5|   30|
#|null|   Red|     3|   10|
#+----+------+------+-----+

mode_columns=['Item', 'Color']

# Weight + Price
for item in mode_columns:
    # Count all occurrences of Weight + Price combos
    df1 = (items
       .dropna(subset=[f'{item}'])
       .groupBy(f'{item}', "Weight", "Price")
       .agg(f.count("Price").alias("count")))

    # Reduce df1 to only include those most frequent Weight + Price combos
    df2 = (df1
       .groupBy("Weight", "Price")
       .agg(f.max("count").alias("count")))

    # Join with df of counts to get the {item} mode
    grouped = (df1
           .join(df2, ["Weight", "Price", "count"])
           .withColumnRenamed(f'{item}', f'{item}_fill_value')
           .drop("count"))
    
    #Join with original df
    items = items.join(grouped, ["Weight", "Price"], "left" )
    
    # Coalesce the original and imputed {item} columns
    items = items.withColumn(f'{item}_weight_price_impute', f.coalesce(f'{item}', f'{item}_fill_value'))
    items = items.drop(f'{item}', f'{item}_fill_value')
    items = items.withColumnRenamed(f'{item}_weight_price_impute', f'{item}')
    items.show()
#+------+-----+----+------+
#|Weight|Price|Item| Color|
#+------+-----+----+------+
#|     5|   30|   B| Green|
#|     5|   30|   B| Green|
#|     5|   30|   A| Green|
#|     5|   30|   A| Green|
#|     3|   20|   A|   Red|
#|     6|   10|null|  null|
#|     5|   10|   B| Green|
#|     3|   10|   A|   Red|
#|     3|   10|   A|   Red|
#|     3|   10|   A|   Red|
#|     6|   20|   B|   Red|
#|     6|   20|   B|   Red|
#|     6|   20|   B|Orange|
#|     6|   20|   B|Orange|
#+------+-----+----+------+


question from:https://stackoverflow.com/questions/65928710/pyspark-fill-nas-with-mode-of-column-based-on-aggregation-of-other-columns

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Once you have grouped, you can do a left join of items with grouped, and then use coalesce to fill in null values in Item column. Coalesce function returns the first column that is not null.

Step 1. get grouped df:

df1 = (items
   .dropna()
   .groupBy("Item", "Weight", "Color")
   .agg(F.count("Price").alias("count")))

df2 = (df1
   .groupBy("Weight", "Color")
   .agg(F.max("count").alias("count")))

grouped = (df1
       .join(df2, ["Weight", "Color", "count"])
       .withColumnRenamed("Item", "fill_value")
       .drop("count"))

Step 2. Left join items with grouped

df = items.join(grouped, ["Weight", "Color"], "left" )

Step 3. apply coalesce

df_filled = df.withColumn("item_weight_impute", F.coalesce("Item", "fill_value"))

df_filled is now

enter image description here


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...