Найти тему
10,2 тыс подписчиков

📂 Кластеризация текста в PySpark


Наша задача состоит в том, чтобы разбить все сообщения на группы, каждая из которых будет содержать в себе сообщения одного типа.

1. Создание сессии Spark и импорт необходимых модулей

• Для того чтобы создать Spark сессию, мы написали следующий код:
from pyspark import SparkContext, SparkConf, HiveContext
# запуск сессии спарка
conf = SparkConf().setAppName('spark_dlab_app')
conf.setAll(
[
#Укажите тут нужные параметры Spark
])
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

• Импортируем модули для дальнейшей работы:
# для создания пользовательских функций
from pyspark.sql.functions import udf
# для использования оконных функций
from pyspark.sql.window import Window
# для работы с PySpark DataFrame
from pyspark.sql import DataFrame
# для задания типа возвращаемого udf функцией
from pyspark.sql.types import StringType
# для создания регулярных выражений
import re
# для работы с Pandas DataFrame
import pandas as pd
# для предобработки текста
from pyspark.ml.feature import HashingTF, IDF, Word2Vec,\
CountVectorizer, Tokenizer, StopWordsRemover
# для кластеризации
from pyspark.ml.clustering import Kmeans, BisectingKmeans
# для создания пайплайна
from pyspark.ml import Pipeline
# для подсчета частоты слов в тексте
from nltk.probability import FreqDist

2. Предварительная обработка текста

• Первым делом создадим DataFrame из данных, которые находятся на Hadoop, в нашей сессии:
t = spark.table('data')

• Поскольку в тексте содержится много информации, которая не несёт никакой смысловой нагрузки, например, различные цифры или знаки препинания, мы её удалим. Для этого написали UDF-функцию, которая почистит текст с помощью регулярных выражений.
def text_prep(text):
# переводим текст в нижний регистр
text = str(text).lower()
# убираем всё, что не русская буква, и убираем слово «баланс»
text = re.sub('[^а-яё]|баланс',' ',text)
# убираем всё, что начинается с «от»
text = re.sub('от.+','',text)
# убираем одиночные буквы
text = re.sub('\s[а-яё]\s{0,1}','',text)
# если пробелов больше одного заменяем их на один
text = re.sub('\s+',' ',text)
# убираем лишние пробелы слева и справа
text = text.strip()
return text
# создание пользовательской функции
prep_text_udf = udf(text_prep, StringType())

• Применим нашу функцию и уберем пустые строки:
t = t.withColumn('prep_text', prep_text_udf('sms_text'))\
.filter('prep_text <> ""')


1 минута