Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
296 views
in Technique[技术] by (71.8m points)

Spark SQL broadcast hash join

I'm trying to perform a broadcast hash join on dataframes using SparkSQL as documented here: https://docs.cloud.databricks.com/docs/latest/databricks_guide/06%20Spark%20SQL%20%26%20DataFrames/05%20BroadcastHashJoin%20-%20scala.html

In that example, the (small) DataFrame is persisted via saveAsTable and then there's a join via spark SQL (i.e. via sqlContext.sql("..."))

The problem I have is that I need to use the sparkSQL API to construct my SQL (I am left joining ~50 tables with an ID list, and don't want to write the SQL by hand).

How do I tell spark to use the broadcast hash join via the API?  The issue is that if I load the ID list (from the table persisted via `saveAsTable`) into a `DataFrame` to use in the join, it isn't clear to me if Spark can apply the broadcast hash join.
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can explicitly mark the DataFrame as small enough for broadcasting using broadcast function:

Python:

from pyspark.sql.functions import broadcast

small_df = ...
large_df = ...

large_df.join(broadcast(small_df), ["foo"])

or broadcast hint (Spark >= 2.2):

large_df.join(small_df.hint("broadcast"), ["foo"])

Scala:

import org.apache.spark.sql.functions.broadcast

val smallDF: DataFrame = ???
val largeDF: DataFrame = ???

largeDF.join(broadcast(smallDF), Seq("foo"))

or broadcast hint (Spark >= 2.2):

largeDF.join(smallDF.hint("broadcast"), Seq("foo"))

SQL

You can use hints (Spark >= 2.2):

SELECT /*+ MAPJOIN(small) */ * 
FROM large JOIN small
ON large.foo = small.foo

or

SELECT /*+  BROADCASTJOIN(small) */ * 
FROM large JOIN small
ON large.foo = small.foo

or

SELECT /*+ BROADCAST(small) */ * 
FROM large JOIN small
ON larger.foo = small.foo

R (SparkR):

With hint (Spark >= 2.2):

join(large, hint(small, "broadcast"), large$foo == small$foo)

With broadcast (Spark >= 2.3)

join(large, broadcast(small), large$foo == small$foo)

Note:

Broadcast join is useful if one of structures is relatively small. Otherwise it can be significantly more expensive than a full shuffle.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...