Найти тему
In Data We Trust

Отправка уведомлений Airflow в Telegram

Оглавление

Наиболее легкий способ мониторить состояние задач и пайплайнов - это настроить отправку соответствующих уведомлений в телеграм.

Создание ботов в телеграм

Я создал двух ботов в телеграме. В первый бот отправляются уведомления об успешном завершении DAG. У этого бота отключены уведомления, и вообще он отправлен в архив. Так как нужен редко. А второй бот получает уведомления о провалах. У него я звук не отключал. Процесс создания бота в телеграм описан в соседней статье.

Настройка подключений в Airflow

После получения кредов от ботов настраиваем соединения в Airflow: Airflow > Admin > Connections.

Параметры:

  • Connection Id — название подключения. Оно указывается в Task
  • Connection Type — HTTP.
  • Host — ID чата, в который бот будет присылать сообщение.
  • Password — Токен бота.

В статье "Построение дашборда DataLens для клиентского сервиса на базе Jivo, AirFlow и ClickHouse" можно почитать подробности.

Установка провайдера Телеграм в Airflow

Перед описанием методов, убеждаемся что в Airflow установлены библиотеки для работы с telegram.

-2

Описание методов отправки уведомлений в телегу

Airflow позволяет задать разные методы, разным статусом на уровне DAG и Task. Я написал два метода.

Отправка события успешного завершения DAG

# Импорт оператора
from airflow.providers.telegram.operators.telegram import TelegramOperator
def on_success_callback(context):
"""
Отправляет в телеграм сообщение об успешном завершении DAG
"""
send_message = TelegramOperator(
task_id='send_message_to_telegram',
telegram_conn_id='telegram_jupyter_to_tg_bot',
text=f"\U00002705 DAG <b>{context['ti'].dag_id}</b>"
f" завершился успешно в {context['ti'].end_date}"
)
return send_message.execute(context=context)

Отправка события ошибки завершения DAG

def on_failure_callback(context):
"""
Отправляет в телеграм сообщение об ошибке во время выполнения DAG
"""
send_message = TelegramOperator(
task_id='send_message_to_telegram',
telegram_conn_id='tg_bad_news',
text=f"\U0000274C DAG <b>{context['ti'].dag_id}</b>"
f" завершился с ошибкой в task {context['ti'].task_id}."
)
return send_message.execute(context=context)

Настройка отправки уведомлений

Добавляем отправку уведомлений в настройки DAG

with DAG(dag_id='test_dag',
schedule='@daily,
start_date=days_ago(0),
on_success_callback=on_success_callback,
on_failure_callback=on_failure_callback,
tags=['test'],
default_args=default_args
) as dag: