Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
517 views
in Technique[技术] by (71.8m points)

airflow - Fusing operators together

I'm still in the process of deploying Airflow and I've already felt the need to merge operators together. The most common use-case would be coupling an operator and the corresponding sensor. For instance, one might want to chain together the EmrStepOperator and EmrStepSensor.


I'm creating my DAGs programmatically, and the biggest one of those contains 150+ (identical) branches, each performing the same series of operations on different bits of data (tables). Therefore clubbing together tasks that make-up a single logical step in my DAG would be of great help.

Here are 2 contending examples from my project to give motivation for my argument.

1. Deleting data from S3 path and then writing new data

This step comprises 2 operators

  • DeleteS3PathOperator: Extends from BaseOperator & uses S3Hook
  • HadoopDistcpOperator: Extends from SSHOperator

2. Conditionally performing MSCK REPAIR on Hive table

This step contains 4 operators

  • BranchPythonOperator: Checks whether Hive table is partitioned
  • MsckRepairOperator: Extends from HiveOperator and performs MSCK REPAIR on (partioned) table
  • Dummy(Branch)Operator: Makes up alternate branching path to MsckRepairOperator (for non-partitioned tables)
  • Dummy(Join)Operator: Makes up the join step for both branches

Using operators in isolation certainly offers smaller modules and more fine-grained logging / debugging, but in large DAGs, reducing the clutter might be desirable. From my current understanding there are 2 ways to chain operators together

  1. Hooks

    Write actual processing logic in hooks and then use as many hooks as you want within a single operator (Certainly the better way in my opinion)

  2. SubDagOperator

    A risky and controversial way of doing things; additionally the naming convention for SubDagOperator makes me frown.


My questions are

  • Should operators be composed at all or is it better to have discrete steps?
  • Any pitfalls, improvements in above approaches?
  • Any other ways to combine operators together?
  • In taxonomy of Airflow, is the primary motive of Hooks same as above, or do they serve some other purposes too?

UPDATE-1

3. Multiple Inhteritance

While this is a Python feature rather than Airflow specific, its worthwhile to point out that multiple inheritance can come handy in combining functionalities of operators. QuboleCheckOperator, for instance, is already written using that. However in the past, I've tried this thing to fuse EmrCreateJobFlowOperator and EmrJobFlowSensor, but at the time I had run into issues with @apply_defaults decorator and had abandoned the idea.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I have combined various hooks to create a Single operator based on my needs. A simple example is I clubbed gcs delete, copy, list method and get_size methods in hook to create a single operator called GcsDataValidationOperator. A rule of thumb would be to have Idempotency i.e. if you run multiple times it should produce the same result.

Should operators be composed at all or is it better to have discrete steps?

The only pitfall is maintainability, sometimes when the hooks change in the master branch, you will need to update all your operator manually if there are any breaking changes.

Any pitfalls, improvements in above approaches?

You can use PythonOperator and use the in-built hooks with .execute method, but it would still mean a lot of details in the DAG file. Hence, I would still go for a new operator approach

Any other ways to combine operators together?

Hooks are just interfaces to external platforms and databases like Hive, GCS, etc and form building blocks for operators. This allows the creation of new operators. Also, this mean you can customize templated field, add slack notification on each granular step inside your new operator and have your own logging details.

In taxonomy of Airflow, is the primary motive of Hooks same as above, or do they serve some other purposes too?

FWIW: I am the PMC member and a contributor of the Airflow project.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...