Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
515 views
in Technique[技术] by (71.8m points)

python - Automatically generating ExternalTaskSensor where execution date depends only on DAG id

Help me crack this one. I am looking for an elegant solution for dynamically generating ExternalTaskSensor tasks in Airflow with unique execution_date_fn functions while avoiding problems arising from function scopes.

I am trying to create a DAG that depends on several other DAGs by that they shouldn't run simultaneously. I am using ExternalTaskSensor operators, best illustrated visually:

sensor dag

Sensor tasks, e.g. sense_dag_1, rely on DagRun object to find the latest execution_date of e.g. dag_1. This method, called get_execution_date in my code, is created and passed to the ExternalTaskOperator via execution_date_fn kwarg. This works well when "manually" creating tasks as shown in example:

sensor = ExternalTaskSensor(
        task_id='sense_dag_1',
        external_dag_id='dag_1',
        execution_date_fn=lambda dt: get_execution_date('dag_1'),
        dag=dag
    )

Note that a little workaround is used, providing get_execution_date to ExternalTaskSensor as a lambda function with input dt. This solution, however, faces a problem when automatically generating multiple ExternalTaskSensor tasks for a list of DAG names, as seen in naive example:

for dag_id in ['dag_1', 'dag_2']:
    sensor = ExternalTaskSensor(
        task_id='sense_'+dag_id,
        external_dag_id=dag_id,
        execution_date_fn=lambda dt: get_execution_date(dag_id),
        dag=dag
    )

When creating tasks this way, all tasks are set with the same execution_date, coinciding with execution_date of the last dag_id in the list. I am aware that this problem is to do with functions and scope, as described in Python's official documentation.

We end up with two limiting factors:

  • execution_date_fn only accepts dt and **context as arguments
  • Lambda functions (and functions in general) require arguments defined in function scope in order to correctly assign variable values.

QUESTION: How do I dynamically generate ExternalTaskSensor tasks where execution_date only depends on dag_id?

I have one solution in mind: adding tasks upstream of sensors that get the execution_date and push it to context. However, in this case order of task execution becomes important, making the end DAG look something like this: employ context This is not aesthetically pleasing and I am looking for something better


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I remembered a working solution: move sensor instantiation to a separate function. This way the variable scope is separated for loop and task creation. Code example:

def create_sensor(dag_id):
    task = ExternalTaskSensor(
        task_id='sense_'+dag_id,
        external_dag_id=dag_id,
        execution_date_fn=lambda dt: get_execution_date(dag_id),
        dag=dag
    )
    return task

for dag_id in ['dag_1', 'dag_2']:
    sensor = create_sensor(dag_id)

More on the topic here: Unexpected Airflow behaviour in dynamic task generation


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...