Для чего нужна данная статья?:
Написать код CDC real-time data processing обладающий следующими свойствами:
- Масштабируемость: тысячи партиций (добавляйте readers), авто-backpressure предотвращает OOM.
Тысячи партиций: данные делятся на множество независимых частей (партиций), каждая из которых обрабатывается отдельно. Например, как в супермаркете: вместо одной кассы — десятки, и каждый покупатель идёт к своей.
Авто-backpressure: если система не успевает обрабатывать данные, она автоматически замедляет их поступление, чтобы не перегрузиться (как если бы кассир просил покупателей подождать, пока освободится место на ленте).
- Интеллект: ML-батчинг на GPU детектирует аномалии в реал-тайм, роутинг в quarantine.
ML-батчинг на GPU: данные собираются в небольшие пакеты (батчи) и анализируются на видеокартах (GPU), что ускоряет процесс. Например, как если бы вы сортировали фрукты не по одному, а сразу горстями.
Детектирование аномалий: система находит необычные события (например, внезапный скачок температуры на производстве или подозрительная транзакция в банке).
Роутинг в quarantine: подозрительные данные отправляются на дополнительную проверку (как в аэропорту: подозрительный багаж проверяют отдельно).
- Надёжность: exactly-once (Kafka transactions), checkpointing (resume после рестарта), error handling с метриками.
Exactly-once (Kafka transactions): каждое событие обрабатывается ровно один раз, без дубликатов или потерь. Например, как если бы вы платили за товар один раз, а не дважды или вообще не платили.
Checkpointing: система сохраняет состояние обработки, чтобы после перезапуска продолжить с того же места (как закладка в книге).
Error handling с метриками: ошибки фиксируются и анализируются, чтобы быстро находить и исправлять проблемы.
- Поддерживаемость: SOLID + Hexagonal — новые источники/синки/ML-модели добавляются как плагины без изменения core/orchestrator.
SOLID + Hexagonal архитектура: код разбит на независимые модули, которые можно менять или добавлять, не ломая всю систему. Например, как конструктор Lego: можно заменить одну деталь, не разбирая всю постройку.
Плагины: новые источники данных, модели или способы сохранения добавляются как отдельные модули (как установка нового приложения на телефон).
- Performance: parallel batch inference (256+ событий за инференс), staged parallelism (CPU/GPU/I/O отдельно).
Parallel batch inference: данные анализируются параллельно большими партиями (например, 256 событий за один раз), что ускоряет процесс.
Staged parallelism: разные этапы обработки (CPU, GPU, ввод/вывод) работают параллельно, не мешая друг другу. Например, как на кухне: пока один повар режет овощи, другой уже жарит мясо, а третий моет посуду.
Архитектура (Ports & Adapters):
- Core: чистая доменная логика (типы изменений, ошибки).
Простыми словами: Это сердце программы, где описано, что она должна делать, но не как это делать.
Пример:
Представьте, что вы пишете программу для банка. В Core будет логика: "Если на счёте достаточно денег, разрешить перевод". Но не будет кода, который реально списывает деньги с базы данных.
- Ports: абстрактные трейты (интерфейсы) — обеспечивают DIP (Dependency Inversion) и ISP (Interface Segregation).
Простыми словами: Это "контракты" или правила, по которым другие части программы должны общаться с Core.
Пример:
Продолжим банковский пример. Port может быть интерфейсом "ПереводДенег" с методом выполнитьПеревод(сумма). Конкретная реализация (например, через базу данных или API) будет подключаться к этому интерфейсу.
- Adapters: конкретные реализации (PostgreSQL polling reader, simple transformer, Torch-based ML enricher с GPU-батчингом, Kafka transactional writer, sled checkpoint).
Простыми словами: Это реальные "детали", которые выполняют работу по правилам из Ports.
Пример:
Адаптер для базы данных PostgreSQL будет реализовывать метод выполнитьПеревод(сумма) так, чтобы деньги реально списывались из базы.
- Orchestrator: координация всего пайплайна — многопоточный, асинхронный, с backpressure и интеллектуальным роутингом аномалий.
Простыми словами: Это "дирижёр", который управляет, в каком порядке и как быстро выполняются все части программы.
Пример:
В банке Orchestrator может следить, чтобы переводы не накапливались в очереди, а обрабатывались быстро и без ошибок, даже если одновременно пришло много запросов.
- Metrics: детальная observability для production (latency, throughput, queues, errors).
Простыми словами: Это инструменты, которые показывают, как работает программа в реальном времени (скорость, ошибки, загрузка).
Пример:
Как в банке на экране оператора видно, сколько переводов в секунду проходит, нет ли задержек или ошибок.
Проект использует Tokio для асинхронности, tch для ML-инференса на GPU/CPU, rdkafka для exactly-once, sled для checkpointing.
- Tokio — библиотека для асинхронной работы (чтобы программа могла делать много дел одновременно).
- tch — обёртка для PyTorch, чтобы запускать машинное обучение на GPU/CPU.
- rdkafka — библиотека для работы с Kafka (система для передачи сообщений между программами).
- sled — база данных для сохранения состояния (например, чтобы запомнить, какие данные уже обработали).
1. Core модуль (src/core/)
src/core/change.rs — доменные типы (SRP: только данные изменений).
- RawChange: сырое событие из источника (id, partition_key для шардинга, operation, payload как JSON, timestamp). Это исходное, необработанное событие, которое приходит из какого-то источника данных (например, базы данных, лога, очереди сообщений). Оно содержит минимально необходимую информацию для дальнейшей обработки.
- Operation: enum для Insert/Update/Delete. Это событие, которое уже преобразовано в удобный для бизнеса формат. Оно содержит не сырые данные, а структурированную информацию, понятную для дальнейшей логики (например, для машинного обучения).
- ProcessedChange: enum (расширяемый) — сейчас только Order(OrderChange) с бизнес-данными и synthetic_features для ML. Это событие, которое уже преобразовано в удобный для бизнеса формат. Оно содержит не сырые данные, а структурированную информацию, понятную для дальнейшей логики (например, для машинного обучения).
Что внутри?
Сейчас упоминается только Order(OrderChange) — то есть изменения, связанные с заказами, с бизнес-данными (например, сумма заказа, статус, список товаров).
synthetic_features — искусственно созданные признаки для машинного обучения (например, "средняя сумма заказов пользователя за месяц").
- EnrichedChange: обогащённое изменение (inner + anomaly_score + classification от ML). Это событие, которое уже прошло через машинное обучение или другие системы анализа. Оно содержит не только бизнес-данные, но и результаты анализа (например, оценку аномальности, классификацию).
Что внутри?
- inner — исходное ProcessedChange.
- anomaly_score — оценка, насколько это событие необычное (например, 0.9 — очень подозрительно).
- classification — результат классификации (например, "мошенничество", "нормальный заказ")
Это обеспечивает OCP (Open/Closed): новые типы изменений добавляются без изменения существующего кода. Это принцип проектирования: код должен быть открыт для расширения, но закрыт для изменения. То есть, чтобы добавить новый тип изменения (например, не только заказы, но и возвраты), не нужно менять уже существующий код — достаточно добавить новый вариант в enum.
Пример:
Если завтра понадобится обрабатывать не только заказы, но и возвраты, можно просто добавить новый вариант в enum:
rustCopyenum ProcessedChange {
Order(OrderChange),
Return(ReturnChange), // новый тип
}
При этом старый код для обработки заказов не меняется.
src/core/error.rs : thiserror-derive enum с From для всех внешних ошибок (IO, Postgres, Sled, Tch, Kafka) — цепочка ошибок для надёжности. Это файл, где описаны все возможные ошибки, которые могут возникнуть в системе (например, ошибки чтения/записи, базы данных, очередей). Используется библиотека thiserror для удобного создания цепочек ошибок (чтобы понимать, какая ошибка к какой привела).
Пример:
Если не удалось записать данные в базу, ошибка может выглядеть так:
CopyError: DatabaseError { source: IOError { ... } }
Это помогает быстро найти причину проблемы.
Итоговая схема:
CopyRawChange (сырые данные)
→ ProcessedChange (бизнес-логика)
→ EnrichedChange (ML-анализ)
Каждый этап добавляет новые данные, не меняя предыдущие.
2. Ports (src/ports/) — SOLID в чистом виде
Трейты — аналоги интерфейсов Java/C#.
- reader.rs: BatchChangeReader (наследует ChangeReader) — чтение батчами для эффективности (batch inference). Простыми словами:
Это компонент, который читает данные не по одному, а большими порциями (батчами), чтобы ускорить обработку. Представьте, что вы загружаете фото в Instagram: вместо того, чтобы отправлять их по одному, вы отправляете сразу 10 — так быстрее и эффективнее.
Пример:
Если у вас есть 1000 записей в базе данных, вместо того, чтобы читать каждую по отдельности, вы читаете сразу по 100 записей за раз.
- processor.rs: ChangeProcessor — трансформация raw → processed (валидация, фичи). Простыми словами:
Это компонент, который берёт "сырые" данные (например, текст из формы обратной связи) и преобразует их в удобный формат: проверяет на ошибки, добавляет нужные поля, очищает от мусора.
Пример:
Вы заполняете анкету на сайте: ваш возраст — "двадцать пять". Процессор преобразует это в число 25, проверяет, что это корректный возраст, и добавляет метку "взрослый".
- enricher.rs: ChangeEnricher — батчевое обогащение ML (parallel inference). Простыми словами:
Это компонент, который добавляет к данным новую информацию с помощью машинного обучения. Например, по фотографии определяет, что на ней кот, и добавляет тег "кот". Работает параллельно — сразу с несколькими данными, чтобы быстрее.
Пример:
У вас есть список товаров. Обогатитель анализирует описания и добавляет теги: "электроника", "бытовая техника", "акция".
- writer.rs: ChangeWriter — батчевая запись + commit для exactly-once. Простыми словами:
Это компонент, который сохраняет обработанные данные большими порциями и гарантирует, что каждая запись будет сохранена ровно один раз — без дубликатов и потерь.
Пример:
Вы переводите деньги с карты на карту. Важно, чтобы сумма списалась и зачислилась ровно один раз, а не дважды или ни разу.
Все трейты async, Send + Sync — для многопоточности. LSP соблюдается: все реализации взаимозаменяемы. Простыми словами:
- async — компоненты работают асинхронно, то есть не блокируют друг друга, пока ждут ответа (например, пока один компонент ждёт данные из базы, другой может заниматься обработкой).
- Send + Sync — компоненты можно безопасно использовать в нескольких потоках одновременно, как несколько поваров на кухне, которые не мешают друг другу.
Пример:
Вы загружаете видео на YouTube и одновременно редактируете описание — оба процесса идут параллельно.
3. Adapters — конкретные плагины (OCP: расширяемость)
postgres_polling.rs (partition-level reader):
- Подключается к Postgres, поллит по id > last_id (incremental).
- last_id в Mutex для thread-safety.
- read_batch возвращает Vec<RawChange> — батч для эффективности.
- poll_interval для idle.
Что делает:
Модуль постоянно проверяет базу данных PostgreSQL на наличие новых записей, начиная с последнего сохранённого id.
Ключевые понятия:
- Incremental polling (инкрементальный опрос):
Вместо того, чтобы каждый раз скачивать все данные, код запоминает последний обработанный id (например, last_id = 1000) и в следующий раз запрашивает только записи с id > 1000.
Пример: Если в базе появились записи с id 1001, 1002, 1003 — код обработает только их. - Mutex (мьютекс):
Это "замок" для защиты переменной last_id от одновременного изменения несколькими потоками. Без мьютекса два потока могут одновременно изменить last_id, что приведёт к ошибкам.
Пример: Представьте, что два человека пытаются одновременно записать номер последней обработанной записи в блокнот — без замка они перезапишут друг друга. - read_batch (чтение пачками):
Вместо обработки каждой записи по отдельности, код читает сразу несколько записей (например, 100 штук) — это ускоряет работу.
Пример: Как если бы вы несли книги из библиотеки не по одной, а сразу стопкой. - poll_interval (интервал опроса):
Если новых данных нет, код не будет постоянно нагружать базу — он делает паузу (например, 1 секунду) перед следующим запросом.
simple_transformer.rs:
- Process: парсит payload, валидация (negative amount → error), генерирует synthetic_features (log10(amount), normalized user_id, timestamp) для ML.
- Возвращает ProcessedChange::Order.
Что делает:
Модуль парсит и валидирует данные, а также готовит их для машинного обучения.
Ключевые понятия:
- Парсинг payload (разбор данных):
Код извлекает из сырых данных нужные поля (например, сумму заказа, id пользователя, время).
Пример: Из строки {"order_id": 123, "amount": 500} извлекаются order_id и amount. - Валидация (проверка данных):
Если сумма заказа отрицательная — код возвращает ошибку.
Пример: Если amount = -100, это ошибка — сумма не может быть отрицательной. - Synthetic features (синтетические признаки):
Код создаёт новые признаки на основе исходных данных, чтобы улучшить работу ML-модели.
Примеры:log10(amount) — логарифм суммы (чтобы большие суммы не искажали модель).
normalized user_id — приведение id пользователя к диапазону 0–1.
timestamp — время заказа в удобном формате.
ml_enricher.rs (многопоточный ML с батчингом):
- TorchModelProvider: загружает TorchScript модель один раз, set_eval, device (GPU if available).
- extract_features: берёт synthetic_features из OrderChange.
- enrich_batch:Собирает фичи в Vec<Vec<f32>>.
Stack в Tensor на device (GPU для скорости).
forward_ts — инференс всего батча параллельно на GPU.
to_cpu + f_values — результаты.
Парсит anomaly_score и classification. - Максимальная сложность: dynamic batch size, GPU acceleration, fallback.
Что делает:
Модуль загружает модель машинного обучения и применяет её к данным, используя GPU для ускорения.
Ключевые понятия:
- TorchScript модель:
Модель машинного обучения, сохранённая в формате TorchScript (можно быстро загрузить и использовать).
Пример: Как если бы вы сохранили рецепт торта в книге, чтобы потом быстро его испечь. - Batching (пакетная обработка):
Вместо того, чтобы обрабатывать каждую запись по отдельности, код собирает их в пакеты (например, по 32 записи) и обрабатывает сразу.
Пример: Как если бы вы пекли не один пирог, а сразу несколько на одном противне. - GPU acceleration (ускорение на GPU):
Модель работает на видеокарте (GPU), что намного быстрее, чем на процессоре (CPU).
Пример: Как если бы вы считали на калькуляторе вместо счётов на бумаге. - Dynamic batch size (динамический размер пакета):
Размер пакета может меняться в зависимости от доступной памяти.
Пример: Если памяти мало — пакет из 10 записей, если много — из 100. - Fallback (откат):
Если GPU недоступен, модель может работать на CPU.
Пример: Если электричество отключили, вы можете готовить на газовой плите.
kafka_transactional.rs (exactly-once writer):
- FutureProducer с transactional.id и idempotence.
- init_transactions при создании.
- write_batch: begin_transaction → send с key (order_id) → commit_transaction.
- Обрабатывает ошибки через CdcError::Kafka.
Что делает:
Модуль записывает данные в Kafka так, чтобы каждая запись была доставлена ровно один раз (exactly-once).
Ключевые понятия:
- Transactional producer (транзакционный продюсер):
Код начинает транзакцию, отправляет данные, и только если всё прошло успешно — фиксирует транзакцию.
Пример: Как если бы вы переводили деньги: сначала проверяете баланс, потом списываете, и только потом подтверждаете перевод. - Idempotence (идempотентность):
Если одна и та же запись отправится дважды, Kafka не создаст дубликат.
Пример: Если вы дважды нажали "Отправить" в мессенджере, сообщение отправится только один раз. - Key (order_id):
Каждая запись имеет уникальный ключ (например, order_id), чтобы Kafka могла правильно распределять данные по партициям.
checkpoint.rs:
- Sled Db (embedded KV) для offsets по partition_key.
- save/load_offset с to_be_bytes — атомарно и надёжно.
Что делает:
Модуль сохраняет, до какого id или offset данные уже обработаны, чтобы при перезапуске не начинать с нуля.
Ключевые понятия:
- Sled Db (встроенная база данных):
Это лёгкая база данных, которая хранит данные прямо на диске, без отдельного сервера.
Пример: Как если бы вы записывали прогресс чтения книги на листочке, который всегда лежит рядом. - Offset (смещение):
Это номер последней обработанной записи (например, offset = 1000).
Пример: Если вы читаете книгу и запомнили, что дочитали до 100 страницы — это ваш offset. - Атомарность (atomic operations):
Сохранение offset происходит целиком и сразу, без риска потерять данные при сбое.
Пример: Как если бы вы записывали номер страницы одним движением ручки, а не по цифрам.
4. Orchestrator — многопоточный staged pipeline
run():
- Создаёт bounded mpsc channels (backpressure: max_inflight_batches ограничивает память).
Простыми словами:
Это как очередь в магазине, где количество мест ограничено. Если очередь заполнилась, новые покупатели не могут встать, пока кто-то не освободит место. Такой подход предотвращает перегрузку системы.
Пример:
Представьте, что у вас есть корзина для почты. Вы можете положить туда только 10 писем. Если корзина заполнена, новые письма не будут приниматься, пока вы не обработаете хотя бы одно.
- Reader stage: по задаче на reader (partition parallelism) — бесконечный loop, read_batch, метрики (inc_changes_read), send в tx_raw.
Простыми словами:
Это часть программы, которая читает данные из какого-то источника (например, из базы данных или файла) и отправляет их дальше на обработку.
Пример:
Представьте, что у вас есть книга, и вы читаете её по одной странице, а потом передаёте прочитанное другу для анализа.
- Processing stage: parallelism_per_stage задач — recv raw_batch → parallel transform (join_all) → enrich_batch с latency метрикой → partition на normal/quarantine по threshold → send в tx_enriched.
Простыми словами:
Здесь данные, полученные от читателей, обрабатываются: трансформируются, обогащаются, проверяются на ошибки.
Пример:
Ваш друг получает страницы книги, выделяет главные мысли, добавляет свои комментарии и сортирует страницы по темам.
- Writer stage: отдельные задачи — recv, detect anomaly in batch, choose writer (normal/quarantine), write_batch.
Простыми словами:
Это финальный этап, где обработанные данные сохраняются в нужное место (например, в другую базу данных или файл).
Пример:
После анализа страниц ваш друг записывает итоговые выводы в тетрадь.
- Graceful shutdown по Ctrl+C или завершению задач.
- Метрики на всех этапах: queues size, latency, counts, errors.
Это pipeline parallelism + data parallelism: readers I/O-bound, processing CPU/GPU-bound, writers I/O-bound.
Простыми словами:
Это как конвейер на заводе: каждый работник выполняет свою часть работы, и все части работают одновременно, но независимо друг от друга.
Пример:
На заводе по сборке машин один рабочий ставит колёса, другой — двигатель, третий — красит кузов. Все работают одновременно, но каждый над своей задачей.
5. Metrics — полная observability
Prometheus exporter: describe всех ключевых метрик (counters для counts, gauges для dynamic: queues, latency).
Функции inc_* и set_* — вызываются в orchestrator для реал-тайм мониторинга (autoscaling в K8s по latency/queues).
Prometheus — это система для сбора и хранения метрик (чисел, которые показывают, как работает ваша программа или сервер). А exporter — это специальная программа, которая собирает эти метрики из вашего приложения и отдаёт их Prometheus.
Пример:
Допустим, у вас есть сайт. Вы хотите знать, сколько людей заходит на сайт каждую секунду. Exporter будет считать этих людей и отправлять данные в Prometheus.
- Counters — счётчики, которые только увеличиваются (например, сколько раз что-то произошло).
Пример: Количество заказов на сайте. Каждый новый заказ увеличивает счётчик на 1. - Gauges — метрики, которые могут как увеличиваться, так и уменьшаться (например, текущая температура, количество людей в очереди).
Пример: Количество людей, одновременно находящихся на сайте. Оно может то расти, то падать. - Queues — это очереди задач, которые ждут выполнения.
Пример: На сайте 100 человек одновременно хотят купить билет. Если сервер может обработать только 10 заказов в секунду, остальные 90 попадают в очередь. - Latency — это время, которое проходит между отправкой запроса и получением ответа.
Пример: Вы нажали на кнопку «Купить», а ответ пришёл через 2 секунды — это и есть latency. - inc_* — функции, которые увеличивают счётчики (counters) на 1.
Пример: inc_orders() — увеличивает счётчик заказов на 1 каждый раз, когда кто-то делает заказ. - set_* — функции, которые устанавливают текущее значение для gauges.
Пример: set_queue_length(5) — говорит, что в очереди сейчас 5 задач. - Реал-тайм мониторинг — это наблюдение за метриками прямо сейчас, без задержек.
- Autoscaling — автоматическое увеличение или уменьшение количества серверов (подов в Kubernetes) в зависимости от нагрузки.
Пример: Если очередь задач стала слишком большой, Kubernetes автоматически запустит ещё один сервер, чтобы справиться с нагрузкой.
6. Main — DI и запуск
- Инициализация tracing + metrics.
- Создание адаптеров (reader, processor, enricher с GPU, writers transactional, checkpoint).
- Сборка orchestrator с конфигурацией parallelism/batch_size/threshold.
- orchestrator.run().await — запуск пайплайна.
Подробное объяснение ML-модели в Enricher (максимальная архитектурная сложность)
MlEnricher — многопоточный параллельный batch-инференс на GPU/CPU с использованием PyTorch (через tch-rs bindings) строго следует принципам SOLID:
- SRP: только обогащение батча изменений ML-предсказаниями (anomaly detection + classification).
- OCP: расширяем (новые модели/фичи добавляются без изменения кода).
- LSP: реализует ChangeEnricher трейт — взаимозаменяем с другими enricher'ами.
- ISP: узкий интерфейс — только enrich_batch.
- DIP: зависит от абстракции TorchModelProvider (загрузка модели).
это production-grade ML-интеграция в real-time поток с backpressure, latency-метриками и fallback на CPU.
1. TorchModelProvider — провайдер модели (одноразовая загрузка)
pub struct TorchModelProvider {
model: CModule, // TorchScript модуль (экспортированная модель)
device: Device, // GPU (Cuda) если доступно, иначе CPU
}
impl TorchModelProvider {
pub fn new(path: &str, device: Device) -> Result<Self, TchError> {
let mut model = CModule::load(path)?; // Загрузка .pt (TorchScript) один раз
model.set_eval(); // Режим inference (no dropout, batchnorm fixed)
Ok(Self { model, device })
}
}
- Как работает: Модель загружается один раз при старте (lazy initialization в main через Arc для шаринга между потоками).
CModule::load — читает TorchScript файл (.pt), экспортированный из PyTorch (torch.jit.script или trace).
set_eval() — отключает training-режим для детерминированного инференса.
device: Device::cuda_if_available() — автоматический GPU-fallback (максимальная производительность: тысячи событий/сек на modern GPU). - Поддержка GPU без дополнительного кода (tch-rs использует LibTorch с CUDA).
Модель может быть любой: Autoencoder для anomaly reconstruction error, LSTM для последовательных паттернов, Transformer для контекста, или ensemble.
Пример модели (как обучить в PyTorch и экспортировать):
class AnomalyClassifier(torch.nn.Module):
def __init__(self, input_dim=10):
super().__init__()
self.fc = torch.nn.Sequential(
torch.nn.Linear(input_dim, 128),
torch.nn.ReLU(),
torch.nn.Linear(128, 64),
torch.nn.ReLU(),
torch.nn.Linear(64, 2) # [anomaly_score (0-1), class_prob (critical)]
)
def forward(self, x):
return torch.sigmoid(self.fc(x)) # или raw logits
# Обучение...
scripted = torch.jit.script(model.eval())
scripted.save("anomaly.pt")
2. MlEnricher — батчевый инференс с feature extraction
pub struct MlEnricher {
model: Arc<TorchModelProvider>, // Шаринг между потоками (thread-safe)
}
impl MlEnricher {
pub fn new(model: Arc<TorchModelProvider>) -> Self { Self { model } }
fn extract_features(&self, change: &ProcessedChange) -> Vec<f32> {
match change {
ProcessedChange::Order(o) => o.synthetic_features.clone(), // Из transformer
_ => vec![0.0; 10], // Fallback для неизвестных типов (OCP)
}
}
}
#[async_trait]
impl ChangeEnricher for MlEnricher {
async fn enrich_batch(&self, batch: Vec<ProcessedChange>) -> Result<Vec<EnrichedChange>, CdcError> {
if batch.is_empty() { return Ok(vec![]); }
// 1. Feature extraction (параллельно по батчу)
let features: Vec<Vec<f32>> = batch.iter().map(|c| self.extract_features(c)).collect();
// 2. Батчинг в Tensor (максимальная эффективность GPU)
let tensors: Vec<Tensor> = features.iter()
.map(|v| Tensor::from_slice(v).to(self.model.device)) // На GPU!
.collect();
let input = Tensor::stack(&tensors, 0); // [batch_size, feature_dim]
// 3. Parallel batch inference (весь батч за один forward!)
let output = self.model.model.forward_ts(&[input])?; // GPU-accelerated
let cpu_output = output.to_device(Device::Cpu); // Только для чтения результатов
let values: Vec<f32> = cpu_output.f_values()?; // Flat vector результатов
// 4. Парсинг output (предполагаем модель возвращает [anomaly_score, class_prob] на строку)
let feature_dim = values.len() / batch.len(); // Динамический (поддержка разных моделей)
let chunks = values.chunks(feature_dim);
let mut enriched = Vec::with_capacity(batch.len());
for (i, chunk) in chunks.enumerate() {
let anomaly_score = chunk[0]; // 0-1 sigmoid
let classification = if chunk.get(1).map(|&p| p > 0.5).unwrap_or(false) {
"critical".to_string()
} else {
"normal".to_string()
};
enriched.push(EnrichedChange {
inner: batch[i].clone(),
anomaly_score: Some(anomaly_score),
classification: Some(classification),
});
}
Ok(enriched)
}
}
Как работает inference шаг за шагом (максимальная сложность)
- Input: батч ProcessedChange (из transformer: synthetic_features — нормализованные признаки вроде log(amount), user_id, timestamp).
- Feature extraction: O(1) per event — простая копия/генерация (расширяемо для complex embedding).
- Батчинг:Каждый вектор фич → Tensor на device (GPU memcpy — асинхронно).
Tensor::stack — один большой тензор [batch_size, dim] (256–1024 для оптимальной GPU utilization). - Forward pass:forward_ts(&[input]) — один вызов модели на весь батч (параллелизм CUDA kernels: тысячи операций одновременно).
Latency ~1-10ms на modern GPU для батча 256 (vs секунды на sequential CPU). - Output parsing:to_cpu + f_values — эффективное чтение.
Динамический chunking — модель может возвращать любой вектор (OCP).
anomaly_score: reconstruction error или probability.
classification: threshold на prob (critical если >0.5).
Почему это максимально эффективно?
- Многопоточность: enrich_batch вызывается параллельно в orchestrator (parallelism_per_stage потоков) — динамический батчинг (orchestrator накапливает до batch_size).
- GPU acceleration: весь батч на CUDA — 10-100x speedup vs CPU loop.
- Backpressure integration: если GPU overloaded — очередь растёт, orchestrator замедляет readers.
- Observability: latency метрика в Prometheus (record_inference_latency в orchestrator).
- Расширяемость: новая модель (Siamese для dedup, LSTM для sequences) — просто новый .pt файл.
- Надёжность: TchError → CdcError, fallback CPU.
В orchestrator это интегрировано с роутингом: если anomaly_score >= threshold — в quarantine Kafka topic (интеллектуальная фильтрация подозрительных изменений в реал-тайм).