Добавить в корзинуПозвонить
Найти в Дзене
Один Rust не п...Rust

Kafka в Rust

t.me/oneRustnoqRust Для чего нужна данная статья? : Найти компромиссы между библиотеками для взаимодействия с Apache Kafka. Зачем Вам это уметь? : Написать систему уведомлений в реальном времени с использованием ML. Производитель: Генерирует сложные события с метаданными (устройство, геолокация, длительность сессии).
Использует случайное партиционирование и повторные попытки для надежности.
Работает в фоновом потоке для имитации реального трафика. Это программа, которая создаёт и отправляет данные (события) в систему Kafka. В вашем примере она генерирует события с метаданными — например, информацию о том, что пользователь зашёл на сайт с телефона, его геолокация, сколько времени он провёл на сайте. Пример:
Представьте, что у вас есть магазин, и каждая касса отправляет информацию о покупках в общую базу. Здесь касса — это производитель, а покупки — события.
ML-модель: Обучается на синтетических данных (в реальном проекте используйте настоящие данные).
Преобразует категориальные признак
Оглавление
GitHub - nicktretyakov/kafka_ml
ML на RUST без заморочек

t.me/oneRustnoqRust

Для чего нужна данная статья? :

Найти компромиссы между библиотеками для взаимодействия с Apache Kafka.

Зачем Вам это уметь? :

Написать систему уведомлений в реальном времени с использованием ML.

Cистема уведомлений в реальном времени с использованием ML

Производитель: Генерирует сложные события с метаданными (устройство, геолокация, длительность сессии).
Использует случайное партиционирование и повторные попытки для надежности.
Работает в фоновом потоке для имитации реального трафика.

Это программа, которая создаёт и отправляет данные (события) в систему Kafka. В вашем примере она генерирует события с метаданными — например, информацию о том, что пользователь зашёл на сайт с телефона, его геолокация, сколько времени он провёл на сайте.

Пример:
Представьте, что у вас есть магазин, и каждая касса отправляет информацию о покупках в общую базу. Здесь касса — это производитель, а покупки — события.

ML-модель: Обучается на синтетических данных (в реальном проекте используйте настоящие данные).
Преобразует категориальные признаки в one-hot encoding.
Сохраняется в файл для использования потребителем.

One-hot encoding - способ преобразования категориальных данных (например, цвета, города, устройства) в числа, которые может понять компьютер. Например, если у вас есть категории "Москва", "Питер", "Казань", то one-hot encoding превратит их в три колонки с нулями и единицей в нужной.

Это алгоритм, который обучается на данных и потом может делать предсказания. В вашем примере модель обучается на синтетических (искусственно созданных) данных, чтобы потом предсказывать, какие уведомления будут полезны пользователю.

Пример:
Если вы часто покупаете книги о путешествиях, модель может предсказать, что вам будет интересно уведомление о скидке на путеводители.

Потребитель: Обрабатывает события из Kafka в реальном времени.
Использует ML-модель для предсказания релевантности уведомлений.
Поддерживает асинхронную отправку уведомлений через очередь и разные каналы (email, SMS, push).
Включает обработку ошибок, логирование и ручное подтверждение оффсетов для надежности.

Это программа, которая читает данные из Kafka и что-то с ними делает. В вашем примере потребитель получает события, использует ML-модель, чтобы решить, отправлять ли уведомление, и отправляет его по email, SMS или push.

Пример:
Вернёмся к магазину: если касса отправила информацию о покупке, то программа, которая анализирует эти данные и отправляет вам чек на почту — это потребитель.

Масштабируемость: Kafka обеспечивает распределенную обработку событий.
Потребители могут быть запущены в нескольких экземплярах с одной группой (group_id).
Очередь уведомлений позволяет масштабировать отправку.

Безопасность и мониторинг: Логирование помогает отслеживать ошибки и производительность.
В реальной системе добавьте шифрование Kafka и аутентификацию.

Код системы

Программа может работать в трёх режимах, определяемых аргументом командной строки:

  • train: Обучает ML-модель на синтетических данных.
  • producer: Генерирует и отправляет события в Kafka-топик.
  • consumer: Потребляет события из Kafka, применяет ML для проверки, и отправляет уведомления, если нужно.

Код использует асинхронность (с tokio), обработку ошибок (anyhow), логирование (log), и различные библиотеки для сетевых взаимодействий.

  • anyhow: Для удобной обработки ошибок (Result и Context для цепочек ошибок).
  • aws_config и aws_sdk_sns: Для интеграции с AWS Simple Notification Service (SNS) — отправка email.
  • chrono: Работа с датами и временем (Utc для таймстампов).
  • linfa и связанные: Библиотека для ML. DecisionTree — дерево решений, Fit и Predict — трейты для обучения и предсказания, ParamGuard — для валидации параметров модели.
  • log: Логирование (info, error).
  • ndarray: Массивы для данных ML (Array1 для векторов, Array2 для матриц).
  • rdkafka: Клиент Kafka для продюсера (отправка сообщений) и консьюмера (получение).
  • reqwest: HTTP-клиент для Twilio и Firebase.
  • serde: Сериализация/десериализация JSON (для событий).
  • std::collections::HashMap: Для параметров HTTP-запросов.
  • std::sync::Arc и tokio::sync::Mutex: Для thread-safe обмена данными (Arc — атомарный счётчик ссылок, Mutex — для блокировки).
  • std::time::Duration и tokio: Для задержек и асинхронности.

Эти импорты обеспечивают все необходимые инструменты для потоковой обработки, ML и уведомлений.

События пользователей (UserEvent)

  • UserEvent: Представляет событие пользователя. Содержит ID пользователя, действие (например, "login"), таймстамп, и метаданные.
  • EventMetadata: Метаданные события — устройство (mobile/desktop/tablet), локация (широта/долгота), длительность сессии (в секундах).
  • Эти структуры сериализуемы (serde) для передачи в Kafka как JSON.

Уведомления (Notification)

  • NotificationChannel: Enum для каналов уведомлений (email, SMS, push).
  • Notification: Структура уведомления — ID пользователя, канал, сообщение, приоритет (не используется в коде, но зарезервировано).

Обёртка ML-модели (MLModel)

  • MLModel: Обёртка вокруг DecisionTree из linfa. Хранит обученную модель.
  • new: Конструктор.
  • predict: Метод для предсказания. Принимает матрицу признаков (Array2<f64>), возвращает массив меток (usize: 0 или 1). Использует трейт Predict.

Модель классифицирует события: 1 — высокая вовлечённость (нужно уведомить), 0 — нет.

Продюсер событий (EventProducer)

  • EventProducer: Хранит Kafka-продюсера и топик.
  • new: Создаёт продюсера с конфигурацией (brokers — адрес Kafka, timeout).
  • send_event: Сериализует событие в JSON, создаёт запись (key — user_id), отправляет асинхронно.
  • generate_events: Генерирует синтетические события (цикл по count). Действия и устройства циклично, локация — с небольшим сдвигом, сессия — модуль 3600. Отправляет с задержкой 100 мс.

Консьюмер уведомлений (NotificationConsumer)

  • NotificationConsumer: Хранит Kafka-консьюмера, ML-модель (в Arc для шаринга), очередь уведомлений (Mutex для синхронизации), клиенты AWS SNS и HTTP.
  • new: Создаёт консьюмера, подписывается на топик, инициализирует SNS.
  • process_events: Бесконечный цикл: получает сообщения из Kafka, десериализует, обрабатывает (handle_event).
  • handle_event: Извлекает признаки, делает предсказание ML, если 1 — создаёт и enqueue уведомление.
  • extract_features: Создаёт матрицу признаков: длительность сессии, текущий час, день недели.
  • create_notification: Определяет канал по устройству, создаёт сообщение.
  • enqueue_notification: Добавляет в очередь (под Mutex), логирует, сразу отправляет (send_notification).
  • send_notification: Диспетчер по каналу: email/SMS/push.
  • send_email: Использует AWS SNS для публикации в топик (плейсхолдер ARN).
  • send_sms: Плейсхолдер для Twilio — POST-запрос с параметрами (номера фиктивные).
  • send_push: Плейсхолдер для Firebase — POST с JSON (токен — "user_{id}", ключ фиктивный).

Очередь используется, но уведомления отправляются сразу после enqueue (не накапливаются).

Функция обучения модели (train_model)

  • Генерирует 1000 синтетических примеров: признаки (длительность, час, день), метки по правилу (1 если сессия >30 мин и час 9-17).
  • Создаёт Dataset из ndarray.
  • Обучает DecisionTree: критерий Gini, глубина 10.
  • Вычисляет accuracy на обучающих данных (обычно высокая, ~100% из-за простоты).
  • Возвращает MLModel.

Главная функция (main)

  • Инициализирует логгер.
  • Читает аргумент (mode).
  • train: Обучает модель, логирует.
  • producer: Создаёт продюсера, генерирует 100 событий.
  • consumer: Обучает модель, создаёт консьюмера, запускает обработку (бесконечный цикл).
  • По умолчанию — помощь.