Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
298 views
in Technique[技术] by (71.8m points)

python - Combine PySpark DataFrame ArrayType fields into single ArrayType field

I have a PySpark DataFrame with 2 ArrayType fields:

>>>df
DataFrame[id: string, tokens: array<string>, bigrams: array<string>]
>>>df.take(1)
[Row(id='ID1', tokens=['one', 'two', 'two'], bigrams=['one two', 'two two'])]

I would like to combine them into a single ArrayType field:

>>>df2
DataFrame[id: string, tokens_bigrams: array<string>]
>>>df2.take(1)
[Row(id='ID1', tokens_bigrams=['one', 'two', 'two', 'one two', 'two two'])]

The syntax that works with strings does not seem to work here:

df2 = df.withColumn('tokens_bigrams', df.tokens + df.bigrams)

Thanks!

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Spark >= 2.4

You can use concat function (SPARK-23736):

from pyspark.sql.functions import col, concat 

df.select(concat(col("tokens"), col("tokens_bigrams"))).show(truncate=False)

# +---------------------------------+                                             
# |concat(tokens, tokens_bigrams)   |
# +---------------------------------+
# |[one, two, two, one two, two two]|
# |null                             |
# +---------------------------------+

To keep data when one of the values is NULL you can coalesce with array:

from pyspark.sql.functions import array, coalesce      

df.select(concat(
    coalesce(col("tokens"), array()),
    coalesce(col("tokens_bigrams"), array())
)).show(truncate = False)

# +--------------------------------------------------------------------+
# |concat(coalesce(tokens, array()), coalesce(tokens_bigrams, array()))|
# +--------------------------------------------------------------------+
# |[one, two, two, one two, two two]                                   |
# |[three]                                                             |
# +--------------------------------------------------------------------+

Spark < 2.4

Unfortunately to concatenate array columns in general case you'll need an UDF, for example like this:

from itertools import chain
from pyspark.sql.functions import col, udf
from pyspark.sql.types import *


def concat(type):
    def concat_(*args):
        return list(chain.from_iterable((arg if arg else [] for arg in args)))
    return udf(concat_, ArrayType(type))

which can be used as:

df = spark.createDataFrame(
    [(["one", "two", "two"], ["one two", "two two"]), (["three"], None)], 
    ("tokens", "tokens_bigrams")
)

concat_string_arrays = concat(StringType())
df.select(concat_string_arrays("tokens", "tokens_bigrams")).show(truncate=False)

# +---------------------------------+
# |concat_(tokens, tokens_bigrams)  |
# +---------------------------------+
# |[one, two, two, one two, two two]|
# |[three]                          |
# +---------------------------------+

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

56.9k users

...