Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.3k views
in Technique[技术] by (71.8m points)

dataframe - Compare two values with Scala Spark

I got the next parquet file:

+--------------+------------+-------+
|gf_cutoff     | country_id |gf_mlt |
+--------------+------------+-------+
|2020-12-14    |DZ          |5      |
|2020-08-06    |DZ          |4      |
|2020-07-03    |DZ          |4      |
|2020-12-14    |LT          |1      |
|2020-08-06    |LT          |1      |
|2020-07-03    |LT          |1      |

As you can see is particioned by country_id and ordered by gf_cutoff DESC. What I want to do es compare gf_mlt to check if the value has changed. To do that I want to compare the most recently gf_cutoff with the second one.

A example of this case would be compare:

 2020-12-14 DZ 5
with
 2020-08-06 DZ 4

And I want to write in a new column, if the value of the most recent date is different of the second row, put in a new column, the most recent value that is 5 for DZ and put in another column True if the value has changed or false if has not changed. Afther did this comparation, delete the rows with the older rows.

For DZ has changed and for LT hasn't changed because is all time 1.

So the output would be like this:

+--------------+------------+-------+------------+-----------+
|gf_cutoff     | country_id |gf_mlt | Has_change | old_value |
+--------------+------------+-------+------------+-----------+
|2020-12-14    |DZ          |5      |    True    |     4     |
|2020-12-14    |LT          |1      |    False   |     1     |

If you need more explanation, just tell me it.

question from:https://stackoverflow.com/questions/65842685/compare-two-values-with-scala-spark

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can use lag over an appropriate window to get the most recent value, and then filter the most recent rows using a row_number over another appropriate window:

import org.apache.spark.sql.expressions.Window

val df2 = df.withColumn(
    "last_value",
    lag("gf_mlt", 1).over(Window.partitionBy("country_id").orderBy("gf_cutoff"))
).withColumn(
    "rn", 
    row_number().over(Window.partitionBy("country_id").orderBy(desc("gf_cutoff")))
).filter("rn = 1").withColumn(
    "changed",
    $"gf_mlt" === $"last_value"
).drop("rn")

df2.show
+----------+----------+------+----------+-------+
| gf_cutoff|country_id|gf_mlt|last_value|changed|
+----------+----------+------+----------+-------+
|2020-12-14|        DZ|     5|         4|  false|
|2020-12-14|        LT|     1|         1|   true|
+----------+----------+------+----------+-------+

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...