Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.7k views
in Technique[技术] by (71.8m points)

apache spark - Efficient pyspark join

I've read a lot about how to do efficient joins in pyspark. The ways to achieve efficient joins I've found are basically:

  • Use a broadcast join if you can. (I usually can't because the dataframes are too large)
  • Consider using a very large cluster. (I'd rather not because of $$$).
  • Use the same partitioner.

The last one is the one i'd rather try, but I can't find a way to do it in pyspark. I've tried:

df.repartition(numberOfPartitions,['parition_col1','partition_col2'])

but it doesn't help, it still takes way too long until I stop it, because spark get's stucked in the last few jobs.

So, how can I use the same partitioner in pyspark and speed up my joins, or even get rid of the shuffles that takes forever ? Which code do I need to use ?

PD: I've checked other articles, even on stackoverflow, but I still can't see code.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

you can also use a two-pass approach, in case it suits your requirement.First, re-partition the data and persist using partitioned tables (dataframe.write.partitionBy()). Then, join sub-partitions serially in a loop, "appending" to the same final result table. It was nicely explained by Sim. see link below

two pass approach to join big dataframes in pyspark

based on case explained above I was able to join sub-partitions serially in a loop and then persisting joined data to hive table.

Here is the code.

from pyspark.sql.functions import *
emp_df_1.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_1")
emp_df_2.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_2")

So, if you are joining on an integer emp_id, you can partition by the ID modulo some number and this way you can re distribute the load across the spark partitions and records having similar keys will be grouped together and reside on same partition. you can then read and loop through each sub partition data and join both the dataframes and persist them together.

counter =0;
paritioncount = 4;
while counter<=paritioncount:
    query1 ="SELECT * FROM UDB.temptable_1 where par_id={}".format(counter)
    query2 ="SELECT * FROM UDB.temptable_2 where par_id={}".format(counter)
    EMP_DF1 =spark.sql(query1)
    EMP_DF2 =spark.sql(query2)
    df1 = EMP_DF1.alias('df1')
    df2 = EMP_DF2.alias('df2')
    innerjoin_EMP = df1.join(df2, df1.emp_id == df2.emp_id,'inner').select('df1.*')
    innerjoin_EMP.show()
    innerjoin_EMP.write.format('orc').insertInto("UDB.temptable")
    counter = counter +1

I have tried this and this is working fine. This is just an example to demo the two-pass approach. your join conditions may vary and the number of partitions also depending on your data size.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...