Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
228 views
in Technique[技术] by (71.8m points)

sql - Daily forecast on a PySpark dataframe

I have the following dataframe in PySpark:

DT_BORD_REF: Date column for the month
REF_DATE: A date reference for current day separating past and future
PROD_ID: Product ID
COMPANY_CODE: Company ID
CUSTOMER_CODE: Customer ID
MTD_WD: Month to Date count of working days (Date = DT_BORD_REF)
QUANTITY: Number of items sold
QTE_MTD: Number of items month to date

+-------------------+-------------------+-----------------+------------+-------------+-------------+------+--------+-------+
|        DT_BORD_REF|           REF_DATE|          PROD_ID|COMPANY_CODE|CUSTOMER_CODE|COUNTRY_ALPHA|MTD_WD|QUANTITY|QTE_MTD|
+-------------------+-------------------+-----------------+------------+-------------+-------------+------+--------+-------+
|2020-11-02 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     1|     4.0|    4.0|
|2020-11-05 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     3|    null|    4.0|
|2020-11-06 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     4|    null|    4.0|
|2020-11-09 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     5|    null|    4.0|
|2020-11-10 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     6|    null|    4.0|
|2020-11-11 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     7|    null|    4.0|
|2020-11-12 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     8|    null|    4.0|
|2020-11-13 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     9|    null|    4.0|
|2020-11-16 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    10|    null|    4.0|
|2020-11-17 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    11|    null|    4.0|
|2020-11-18 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    12|    null|    4.0|
|2020-11-19 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    13|    null|    4.0|
|2020-11-20 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    14|    null|    4.0|
|2020-11-23 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    15|    null|    4.0|
|2020-11-24 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    16|    null|    4.0|
|2020-11-25 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    17|    null|    4.0|

for DT_BORD_REF < REF_DATE all rows are actual sales and do not necessarily occurs every working day. Sometimes happens in non working days too.

for DT_BORD_REF >= REF_DATE there are no sales (it's the future)

The objective is to forecast the sales for all future rows using the formula: QTE_MTD/MTD_WD calculated on the REF_DATE for each product, customer and country.

The QTE_MTD was calculated from QUANTITY column using a window function. I need to divide that for MTD_WD on REF_DATE which in this exemple is 3

How can I add a column with MTD_WD on REF_DATE partitioning by product, customer and country?

In other words, I need to add a column with the first occurrence of MTD_WD when the condition DT_BORD_REF > REF_DATE is met (again, which is 3 in this exemple) for each product, customer and country.

This dataset has millions of row for different products, customers and countries The working days are provided by country

Hope it was clear :)

question from:https://stackoverflow.com/questions/65941041/daily-forecast-on-a-pyspark-dataframe

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can use first with ignorenulls=True, and when with the appropriate condition, to get the first MTD_WD where DT_BORD_REF > REF_DATE:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'val',
    F.first(
        F.when(
            F.col('DT_BORD_REF') > F.col('REF_DATE'),
            F.col('MTD_WD')
        ), 
        ignorenulls=True
    ).over(
        Window.partitionBy('PROD_ID','COMPANY_CODE','CUSTOMER_CODE','COUNTRY_ALPHA')
              .orderBy('DT_BORD_REF')
              .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    )
)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...