Найти тему

Планы выполнения запросов при работе с API pandas в Apache Spark

Для чего смотреть планы выполнения запросов при работе с API pandas в Spark и как это сделать: примеры использования метода spark.explain() и его аргументов для вывода логических и физических планов. Разбираем на примере PySpark-скрипта.

API pandas и физический план выполнения запроса в Apache Spark

Мы уже писали, что PySpark, API-интерфейс Python в Apache Spark, позволяет работать с популярной библиотекой pandas, которая довольно известна, но по своей природе не очень хорошо работает с большим объемом данных. Тем не менее, многие аналитики данных и дата-инженеры по-прежнему используют pandas в своих PySpark-приложениях. Чтобы делать это более эффективно, можно попытаться избежать дорогостоящих операций с датафреймами, вызвав метод spark.explain(). Он построит физический план выполнения этой операции, который всегда представляет собой RDD. Если вызвать метод spark.explain() с аргументом True, он также покажет и логический план — краткое изложение всех шагов преобразования, которые необходимо выполнить. Логический план позволяет получить наиболее оптимизированную версию пользовательского выражения, но не предоставляет подробную информацию о драйвере (главном узле) или исполнителе (рабочем узле). За создание и хранение логического плана отвечает SparkContext, который также использует API pandas в Apache Spark.

Посмотрим, как это работает, написав несложный PySpark-скрипт с использованием API pandas для генерации датафрейма с зарплатами по разным специальностям, вычисления медианной зарплаты и фильтрации тех строк, где зарплата оказалась выше медианной. PySpark-скрипт, запускаемый в Colab, выглядит таким образом:

-2

Физический план выполнения запроса с датафреймом pandas, вызываемый методом spark.explain() без аргумента True, выглядит так:

-3
Просмотр физического плана выполнения запроса PySpark
Просмотр физического плана выполнения запроса PySpark

Некоторые операции, например, sort_values(), сложнее выполнять в параллельной или распределенной среде, чем в памяти на одном компьютере, поскольку необходимо отправлять данные на другие узлы через сеть. Об этом свидетельствует операция Exchange в плане выполнения запросов. По возможности рекомендуется избегать таких shuffle-операций, поскольку они замедляют вычисления из-за накладных расходов передачи данных по сети.

Логический план выполнения запроса

При вызове метода spark.explain() с аргументом True, выводимая информация будет намного полнее. Теперь она включает не только физический план, но и логический план, причем все стадии работы с ним: от синтаксического анализа до оптимизации перед генерацией физического плана.

-5
-6
Расширенное отображение планов выполнения запроса: логический и физический план
Расширенное отображение планов выполнения запроса: логический и физический план

Вместо аргумента True в методе spark.explain() можно использовать аргумент mode=»extended«, который также выводит логические и физические планы выполнения запроса. А аргумент mode=»formatted» показывает разделенный вывод, созданный на основе оптимизированного физического плана, и раздел с деталями каждого узла:

-8
Вызов метода spark.explain() аргументом mode=»extended»
Вызов метода spark.explain() аргументом mode=»extended»

Когда над объектами Spark вызывается много операций API pandas, базовый планировщик фреймворка может замедлиться из-за огромного и сложного плана. В таком случае можно вызвать метод работы с контрольной точкой DataFrame.spark.checkpoint() или DataFrame.spark.local_checkpoint(), который удалит предыдущий план выполнения и построит его заново в более простом варианте. Результат предыдущего DataFrame сохраняется в настроенной файловой системе при вызове DataFrame.spark.checkpoint() или в исполнителе при вызове DataFrame.spark.local_checkpoint().

Поскольку в Apache Spark раздел является единицей параллелизма, рекомендуется избегать вычислений на одном разделе. Однако, некоторые API, такие как DataFrame.rank, используют оконные функции PySpark без указания раздела. Это перемещает все данные в один раздел на одном узле и может привести к серьезному снижению производительности. Таких API следует избегать для очень больших наборов данных. Например, добавим к фильтрации профессий с зарплатой выше медианной функцию rank() и посмотрим, как это отразится на физическом плане выполнения запроса.

В физическом плане появилась оконная функция, а сам он теперь выглядит так:

-10
Оконные функции в плане выполнения Apache Spark
Оконные функции в плане выполнения Apache Spark

Чтобы rank() не замедлял работу распределенной программы, перемещая данные в один раздел, можно использовать метод groupBy.rank, который менее затратен, поскольку данные можно распределять и вычислять для каждой группы. В ранее приведенном примере нет явно выделяемых групп, поэтому оставим этот совет без иллюстрации.

Узнайте больше про возможности Apache Spark для разработки приложений аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Основы Apache Spark для разработчиков Потоковая обработка в Apache Spark Анализ данных с Apache Spark Машинное обучение в Apache Spark Графовые алгоритмы в Apache Spark Архитектура данных с Apache Spark Статья: https://bigdataschool.ru/blog/news/spark/pandas-on-spark-and-execution-plans.html

Курсы: https://bigdataschool.ru/courses/apache-spark-core https://bigdataschool.ru/courses/apache-spark-structured-streaming https://bigdataschool.ru/courses/apache-spark-sql https://bigdataschool.ru/courses/apache-spark-machine-learning https://bigdataschool.ru/courses/apache-spark-graphframe https://bigdataschool.ru/courses/apache-spark-for-data-engineer

Наш сайт: https://bigdataschool.ru

Копирование, размножение, распространение, перепечатка (целиком или частично), или иное использование материала допускается только с письменного разрешения правообладателя ООО "УЦ Коммерсант"