Найти в Дзене
Один Rust не п...Rust

CDC (Change Data Capture) ML

t.me/oneRustnoqRust Написать код CDC real-time data processing обладающий следующими свойствами: Тысячи партиций: данные делятся на множество независимых частей (партиций), каждая из которых обрабатывается отдельно. Например, как в супермаркете: вместо одной кассы — десятки, и каждый покупатель идёт к своей. Авто-backpressure: если система не успевает обрабатывать данные, она автоматически замедляет их поступление, чтобы не перегрузиться (как если бы кассир просил покупателей подождать, пока освободится место на ленте). ML-батчинг на GPU: данные собираются в небольшие пакеты (батчи) и анализируются на видеокартах (GPU), что ускоряет процесс. Например, как если бы вы сортировали фрукты не по одному, а сразу горстями. Детектирование аномалий: система находит необычные события (например, внезапный скачок температуры на производстве или подозрительная транзакция в банке). Роутинг в quarantine: подозрительные данные отправляются на дополнительную проверку (как в аэропорту: подозрительный
Оглавление

t.me/oneRustnoqRust

ML на RUST без заморочек
nicktretyakov1/CDCML | Gitverse

Для чего нужна данная статья?:

Написать код CDC real-time data processing обладающий следующими свойствами:

  • Масштабируемость: тысячи партиций (добавляйте readers), авто-backpressure предотвращает OOM.

Тысячи партиций: данные делятся на множество независимых частей (партиций), каждая из которых обрабатывается отдельно. Например, как в супермаркете: вместо одной кассы — десятки, и каждый покупатель идёт к своей.

Авто-backpressure: если система не успевает обрабатывать данные, она автоматически замедляет их поступление, чтобы не перегрузиться (как если бы кассир просил покупателей подождать, пока освободится место на ленте).

  • Интеллект: ML-батчинг на GPU детектирует аномалии в реал-тайм, роутинг в quarantine.

ML-батчинг на GPU: данные собираются в небольшие пакеты (батчи) и анализируются на видеокартах (GPU), что ускоряет процесс. Например, как если бы вы сортировали фрукты не по одному, а сразу горстями.

Детектирование аномалий: система находит необычные события (например, внезапный скачок температуры на производстве или подозрительная транзакция в банке).

Роутинг в quarantine: подозрительные данные отправляются на дополнительную проверку (как в аэропорту: подозрительный багаж проверяют отдельно).

  • Надёжность: exactly-once (Kafka transactions), checkpointing (resume после рестарта), error handling с метриками.

Exactly-once (Kafka transactions): каждое событие обрабатывается ровно один раз, без дубликатов или потерь. Например, как если бы вы платили за товар один раз, а не дважды или вообще не платили.

Checkpointing: система сохраняет состояние обработки, чтобы после перезапуска продолжить с того же места (как закладка в книге).

Error handling с метриками: ошибки фиксируются и анализируются, чтобы быстро находить и исправлять проблемы.

  • Поддерживаемость: SOLID + Hexagonal — новые источники/синки/ML-модели добавляются как плагины без изменения core/orchestrator.

SOLID + Hexagonal архитектура: код разбит на независимые модули, которые можно менять или добавлять, не ломая всю систему. Например, как конструктор Lego: можно заменить одну деталь, не разбирая всю постройку.

Плагины: новые источники данных, модели или способы сохранения добавляются как отдельные модули (как установка нового приложения на телефон).

  • Performance: parallel batch inference (256+ событий за инференс), staged parallelism (CPU/GPU/I/O отдельно).

Parallel batch inference: данные анализируются параллельно большими партиями (например, 256 событий за один раз), что ускоряет процесс.

Staged parallelism: разные этапы обработки (CPU, GPU, ввод/вывод) работают параллельно, не мешая друг другу. Например, как на кухне: пока один повар режет овощи, другой уже жарит мясо, а третий моет посуду.

Архитектура (Ports & Adapters):

  • Core: чистая доменная логика (типы изменений, ошибки).

Простыми словами: Это сердце программы, где описано, что она должна делать, но не как это делать.
Пример:
Представьте, что вы пишете программу для банка. В Core будет логика: "Если на счёте достаточно денег, разрешить перевод". Но не будет кода, который реально списывает деньги с базы данных.

  • Ports: абстрактные трейты (интерфейсы) — обеспечивают DIP (Dependency Inversion) и ISP (Interface Segregation).

Простыми словами: Это "контракты" или правила, по которым другие части программы должны общаться с Core.
Пример:
Продолжим банковский пример. Port может быть интерфейсом "ПереводДенег" с методом выполнитьПеревод(сумма). Конкретная реализация (например, через базу данных или API) будет подключаться к этому интерфейсу.

  • Adapters: конкретные реализации (PostgreSQL polling reader, simple transformer, Torch-based ML enricher с GPU-батчингом, Kafka transactional writer, sled checkpoint).

Простыми словами: Это реальные "детали", которые выполняют работу по правилам из Ports.
Пример:
Адаптер для базы данных PostgreSQL будет реализовывать метод выполнитьПеревод(сумма) так, чтобы деньги реально списывались из базы.

  • Orchestrator: координация всего пайплайна — многопоточный, асинхронный, с backpressure и интеллектуальным роутингом аномалий.

Простыми словами: Это "дирижёр", который управляет, в каком порядке и как быстро выполняются все части программы.
Пример:
В банке Orchestrator может следить, чтобы переводы не накапливались в очереди, а обрабатывались быстро и без ошибок, даже если одновременно пришло много запросов.

  • Metrics: детальная observability для production (latency, throughput, queues, errors).

Простыми словами: Это инструменты, которые показывают, как работает программа в реальном времени (скорость, ошибки, загрузка).
Пример:
Как в банке на экране оператора видно, сколько переводов в секунду проходит, нет ли задержек или ошибок.

Проект использует Tokio для асинхронности, tch для ML-инференса на GPU/CPU, rdkafka для exactly-once, sled для checkpointing.

  • Tokio — библиотека для асинхронной работы (чтобы программа могла делать много дел одновременно).
  • tch — обёртка для PyTorch, чтобы запускать машинное обучение на GPU/CPU.
  • rdkafka — библиотека для работы с Kafka (система для передачи сообщений между программами).
  • sled — база данных для сохранения состояния (например, чтобы запомнить, какие данные уже обработали).

1. Core модуль (src/core/)

src/core/change.rs — доменные типы (SRP: только данные изменений).

  • RawChange: сырое событие из источника (id, partition_key для шардинга, operation, payload как JSON, timestamp). Это исходное, необработанное событие, которое приходит из какого-то источника данных (например, базы данных, лога, очереди сообщений). Оно содержит минимально необходимую информацию для дальнейшей обработки.
  • Operation: enum для Insert/Update/Delete. Это событие, которое уже преобразовано в удобный для бизнеса формат. Оно содержит не сырые данные, а структурированную информацию, понятную для дальнейшей логики (например, для машинного обучения).
  • ProcessedChange: enum (расширяемый) — сейчас только Order(OrderChange) с бизнес-данными и synthetic_features для ML. Это событие, которое уже преобразовано в удобный для бизнеса формат. Оно содержит не сырые данные, а структурированную информацию, понятную для дальнейшей логики (например, для машинного обучения).

Что внутри?

Сейчас упоминается только Order(OrderChange) — то есть изменения, связанные с заказами, с бизнес-данными (например, сумма заказа, статус, список товаров).

synthetic_features — искусственно созданные признаки для машинного обучения (например, "средняя сумма заказов пользователя за месяц").

  • EnrichedChange: обогащённое изменение (inner + anomaly_score + classification от ML). Это событие, которое уже прошло через машинное обучение или другие системы анализа. Оно содержит не только бизнес-данные, но и результаты анализа (например, оценку аномальности, классификацию).

Что внутри?

  • inner — исходное ProcessedChange.
  • anomaly_score — оценка, насколько это событие необычное (например, 0.9 — очень подозрительно).
  • classification — результат классификации (например, "мошенничество", "нормальный заказ")

Это обеспечивает OCP (Open/Closed): новые типы изменений добавляются без изменения существующего кода. Это принцип проектирования: код должен быть открыт для расширения, но закрыт для изменения. То есть, чтобы добавить новый тип изменения (например, не только заказы, но и возвраты), не нужно менять уже существующий код — достаточно добавить новый вариант в enum.

Пример:
Если завтра понадобится обрабатывать не только заказы, но и возвраты, можно просто добавить новый вариант в enum:

rustCopyenum ProcessedChange {
Order(OrderChange),
Return(ReturnChange),
// новый тип
}

При этом старый код для обработки заказов не меняется.

src/core/error.rs : thiserror-derive enum с From для всех внешних ошибок (IO, Postgres, Sled, Tch, Kafka) — цепочка ошибок для надёжности. Это файл, где описаны все возможные ошибки, которые могут возникнуть в системе (например, ошибки чтения/записи, базы данных, очередей). Используется библиотека thiserror для удобного создания цепочек ошибок (чтобы понимать, какая ошибка к какой привела).

Пример:
Если не удалось записать данные в базу, ошибка может выглядеть так:

CopyError: DatabaseError { source: IOError { ... } }

Это помогает быстро найти причину проблемы.

Итоговая схема:

CopyRawChange (сырые данные)
→ ProcessedChange (бизнес-логика)
→ EnrichedChange (ML-анализ)

Каждый этап добавляет новые данные, не меняя предыдущие.

2. Ports (src/ports/) — SOLID в чистом виде

Трейты — аналоги интерфейсов Java/C#.

  • reader.rs: BatchChangeReader (наследует ChangeReader) — чтение батчами для эффективности (batch inference). Простыми словами:
    Это компонент, который читает данные не по одному, а большими порциями (батчами), чтобы ускорить обработку. Представьте, что вы загружаете фото в Instagram: вместо того, чтобы отправлять их по одному, вы отправляете сразу 10 — так быстрее и эффективнее.

Пример:
Если у вас есть 1000 записей в базе данных, вместо того, чтобы читать каждую по отдельности, вы читаете сразу по 100 записей за раз.

  • processor.rs: ChangeProcessor — трансформация raw → processed (валидация, фичи). Простыми словами:
    Это компонент, который берёт "сырые" данные (например, текст из формы обратной связи) и преобразует их в удобный формат: проверяет на ошибки, добавляет нужные поля, очищает от мусора.

Пример:
Вы заполняете анкету на сайте: ваш возраст — "двадцать пять". Процессор преобразует это в число 25, проверяет, что это корректный возраст, и добавляет метку "взрослый".

  • enricher.rs: ChangeEnricher — батчевое обогащение ML (parallel inference). Простыми словами:
    Это компонент, который добавляет к данным новую информацию с помощью машинного обучения. Например, по фотографии определяет, что на ней кот, и добавляет тег "кот". Работает параллельно — сразу с несколькими данными, чтобы быстрее.

Пример:
У вас есть список товаров. Обогатитель анализирует описания и добавляет теги: "электроника", "бытовая техника", "акция".

  • writer.rs: ChangeWriter — батчевая запись + commit для exactly-once. Простыми словами:
    Это компонент, который сохраняет обработанные данные большими порциями и гарантирует, что каждая запись будет сохранена ровно один раз — без дубликатов и потерь.

Пример:
Вы переводите деньги с карты на карту. Важно, чтобы сумма списалась и зачислилась ровно один раз, а не дважды или ни разу.

Все трейты async, Send + Sync — для многопоточности. LSP соблюдается: все реализации взаимозаменяемы. Простыми словами:

  • async — компоненты работают асинхронно, то есть не блокируют друг друга, пока ждут ответа (например, пока один компонент ждёт данные из базы, другой может заниматься обработкой).
  • Send + Sync — компоненты можно безопасно использовать в нескольких потоках одновременно, как несколько поваров на кухне, которые не мешают друг другу.

Пример:
Вы загружаете видео на YouTube и одновременно редактируете описание — оба процесса идут параллельно.

3. Adapters — конкретные плагины (OCP: расширяемость)

postgres_polling.rs (partition-level reader):

  • Подключается к Postgres, поллит по id > last_id (incremental).
  • last_id в Mutex для thread-safety.
  • read_batch возвращает Vec<RawChange> — батч для эффективности.
  • poll_interval для idle.

Что делает:
Модуль постоянно проверяет базу данных PostgreSQL на наличие новых записей, начиная с последнего сохранённого id.

Ключевые понятия:

  • Incremental polling (инкрементальный опрос):
    Вместо того, чтобы каждый раз скачивать все данные, код запоминает последний обработанный id (например, last_id = 1000) и в следующий раз запрашивает только записи с id > 1000.
    Пример: Если в базе появились записи с id 1001, 1002, 1003 — код обработает только их.
  • Mutex (мьютекс):
    Это "замок" для защиты переменной last_id от одновременного изменения несколькими потоками. Без мьютекса два потока могут одновременно изменить last_id, что приведёт к ошибкам.
    Пример: Представьте, что два человека пытаются одновременно записать номер последней обработанной записи в блокнот — без замка они перезапишут друг друга.
  • read_batch (чтение пачками):
    Вместо обработки каждой записи по отдельности, код читает сразу несколько записей (например, 100 штук) — это ускоряет работу.
    Пример: Как если бы вы несли книги из библиотеки не по одной, а сразу стопкой.
  • poll_interval (интервал опроса):
    Если новых данных нет, код не будет постоянно нагружать базу — он делает паузу (например, 1 секунду) перед следующим запросом.

simple_transformer.rs:

  • Process: парсит payload, валидация (negative amount → error), генерирует synthetic_features (log10(amount), normalized user_id, timestamp) для ML.
  • Возвращает ProcessedChange::Order.

Что делает:
Модуль парсит и валидирует данные, а также готовит их для машинного обучения.

Ключевые понятия:

  • Парсинг payload (разбор данных):
    Код извлекает из сырых данных нужные поля (например, сумму заказа, id пользователя, время).
    Пример: Из строки {"order_id": 123, "amount": 500} извлекаются order_id и amount.
  • Валидация (проверка данных):
    Если сумма заказа отрицательная — код возвращает ошибку.
    Пример: Если amount = -100, это ошибка — сумма не может быть отрицательной.
  • Synthetic features (синтетические признаки):
    Код создаёт новые признаки на основе исходных данных, чтобы улучшить работу ML-модели.
    Примеры:log10(amount) — логарифм суммы (чтобы большие суммы не искажали модель).
    normalized user_id — приведение id пользователя к диапазону 0–1.
    timestamp — время заказа в удобном формате.

ml_enricher.rs (многопоточный ML с батчингом):

  • TorchModelProvider: загружает TorchScript модель один раз, set_eval, device (GPU if available).
  • extract_features: берёт synthetic_features из OrderChange.
  • enrich_batch:Собирает фичи в Vec<Vec<f32>>.
    Stack в Tensor на device (GPU для скорости).
    forward_ts — инференс всего батча параллельно на GPU.
    to_cpu + f_values — результаты.
    Парсит anomaly_score и classification.
  • Максимальная сложность: dynamic batch size, GPU acceleration, fallback.

Что делает:
Модуль загружает модель машинного обучения и применяет её к данным, используя GPU для ускорения.

Ключевые понятия:

  • TorchScript модель:
    Модель машинного обучения, сохранённая в формате TorchScript (можно быстро загрузить и использовать).
    Пример: Как если бы вы сохранили рецепт торта в книге, чтобы потом быстро его испечь.
  • Batching (пакетная обработка):
    Вместо того, чтобы обрабатывать каждую запись по отдельности, код собирает их в пакеты (например, по 32 записи) и обрабатывает сразу.
    Пример: Как если бы вы пекли не один пирог, а сразу несколько на одном противне.
  • GPU acceleration (ускорение на GPU):
    Модель работает на видеокарте (GPU), что намного быстрее, чем на процессоре (CPU).
    Пример: Как если бы вы считали на калькуляторе вместо счётов на бумаге.
  • Dynamic batch size (динамический размер пакета):
    Размер пакета может меняться в зависимости от доступной памяти.
    Пример: Если памяти мало — пакет из 10 записей, если много — из 100.
  • Fallback (откат):
    Если GPU недоступен, модель может работать на CPU.
    Пример: Если электричество отключили, вы можете готовить на газовой плите.

kafka_transactional.rs (exactly-once writer):

  • FutureProducer с transactional.id и idempotence.
  • init_transactions при создании.
  • write_batch: begin_transaction → send с key (order_id) → commit_transaction.
  • Обрабатывает ошибки через CdcError::Kafka.

Что делает:
Модуль записывает данные в Kafka так, чтобы каждая запись была доставлена ровно один раз (exactly-once).

Ключевые понятия:

  • Transactional producer (транзакционный продюсер):
    Код начинает транзакцию, отправляет данные, и только если всё прошло успешно — фиксирует транзакцию.
    Пример: Как если бы вы переводили деньги: сначала проверяете баланс, потом списываете, и только потом подтверждаете перевод.
  • Idempotence (идempотентность):
    Если одна и та же запись отправится дважды, Kafka не создаст дубликат.
    Пример: Если вы дважды нажали "Отправить" в мессенджере, сообщение отправится только один раз.
  • Key (order_id):
    Каждая запись имеет уникальный ключ (например, order_id), чтобы Kafka могла правильно распределять данные по партициям.

checkpoint.rs:

  • Sled Db (embedded KV) для offsets по partition_key.
  • save/load_offset с to_be_bytes — атомарно и надёжно.

Что делает:
Модуль сохраняет, до какого id или offset данные уже обработаны, чтобы при перезапуске не начинать с нуля.

Ключевые понятия:

  • Sled Db (встроенная база данных):
    Это лёгкая база данных, которая хранит данные прямо на диске, без отдельного сервера.
    Пример: Как если бы вы записывали прогресс чтения книги на листочке, который всегда лежит рядом.
  • Offset (смещение):
    Это номер последней обработанной записи (например, offset = 1000).
    Пример: Если вы читаете книгу и запомнили, что дочитали до 100 страницы — это ваш offset.
  • Атомарность (atomic operations):
    Сохранение offset происходит целиком и сразу, без риска потерять данные при сбое.
    Пример: Как если бы вы записывали номер страницы одним движением ручки, а не по цифрам.

4. Orchestrator — многопоточный staged pipeline

run():

  • Создаёт bounded mpsc channels (backpressure: max_inflight_batches ограничивает память).

Простыми словами:
Это как очередь в магазине, где количество мест ограничено. Если очередь заполнилась, новые покупатели не могут встать, пока кто-то не освободит место. Такой подход предотвращает перегрузку системы.

Пример:
Представьте, что у вас есть корзина для почты. Вы можете положить туда только 10 писем. Если корзина заполнена, новые письма не будут приниматься, пока вы не обработаете хотя бы одно.

  • Reader stage: по задаче на reader (partition parallelism) — бесконечный loop, read_batch, метрики (inc_changes_read), send в tx_raw.

Простыми словами:
Это часть программы, которая читает данные из какого-то источника (например, из базы данных или файла) и отправляет их дальше на обработку.

Пример:
Представьте, что у вас есть книга, и вы читаете её по одной странице, а потом передаёте прочитанное другу для анализа.

  • Processing stage: parallelism_per_stage задач — recv raw_batch → parallel transform (join_all) → enrich_batch с latency метрикой → partition на normal/quarantine по threshold → send в tx_enriched.

Простыми словами:
Здесь данные, полученные от читателей, обрабатываются: трансформируются, обогащаются, проверяются на ошибки.

Пример:
Ваш друг получает страницы книги, выделяет главные мысли, добавляет свои комментарии и сортирует страницы по темам.

  • Writer stage: отдельные задачи — recv, detect anomaly in batch, choose writer (normal/quarantine), write_batch.

Простыми словами:
Это финальный этап, где обработанные данные сохраняются в нужное место (например, в другую базу данных или файл).

Пример:
После анализа страниц ваш друг записывает итоговые выводы в тетрадь.

  • Graceful shutdown по Ctrl+C или завершению задач.
  • Метрики на всех этапах: queues size, latency, counts, errors.

Это pipeline parallelism + data parallelism: readers I/O-bound, processing CPU/GPU-bound, writers I/O-bound.

Простыми словами:
Это как конвейер на заводе: каждый работник выполняет свою часть работы, и все части работают одновременно, но независимо друг от друга.

Пример:
На заводе по сборке машин один рабочий ставит колёса, другой — двигатель, третий — красит кузов. Все работают одновременно, но каждый над своей задачей.

5. Metrics — полная observability

Prometheus exporter: describe всех ключевых метрик (counters для counts, gauges для dynamic: queues, latency).
Функции inc_* и set_* — вызываются в orchestrator для реал-тайм мониторинга (autoscaling в K8s по latency/queues).

Prometheus — это система для сбора и хранения метрик (чисел, которые показывают, как работает ваша программа или сервер). А exporter — это специальная программа, которая собирает эти метрики из вашего приложения и отдаёт их Prometheus.

Пример:
Допустим, у вас есть сайт. Вы хотите знать, сколько людей заходит на сайт каждую секунду. Exporter будет считать этих людей и отправлять данные в Prometheus.

  • Counters — счётчики, которые только увеличиваются (например, сколько раз что-то произошло).
    Пример: Количество заказов на сайте. Каждый новый заказ увеличивает счётчик на 1.
  • Gauges — метрики, которые могут как увеличиваться, так и уменьшаться (например, текущая температура, количество людей в очереди).
    Пример: Количество людей, одновременно находящихся на сайте. Оно может то расти, то падать.
  • Queues — это очереди задач, которые ждут выполнения.
    Пример: На сайте 100 человек одновременно хотят купить билет. Если сервер может обработать только 10 заказов в секунду, остальные 90 попадают в очередь.
  • Latency — это время, которое проходит между отправкой запроса и получением ответа.
    Пример: Вы нажали на кнопку «Купить», а ответ пришёл через 2 секунды — это и есть latency.
  • inc_* — функции, которые увеличивают счётчики (counters) на 1.
    Пример: inc_orders() — увеличивает счётчик заказов на 1 каждый раз, когда кто-то делает заказ.
  • set_* — функции, которые устанавливают текущее значение для gauges.
    Пример: set_queue_length(5) — говорит, что в очереди сейчас 5 задач.
  • Реал-тайм мониторинг — это наблюдение за метриками прямо сейчас, без задержек.
  • Autoscaling — автоматическое увеличение или уменьшение количества серверов (подов в Kubernetes) в зависимости от нагрузки.
    Пример: Если очередь задач стала слишком большой, Kubernetes автоматически запустит ещё один сервер, чтобы справиться с нагрузкой.

6. Main — DI и запуск

  • Инициализация tracing + metrics.
  • Создание адаптеров (reader, processor, enricher с GPU, writers transactional, checkpoint).
  • Сборка orchestrator с конфигурацией parallelism/batch_size/threshold.
  • orchestrator.run().await — запуск пайплайна.

Подробное объяснение ML-модели в Enricher (максимальная архитектурная сложность)

MlEnricherмногопоточный параллельный batch-инференс на GPU/CPU с использованием PyTorch (через tch-rs bindings) строго следует принципам SOLID:

  • SRP: только обогащение батча изменений ML-предсказаниями (anomaly detection + classification).
  • OCP: расширяем (новые модели/фичи добавляются без изменения кода).
  • LSP: реализует ChangeEnricher трейт — взаимозаменяем с другими enricher'ами.
  • ISP: узкий интерфейс — только enrich_batch.
  • DIP: зависит от абстракции TorchModelProvider (загрузка модели).

это production-grade ML-интеграция в real-time поток с backpressure, latency-метриками и fallback на CPU.

1. TorchModelProvider — провайдер модели (одноразовая загрузка)

pub struct TorchModelProvider {
model: CModule, // TorchScript модуль (экспортированная модель)
device: Device, // GPU (Cuda) если доступно, иначе CPU
}

impl TorchModelProvider {
pub fn new(path: &str, device: Device) -> Result<Self, TchError> {
let mut model = CModule::load(path)?; // Загрузка .pt (TorchScript) один раз
model.set_eval(); // Режим inference (no dropout, batchnorm fixed)
Ok(Self { model, device })
}
}

  • Как работает: Модель загружается один раз при старте (lazy initialization в main через Arc для шаринга между потоками).
    CModule::load — читает TorchScript файл (.pt), экспортированный из PyTorch (torch.jit.script или trace).
    set_eval() — отключает training-режим для детерминированного инференса.
    device: Device::cuda_if_available() — автоматический GPU-fallback (максимальная производительность: тысячи событий/сек на modern GPU).
  • Поддержка GPU без дополнительного кода (tch-rs использует LibTorch с CUDA).
    Модель может быть любой: Autoencoder для anomaly reconstruction error, LSTM для последовательных паттернов, Transformer для контекста, или ensemble.

Пример модели (как обучить в PyTorch и экспортировать):

class AnomalyClassifier(torch.nn.Module):
def __init__(self, input_dim=10):
super().__init__()
self.fc = torch.nn.Sequential(
torch.nn.Linear(input_dim, 128),
torch.nn.ReLU(),
torch.nn.Linear(128, 64),
torch.nn.ReLU(),
torch.nn.Linear(64, 2) # [anomaly_score (0-1), class_prob (critical)]
)

def forward(self, x):
return torch.sigmoid(self.fc(x)) # или raw logits

# Обучение...
scripted = torch.jit.script(model.eval())
scripted.save("anomaly.pt")

2. MlEnricher — батчевый инференс с feature extraction

pub struct MlEnricher {
model: Arc<TorchModelProvider>, // Шаринг между потоками (thread-safe)
}

impl MlEnricher {
pub fn new(model: Arc<TorchModelProvider>) -> Self { Self { model } }

fn extract_features(&self, change: &ProcessedChange) -> Vec<f32> {
match change {
ProcessedChange::Order(o) => o.synthetic_features.clone(), // Из transformer
_ => vec![0.0; 10], // Fallback для неизвестных типов (OCP)
}
}
}

#[async_trait]
impl ChangeEnricher for MlEnricher {
async fn enrich_batch(&self, batch: Vec<ProcessedChange>) -> Result<Vec<EnrichedChange>, CdcError> {
if batch.is_empty() { return Ok(vec![]); }

// 1. Feature extraction (параллельно по батчу)
let features: Vec<Vec<f32>> = batch.iter().map(|c| self.extract_features(c)).collect();

// 2. Батчинг в Tensor (максимальная эффективность GPU)
let tensors: Vec<Tensor> = features.iter()
.map(|v| Tensor::from_slice(v).to(self.model.device)) // На GPU!
.collect();
let input = Tensor::stack(&tensors, 0); // [batch_size, feature_dim]

// 3. Parallel batch inference (весь батч за один forward!)
let output = self.model.model.forward_ts(&[input])?; // GPU-accelerated
let cpu_output = output.to_device(Device::Cpu); // Только для чтения результатов
let values: Vec<f32> = cpu_output.f_values()?; // Flat vector результатов

// 4. Парсинг output (предполагаем модель возвращает [anomaly_score, class_prob] на строку)
let feature_dim = values.len() / batch.len(); // Динамический (поддержка разных моделей)
let chunks = values.chunks(feature_dim);

let mut enriched = Vec::with_capacity(batch.len());
for (i, chunk) in chunks.enumerate() {
let anomaly_score = chunk[0]; // 0-1 sigmoid
let classification = if chunk.get(1).map(|&p| p > 0.5).unwrap_or(false) {
"critical".to_string()
} else {
"normal".to_string()
};

enriched.push(EnrichedChange {
inner: batch[i].clone(),
anomaly_score: Some(anomaly_score),
classification: Some(classification),
});
}

Ok(enriched)
}
}

Как работает inference шаг за шагом (максимальная сложность)

  1. Input: батч ProcessedChange (из transformer: synthetic_features — нормализованные признаки вроде log(amount), user_id, timestamp).
  2. Feature extraction: O(1) per event — простая копия/генерация (расширяемо для complex embedding).
  3. Батчинг:Каждый вектор фич → Tensor на device (GPU memcpy — асинхронно).
    Tensor::stack — один большой тензор [batch_size, dim] (256–1024 для оптимальной GPU utilization).
  4. Forward pass:forward_ts(&[input]) — один вызов модели на весь батч (параллелизм CUDA kernels: тысячи операций одновременно).
    Latency ~1-10ms на modern GPU для батча 256 (vs секунды на sequential CPU).
  5. Output parsing:to_cpu + f_values — эффективное чтение.
    Динамический chunking — модель может возвращать любой вектор (OCP).
    anomaly_score: reconstruction error или probability.
    classification: threshold на prob (critical если >0.5).

Почему это максимально эффективно?

  • Многопоточность: enrich_batch вызывается параллельно в orchestrator (parallelism_per_stage потоков) — динамический батчинг (orchestrator накапливает до batch_size).
  • GPU acceleration: весь батч на CUDA — 10-100x speedup vs CPU loop.
  • Backpressure integration: если GPU overloaded — очередь растёт, orchestrator замедляет readers.
  • Observability: latency метрика в Prometheus (record_inference_latency в orchestrator).
  • Расширяемость: новая модель (Siamese для dedup, LSTM для sequences) — просто новый .pt файл.
  • Надёжность: TchError → CdcError, fallback CPU.

В orchestrator это интегрировано с роутингом: если anomaly_score >= threshold — в quarantine Kafka topic (интеллектуальная фильтрация подозрительных изменений в реал-тайм).