Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
392 views
in Technique[技术] by (71.8m points)

spark-shell scala to map the exchange rate value in dataframe

I have a dataframe df

+-----+--------+----------+-----+
|count|currency|      date|value|
+-----+--------+----------+-----+
|    3|     GBP|2021-01-14|    4|
|  102|     USD|2021-01-14|    3|
|  234|     EUR|2021-01-14|    5|
|   28|     GBP|2021-01-16|    5|
|   48|     USD|2021-01-16|    7|
|   68|     EUR|2021-01-15|    6|
|   20|     GBP|2021-01-15|    1|
|   33|     EUR|2021-01-17|    2|
|  106|     GBP|2021-01-17|   10|
+-----+--------+----------+-----+

I have a separate dataframe for USD exchange_rate

val exchange_rate = spark.read.format("csv").load("/Users/khan/data/exchange_rate.csv")
exchange_rate.show()

INSERTTIME  EXCAHNGERATE    CURRENCY
2021-01-14  0.731422        GBP
2021-01-14  0.784125        EUR
2021-01-15  0.701922        GBP
2021-01-15  0.731422        EUR
2021-01-16  0.851422        GBP
2021-01-16  0.721128        EUR
2021-01-17  0.771621        GBP
2021-01-17  0.751426        EUR

I want to convert the GBP and EUR currency to USD in df by looking in the exchange_rate dataframe corresponding to the date

My Idea

import com.currency_converter.CurrencyConverter from http://xavierguihot.com/currency_converter/#com.currency_converter.CurrencyConverter$

is there a simpler way to do it?

question from:https://stackoverflow.com/questions/65901406/spark-shell-scala-to-map-the-exchange-rate-value-in-dataframe

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can use a correlated subquery (a fancy way of doing joins):

df.createOrReplaceTempView("df1")
exchange_rate.createOrReplaceTempView("df2")

val result = spark.sql("""
select count, 'USD' as currency, date, value,
    value * coalesce(
        (select min(df2.EXCAHNGERATE)
         from df2
         where df1.date = df2.INSERTTIME and df1.currency = df2.CURRENCY),
        1  -- use 1 as exchange rate if no exchange rate found
    ) as converted
from df1
""")

result.show
+-----+--------+----------+-----+---------+
|count|currency|      date|value|converted|
+-----+--------+----------+-----+---------+
|    3|     USD|2021-01-14|    4| 2.925688|
|  102|     USD|2021-01-14|    3|      3.0|
|  234|     USD|2021-01-14|    5| 3.920625|
|   28|     USD|2021-01-16|    5|  4.25711|
|   48|     USD|2021-01-16|    7|      7.0|
|   68|     USD|2021-01-15|    6| 4.388532|
|   20|     USD|2021-01-15|    1| 0.701922|
|   33|     USD|2021-01-17|    2| 1.502852|
|  106|     USD|2021-01-17|   10|  7.71621|
+-----+--------+----------+-----+---------+

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...