Найти в Дзене
10,2 тыс подписчиков

⏩ Методы ускорения кода  часть 2 : Параллелизация


Большинство современных компьютеров имеют процессор с более чем одним ядром, т.е. с возможностью делать несколько вычислений одновременно. Более того, часто аналитики и DS работают на сервере (например, c JupyterHub), у которого число ядер может достигать сотен.

Изначально интерпретатор python — однопоточная программа с глобальным локом во время вычисления, а значит в ванильном “pandas” нельзя параллелить задачи. Однако, в стандартной библиотеке python есть несколько модулей, позволяющих работать с многопоточностью, и далее мы разберем их применение.

Первый модуль — multiprocessing. Он обходит глобальный лок, работая не через под-потоки, а через под-процессы. Основной его параметр — это кол-во процессов, на которые будет биться основной. Обычно его выбирают равным кол-ву ядер в процессоре, но я советую брать число на 1 меньше, оставляя одно ядро на накладные расходы и синхронизацию. Далее создается пул воркеров (процессов), на которые будут параллелиться вычисления. Есть несколько вариантов, как разбить по ним датафрейм, но обычно используется .array_split():

import pandas as pd
import multiprocessing as mp

def your_datarame_func(df):
...

n_cores = max(mp.cpu_count() - 1, 1)
p = mp.Pool(n_cores) # Data parallelism Object

def parallelize_dataframe(df, func, n_cores):
df_split = np.array_split(df, n_cores)
df = pd.concat( pool.map(func, df_split) )
pool.close() ; pool.join()
return df

df_results = parallelize_dataframe(df, func=your_datarame_func)

Более высокоуровневый интерфейс предоставляет другой пакет из стандартной библиотеки — concurrent.futures, однако возможностей у него меньше. Он предоставляет 2 аналогичных API для работы с процессами и тредами — ProcessPoolExecutor и ThreadPoolExecutor.

import psutil
import pandas as pd
import numpy as np
from concurrent.futures import ProcessPoolExecutor, as_completed

def your_datarame_func(df):
...

num_procs = max(psutil.cpu_count(logical=True) - 1, 1)

splitted_df = np.array_split(df, num_procs)
df_results = []

with ProcessPoolExecutor(max_workers=num_procs) as executor:
results = [executor.submit(your_datarame_func, df=df) for df in splitted_df]
for result in as_completed(results):
try:
df_results.append(result.result())
except Exception as ex:
print(str(ex))
pass

df_results = pd.concat(df_results)

В concurrent.futures можно получить pid порождаемых процессов (пример). Также, можно использовать сторонние пакеты для параллельных вычислений.


2 минуты