1. ホーム
  2. python

[解決済み] Airflowで条件付きタスクを作成する方法

2022-02-17 14:45:20

質問

Airflowで下記のスキーマのような条件付きタスクを作りたいのですが、どうすればいいですか?想定されるシナリオは以下の通りです。

  • タスク1が実行される
  • タスク1が成功したら、タスク2aを実行する。
  • タスク1が失敗した場合、タスク2bを実行します。
  • 最後にタスク3を実行する

上記のタスクは全てSSHExecuteOperatorです。 ShortCircuitOperatorと/またはXComを使って条件を管理する必要があると思うのですが、どのように実装すればよいのかがよくわかりません。解決策を説明していただけませんか?

解決方法を教えてください。

を使用する必要があります。 エアフロールトリガールール

すべてのオペレータは、生成されたタスクがトリガーされるルールを定義するtrigger_rule引数を持っています。

トリガールールの可能性。

ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'

ここで、あなたの問題を解決するためのアイデアを紹介します。

from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook

sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)

task_1 = SSHExecuteOperator(
        task_id='task_1',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2 = SSHExecuteOperator(
        task_id='conditional_task',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2a = SSHExecuteOperator(
        task_id='task_2a',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)

task_2b = SSHExecuteOperator(
        task_id='task_2b',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_FAILED,
        ssh_hook=sshHook,
        dag=dag)

task_3 = SSHExecuteOperator(
        task_id='task_3',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ONE_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)


task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)