Для чего нужна данная статья? :
Найти компромиссы между библиотеками для взаимодействия с Apache Kafka.
Зачем Вам это уметь? :
Написать систему уведомлений в реальном времени с использованием ML.
Cистема уведомлений в реальном времени с использованием ML
Производитель: Генерирует сложные события с метаданными (устройство, геолокация, длительность сессии).
Использует случайное партиционирование и повторные попытки для надежности.
Работает в фоновом потоке для имитации реального трафика.
Это программа, которая создаёт и отправляет данные (события) в систему Kafka. В вашем примере она генерирует события с метаданными — например, информацию о том, что пользователь зашёл на сайт с телефона, его геолокация, сколько времени он провёл на сайте.
Пример:
Представьте, что у вас есть магазин, и каждая касса отправляет информацию о покупках в общую базу. Здесь касса — это производитель, а покупки — события.
ML-модель: Обучается на синтетических данных (в реальном проекте используйте настоящие данные).
Преобразует категориальные признаки в one-hot encoding.
Сохраняется в файл для использования потребителем.
One-hot encoding - способ преобразования категориальных данных (например, цвета, города, устройства) в числа, которые может понять компьютер. Например, если у вас есть категории "Москва", "Питер", "Казань", то one-hot encoding превратит их в три колонки с нулями и единицей в нужной.
Это алгоритм, который обучается на данных и потом может делать предсказания. В вашем примере модель обучается на синтетических (искусственно созданных) данных, чтобы потом предсказывать, какие уведомления будут полезны пользователю.
Пример:
Если вы часто покупаете книги о путешествиях, модель может предсказать, что вам будет интересно уведомление о скидке на путеводители.
Потребитель: Обрабатывает события из Kafka в реальном времени.
Использует ML-модель для предсказания релевантности уведомлений.
Поддерживает асинхронную отправку уведомлений через очередь и разные каналы (email, SMS, push).
Включает обработку ошибок, логирование и ручное подтверждение оффсетов для надежности.
Это программа, которая читает данные из Kafka и что-то с ними делает. В вашем примере потребитель получает события, использует ML-модель, чтобы решить, отправлять ли уведомление, и отправляет его по email, SMS или push.
Пример:
Вернёмся к магазину: если касса отправила информацию о покупке, то программа, которая анализирует эти данные и отправляет вам чек на почту — это потребитель.
Масштабируемость: Kafka обеспечивает распределенную обработку событий.
Потребители могут быть запущены в нескольких экземплярах с одной группой (group_id).
Очередь уведомлений позволяет масштабировать отправку.
Безопасность и мониторинг: Логирование помогает отслеживать ошибки и производительность.
В реальной системе добавьте шифрование Kafka и аутентификацию.
Код системы
Программа может работать в трёх режимах, определяемых аргументом командной строки:
- train: Обучает ML-модель на синтетических данных.
- producer: Генерирует и отправляет события в Kafka-топик.
- consumer: Потребляет события из Kafka, применяет ML для проверки, и отправляет уведомления, если нужно.
Код использует асинхронность (с tokio), обработку ошибок (anyhow), логирование (log), и различные библиотеки для сетевых взаимодействий.
- anyhow: Для удобной обработки ошибок (Result и Context для цепочек ошибок).
- aws_config и aws_sdk_sns: Для интеграции с AWS Simple Notification Service (SNS) — отправка email.
- chrono: Работа с датами и временем (Utc для таймстампов).
- linfa и связанные: Библиотека для ML. DecisionTree — дерево решений, Fit и Predict — трейты для обучения и предсказания, ParamGuard — для валидации параметров модели.
- log: Логирование (info, error).
- ndarray: Массивы для данных ML (Array1 для векторов, Array2 для матриц).
- rdkafka: Клиент Kafka для продюсера (отправка сообщений) и консьюмера (получение).
- reqwest: HTTP-клиент для Twilio и Firebase.
- serde: Сериализация/десериализация JSON (для событий).
- std::collections::HashMap: Для параметров HTTP-запросов.
- std::sync::Arc и tokio::sync::Mutex: Для thread-safe обмена данными (Arc — атомарный счётчик ссылок, Mutex — для блокировки).
- std::time::Duration и tokio: Для задержек и асинхронности.
Эти импорты обеспечивают все необходимые инструменты для потоковой обработки, ML и уведомлений.
События пользователей (UserEvent)
- UserEvent: Представляет событие пользователя. Содержит ID пользователя, действие (например, "login"), таймстамп, и метаданные.
- EventMetadata: Метаданные события — устройство (mobile/desktop/tablet), локация (широта/долгота), длительность сессии (в секундах).
- Эти структуры сериализуемы (serde) для передачи в Kafka как JSON.
Уведомления (Notification)
- NotificationChannel: Enum для каналов уведомлений (email, SMS, push).
- Notification: Структура уведомления — ID пользователя, канал, сообщение, приоритет (не используется в коде, но зарезервировано).
Обёртка ML-модели (MLModel)
- MLModel: Обёртка вокруг DecisionTree из linfa. Хранит обученную модель.
- new: Конструктор.
- predict: Метод для предсказания. Принимает матрицу признаков (Array2<f64>), возвращает массив меток (usize: 0 или 1). Использует трейт Predict.
Модель классифицирует события: 1 — высокая вовлечённость (нужно уведомить), 0 — нет.
Продюсер событий (EventProducer)
- EventProducer: Хранит Kafka-продюсера и топик.
- new: Создаёт продюсера с конфигурацией (brokers — адрес Kafka, timeout).
- send_event: Сериализует событие в JSON, создаёт запись (key — user_id), отправляет асинхронно.
- generate_events: Генерирует синтетические события (цикл по count). Действия и устройства циклично, локация — с небольшим сдвигом, сессия — модуль 3600. Отправляет с задержкой 100 мс.
Консьюмер уведомлений (NotificationConsumer)
- NotificationConsumer: Хранит Kafka-консьюмера, ML-модель (в Arc для шаринга), очередь уведомлений (Mutex для синхронизации), клиенты AWS SNS и HTTP.
- new: Создаёт консьюмера, подписывается на топик, инициализирует SNS.
- process_events: Бесконечный цикл: получает сообщения из Kafka, десериализует, обрабатывает (handle_event).
- handle_event: Извлекает признаки, делает предсказание ML, если 1 — создаёт и enqueue уведомление.
- extract_features: Создаёт матрицу признаков: длительность сессии, текущий час, день недели.
- create_notification: Определяет канал по устройству, создаёт сообщение.
- enqueue_notification: Добавляет в очередь (под Mutex), логирует, сразу отправляет (send_notification).
- send_notification: Диспетчер по каналу: email/SMS/push.
- send_email: Использует AWS SNS для публикации в топик (плейсхолдер ARN).
- send_sms: Плейсхолдер для Twilio — POST-запрос с параметрами (номера фиктивные).
- send_push: Плейсхолдер для Firebase — POST с JSON (токен — "user_{id}", ключ фиктивный).
Очередь используется, но уведомления отправляются сразу после enqueue (не накапливаются).
Функция обучения модели (train_model)
- Генерирует 1000 синтетических примеров: признаки (длительность, час, день), метки по правилу (1 если сессия >30 мин и час 9-17).
- Создаёт Dataset из ndarray.
- Обучает DecisionTree: критерий Gini, глубина 10.
- Вычисляет accuracy на обучающих данных (обычно высокая, ~100% из-за простоты).
- Возвращает MLModel.
Главная функция (main)
- Инициализирует логгер.
- Читает аргумент (mode).
- train: Обучает модель, логирует.
- producer: Создаёт продюсера, генерирует 100 событий.
- consumer: Обучает модель, создаёт консьюмера, запускает обработку (бесконечный цикл).
- По умолчанию — помощь.