Найти в Дзене

Пример потокового конвейера из Kafka в Elasticsearch на платформе Decodable

Оглавление

Практическая демонстрация потокового SQL-конвейера, который преобразует данные, потребленные из Apache Kafka, и записывает результаты в Elasticsearch, используя Debezium-коннекторы и задания Apache Flink в облачной платформе Decodable.

Потребление сообщений из Apache Kafka

Я уже показывала пример интеграции Apache Kafka и Elasticsearch с помощью sink-коннектора, а также конвейер с ClickHouse Cloud. Сегодня совместим некоторые из этих прежних работ, чтобы протестировать облачную платформу Decodable на базе Apache Flink и Debezium. Она является полностью управляемым сервисом, который обеспечивает обработку данных в реальном времени. С Decodable можно получать данные из различных источников и направлять их в разные системы-приемники, а также преобразовывать и дополнять эти потоковые данные с помощью SQL или языка программирования на основе JVM (Java, Scala). Я буду использовать именно SQL в качестве основного средства построения потокового конвейера, суть которого состоит в извлечении данных из топика Kafka, их преобразовании и записи в Elasticsearch.

Архитектура потокового конвейера
Архитектура потокового конвейера

Как обычно, данные в Kafka, развернутой на платформе Upstash, буду публиковать с помощью простого Python-приложения, запускаемого в Google Colab, со следующим исходным кодом:

-3
-4
-5
-6

Этот  Python-скрипт каждые 3 секунды генерирует и публикует в Kafka клиентские обращения от компаний или частных лиц: заявки на покупку товаров в интернет-магазине или вопросы по работе магазина.

Python-скрипт публикации сообщений в Kafka
Python-скрипт публикации сообщений в Kafka

Схема полезной нагрузки сообщения в формате JSON выглядит так:

-8
-9

Далее получим эти данные с помощью Debezium-коннектора в платформе Decodable. Поскольку Decodable основана на Apache Flink, она использует именно его термины при создании потокового конвейера. В частности, сперва нужно создать коннекторы к источникам и приемникам данных, настроив учетные записи подключения к внешним системам. Decodeable создает сетевые подключения к ресурсам, которые указаны в подключениях. Я создала 2 коннектора: Kafka как источник данных и Elasticsearch как приемник.

Подключения к внешним системам: источник и приемник
Подключения к внешним системам: источник и приемник

Преобразование потока данных и публикация в Elasticsearch

Предположим, нужно отправлять в индекс Elasticsearch под названием apps_index не все подряд обращения, а только те, которые являются заявками на покупку, а также количество позиций в каждой заявке. Чтобы сделать это, необходимо создать новый поток, который извлекает данные из исходного потока from_test_kafka, немного преобразует и обогащает их с помощью следующего SQL-запроса:

-11
Потоки в Decodable
Потоки в Decodable

Протестировав и настроив источник и приемник данных, следует запустить оба подключения и сам настроенный конвейер, чтобы он работал непрерывно.

Потоковый конвейер обработки данных
Потоковый конвейер обработки данных

Статистика по принятым из источника и отправленным в приемник данным отображается в свойствах созданного конвейера.

Статистика работы потокового конвейера
Статистика работы потокового конвейера

Проверить, что данные, потребленные из Kafka, попадают в систему-приемник, т.е. Elasticsearch, можно из интерфейса самой это NoSQL-СУБД с помощью API поиска, отправив POST-запрос к /apps_index/_search с полезной нагрузкой для просмотра всех результатов:

-15
Содержимое индекса Elasticsearch
Содержимое индекса Elasticsearch

Более наглядный вид содержимого индекса показывает веб-интерфейс панелей Kibana, интегрированный с Elasticsearch

Просмотр данных в Kibana
Просмотр данных в Kibana

Разумеется, вся информация о публикации и потреблении данных отображается в GUI веб-развертывания Kafka.

Визуализация пропускной способности публикации и потребления сообщений в Kafka
Визуализация пропускной способности публикации и потребления сообщений в Kafka

Разумеется, для практического использования полученной из Kafka информации, конвейер должен включать больше операций преобразования данных. В частности, чтобы построить поминутную или почасовую гистограмму получения заявок в Kibana, необходимо преобразовать типы полей индекса, чтобы поле producer_timestamp было не строкой, а датой и временем. Кроме того, следует преобразовать поле content, рассматривая его как документ, чтобы выполнять поиск по названию и количеству заказанных продуктов, а также строить по ним различные визуализации. Для этого SQL-запрос, преобразующий поток исходных данных из Kafka, будет намного сложнее.

Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Статья: https://bigdataschool.ru/blog/news/kafka/from-kafka-t..
Курсы:
https://bigdataschool.ru/courses/apache-kafka-develop.. https://bigdataschool.ru/courses/apache-kafka-basics https://bigdataschool.ru/courses/apache-kafka-adminis.. https://bigdataschool.ru/courses/arenadata-streaming-..
Наш сайт:
https://bigdataschool.ru

Копирование, размножение, распространение, перепечатка (целиком или частично), или иное использование материала допускается только с письменного разрешения правообладателя ООО "УЦ Коммерсант»