Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
188 views
in Technique[技术] by (71.8m points)

python - Airflow: how to use trigger parameters in functions

We are using Airflow's KubernetesPodOperator for our data pipelines. What we would like to add is the option to pass in parameters via the UI.

We currently use it in a way that we have different yaml files that are storing the parameters for the operator, and instead of calling the operator directly we are calling a function that does some prep and returns the operator like this:

def prep_kubernetes_pod_operator(yaml):

    # ... read yaml and extract params

    return KubernetesPodOperator(params)

with DAG(...):
    
    task1 = prep_kubernetes_pod_operator(yaml)

For us this works well and we can keep our dag files pretty lightweight, however now we would like to add the functionality that we can add some extra params via the UI. I understand that the trigger params can be accessed via kwargs['dag_run'].conf, but I had no success pulling these into the Python function.

Another thing I tried is to create a custom operator because that recognises the args, but I couldn't manage to call the KubernetesPodOperator in the execute part (and I guess calling an operator in an operator is not right solution anyways).

Update:

Following NicoE's advice, I started to extend the KubernetesPodOperator instead.

The error I am having now is that when I am parsing the yaml and assign the arguments after, the parent arguments become tuples and that throws a type error.

dag:

task = NewKPO(
    task_id="task1",
    yaml_path=yaml_path)

operator:

class NewKPO(KubernetesPodOperator):
   @apply_defaults
   def __init__(
           self,
           yaml_path: str,
           name: str = "default",
           *args,
           **kwargs) -> None:
       self.yaml_path = yaml_path
       self.name = name
       super(NewKPO, self).__init__(
           name=name, # DAG is not parsed without this line - 'key has to be string'
           *args,
           **kwargs)

   def execute(self, context):
       # parsing yaml and adding context["dag_run"].conf (...)
       self.name = yaml.name
       self.image = yaml.image
       self.secrets = yaml.secrets
       #(...) if i run a type(self.secrets) here I will get tuple
       return super(NewKPO, self).execute(context)
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You could use params, which is a dictionary that can be defined at DAG level parameters and remains accesible in every task. Works for every operator derived from BaseOperator and can also be set from the UI.

The following example shows how to use it with different operators. params could be defined in default_args dict or as arg to the DAG object.

default_args = {
    "owner": "airflow",
    'params': {
        "param1": "first_param",
        "param2": "second_param"
    }
}

dag = DAG(
    dag_id="example_dag_params",
    default_args=default_args,
    start_date=days_ago(1),
    schedule_interval="@once",
    tags=['example_dags'],
    catchup=False
)

When triggering this DAG from the UI you could add an extra param:

set params while triggering DAG from the UI

Params could be accessed in templated fields, as in BashOperator case:

with dag:

    bash_task = BashOperator(
        task_id='bash_task',
        bash_command='echo bash_task: {{ params.param1 }}')

bash_task logs output:

{bash.py:158} INFO - Running command: echo bash_task: first_param
{bash.py:169} INFO - Output:
{bash.py:173} INFO - bash_task: first_param
{bash.py:177} INFO - Command exited with return code 0

Params are accessible within execution context, like in python_callable:


    def _print_params(**kwargs):
        print(f"Task_id: {kwargs['ti'].task_id}")
        for k, v in kwargs['params'].items():
            print(f"{k}:{v}")

    python_task = PythonOperator(
        task_id='python_task',
        python_callable=_print_params,
    )

Output:

{logging_mixin.py:104} INFO - Task_id: python_task
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param3:param_from_the_UI

You could also add params at task level definition:

    python_task_2 = PythonOperator(
        task_id='python_task_2',
        python_callable=_print_params,
        params={'param4': 'param defined at task level'}
    )

Output:

{logging_mixin.py:104} INFO - Task_id: python_task_2
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param4:param defined at task level
{logging_mixin.py:104} INFO - param3:param_from_the_UI

Following the example you could define a custom Operator that inhertis from BaseOperator:

class CustomDummyOperator(BaseOperator):

    @apply_defaults
    def __init__(self, custom_arg: str = 'default', *args, **kwargs) -> None:
        self.custom_arg = custom_arg
        super(CustomDummyOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print(f"Task_id: {self.task_id}")
        for k, v in context['params'].items():
            print(f"{k}:{v}")

An the task example would be:

    custom_op_task = CustomDummyOperator(
        task_id='custom_operator_task'
    )

Output:

{logging_mixin.py:104} INFO - Task_id: custom_operator_task
{logging_mixin.py:104} INFO - custom_arg: default
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param3:param_from_the_UI

Imports:

from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.models import BaseOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.utils.decorators import apply_defaults

I hope that works for you!


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...