Help me crack this one. I am looking for an elegant solution for dynamically generating ExternalTaskSensor
tasks in Airflow with unique execution_date_fn
functions while avoiding problems arising from function scopes.
I am trying to create a DAG that depends on several other DAGs by that they shouldn't run simultaneously. I am using ExternalTaskSensor
operators, best illustrated visually:
Sensor tasks, e.g. sense_dag_1
, rely on DagRun
object to find the latest execution_date
of e.g. dag_1
. This method, called get_execution_date
in my code, is created and passed to the ExternalTaskOperator
via execution_date_fn
kwarg. This works well when "manually" creating tasks as shown in example:
sensor = ExternalTaskSensor(
task_id='sense_dag_1',
external_dag_id='dag_1',
execution_date_fn=lambda dt: get_execution_date('dag_1'),
dag=dag
)
Note that a little workaround is used, providing get_execution_date
to ExternalTaskSensor
as a lambda function with input dt
. This solution, however, faces a problem when automatically generating multiple ExternalTaskSensor
tasks for a list of DAG names, as seen in naive example:
for dag_id in ['dag_1', 'dag_2']:
sensor = ExternalTaskSensor(
task_id='sense_'+dag_id,
external_dag_id=dag_id,
execution_date_fn=lambda dt: get_execution_date(dag_id),
dag=dag
)
When creating tasks this way, all tasks are set with the same execution_date
, coinciding with execution_date
of the last dag_id
in the list. I am aware that this problem is to do with functions and scope, as described in Python's official documentation.
We end up with two limiting factors:
execution_date_fn
only accepts dt
and **context
as arguments
- Lambda functions (and functions in general) require arguments defined in function scope in order to correctly assign variable values.
QUESTION: How do I dynamically generate ExternalTaskSensor
tasks where execution_date
only depends on dag_id
?
I have one solution in mind: adding tasks upstream of sensors that get the execution_date
and push it to context
. However, in this case order of task execution becomes important, making the end DAG look something like this:
This is not aesthetically pleasing and I am looking for something better