Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
682 views
in Technique[技术] by (71.8m points)

scala - Calculate the standard deviation of grouped data in a Spark DataFrame

I have user logs that I have taken from a csv and converted into a DataFrame in order to leverage the SparkSQL querying features. A single user will create numerous entries per hour, and I would like to gather some basic statistical information for each user; really just the count of the user instances, the average, and the standard deviation of numerous columns. I was able to quickly get the mean and count information by using groupBy($"user") and the aggregator with SparkSQL functions for count and avg:

val meanData = selectedData.groupBy($"user").agg(count($"logOn"),
avg($"transaction"), avg($"submit"), avg($"submitsPerHour"), avg($"replies"),
avg($"repliesPerHour"), avg($"duration"))

However, I cannot seem to find an equally elegant way to calculate the standard deviation. So far I can only calculate it by mapping a string, double pair and use StatCounter().stdev utility:

val stdevduration = duration.groupByKey().mapValues(value =>
org.apache.spark.util.StatCounter(value).stdev)

This returns an RDD however, and I would like to try and keep it all in a DataFrame for further queries to be possible on the returned data.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Spark 1.6+

You can use stddev_pop to compute population standard deviation and stddev / stddev_samp to compute unbiased sample standard deviation:

import org.apache.spark.sql.functions.{stddev_samp, stddev_pop}

selectedData.groupBy($"user").agg(stdev_pop($"duration"))

Spark 1.5 and below (The original answer):

Not so pretty and biased (same as the value returned from describe) but using formula:

wikipedia sdev

you can do something like this:

import org.apache.spark.sql.functions.sqrt

selectedData
    .groupBy($"user")
    .agg((sqrt(
        avg($"duration" * $"duration") -
        avg($"duration") * avg($"duration")
     )).alias("duration_sd"))

You can of course create a function to reduce the clutter:

import org.apache.spark.sql.Column
def mySd(col: Column): Column = {
    sqrt(avg(col * col) - avg(col) * avg(col))
}

df.groupBy($"user").agg(mySd($"duration").alias("duration_sd"))

It is also possible to use Hive UDF:

df.registerTempTable("df")
sqlContext.sql("""SELECT user, stddev(duration)
                  FROM df
                  GROUP BY user""")

Source of the image: https://en.wikipedia.org/wiki/Standard_deviation


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...