Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
234 views
in Technique[技术] by (71.8m points)

apache spark - DataFrame join optimization - Broadcast Hash Join

I am trying to effectively join two DataFrames, one of which is large and the second is a bit smaller.

Is there a way to avoid all this shuffling? I cannot set autoBroadCastJoinThreshold, because it supports only Integers - and the table I am trying to broadcast is slightly bigger than integer number of bytes.

Is there a way to force broadcast ignoring this variable?

question from:https://stackoverflow.com/questions/32435263/dataframe-join-optimization-broadcast-hash-join

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Broadcast Hash Joins (similar to map side join or map-side combine in Mapreduce) :

In SparkSQL you can see the type of join being performed by calling queryExecution.executedPlan. As with core Spark, if one of the tables is much smaller than the other you may want a broadcast hash join. You can hint to Spark SQL that a given DF should be broadcast for join by calling method broadcast on the DataFrame before joining it

Example: largedataframe.join(broadcast(smalldataframe), "key")

in DWH terms, where largedataframe may be like fact
smalldataframe may be like dimension

As described by my fav book (HPS) pls. see below to have better understanding.. enter image description here

Note : Above broadcast is from import org.apache.spark.sql.functions.broadcast not from SparkContext

Spark also, automatically uses the spark.sql.conf.autoBroadcastJoinThreshold to determine if a table should be broadcast.

Tip : see DataFrame.explain() method

def
explain(): Unit
Prints the physical plan to the console for debugging purposes.

Is there a way to force broadcast ignoring this variable?

sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")


NOTE :

Another similar out of box note w.r.t. Hive (not spark) : Similar thing can be achieved using hive hint MAPJOIN like below...

Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key

hive> set hive.auto.convert.join=true;
hive> set hive.auto.convert.join.noconditionaltask.size=20971520
hive> set hive.auto.convert.join.noconditionaltask=true;
hive> set hive.auto.convert.join.use.nonstaged=true;
hive> set hive.mapjoin.smalltable.filesize = 30000000; // default 25 mb made it as 30mb

Further Reading : Please refer my article on BHJ, SHJ, SMJ


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...