Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
195 views
in Technique[技术] by (71.8m points)

python - Unexpected Airflow behaviour in dynamic task generation

For reasons acceptable to me, I am trying to dynamically generate ExternalTaskSensor tasks with different execution_date_fn in each iteration. Callable provided to execution_date_fn kwarg requires to have dt as input and provide execution_date as output, which I am writing down as a lambda function, e.g. lambda dt: get_execution_date(i).

I noticed that execution_date_fn provided as a lambda function in a loop results in unexpected behaviour - all generated tasks have the same execution_date

I noticed that this behaviour is not intrinsic to ExternalTaskSensor but originates somewhere else. This behaviour can be seen in this example:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

dag = DAG(
    'test_lambda',
    schedule_interval=None,
    start_date=datetime(2021,1,1),
    catchup=False
)

for task_id in ['task1', 'task2']:
    task = PythonOperator(
        task_id='printer_'+task_id,
        python_callable=lambda: print(task_id),
        dag=dag
    )

This results in both tasks printer_task1 and printer_task2 printing 'task2' in logs.

I have managed to correct the behaviour by moving sensor instantiation into a function:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def create_task(task_id):
    task = PythonOperator(
        task_id='printer_'+task_id,
        python_callable=lambda: print(task_id),
        dag=dag
    )
    return task

dag = DAG(
    'test_lambda',
    schedule_interval=None,
    start_date=datetime(2021,1,1),
    catchup=False
)

for task_id in ['task1', 'task2']:
    task = create_task(task_id)

In this case task printer_task1 prints 'task1' and printer_task2 prints 'task2' in the logs.

I would be interested to know why am I observing such behaviour?

DISCLAIMER: I am aware that normal way to provide arguments to a PythonOperator is via op_args kwarg. Lambda functions were used solely to provide an example as op_args option is not available in ExternalTaskSensor when using execution_date_fn.

EDIT: This is a lambda issue and not Airflow-specific. Official Python documentation has a topic on the issue: https://docs.python.org/3/faq/programming.html#why-do-lambdas-defined-in-a-loop-with-different-values-all-return-the-same-result


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

This has little to do with Airflow, it is a lambda issue:

>>> ls = [lambda: i for i in [1,2]]
>>> ls[0]()
2
>>> ls[1]()
2

To know why it does that, I recommend reading that Stackoverflow post that will probably explains why better than I could


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...