Добавить в корзинуПозвонить
Найти в Дзене
Один Rust не п...Rust

Data Lake ETL, ML, Kafka

t.me/oneRustnoqRust Создать систему Data Lake с ETL, ML, Kafka, Amazon S3 и REST API на Axum. Центральный enum DataLakeError (создан с помощью thiserror) объединяет все возможные ошибки: Ошибки реализуют IntoResponse для Axum — при ошибке в API возвращается JSON с кодом статуса (например, 400 для валидации, 500 для остальных). Логирование через tracing::instrument добавляет контекст (skip параметров, уровни info/warn/error). Центральный класс, собирающий компоненты через dependency injection.
Оглавление
nicktretyakov1/data_lake | Gitverse
ML на RUST без заморочек

t.me/oneRustnoqRust

Для чего нужна данная статья? :

Создать систему Data Lake с ETL, ML, Kafka, Amazon S3 и REST API на Axum.

Зачем Вам это уметь? :

  • Высокая производительность: Polars + rayon (параллелизм), lazy evaluation.
  • Надёжность: Детальные ошибки, логи, валидация, graceful handling Kafka.
  • Расширяемость: Добавьте новые хранилища/трансформеры без изменения оркестратора.
  • Production-ready: Tracing, метрики, async.

Архитектура

1. Система ошибок (DataLakeError)

Центральный enum DataLakeError (создан с помощью thiserror) объединяет все возможные ошибки:

  • IO, Polars, S3 (с конкретным источником из AWS SDK), Kafka, ML, сериализация, конфигурация, валидация.

Ошибки реализуют IntoResponse для Axum — при ошибке в API возвращается JSON с кодом статуса (например, 400 для валидации, 500 для остальных).

Логирование через tracing::instrument добавляет контекст (skip параметров, уровни info/warn/error).

2. Абстракции (Traits)

  • StorageProvider (async trait): Интерфейс для хранилищ. Реализация — S3StorageProvider (использует официальный AWS SDK).Методы: upload и download.
    Логирует успех/ошибки, использует ByteStream для эффективной загрузки.
  • DataTransformer (async trait): Интерфейс для трансформаций данных.Реализация — AnomalyDetectionTransformer с автоэнкодером.
    Модель: Простой автоэнкодер (64 → 32 → 16 → 32 → 64 нейронов с ReLU).
    Преобразует DataFrame в тензор, вычисляет ошибку реконструкции, добавляет колонку is_anomaly (true если ошибка > threshold).
  • StreamProcessor (async trait): Интерфейс для потоковой обработки.Реализация — KafkaStreamProcessor с FutureProducer.
    Асинхронно отправляет данные в топик Kafka с уникальным ключом (UUID).

3. Оркестратор (DataLakeOrchestrator)

Центральный класс, собирающий компоненты через dependency injection.

  • Конструктор new() (async): Инициализирует S3, трансформер и Kafka-продьюсер. Возвращает Result для early-fail.
  • Метод run_pipeline (основной ETL + ML + Streaming):Валидация файла (существует ли?).
    Загрузка CSV в DataFrame через Polars (infer schema, header).
    Трансформация: Lazy-получение, масштабирование колонок (*2), приведение к Float64.
    ML: Применение автоэнкодера для обнаружения аномалий.
    Сериализация в Parquet (эффективный columnar формат для Data Lake).
    Загрузка в S3.
    Отправка в Kafka (fire-and-forget: ошибка логируется, но не прерывает пайплайн — graceful degradation).
  • Метод start_api: Запускает простой Axum-сервер (роуты / и /health).

4. Точка входа (main)

  • Инициализация tracing.
  • Создание оркестратора (с обработкой ошибок инициализации).
  • Запуск пайплайна на примере файла.
  • Запуск API в отдельном таске (tokio::spawn).
  • Graceful shutdown по Ctrl+C.

Как это работает в целом (поток данных)

  1. Локальный CSV → Polars DataFrame.
  2. Параллельная трансформация (lazy + collect).
  3. ML-анализ (аномалии) → новая колонка.
  4. Parquet-сериализация → байты.
  5. Байты → S3 (Data Lake).
  6. Байты → Kafka (для потоковой обработки downstream-системами).
  7. API для мониторинга/расширения.