Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
414 views
in Technique[技术] by (71.8m points)

python - Comparing columns in Pyspark

I am working on a PySpark DataFrame with n columns. I have a set of m columns (m < n) and my task is choose the column with max values in it.

For example:

Input: PySpark DataFrame containing :

col_1 = [1,2,3], col_2 = [2,1,4], col_3 = [3,2,5]

Ouput :

col_4 = max(col1, col_2, col_3) = [3,2,5]

There is something similar in pandas as explained in this question.

Is there any way of doing this in PySpark or should I change convert my PySpark df to Pandas df and then perform the operations?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can reduce using SQL expressions over a list of columns:

from pyspark.sql.functions import max as max_, col, when
from functools import reduce

def row_max(*cols):
    return reduce(
        lambda x, y: when(x > y, x).otherwise(y),
        [col(c) if isinstance(c, str) else c for c in cols]
    )

df = (sc.parallelize([(1, 2, 3), (2, 1, 2), (3, 4, 5)])
    .toDF(["a", "b", "c"]))

df.select(row_max("a", "b", "c").alias("max")))

Spark 1.5+ also provides least, greatest

from pyspark.sql.functions import greatest

df.select(greatest("a", "b", "c"))

If you want to keep name of the max you can use `structs:

from pyspark.sql.functions import struct, lit

def row_max_with_name(*cols):
    cols_ = [struct(col(c).alias("value"), lit(c).alias("col")) for c in cols]
    return greatest(*cols_).alias("greatest({0})".format(",".join(cols)))

 maxs = df.select(row_max_with_name("a", "b", "c").alias("maxs"))

And finally you can use above to find select "top" column:

from pyspark.sql.functions import max

((_, c), ) = (maxs
    .groupBy(col("maxs")["col"].alias("col"))
    .count()
    .agg(max(struct(col("count"), col("col"))))
    .first())

df.select(c)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...