Найти в Дзене

Dataset vs XCom: что выбрать для обмена данными между задачами в Apache AirFlow

Чем обмен данными через XCom отличается от использования Dataset и какой из механизмов выбирать для обмена данными между задачами Apache Airflow: разбираем на практическом примере. В Apache Airflow есть несколько механизмов для обмена данными между задачами: XCom и набор данных (Dataset). При общей цели они предназначены для разных сценариев и имеют свои особенности. XCom(Cross-Communication) позволяет задачам обмениваться небольшими сообщениями. Обычно XCom используется в следующих случаях: Понятие Dataset появилось в Airflow с версии 2.4, о чем мы писали здесь. Набор данных помогает организовать зависимости между задачами на уровне данных. Dataset определяет, какие данные были обновлены и какие задачи должны быть запущены в ответ на эти изменения. Например, есть задача, которая обновляет таблицу в базе данных, и нужно, чтобы другие задачи запускались только после того, как данные будут обновлены. Dataset используется для отслеживания состояния данных, и задачи, зависящие от этого Dat
Оглавление

Чем обмен данными через XCom отличается от использования Dataset и какой из механизмов выбирать для обмена данными между задачами Apache Airflow: разбираем на практическом примере.

Обмен данными через XCom

В Apache Airflow есть несколько механизмов для обмена данными между задачами: XCom и набор данных (Dataset). При общей цели они предназначены для разных сценариев и имеют свои особенности. XCom(Cross-Communication) позволяет задачам обмениваться небольшими сообщениями. Обычно XCom используется в следующих случаях:

  • когда нужно передать небольшие объемы данных между задачами;
  • когда данные должны быть доступны немедленно после выполнения задачи;
  • когда необходимо передать параметры конфигурации или метаданные.

Понятие Dataset появилось в Airflow с версии 2.4, о чем мы писали здесь. Набор данных помогает организовать зависимости между задачами на уровне данных. Dataset определяет, какие данные были обновлены и какие задачи должны быть запущены в ответ на эти изменения. Например, есть задача, которая обновляет таблицу в базе данных, и нужно, чтобы другие задачи запускались только после того, как данные будут обновлены. Dataset используется для отслеживания состояния данных, и задачи, зависящие от этого Dataset, будут выполнены автоматически.

Обычно набор данных используется в следующих ситуациях:

  • когда необходимо организовать зависимости задач на основе состояния данных;
  • когда одна задача может влиять на выполнение множества других задач в зависимости от обновленных данных;
  • когда данные обновляются асинхронно, и необходимо запускать задачи только после завершения этого процесса.

Таким образом, выбирая между XCom и Dataset, первый вариант подходит, когда нужно передать конкретные значения между задачами в рамках одного DAG, и эти данные являются частью логики выполнения задач. Dataset подойдет для более сложных зависимостей на уровне данных, особенно когда их обновление влияет на выполнение нескольких задач в разных DAG или важна отслеживаемость состояния данных.

Чтобы продемонстрировать, как это работает, напишем DAG из 3-х задач генерации и обработки данных о пользовательских действиях на веб-страницах. DAG называется запускается ежедневно, начиная с текущей даты, без повтора выполнения задач за прошедший период, т.к. catchup=False. 1-ая задача генерирует фейковые данные о пользовательских действиях, 2-ая обрабатывает эти данные для получения статистики, а 3-я логирует результаты в CSV-файл. Обмен данными между задачами организуем через XCom. В качестве стартовой и конечной точки добавим задачи с пустыми операторами. Код DAG-файла, написанный в стиле смешения традиционного и TaskFlow API, выглядит так:

-2
-3
-4

Запустим этот DAG в AirFlow.

Выполнение DAG в AirFlow
Выполнение DAG в AirFlow

XCom-объекты отображаются в интерфейсе фреймворка.

Сохраненные XCom-объекты для обмена данными между задачами
Сохраненные XCom-объекты для обмена данными между задачами

Сгенерированный файл статистики пользовательского поведения сохранен в директории Colab, где я запускала AirFlow. Этот CSV-файл можно посмотреть и скачать.

Созданный файл с данными
Созданный файл с данными

Обмен данными через Dataset в Airflow

Рассмотрим аналогичный пример с Dataset для обмена данными. Dataset предоставляет возможность обмениваться данными между DAG-ами, но его реализация требует некоторой настройки среды. Например, я буду использовать ранее сгенерированный файл в качестве источника данных. Сперва определим два набора данных: исходный файл данных data_file и файл с обработанными данными processed_data_file. Задача create_dataset проверяет существование исходного файла. Если файл не найден или пуст, записывает сообщение об ошибке в лог-файл. Задача use_dataset считывает данные из CSV-файла и подсчитывает количество событий для каждого пользователя, а также записывает результаты в новый CSV-файл user_statistics.csv. Чтобы выполнять эту задачу, если ни одна из предыдущих задач не завершилась с ошибкой и хотя бы одна завершилась успешно, зададим ей правило триггера none_failed_min_one_success. Аналогично предыдущему примеру в качестве стартовой и конечной точки добавим задачи с пустыми операторами. Код DAG-файла выглядит так:

-8
-9

Запустим этот DAG в AirFlow.

Выполнение DAG в AirFlow
Выполнение DAG в AirFlow

Сгенерированные датасеты отображаются в веб-интерфейсе фреймворка.

Созданные датасеты для обмена данными между задачами
Созданные датасеты для обмена данными между задачами

Как и в предыдущем случае, сгенерированный CSV-файл статистики пользовательского поведения сохранен в директории Colab, где запущен AirFlow.

Файл с результатами аналитики данных
Файл с результатами аналитики данных

Обмен данными через XCom-объекты мне показался немного проще, чем через наборы данных. Однако, поскольку XCom имеет ограниченный размер и по умолчанию сохраняется в базе данных метаданных, он подходит только для простых кейсов. Поэтому дата-инженеру полезно знать про возможности Dataset, который может быть подгружен из любого, в т.ч. внешнего источника, и уметь пользоваться этим механизмом.

Узнайте больше про администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Статья

Курсы: AIRF YARF

Наш сайт

Копирование, размножение, распространение, перепечатка (целиком или частично), или иное использование материала допускается только с письменного разрешения правообладателя ООО "УЦ Коммерсант"