Найти в Дзене
ЦифроПроф

Обработка и качество данных в PySpark

Для обработки пропущенных значений используют функцию dropna(), а также fillna(). Задача 1 Удалите из датафрейма пропущенные значения. Затем напечатайте на экране количество строк в датафрейме. import numpy as np import pandas as pd from pyspark.sql import SparkSession APP_NAME = "DataFrames" SPARK_URL = "local[*]" spark = SparkSession.builder.appName(APP_NAME) \ .config('spark.ui.showConsoleProgress', 'false') \ .getOrCreate() taxi = spark.read.load('/datasets/pickups_terminal_5.csv', format='csv', header='true', inferSchema='true') taxi = taxi.dropna() print(taxi.count()) Задача 2 Заполните пропущенные значения в датафрейме нулями. Функцией describe() выведите на экран результаты, чтобы убедиться в корректности заполнения значений. import numpy as np import pandas as pd from pyspark.sql import SparkSession APP_NAME = "DataFrames" SPARK_URL = "local[*]" spark = SparkSession.builder.appName(APP_NAME) \ .config('spark.ui.showConsoleProgress', 'false') \ .getOrCreate() taxi = spark.read.
Оглавление
Фото из открытых источников
Фото из открытых источников

Для обработки пропущенных значений используют функцию dropna(), а также fillna().

Задача 1

Удалите из датафрейма пропущенные значения. Затем напечатайте на экране количество строк в датафрейме.

import numpy as np

import pandas as pd

from pyspark.sql import SparkSession

APP_NAME = "DataFrames"

SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME) \

.config('spark.ui.showConsoleProgress', 'false') \

.getOrCreate()

taxi = spark.read.load('/datasets/pickups_terminal_5.csv',

format='csv', header='true', inferSchema='true')

taxi = taxi.dropna()

print(taxi.count())

Задача 2

Заполните пропущенные значения в датафрейме нулями. Функцией describe() выведите на экран результаты, чтобы убедиться в корректности заполнения значений.

import numpy as np

import pandas as pd

from pyspark.sql import SparkSession

APP_NAME = "DataFrames"

SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME) \

.config('spark.ui.showConsoleProgress', 'false') \

.getOrCreate()

taxi = spark.read.load('/datasets/pickups_terminal_5.csv',

format='csv', header='true', inferSchema='true')

taxi=taxi.fillna(0)

print(taxi.describe().show())

SQL-запросы в датафреймах

Для выполнения SQL-запроса необходимо импортировать SparkSession.

Задача 1

Изучите статистические выбросы. В переменной result сохраните результат запроса, который выберет строки с заказами такси у терминала №5. Расположите их от большего числа заказов в день к меньшему. Выведите на экран первые пять строк, используя функцию show.

from pyspark.sql import SparkSession

APP_NAME = "DataFrames"

SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME) \

.config('spark.ui.showConsoleProgress', 'false') \

.getOrCreate()

taxi = spark.read.load('/datasets/pickups_terminal_5.csv',

format='csv', header='true', inferSchema='true')

taxi = taxi.fillna(0)

taxi.registerTempTable("taxi")

result = spark.sql("SELECT date, hour, minute, pickups FROM taxi order by pickups desc limit 5")

print(result.show())

Задача 2

from pyspark.sql import SparkSession

APP_NAME = "DataFrames"

SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME) \

.config('spark.ui.showConsoleProgress', 'false') \

.getOrCreate()

taxi = spark.read.load('/datasets/pickups_terminal_5.csv',

format='csv', header='true', inferSchema='true')

taxi = taxi.fillna(0)

taxi.registerTempTable("taxi")

result = spark.sql("SELECT COUNT(DISTINCT(date)) FROM taxi WHERE pickups > 200")

print(result.show())

Группировка и агрегирующие функции в PySpark

Для группировки данных в PySpark используется функция groupby()

Задача 1

Сгруппируйте записи по месяцам. По каждому месяцу рассчитайте среднее количество заказов.

Напечатайте на экране таблицу с месяцами и средним количеством заказов по убыванию.

from pyspark.sql import SparkSession

APP_NAME = "DataFrames"

SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME) \

.config('spark.ui.showConsoleProgress', 'false') \

.getOrCreate()

taxi = spark.read.load('/datasets/pickups_terminal_5.csv',

format='csv', header='true', inferSchema='true')

taxi = taxi.fillna(0)

taxi.registerTempTable("taxi")

print(spark.sql('SELECT EXTRACT(MONTH FROM date), AVG(pickups) FROM taxi '

'GROUP BY EXTRACT(MONTH FROM date) ORDER BY AVG(pickups) DESC').show())

Задача 2

Вычислите среднее количество заказов за каждый час. Затем отсортируйте данные по убыванию.

Выведите самые загруженные 10 часов и среднее количество заказов такси в эти часы.

from pyspark.sql import SparkSession

APP_NAME = "DataFrames"

SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME) \

.config('spark.ui.showConsoleProgress', 'false') \

.getOrCreate()

taxi = spark.read.load('/datasets/pickups_terminal_5.csv',

format='csv', header='true', inferSchema='true')

taxi = taxi.fillna(0)

taxi.registerTempTable("taxi")

print(spark.sql('SELECT hour, AVG(pickups) FROM taxi '

'GROUP BY hour ORDER BY AVG(pickups) DESC').show(10))