Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
671 views
in Technique[技术] by (71.8m points)

python - Can a failed Airflow DAG Task Retry with changed parameter

With Airflow, is it possible to restart an upstream task if a downstream task fails? This seems to be against the "Acyclic" part of the term DAG. I would think this is a common problem though.

Background

I'm looking into using Airflow to manage a data processing workflow that has been managed manually.

There is a task that will fail if a parameter x is set too high, but increasing the parameter value gives better quality results. We have not found a way to calculate a safe but maximally high parameter x. The process by hand has been to restart the job if failed with a lower parameter until it works.

The workflow looks something like this:

Task A - Gather the raw data

Task B - Generate config file for job

Task C - Modify config file parameter x

Task D - Run the data manipulation Job

Task E - Process Job results

Task F - Generate reports

Issue

If task D fails because of parameter x being too high, I want to rerun task C and task D. This doesn't seem to be supported. I would really appreciate some guidance on how to handle this.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

First of all: that's an excellent question, I wonder why it hasn't been discussed widely until now


I can think of two possible approaches

  1. Fusing Operators: As pointed out by @Kris, Combining Operators together appears to be the most obvious workaround

  2. Separate Top-Level DAGs: Read below


Separate Top-Level DAGs approach

Given

  • Say you have tasks A & B
  • A is upstream to B
  • You want execution to resume (retry) from A if B fails

(Possibile) Idea: If your'e feeling adventurous

  • Put tasks A & B in separate top-level DAGs, say DAG-A & DAG-B
  • At the end of DAG-A, trigger DAG-B using TriggerDagRunOperator
    • In all likelihood, you will also have to use an ExternalTaskSensor after TriggerDagRunOperator
  • In DAG-B, put a BranchPythonOperator after Task-B with trigger_rule=all_done
  • This BranchPythonOperator should branch out to another TriggerDagRunOperator that then invokes DAG-A (again!)

Useful references


EDIT-1

Here's a much simpler way that can achieve similar behaviour

How can you re-run upstream task if a downstream task fails in Airflow (using Sub Dags)


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...