Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
664 views
in Technique[技术] by (71.8m points)

dataframe - Spark custom aggregation : collect_list+UDF vs UDAF

I often have the need to perform custom aggregations on dataframes in spark 2.1, and used these two approaches :

  • using groupby/collect_list to get all the values in a single row, then apply an UDF to aggregate the values
  • Writing a custom UDAF (User defined aggregate function)

I generally prefer the first option as its easier to implement and more readable than the UDAF implementation. But I would assume that the first option is generally slower, because more data is sent around the network (no partial aggregation), but my experience shows that UDAF are generally slow. Why is that?

Concrete example: Calculating histograms:

Data is in a hive table (1E6 random double values)

val df = spark.table("testtable")

def roundToMultiple(d:Double,multiple:Double) = Math.round(d/multiple)*multiple

UDF approach:

val udf_histo = udf((xs:Seq[Double]) => xs.groupBy(x => roundToMultiple(x,0.25)).mapValues(_.size))

df.groupBy().agg(collect_list($"x").as("xs")).select(udf_histo($"xs")).show(false)

+--------------------------------------------------------------------------------+
|UDF(xs)                                                                         |
+--------------------------------------------------------------------------------+
|Map(0.0 -> 125122, 1.0 -> 124772, 0.75 -> 250819, 0.5 -> 248696, 0.25 -> 250591)|
+--------------------------------------------------------------------------------+

UDAF-Approach

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

import scala.collection.mutable

class HistoUDAF(binWidth:Double) extends UserDefinedAggregateFunction {

  override def inputSchema: StructType =
    StructType(
      StructField("value", DoubleType) :: Nil
    )

  override def bufferSchema: StructType =
    new StructType()
      .add("histo", MapType(DoubleType, IntegerType))

  override def deterministic: Boolean = true
  override def dataType: DataType = MapType(DoubleType, IntegerType)
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = Map[Double, Int]()
  }

  private def mergeMaps(a: Map[Double, Int], b: Map[Double, Int]) = {
    a ++ b.map { case (k,v) => k -> (v + a.getOrElse(k, 0)) }
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
     val oldBuffer = buffer.getAs[Map[Double, Int]](0)
     val newInput = Map(roundToMultiple(input.getDouble(0),binWidth) -> 1) 
     buffer(0) = mergeMaps(oldBuffer, newInput)
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val a = buffer1.getAs[Map[Double, Int]](0)
    val b = buffer2.getAs[Map[Double, Int]](0)
    buffer1(0) = mergeMaps(a, b)
  }

  override def evaluate(buffer: Row): Any = {
    buffer.getAs[Map[Double, Int]](0)
  }
}

val histo = new HistoUDAF(0.25)

df.groupBy().agg(histo($"x")).show(false)

+--------------------------------------------------------------------------------+
|histoudaf(x)                                                                    |
+--------------------------------------------------------------------------------+
|Map(0.0 -> 125122, 1.0 -> 124772, 0.75 -> 250819, 0.5 -> 248696, 0.25 -> 250591)|
+--------------------------------------------------------------------------------+

My tests show that the collect_list/UDF approach is about 2 times faster than the UDAF approach. Is this a general rule, or are there cases where UDAF is really much faster and the rather awkward implemetation is justified?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

56.9k users

...