Добавить в корзинуПозвонить
Найти в Дзене
Один Rust не п...Rust

Примитивы синхронизации и как их использовать для решения ML задач

Кроме того, существуют экспериментальные подмодули: Arc для совместного использования, RwLock для чтения/записи весов, Mutex и Condvar для ожидания, Barrier для синхронизации эпох, mpsc-каналы для градиентов, атомарные типы для счётчиков и сигналов, Once для одноразовой инициализации, LazyLock для ленивой загрузки данных. Ленивая загрузка данных (LazyLock): static DATA: LazyLock<Vec<(f64, f64)>> = LazyLock::new(|| {
vec![
(1.0, 2.1), (2.0, 4.0), (3.0, 6.1),
// ...
]
}); Однократная инициализация (Once): static INIT: Once = Once::new();
INIT.call_once(|| { init_value = 42.0; }); Каждый воркер выполняет цикл: Для каждой эпохи (num_epochs = 50): Эпоха обучения:
Оглавление
ML на RUST без заморочек

Rust ML без заморочек

Основные

  • Arc (атомарно подсчитываемый указатель для совместного использования)
  • Barrier (барьер для синхронизации нескольких потоков)
  • BarrierWaitResult (результат ожидания на барьере)
  • Condvar (условная переменная для ожидания условий)
  • LazyLock (ленивая инициализация значения при первом доступе)
  • Mutex (мьютекс для взаимного исключения)
  • MutexGuard (RAII-охранник для мьютекса)
  • Once (примитив для одноразовой инициализации)
  • OnceLock (примитив для одноразовой записи значения)
  • OnceState (состояние для Once)
  • PoisonError (ошибка отравления замка)
  • RwLock (замок чтения-записи, позволяет несколько читателей или одного писателя)
  • RwLockReadGuard (RAII-охранник для чтения в RwLock)
  • RwLockWriteGuard (RAII-охранник для записи в RwLock)
  • WaitTimeoutResult (результат ожидания с таймаутом)
  • Weak (слабая ссылка для Arc)

Экспериментальные (требуют feature-флагов)

  • Exclusive (эксклюзивный доступ к значению)
  • MappedMutexGuard (отображенный охранник мьютекса на подполя)
  • MappedRwLockReadGuard (отображенный охранник чтения RwLock на подполя)
  • MappedRwLockWriteGuard (отображенный охранник записи RwLock на подполя)
  • ReentrantLock (реентрантный замок)
  • ReentrantLockGuard (RAII-охранник для реентрантного замка)
  • UniqueArc (уникально владеемый Arc)

Перечисления (Enums)

  • TryLockError (ошибки при попытке захвата замка)

Типы-алиасы (Type Aliases)

  • LockResult (результат захвата замка с учетом отравления)
  • TryLockResult (результат не-блокирующего захвата замка)

Атомарные типы (из подмодуля std::sync::atomic)

  • AtomicBool (атомарный булев тип)
  • AtomicI8 (атомарный signed 8-bit)
  • AtomicI16 (атомарный signed 16-bit)
  • AtomicI32 (атомарный signed 32-bit)
  • AtomicI64 (атомарный signed 64-bit)
  • AtomicIsize (атомарный signed размер платформы)
  • AtomicPtr (атомарный указатель)
  • AtomicU8 (атомарный unsigned 8-bit)
  • AtomicU16 (атомарный unsigned 16-bit)
  • AtomicU32 (атомарный unsigned 32-bit)
  • AtomicU64 (атомарный unsigned 64-bit)
  • AtomicUsize (атомарный unsigned размер платформы)

Перечисления в atomic

  • Ordering (порядок памяти для атомарных операций)

Функции в atomic

  • compiler_fence (компиляторный барьер)
  • fence (барьер памяти)

Каналы (из подмодуля std::sync::mpsc — multi-producer, single-consumer)

  • IntoIter (итератор по значениям)
  • Iter (итератор по значениям)
  • Receiver (получатель сообщений)
  • RecvError (ошибка получения)
  • SendError (ошибка отправки)
  • Sender (отправитель сообщений)
  • SyncSender (синхронный отправитель)
  • TryIter (итератор для попыток получения)

Перечисления в mpsc

  • RecvTimeoutError (ошибки получения с таймаутом)
  • TryRecvError (ошибки попытки получения)
  • TrySendError (ошибки попытки отправки)

Кроме того, существуют экспериментальные подмодули:

  • mpmc (multi-producer, multi-consumer каналы, аналогичные mpsc, но для нескольких потребителей).
  • poison (механизмы для синхронизации).

Максимально быстрый способ обучения линейной модели с использованием синхронизации.

Arc для совместного использования, RwLock для чтения/записи весов, Mutex и Condvar для ожидания, Barrier для синхронизации эпох, mpsc-каналы для градиентов, атомарные типы для счётчиков и сигналов, Once для одноразовой инициализации, LazyLock для ленивой загрузки данных.

1. Используемые примитивы синхронизации и их назначение

  • LazyLock — ленивая инициализация данных (один раз, потокобезопасно).
  • Once — однократное выполнение инициализации.
  • RwLock — блокировка чтения‑записи: много читателей, один писатель.
  • AtomicUsize / AtomicBool — атомарные операции без блокировок.
  • Mutex + Condvar — ожидание условий.
  • Barrier — синхронизация начала эпох между потоками.
  • mpsc::channel — передача градиентов от воркеров к главному потоку.
  • Arc — совместное владение данными между потоками.

2. Инициализация данных

Ленивая загрузка данных (LazyLock):

static DATA: LazyLock<Vec<(f64, f64)>> = LazyLock::new(|| {
vec![
(1.0, 2.1), (2.0, 4.0), (3.0, 6.1),
// ...
]
});

  • Данные загружаются один раз при первом обращении.
  • Потокобезопасная инициализация.

Однократная инициализация (Once):

static INIT: Once = Once::new();
INIT.call_once(|| { init_value = 42.0; });

  • Блок выполняется ровно один раз, даже если вызывается из нескольких потоков.

3. Общие структуры данных

  • Веса модели (weights):
    Arc<RwLock<Vec<f64>> — совместное владение, много читателей (воркеры), один писатель (главный поток).
    Инициализируются нулями: [w0 = 0.0, w1 = 0.0].
  • Канал градиентов (grad_tx, grad_rx):
    MPSC (multi‑producer, single‑consumer) — несколько воркеров отправляют градиенты, главный поток их собирает.
  • Счётчик полученных градиентов (received):
    AtomicUsize — атомарное увеличение без блокировок.
  • Условие готовности (cv_pair):
    (Mutex<bool>, Condvar) — главный поток ждёт, пока все воркеры отправят градиенты.
  • Барьер эпох (barrier):
    Barrier::new(num_workers + 1) — синхронизирует начало каждой эпохи (все воркеры + главный поток).
  • Сигнал остановки (stop):
    AtomicBool — флаг для остановки воркеров.

4. Работа воркеров (поток воркера)

Каждый воркер выполняет цикл:

  1. Проверка сигнала остановки:
    Если stop.load() == true, завершает работу.
  2. Ожидание начала эпохи:
    barrier.wait() — все воркеры и главный поток ждут здесь до начала новой эпохи.
    Лидер барьера (случайный поток) может выполнять специальные действия.
  3. Чтение весов:
    weights.read().unwrap() — получает текущие веса (не блокирует других читателей).
  4. Вычисление градиентов:
    Для каждого (x, y) в батче:
    Предсказание: pred=w0​+w1​⋅x.
    Ошибка: error=pred−y.
    Градиенты:
    gradw0​+=error;
    gradw1​+=error⋅x.
    Нормализация: деление на размер батча.
  5. Отправка градиента:
    grad_tx.send(local_grad) — отправляет градиент в канал.
  6. Уведомление главного потока:
    Атомарно увеличивает счётчик received.
    Если это последний воркер (prev + 1 == num_workers), устанавливает ready = true и вызывает cvar.notify_one().

5. Работа главного потока (цикл эпох)

Для каждой эпохи (num_epochs = 50):

  1. Сброс счётчика:
    received.store(0) — обнуляет счётчик полученных градиентов.
  2. Запуск эпохи:
    barrier.wait() — сигнализирует воркеры начать новую эпоху.
  3. Ожидание градиентов:
    Ждёт на Condvar, пока последний воркер не установит ready = true.
  4. Сбор градиентов:
    Принимает num_workers градиентов из канала grad_rx.
    Суммирует градиенты в total_grad.
  5. Усреднение градиентов:
    Делит total_grad на num_workers.
  6. Обновление весов:
    Берёт эксклюзивную блокировку weights.write().
    Обновляет веса: wi​=wi​−learning_rate⋅avg_gradi​.
  7. Вывод результатов:
    Печатает текущие веса и значение init_value.

6. Завершение работы

  1. Сигнал остановки:
    stop.store(true) — устанавливает флаг остановки для воркеров.
  2. Финальный барьер:
    barrier.wait() — позволяет воркеры проверить флаг stop.
  3. Завершение потоков:
    Главный поток вызывает handle.join() для каждого воркера.

Поток данных и взаимодействие компонентов

Эпоха обучения:

  1. Главный поток:
    сбрасывает счётчик градиентов;
    запускает эпоху через барьер.
  2. Воркеры:
    ждут на барьере;
    читают веса;
    вычисляют градиенты;
    отправляют градиенты в канал;
    уведомляют главный поток через Condvar.
  3. Главный поток:
    ждёт на Condvar;
    собирает градиенты из канала;
    усредняет градиенты;
    обновляет веса;
    печатает результаты.

Ключевые особенности реализации

  • Параллельное вычисление градиентов: воркеры работают одновременно, читают веса параллельно (через RwLock).
  • Синхронизация эпох: барьер гарантирует, что все воркеры начинают новую эпоху одновременно.
  • Атомарный счётчик: позволяет избежать блокировок при подсчёте полученных градиентов.
  • Ожидание готовности: Condvar эффективно ждёт, пока все градиенты будут готовы.
  • Ленивая инициализация: данные загружаются один раз, только при необходимости.
  • Однократная инициализация: Once гарантирует выполнение блока ровно один раз.

Математическая модель

  • Модель: y=w0​+w1​⋅x (линейная регрессия).
  • Функция потерь: MSE (Mean Squared Error).
  • Градиенты:
    ∂w0​∂L​=n1​∑(predi​−yi​);
    ∂w1​∂L​=n1​∑(predi​−yi​)⋅xi​.
  • Обновление весов: wi​←wi​−η⋅∇wi​, где η — learning rate.