Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.4k views
in Technique[技术] by (71.8m points)

scala - Explanation of fold method of spark RDD

I am running Spark-1.4.0 pre-built for Hadoop-2.4 (in local mode) to calculate the sum of squares of a DoubleRDD. My Scala code looks like

sc.parallelize(Array(2., 3.)).fold(0.0)((p, v) => p+v*v)

And it gave a surprising result 97.0.

This is quite counter-intuitive compared to the Scala version of fold

Array(2., 3.).fold(0.0)((p, v) => p+v*v)

which gives the expected answer 13.0.

It seems quite likely that I have made some tricky mistakes in the code due to a lack of understanding. I have read about how the function used in RDD.fold() should be communicative otherwise the result may depend on partitions and etc. So example, if I change the number of partitions to 1,

sc.parallelize(Array(2., 3.), 1).fold(0.0)((p, v) => p+v*v)

the code will give me 169.0 on my machine!

Can someone explain what exactly is happening here?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Well, it is actually pretty well explained by the official documentation:

Aggregate the elements of each partition, and then the results for all the partitions, using a given associative and commutative function and a neutral "zero value". The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.

This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection.

To illustrate what is going on lets try to simulate what is going on step by step:

val rdd = sc.parallelize(Array(2., 3.))

val byPartition = rdd.mapPartitions(
    iter => Array(iter.fold(0.0)((p, v) => (p +  v * v))).toIterator).collect()

It gives us something similar to this Array[Double] = Array(0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 9.0) and

byPartition.reduce((p, v) => (p + v * v))

returns 97

Important thing to note is that results can differ from run to run depending on an order in which partitions are combined.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

57.0k users

...