Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
786 views
in Technique[技术] by (71.8m points)

scala - Spark and SparkSQL: How to imitate window function?

Description

Given a dataframe df

id |       date
---------------
 1 | 2015-09-01
 2 | 2015-09-01
 1 | 2015-09-03
 1 | 2015-09-04
 2 | 2015-09-04

I want to create a running counter or index,

  • grouped by the same id and
  • sorted by date in that group,

thus

id |       date |  counter
--------------------------
 1 | 2015-09-01 |        1
 1 | 2015-09-03 |        2
 1 | 2015-09-04 |        3
 2 | 2015-09-01 |        1
 2 | 2015-09-04 |        2

This is something I can achieve with window function, e.g.

val w = Window.partitionBy("id").orderBy("date")
val resultDF = df.select( df("id"), rowNumber().over(w) )

Unfortunately, Spark 1.4.1 does not support window functions for regular dataframes:

org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;

Questions

  • How can I achieve the above computation on current Spark 1.4.1 without using window functions?
  • When will window functions for regular dataframes be supported in Spark?

Thanks!

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can use HiveContext for local DataFrames as well and, unless you have a very good reason not to, it is probably a good idea anyway. It is a default SQLContext available in spark-shell and pyspark shell (as for now sparkR seems to use plain SQLContext) and its parser is recommended by Spark SQL and DataFrame Guide.

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rowNumber

object HiveContextTest {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Hive Context")
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._

    val df = sc.parallelize(
        ("foo", 1) :: ("foo", 2) :: ("bar", 1) :: ("bar", 2) :: Nil
    ).toDF("k", "v")

    val w = Window.partitionBy($"k").orderBy($"v")
    df.select($"k", $"v", rowNumber.over(w).alias("rn")).show
  }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...