Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
309 views
in Technique[技术] by (71.8m points)

scala - How to find the difference between 1st row and nth row of a dataframe based on a condition using Spark Windowing

Here is my exact requirement. I have to add a new column named ("DAYS_TO_NEXT_PD_ENCOUNTER"). As the name indicates, the values in the new column should have a difference of RANK that has claim_typ as 'PD' and the current row. For one ID, it can occur in-between any of the 'RV's and 'RJ's. For the rows that are present after the first occurence of claim_typ as 'PD', the difference should be null as shown below:

The API 'last' works if the clm_typ 'PD' occurs as the last element. It will not be the case always. For one ID, it can occur in-between any of the 'RV's and 'RJ's.

+----------+--------+---------+----+-------------------------+ | ID | WEEK_ID|CLAIM_TYP|RANK|DAYS_TO_NEXT_PD_ENCOUNTER| +----------+--------+---------+----+-------------------------+ | 30641314|20180209| RV| 1| 5| | 30641314|20180209| RJ| 2| 4| | 30641314|20180216| RJ| 3| 3| | 30641314|20180216| RJ| 4| 2| | 30641314|20180216| RJ| 5| 1| | 30641314|20180216| PD| 6| 0| | 48115882|20180209| RV| 1| 3| | 48115882|20180209| RV| 2| 2| | 48115882|20180209| RV| 3| 1| | 48115882|20180209| PD| 4| 0| | 48115882|20180216| RJ| 5| null| | 48115882|20180302| RJ| 6| null| +----------+--------+---------+----+-------------------------+

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Shown here is a PySpark solution.

You can use conditional aggregation with max(when...)) to get the necessary difference of ranks with the first 'PD' row. After getting the difference, use a when... to null out rows with negative ranks as they all occur after the first 'PD' row.

# necessary imports 
w1 = Window.partitionBy(df.id).orderBy(df.svc_dt)
df = df.withColumn('rnum',row_number().over(w1))
w2 = Window.partitionBy(df.id)
res = df.withColumn('diff_pd_rank',max(when(df.clm_typ == 'PD',df.rnum)).over(w2) - rnum)
res = res.withColumn('days_to_next_pd_encounter',when(res.diff_pd_rank >= 0,res.diff_pd_rank))
res.show()

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

56.9k users

...