Для чего нужна данная статья? :
- Разработать API и веб приложение для обработки данных БД с использованием машинного обучения:
HTTP Server Actix-web API endpoints
Async Runtime Tokio Неблокирующий I/O
DB Pool bb8-postgres Управление соединениями
ML Engine ndarray Матричные вычисления
Parallelism Rayon Параллельная обработка данных
Thread Safety RwLock + Arc Безопасный доступ к модели
Logging tracing Структурированные логи
Error Handling thiserror Типобезопасные ошибки
Зачем Вам это уметь? :
- Увеличить скорость создания схем данных с последующей обработкой.
📐 Общая архитектура
│ HTTP API (Actix-web) │
│ /health /train /metrics │
▼
│ Application State │
│ │ NeuralNet │ │ Repository │ │ Config (norm, map) │ │
│ │ (ML Model) │ │ (DB/Demo) │ │ │ │
▼ ▼
│ PostgreSQL (bb8) │ │ Demo Mode (RAM) │
│ Connection Pool │ │ Generated Data │
🛡️ Обработка ошибок
#[from] Автоматическое преобразование ошибок через ?
thiserror Генерация trait Error автоматически
ResponseError Превращает ошибки в JSON-ответы HTTP 500
🧠 Нейронная сеть (ML Core)
Архитектура сети
Входной слой (2 + gender) → Скрытый слой (64 нейрона, ReLU) → Выход (1 нейрон, Sigmoid)
[age, income, M/F] ↓ ↓
64 значения 0 или 1 (вероятность)
Структура модели
struct NeuralNetwork {
weights_input_hidden: Array2<f64>, // Веса вход→скрытый (n_features × 64)
bias_hidden: Array1<f64>, // Смещения скрытого слоя (64)
weights_hidden_output: Array1<f64>, // Веса скрытый→выход (64)
bias_output: f64, // Смещение выхода (1)
learning_rate: f64, // Скорость обучения (0.01)
}
Прямое распространение (Forward Pass)
Математика:
ReLU max(0, x) Нелинейность в скрытом слое
Sigmoid 1/(1+e^-x) Вероятность на выходе (0-1)
Обратное распространение (Backward Pass)
Визуализация backprop:
Ошибка (a2 - y)
↓
Градиент выхода (dz2)
↓
Градиент скрытого слоя (da1 = dz2 × W_ho.T)
↓
Градиент с учётом ReLU (dz1 = da1 × ReLU'(z1))
↓
Обновление весов (W -= lr × gradient)
Обновление весов (SGD)
Формула: W_new = W_old - learning_rate × gradient
🗄️ Слой работы с данными (Repository Pattern)
Пул соединений (Connection Pool)
Преимущества пула:
max_size 10 Ограничение соединений
min_idle 2 Всегда готовы 2 соединения
connection_timeout 30 сек Защита от зависаний
max_lifetime 30 мин Пересоздание старых соединений
Репозиторий (абстракция доступа к данным)
Методы репозитория:
get_normalization_params() SQL: MIN/MAX
get_gender_mapping() SQL: SELECT DISTINCT
fetch_training_batch() SQL: SELECT ... LIMIT
fetch_test_data() SQL: SELECT FROM test_table
Генерация демо-данных
Логика метки: Человек получает label=1, если доход > 80k И возраст > 35 лет.
🔄 Application State (Состояние приложения)
RwLock для потокобезопасности
Почему RwLock:
- /metrics → read() → много одновременных запросов
- /train → write() → эксклюзивный доступ на время обучения
Предобработка данных (Preprocessing)
age=30, income=50000, gender="M" [0.25, 0.17, 1.0, 0.0]
age=45, income=100000, gender="F" [0.63, 0.58, 0.0, 1.0]
🌐 API Layer (HTTP Endpoints)
Маршруты
/health GET Проверка работоспособности
/train POST Обучение модели
/metrics GET Точность модели
Обработчик обучения
Пример запроса:
curl -X POST http://127.0.0.1:8080/train \
-H "Content-Type: application/json" \
-d '{"epochs": 5}'
📊 Поток данных (Data Flow)
│ 1. HTTP POST /train?epochs=5 │
▼
│ 2. train_model() → цикл 5 эпох │
▼
│ 3. AppState::train_epoch() │
│ ├─ repo.fetch_training_batch(1000) ← БД или демо │
│ ├─ preprocess_batch() ← Rayon параллельно │
│ └─ model.write().fit_batch() ← SGD обновление │
▼
│ 4. AppState::evaluate() │
│ ├─ repo.fetch_test_data() │
│ ├─ preprocess_batch() │
│ ├─ model.read().predict() │
│ └─ accuracy = correct / total │
▼
│ 5. JSON Response: {"status": "trained", "accuracy": 0.87} │
🎯 Как использовать
Быстрый старт (демо):
cargo run --release
curl http://127.0.0.1:8080/health
curl -X POST http://127.0.0.1:8080/train -d '{"epochs": 10}'
curl http://127.0.0.1:8080/metrics
Продакшен (с БД):
DEMO_MODE=false DB_HOST=localhost DB_USER=postgres DB_NAME=mydb DB_PASSWORD=pass cargo run --release
Создайте базу и таблицы
psql -U postgres << 'EOF'
CREATE DATABASE mydb;
\c mydb
CREATE TABLE IF NOT EXISTS mytable (
age INTEGER,
income FLOAT8,
gender TEXT,
label INTEGER
);
CREATE TABLE IF NOT EXISTS test_table (
age INTEGER,
income FLOAT8,
gender TEXT,
label INTEGER
);
INSERT INTO mytable (age, income, gender, label)
SELECT
floor(random() * 40 + 20)::int,
random() * 120000 + 30000,
CASE WHEN random() > 0.5 THEN 'M' ELSE 'F' END,
CASE WHEN random() > 0.5 THEN 1 ELSE 0 END
FROM generate_series(1, 1000);
INSERT INTO test_table (age, income, gender, label)
SELECT
floor(random() * 40 + 20)::int,
random() * 120000 + 30000,
CASE WHEN random() > 0.5 THEN 'M' ELSE 'F' END,
CASE WHEN random() > 0.5 THEN 1 ELSE 0 END
FROM generate_series(1, 200);
EOF