** Please note the edits at the bottom, with an adapted solution script from Anna K. (thank you!) **
I have a dataframe with 4 columns:
# Compute the mode to fill NAs for Item
values = [(None, 'Red', 3, 10), (None, 'Red', 6, 20), ('A', 'Green', 5, 30),
('A', 'Red', 3, 10), (None, 'Green', 6, 10), ('B', 'Green', 5, 10),
('B', 'Orange', 6, 20), ('A', 'Red', 3, 20), ('B', 'Green', 5, 30),
(None, 'Red', 3, 10)]
items = spark.createDataFrame(values, ['Item', 'Color', 'Weight', 'Price'])
items.show()
#+----+------+------+-----+
#|Item| Color|Weight|Price|
#+----+------+------+-----+
#|null| Red| 3| 10|
#|null| Red| 6| 20|
#| A| Green| 5| 30|
#| A| Red| 3| 10|
#|null| Green| 6| 10|
#| B| Green| 5| 10|
#| B|Orange| 6| 20|
#| A| Red| 3| 20|
#| B| Green| 5| 30|
#|null| Red| 3| 10|
#+----+------+------+-----+
I would like to fill the null 'Item' values with the mode of the 'Item' column when grouped by 'Weight' and 'Color':
grouped = items.where(items['Item'].isNotNull()).groupBy('Item', 'Weight', 'Color').count()
window = Window.partitionBy('Weight').orderBy(desc("count"))
grouped = grouped
.withColumn('order', row_number().over(window))
.where(col('order') == 1)
grouped.show()
#+----+------+------+-----+-----+
#|Item|Weight| Color|count|order|
#+----+------+------+-----+-----+
#| B| 6|Orange| 1| 1|
#| B| 5| Green| 2| 1|
#| A| 3| Red| 2| 1|
#+----+------+------+-----+-----+
In this case, all null 'Item' values with a 'Weight' and 'Color' combination of 6 and Orange will be assigned 'B'.
All null 'Item' values with a 'Weight' = 5 and 'Color' = Green will also be assigned 'B'.
Null 'Item' records with 'Weight' = 3 and 'Color' = Red will be assigned Item = A.
My first thought was to perform some sort of join using this new grouped df and my original df- but it's failing, and is also massively inelegant. Is there a more streamlined way to do this?
# Not elegant, and not working...
# Add new empty column to fill NAs
items = items.withColumn('item_weight_impute', lit(None))
# Select columns to include in the join based on weight
items.join(grouped.select('Item','Weight','Color'), ['Item','Weight','Color'], 'left_outer')
.withColumn('item_weight_impute', when((col('Item').isNull()), grouped.Item).otherwise(items.Item))
.select('Item','Color','Weight', 'Price','item_weight_impute')
.show()
#+----+------+------+-----+------------------+
#|Item| Color|Weight|Price|item_weight_impute|
#+----+------+------+-----+------------------+
#| B|Orange| 6| 20| B|
#| A| Green| 5| 30| A|
#|null| Red| 3| 10| null|
#|null| Red| 3| 10| null|
#| B| Green| 5| 30| B|
#| B| Green| 5| 10| B|
#|null| Green| 6| 10| null|
#| A| Red| 3| 20| A|
#| A| Red| 3| 10| A|
#|null| Red| 6| 20| null|
#+----+------+------+-----+------------------+
EDITS! Courtesy of Anna K.
Here is an adaptation of the answer with a twist- perhaps we would like to programmatically impute > 1 categorical column. Here, we impute Item and Color based on the combination of Weight and Price:
# Practice- compute hierarchical modes to fill NAs for Item and Color
values = [(None, 'Red', 3, 10), (None, 'Red', 6, 20), ('A', 'Green', 5, 30),
('A', None, 3, 10), (None, None, 6, 10), ('B', 'Green', 5, 10),
('B', 'Orange', 6, 20), ('A', 'Red', 3, 20), ('B', None, 5, 30),
(None, 'Red', 3, 10)]
items = spark.createDataFrame(values, ['Item', 'Color', 'Weight', 'Price'])
items.show()
#+----+------+------+-----+
#|Item| Color|Weight|Price|
#+----+------+------+-----+
#|null| Red| 3| 10|
#|null| Red| 6| 20|
#| A| Green| 5| 30|
#| A| null| 3| 10|
#|null| null| 6| 10|
#| B| Green| 5| 10|
#| B|Orange| 6| 20|
#| A| Red| 3| 20|
#| B| null| 5| 30|
#|null| Red| 3| 10|
#+----+------+------+-----+
mode_columns=['Item', 'Color']
# Weight + Price
for item in mode_columns:
# Count all occurrences of Weight + Price combos
df1 = (items
.dropna(subset=[f'{item}'])
.groupBy(f'{item}', "Weight", "Price")
.agg(f.count("Price").alias("count")))
# Reduce df1 to only include those most frequent Weight + Price combos
df2 = (df1
.groupBy("Weight", "Price")
.agg(f.max("count").alias("count")))
# Join with df of counts to get the {item} mode
grouped = (df1
.join(df2, ["Weight", "Price", "count"])
.withColumnRenamed(f'{item}', f'{item}_fill_value')
.drop("count"))
#Join with original df
items = items.join(grouped, ["Weight", "Price"], "left" )
# Coalesce the original and imputed {item} columns
items = items.withColumn(f'{item}_weight_price_impute', f.coalesce(f'{item}', f'{item}_fill_value'))
items = items.drop(f'{item}', f'{item}_fill_value')
items = items.withColumnRenamed(f'{item}_weight_price_impute', f'{item}')
items.show()
#+------+-----+----+------+
#|Weight|Price|Item| Color|
#+------+-----+----+------+
#| 5| 30| B| Green|
#| 5| 30| B| Green|
#| 5| 30| A| Green|
#| 5| 30| A| Green|
#| 3| 20| A| Red|
#| 6| 10|null| null|
#| 5| 10| B| Green|
#| 3| 10| A| Red|
#| 3| 10| A| Red|
#| 3| 10| A| Red|
#| 6| 20| B| Red|
#| 6| 20| B| Red|
#| 6| 20| B|Orange|
#| 6| 20| B|Orange|
#+------+-----+----+------+
question from:
https://stackoverflow.com/questions/65928710/pyspark-fill-nas-with-mode-of-column-based-on-aggregation-of-other-columns