Наиболее легкий способ мониторить состояние задач и пайплайнов - это настроить отправку соответствующих уведомлений в телеграм.
Создание ботов в телеграм
Я создал двух ботов в телеграме. В первый бот отправляются уведомления об успешном завершении DAG. У этого бота отключены уведомления, и вообще он отправлен в архив. Так как нужен редко. А второй бот получает уведомления о провалах. У него я звук не отключал. Процесс создания бота в телеграм описан в соседней статье.
Настройка подключений в Airflow
После получения кредов от ботов настраиваем соединения в Airflow: Airflow > Admin > Connections.
Параметры:
- Connection Id — название подключения. Оно указывается в Task
- Connection Type — HTTP.
- Host — ID чата, в который бот будет присылать сообщение.
- Password — Токен бота.
В статье "Построение дашборда DataLens для клиентского сервиса на базе Jivo, AirFlow и ClickHouse" можно почитать подробности.
Установка провайдера Телеграм в Airflow
Перед описанием методов, убеждаемся что в Airflow установлены библиотеки для работы с telegram.
Описание методов отправки уведомлений в телегу
Airflow позволяет задать разные методы, разным статусом на уровне DAG и Task. Я написал два метода.
Отправка события успешного завершения DAG
# Импорт оператора
from airflow.providers.telegram.operators.telegram import TelegramOperator
def on_success_callback(context):
"""
Отправляет в телеграм сообщение об успешном завершении DAG
"""
send_message = TelegramOperator(
task_id='send_message_to_telegram',
telegram_conn_id='telegram_jupyter_to_tg_bot',
text=f"\U00002705 DAG <b>{context['ti'].dag_id}</b>"
f" завершился успешно в {context['ti'].end_date}"
)
return send_message.execute(context=context)
Отправка события ошибки завершения DAG
def on_failure_callback(context):
"""
Отправляет в телеграм сообщение об ошибке во время выполнения DAG
"""
send_message = TelegramOperator(
task_id='send_message_to_telegram',
telegram_conn_id='tg_bad_news',
text=f"\U0000274C DAG <b>{context['ti'].dag_id}</b>"
f" завершился с ошибкой в task {context['ti'].task_id}."
)
return send_message.execute(context=context)
Настройка отправки уведомлений
Добавляем отправку уведомлений в настройки DAG
with DAG(dag_id='test_dag',
schedule='@daily,
start_date=days_ago(0),
on_success_callback=on_success_callback,
on_failure_callback=on_failure_callback,
tags=['test'],
default_args=default_args
) as dag: