Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
828 views
in Technique[技术] by (71.8m points)

spark streaming: select record with max timestamp for each id in dataframe (pyspark)

I have a dataframe with schema -

 |-- record_id: integer (nullable = true)
 |-- Data1: string (nullable = true)
 |-- Data2: string (nullable = true)
 |-- Data3: string (nullable = true)
 |-- Time: timestamp (nullable = true)

I wanted to retrieve the last record in the data, grouping by the record_id and with the greatest timestamp.

So,if the data is initially this:

 +----------+---------+---------+---------+-----------------------+
 |record_id |Data1    |Data2    |Data3    |                   Time|
 +----------+---------+-------------------------------------------+
 |        1 | aaa     | null    |  null   | 2018-06-04 21:51:53.0 |
 |        1 | null    | bbbb    |  cccc   | 2018-06-05 21:51:53.0 |
 |        1 | aaa     | null    |  dddd   | 2018-06-06 21:51:53.0 |
 |        1 | qqqq    | wwww    |  eeee   | 2018-06-07 21:51:53.0 |
 |        2 | aaa     | null    |  null   | 2018-06-04 21:51:53.0 |
 |        2 | aaaa    | bbbb    |  cccc   | 2018-06-05 21:51:53.0 |
 |        3 | aaa     | null    |  dddd   | 2018-06-06 21:51:53.0 |
 |        3 | aaaa    | bbbb    |  eeee   | 2018-06-08 21:51:53.0 |

I want the output to be

 +----------+---------+---------+---------+-----------------------+
 |record_id |Data1    |Data2    |Data3    |                   Time|
 +----------+---------+-------------------------------------------+
 |        1 | qqqq    | wwww    |  eeee   | 2018-06-07 21:51:53.0 |
 |        2 | aaaa    | bbbb    |  cccc   | 2018-06-05 21:51:53.0 |
 |        3 | aaaa    | bbbb    |  eeee   | 2018-06-08 21:51:53.0 |

I tried to join 2 queries on the same stream, similar to the answer here. My code (where df1 is the original dataframe) :

df1=df1.withWatermark("Timetemp", "2 seconds")
df1.createOrReplaceTempView("tbl")
df1.printSchema()
query="select t.record_id as record_id, max(t.Timetemp) as Timetemp from tbl t group by t.record_id"
df2=spark.sql(query)
df2=df2.withWatermark("Timetemp", "2 seconds")

qws=df1.alias('a').join(df2.alias('b'),((col('a.record_id')==col('b.record_id')) & (col("a.Timetemp")==col("b.Timetemp"))))

query = qws.writeStream.outputMode('append').format('console').start()

query.awaitTermination()  

I keep getting this error,though:

Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;

When there is clearly a watermark. What can be done ? I cannot use windowing since non time based windowing is not supported in streaming.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I had also the same task. Tried several options adding current_timestamp column to dataset and them grouping by window and record ID with watermarking, but nothing worked.

There are no API available to solve this task, as far as I found. Window with partitioned by and ordering does not work on streaming datasets.

I solve this task using MapGroupWithState API, but without keeping state as following:

import spark.implicits._

val stream = spark.readStream
  .option("maxFileAge", "24h")
  .option("maxFilesPerTrigger", "1000")
  .parquet(sourcePath)
  .as[input.Data]

val mostRecentRowPerPrimaryKey =
  stream
    .groupByKey(_.id)
    .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(takeMostRecentOnly)

mostRecentRowPerPrimaryKey
  .repartition(5)
  .writeStream
  .option("checkpointLocation", s"${streamingConfig.checkpointBasePath}/$streamName")
  .option("truncate", "false")
  .format("console")
  .outputMode(OutputMode.Update())
  .trigger(Trigger.ProcessingTime(60.seconds))
  .queryName(streamName)
  .start()

def takeMostRecentOnly(pk: Long, values: Iterator[input.Data], state: GroupState[input.Data]): input.Data = {
  values.maxBy(_.last_modified)
}

NOTE: that's only work in Update mode.

Hope it helps!


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

57.0k users

...