t.me/oneRustnoqRust Создать систему Data Lake с ETL, ML, Kafka, Amazon S3 и REST API на Axum. Центральный enum DataLakeError (создан с помощью thiserror) объединяет все возможные ошибки: Ошибки реализуют IntoResponse для Axum — при ошибке в API возвращается JSON с кодом статуса (например, 400 для валидации, 500 для остальных). Логирование через tracing::instrument добавляет контекст (skip параметров, уровни info/warn/error). Центральный класс, собирающий компоненты через dependency injection.
t.me/oneRustnoqRust Создать систему Data Lake с ETL, ML, Kafka, Amazon S3 и REST API на Axum. Центральный enum DataLakeError (создан с помощью thiserror) объединяет все возможные ошибки: Ошибки реализуют IntoResponse для Axum — при ошибке в API возвращается JSON с кодом статуса (например, 400 для валидации, 500 для остальных). Логирование через tracing::instrument добавляет контекст (skip параметров, уровни info/warn/error). Центральный класс, собирающий компоненты через dependency injection.
...Читать далее
Для чего нужна данная статья? :
Создать систему Data Lake с ETL, ML, Kafka, Amazon S3 и REST API на Axum.
Зачем Вам это уметь? :
- Высокая производительность: Polars + rayon (параллелизм), lazy evaluation.
- Надёжность: Детальные ошибки, логи, валидация, graceful handling Kafka.
- Расширяемость: Добавьте новые хранилища/трансформеры без изменения оркестратора.
- Production-ready: Tracing, метрики, async.
Архитектура
1. Система ошибок (DataLakeError)
Центральный enum DataLakeError (создан с помощью thiserror) объединяет все возможные ошибки:
- IO, Polars, S3 (с конкретным источником из AWS SDK), Kafka, ML, сериализация, конфигурация, валидация.
Ошибки реализуют IntoResponse для Axum — при ошибке в API возвращается JSON с кодом статуса (например, 400 для валидации, 500 для остальных).
Логирование через tracing::instrument добавляет контекст (skip параметров, уровни info/warn/error).
2. Абстракции (Traits)
- StorageProvider (async trait): Интерфейс для хранилищ. Реализация — S3StorageProvider (использует официальный AWS SDK).Методы: upload и download.
Логирует успех/ошибки, использует ByteStream для эффективной загрузки. - DataTransformer (async trait): Интерфейс для трансформаций данных.Реализация — AnomalyDetectionTransformer с автоэнкодером.
Модель: Простой автоэнкодер (64 → 32 → 16 → 32 → 64 нейронов с ReLU).
Преобразует DataFrame в тензор, вычисляет ошибку реконструкции, добавляет колонку is_anomaly (true если ошибка > threshold). - StreamProcessor (async trait): Интерфейс для потоковой обработки.Реализация — KafkaStreamProcessor с FutureProducer.
Асинхронно отправляет данные в топик Kafka с уникальным ключом (UUID).
3. Оркестратор (DataLakeOrchestrator)
Центральный класс, собирающий компоненты через dependency injection.
- Конструктор new() (async): Инициализирует S3, трансформер и Kafka-продьюсер. Возвращает Result для early-fail.
- Метод run_pipeline (основной ETL + ML + Streaming):Валидация файла (существует ли?).
Загрузка CSV в DataFrame через Polars (infer schema, header).
Трансформация: Lazy-получение, масштабирование колонок (*2), приведение к Float64.
ML: Применение автоэнкодера для обнаружения аномалий.
Сериализация в Parquet (эффективный columnar формат для Data Lake).
Загрузка в S3.
Отправка в Kafka (fire-and-forget: ошибка логируется, но не прерывает пайплайн — graceful degradation). - Метод start_api: Запускает простой Axum-сервер (роуты / и /health).
4. Точка входа (main)
- Инициализация tracing.
- Создание оркестратора (с обработкой ошибок инициализации).
- Запуск пайплайна на примере файла.
- Запуск API в отдельном таске (tokio::spawn).
- Graceful shutdown по Ctrl+C.
Как это работает в целом (поток данных)
- Локальный CSV → Polars DataFrame.
- Параллельная трансформация (lazy + collect).
- ML-анализ (аномалии) → новая колонка.
- Parquet-сериализация → байты.
- Байты → S3 (Data Lake).
- Байты → Kafka (для потоковой обработки downstream-системами).
- API для мониторинга/расширения.