Добавить в корзинуПозвонить
Найти в Дзене
дата инженеретта

Airflow для менеджеров

В Ariflow 3.1 появилась группа hitl-операторов, которая позволяет что-то вводить пользаку во время работы дага HITL = Human-in-the-loop 🫠 Что делает пример на картинках? Мы задаем даты, выбираем сервисы из списка, вводим почту и получаем отчет. Операторы по сути помогают менеджерам не дергать DA/DE своими адхоками, а пойти самому накликать и выгрузить то, что нужно 🤓 А как это происходит? В базовых примерах после разворачивания airflow появится даг example_hitl_operator. Там есть несколько операторов из пакета airflow.providers.standard.operators.hitl: 🤩HITLOperator — выбрать одну или несколько опций из списка 🤩HITLEntryOperator — ввести любой текст 🤩HITLBranchOperator — выбрать следующую таску 🤩ApprovalOperator — одобрить или отклонить В коде выглядит это вот так: wait_for_multiple_options = HITLOperator( task_id="wait_for_multiple_options", subject="Please choose option to proceed: ", options=["option 1", "option 2", "option 3"], multiple=True, defaults=["option 1"], )

Airflow для менеджеров

В Ariflow 3.1 появилась группа hitl-операторов, которая позволяет что-то вводить пользаку во время работы дага

HITL = Human-in-the-loop

🫠 Что делает пример на картинках?

Мы задаем даты, выбираем сервисы из списка, вводим почту и получаем отчет. Операторы по сути помогают менеджерам не дергать DA/DE своими адхоками, а пойти самому накликать и выгрузить то, что нужно

🤓 А как это происходит?

В базовых примерах после разворачивания airflow появится даг example_hitl_operator. Там есть несколько операторов из пакета airflow.providers.standard.operators.hitl:

🤩HITLOperator — выбрать одну или несколько опций из списка

🤩HITLEntryOperator — ввести любой текст

🤩HITLBranchOperator — выбрать следующую таску

🤩ApprovalOperator — одобрить или отклонить

В коде выглядит это вот так:

wait_for_multiple_options = HITLOperator(

task_id="wait_for_multiple_options",

subject="Please choose option to proceed: ",

options=["option 1", "option 2", "option 3"],

multiple=True,

defaults=["option 1"],

)

Когда заходите в UI, после запуска дага появляется доп вкладка Required Actions (1), где и нужно прожать опцию. Это все потом отправляется в xcom:

{

"params_input": {},

"responded_at": "datetime.datetime@version=2(tz=(UTC,pendulum.tz.timezone.Timezone,1,True),timestamp=1778505898.269782)",

"chosen_options": [

"option 2",

"option 3"

],

"responded_by_user": {

"id": "1",

"name": "airflow"

}

}

Из xcom потом можно достать в других тасках. Сначала обращаетесь по названию таски, а потом работаете, как с обычным словарем:

{{ ti.xcom_pull(task_ids='wait_for_multiple_options')["chosen_options"] }}

{{ ti.xcom_pull(task_ids='wait_for_input')["params_input"]["information"] }}

Use case очень прикольный. Интересно, а вот на практике этим будут пользоваться?

@data_engineerette

-2
-3