Найти в Дзене
Один Rust не п...Rust

Планировщик с ML

Tokio для полноценной асинхронной поддержки.
Таймер Tokio для неблокирующих задержек.
Безблокировочная архитектура с асинхронными мьютексами.
Выделенный канал для передачи обучающих данных.
Фоновый поток для обучения модели.
Неблокирующее прогнозирование во время планирования.
Фьючерсы.
Система Waker - обрабатывается средой выполнения Tokio
Импорты: Ключевые структуры: PriorityTask реализует трейты для сравнения: Метод spawn выполняет: Алгоритм run: Метод calculate_features формирует вектор признаков на основе: Асинхронный метод train_loop: Упрощённая обёртка над tokio::time::sleep для приостановки выполнения на заданное время. Функция main:
Оглавление

GitHub - nicktretyakov/SchedulerML-
ML на RUST без заморочек
Rust ML без заморочек

Для минимальной реализации планировщика задач с использоанием ML необходимо следующее:

Tokio для полноценной асинхронной поддержки.
Таймер Tokio для неблокирующих задержек.
Безблокировочная архитектура с асинхронными мьютексами.
Выделенный канал для передачи обучающих данных.
Фоновый поток для обучения модели.
Неблокирующее прогнозирование во время планирования.
Фьючерсы.
Система Waker - обрабатывается средой выполнения Tokio

1. Используемые библиотеки и структуры данных

Импорты:

  • std::collections::{HashMap, BinaryHeap} — для хранения завершённых задач и очереди с приоритетами;
  • std::sync::{Arc, Mutex} — потокобезопасное разделение данных;
  • tokio::sync::{mpsc, Mutex as AsyncMutex} — асинхронные каналы и мьютексы;
  • ndarray::Array1 — одномерные массивы для ML‑операций;
  • rand::Rng — генерация случайных приоритетов.

Ключевые структуры:

  1. TaskMetadata — метаданные задачи:
    id — уникальный идентификатор;
    priority — приоритет (влияет на порядок выполнения);
    dependencies — список ID зависимых задач;
    start_time — время создания задачи.
  2. MLModel — упрощённая ML‑модель:
    weights — веса модели (инициализируются единицами);
    predict() — делает предсказание через скалярное произведение;
    train_loop() — асинхронный цикл обучения (принимает данные через канал).
  3. Scheduler — основной планировщик:
    task_queue — очередь задач с приоритетами (BinaryHeap);
    ml_sender — канал для отправки данных в поток обучения;
    task_counter — счётчик созданных задач;
    completed_tasks — словарь времени выполнения завершённых задач.
  4. PriorityTask — задача с приоритетом:
    metadata — метаданные;
    future — асинхронная операция, которую нужно выполнить.

2. Реализация приоритетов

PriorityTask реализует трейты для сравнения:

  • PartialOrd и Ord — определяют порядок в BinaryHeap;
  • приоритет сравнивается в обратном порядке (other.priority vs self.priority), чтобы BinaryHeap работал как max‑heap (задачи с высшим приоритетом выполняются первыми);
  • Eq и PartialEq — сравнение по id.

3. Инициализация планировщика (Scheduler::new)

  1. Создаёт канал mpsc для связи с ML‑моделью.
  2. Запускает асинхронный поток обучения через tokio::spawn, передавая ml_receiver в train_loop.
  3. Инициализирует:
    пустую очередь задач (BinaryHeap::new());
    счётчик задач (Mutex<usize>);
    словарь завершённых задач (HashMap).

4. Добавление задач (spawn)

Метод spawn выполняет:

  1. Генерирует уникальный id через атомарный счётчик.
  2. Создаёт TaskMetadata:
    случайный приоритет (rand::thread_rng().gen_range(0.0..1.0));
    зависимости (dependencies);
    время старта (Instant::now()).
  3. Упаковывает асинхронную операцию (future) в PriorityTask.
  4. Добавляет задачу в task_queue.

5. Основной цикл выполнения (run)

Алгоритм run:

  1. Создаёт упрощённую ML‑модель (MLModel::new(3)).
  2. В бесконечном цикле:
    извлекает задачу из task_queue;
    вычисляет признаки для задачи через calculate_features;
    обновляет приоритет задачи с помощью model.predict;
    возвращает задачу в очередь с новым приоритетом;
    повторно извлекает задачу с наивысшим приоритетом;
    выполняет асинхронную операцию (task.future.await);
    фиксирует время выполнения (start_time.elapsed());
    отправляет данные для обучения в ML‑поток:
    target = 1.0, если задача выполнена быстро (< 100 мс);
    target = 0.0 в противном случае.

6. Расчёт признаков (calculate_features)

Метод calculate_features формирует вектор признаков на основе:

  • текущего приоритета задачи (metadata.priority);
  • количества зависимостей (metadata.dependencies.len());
  • доли успешно выполненных зависимостей (success_rate):
    подсчитывает, сколько зависимостей уже завершены (completed.contains_key(id));
    делит на общее количество зависимостей (с защитой от деления на ноль).

7. Обучение модели (train_loop)

Асинхронный метод train_loop:

  • непрерывно принимает пары (features, target) через канал;
  • имитирует обучение (в данном коде — просто выводит данные в консоль);
  • работает в фоновом потоке, запущенном при инициализации планировщика.

8. Асинхронный сон (async_sleep)

Упрощённая обёртка над tokio::time::sleep для приостановки выполнения на заданное время.

9. Точка входа (main)

Функция main:

  1. Создаёт экземпляр Scheduler.
  2. Добавляет тестовую задачу через spawn:
    задача печатает «Task 1 started»;
    спит 1 секунду (async_sleep);
    печатает «Task 1 completed».
  3. Запускает основной цикл планировщика (scheduler.run().await).

Поток данных и взаимодействие компонентов

  1. Создание задачи → spawn добавляет её в очередь.
  2. Планировщик → run извлекает задачу, обновляет приоритет через ML‑модель, выполняет.
  3. Признаки → calculate_features собирает данные о задаче и её зависимостях.
  4. Обучение → train_loop получает пары (признаки, целевая метка) и адаптирует модель.
  5. Обратная связь → время выполнения задачи влияет на целевую метку для обучения.