Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
647 views
in Technique[技术] by (71.8m points)

google cloud composer - Loop many times on many airflow tasks on one dag

I am creating one dag that will have following structure of tasks. This DAG will be schedule to run on everyday at 1:00 AM UTC time. Get rows from database ---- loop on rows to run many task that require each row data.

For example I have method in my DAG that call MySQL database and return many rows .Each row data I have to pass in 4 task as a parameter. I have followed some google search docs but that is not running correctly.

return_db_result is method to get result from Cloud SQL in GCP.

def return_result():
    db_engine_connection = create_cloud_sql_connection()
    session = get_db_session(db_engine_connection)
    result = session.query(Scheduled).filter(Scheduled.job_status == "Scheduled").all()
    session.commit()
    return result

I tried using for loop like something following

for row in return_result():
    op1 = operator({ param=row.id})
    op2 = operator({ param=row.id})
    op3 = operator({ param=row.id})
    op4 = operator({ param=row.id})
    op1 >> op2 >> op3 >> op4

But these task does not show on airflow UI.

question from:https://stackoverflow.com/questions/65651790/loop-many-times-on-many-airflow-tasks-on-one-dag

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Based on your comments assuming your operator is:

class MyOperator(BaseOperator):
    @apply_defaults
    def __init__(self,
                 input_id,
                 input_date,
                 input_status,
                 *args, **kwargs):

        super(MyOperator, self).__init__(*args, **kwargs)
        self.input_id=input_id
        self.input_date=input_date
        self.input_status=input_status

    def execute(self, context):
        pass

You can use it as follows:

start_op = DummyOperator(task_id='start_op')
for row in return_db_result:
    op1 = MyOperator(task_id=f"op1_{row.id}", input_id=row.id, input_date=row.date, input_status=row.status)
    op2 = MyOperator(task_id=f"op2_{row.id}", input_id=row.id, input_date=row.date, input_status=row.status)
    op3 = MyOperator(task_id=f"op3_{row.id}", input_id=row.id, input_date=row.date, input_status=row.status)
    op4 = MyOperator(task_id=f"op4_{row.id}", input_id=row.id, input_date=row.date, input_status=row.status)
    start_op >> op1 >> op2 >> op3 >> op4

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...