Кроме того, существуют экспериментальные подмодули: Arc для совместного использования, RwLock для чтения/записи весов, Mutex и Condvar для ожидания, Barrier для синхронизации эпох, mpsc-каналы для градиентов, атомарные типы для счётчиков и сигналов, Once для одноразовой инициализации, LazyLock для ленивой загрузки данных. Ленивая загрузка данных (LazyLock): static DATA: LazyLock<Vec<(f64, f64)>> = LazyLock::new(|| {
vec![
(1.0, 2.1), (2.0, 4.0), (3.0, 6.1),
// ...
]
}); Однократная инициализация (Once): static INIT: Once = Once::new();
INIT.call_once(|| { init_value = 42.0; }); Каждый воркер выполняет цикл: Для каждой эпохи (num_epochs = 50): Эпоха обучения:
Кроме того, существуют экспериментальные подмодули: Arc для совместного использования, RwLock для чтения/записи весов, Mutex и Condvar для ожидания, Barrier для синхронизации эпох, mpsc-каналы для градиентов, атомарные типы для счётчиков и сигналов, Once для одноразовой инициализации, LazyLock для ленивой загрузки данных. Ленивая загрузка данных (LazyLock): static DATA: LazyLock<Vec<(f64, f64)>> = LazyLock::new(|| {
vec![
(1.0, 2.1), (2.0, 4.0), (3.0, 6.1),
// ...
]
}); Однократная инициализация (Once): static INIT: Once = Once::new();
INIT.call_once(|| { init_value = 42.0; }); Каждый воркер выполняет цикл: Для каждой эпохи (num_epochs = 50): Эпоха обучения:
...Читать далее
Основные
- Arc (атомарно подсчитываемый указатель для совместного использования)
- Barrier (барьер для синхронизации нескольких потоков)
- BarrierWaitResult (результат ожидания на барьере)
- Condvar (условная переменная для ожидания условий)
- LazyLock (ленивая инициализация значения при первом доступе)
- Mutex (мьютекс для взаимного исключения)
- MutexGuard (RAII-охранник для мьютекса)
- Once (примитив для одноразовой инициализации)
- OnceLock (примитив для одноразовой записи значения)
- OnceState (состояние для Once)
- PoisonError (ошибка отравления замка)
- RwLock (замок чтения-записи, позволяет несколько читателей или одного писателя)
- RwLockReadGuard (RAII-охранник для чтения в RwLock)
- RwLockWriteGuard (RAII-охранник для записи в RwLock)
- WaitTimeoutResult (результат ожидания с таймаутом)
- Weak (слабая ссылка для Arc)
Экспериментальные (требуют feature-флагов)
- Exclusive (эксклюзивный доступ к значению)
- MappedMutexGuard (отображенный охранник мьютекса на подполя)
- MappedRwLockReadGuard (отображенный охранник чтения RwLock на подполя)
- MappedRwLockWriteGuard (отображенный охранник записи RwLock на подполя)
- ReentrantLock (реентрантный замок)
- ReentrantLockGuard (RAII-охранник для реентрантного замка)
- UniqueArc (уникально владеемый Arc)
Перечисления (Enums)
- TryLockError (ошибки при попытке захвата замка)
Типы-алиасы (Type Aliases)
- LockResult (результат захвата замка с учетом отравления)
- TryLockResult (результат не-блокирующего захвата замка)
Атомарные типы (из подмодуля std::sync::atomic)
- AtomicBool (атомарный булев тип)
- AtomicI8 (атомарный signed 8-bit)
- AtomicI16 (атомарный signed 16-bit)
- AtomicI32 (атомарный signed 32-bit)
- AtomicI64 (атомарный signed 64-bit)
- AtomicIsize (атомарный signed размер платформы)
- AtomicPtr (атомарный указатель)
- AtomicU8 (атомарный unsigned 8-bit)
- AtomicU16 (атомарный unsigned 16-bit)
- AtomicU32 (атомарный unsigned 32-bit)
- AtomicU64 (атомарный unsigned 64-bit)
- AtomicUsize (атомарный unsigned размер платформы)
Перечисления в atomic
- Ordering (порядок памяти для атомарных операций)
Функции в atomic
- compiler_fence (компиляторный барьер)
- fence (барьер памяти)
Каналы (из подмодуля std::sync::mpsc — multi-producer, single-consumer)
- IntoIter (итератор по значениям)
- Iter (итератор по значениям)
- Receiver (получатель сообщений)
- RecvError (ошибка получения)
- SendError (ошибка отправки)
- Sender (отправитель сообщений)
- SyncSender (синхронный отправитель)
- TryIter (итератор для попыток получения)
Перечисления в mpsc
- RecvTimeoutError (ошибки получения с таймаутом)
- TryRecvError (ошибки попытки получения)
- TrySendError (ошибки попытки отправки)
Кроме того, существуют экспериментальные подмодули:
- mpmc (multi-producer, multi-consumer каналы, аналогичные mpsc, но для нескольких потребителей).
- poison (механизмы для синхронизации).
Максимально быстрый способ обучения линейной модели с использованием синхронизации.
Arc для совместного использования, RwLock для чтения/записи весов, Mutex и Condvar для ожидания, Barrier для синхронизации эпох, mpsc-каналы для градиентов, атомарные типы для счётчиков и сигналов, Once для одноразовой инициализации, LazyLock для ленивой загрузки данных.
1. Используемые примитивы синхронизации и их назначение
- LazyLock — ленивая инициализация данных (один раз, потокобезопасно).
- Once — однократное выполнение инициализации.
- RwLock — блокировка чтения‑записи: много читателей, один писатель.
- AtomicUsize / AtomicBool — атомарные операции без блокировок.
- Mutex + Condvar — ожидание условий.
- Barrier — синхронизация начала эпох между потоками.
- mpsc::channel — передача градиентов от воркеров к главному потоку.
- Arc — совместное владение данными между потоками.
2. Инициализация данных
Ленивая загрузка данных (LazyLock):
static DATA: LazyLock<Vec<(f64, f64)>> = LazyLock::new(|| {
vec![
(1.0, 2.1), (2.0, 4.0), (3.0, 6.1),
// ...
]
});
- Данные загружаются один раз при первом обращении.
- Потокобезопасная инициализация.
Однократная инициализация (Once):
static INIT: Once = Once::new();
INIT.call_once(|| { init_value = 42.0; });
- Блок выполняется ровно один раз, даже если вызывается из нескольких потоков.
3. Общие структуры данных
- Веса модели (weights):
Arc<RwLock<Vec<f64>> — совместное владение, много читателей (воркеры), один писатель (главный поток).
Инициализируются нулями: [w0 = 0.0, w1 = 0.0]. - Канал градиентов (grad_tx, grad_rx):
MPSC (multi‑producer, single‑consumer) — несколько воркеров отправляют градиенты, главный поток их собирает. - Счётчик полученных градиентов (received):
AtomicUsize — атомарное увеличение без блокировок. - Условие готовности (cv_pair):
(Mutex<bool>, Condvar) — главный поток ждёт, пока все воркеры отправят градиенты. - Барьер эпох (barrier):
Barrier::new(num_workers + 1) — синхронизирует начало каждой эпохи (все воркеры + главный поток). - Сигнал остановки (stop):
AtomicBool — флаг для остановки воркеров.
4. Работа воркеров (поток воркера)
Каждый воркер выполняет цикл:
- Проверка сигнала остановки:
Если stop.load() == true, завершает работу. - Ожидание начала эпохи:
barrier.wait() — все воркеры и главный поток ждут здесь до начала новой эпохи.
Лидер барьера (случайный поток) может выполнять специальные действия. - Чтение весов:
weights.read().unwrap() — получает текущие веса (не блокирует других читателей). - Вычисление градиентов:
Для каждого (x, y) в батче:
Предсказание: pred=w0+w1⋅x.
Ошибка: error=pred−y.
Градиенты:
gradw0+=error;
gradw1+=error⋅x.
Нормализация: деление на размер батча. - Отправка градиента:
grad_tx.send(local_grad) — отправляет градиент в канал. - Уведомление главного потока:
Атомарно увеличивает счётчик received.
Если это последний воркер (prev + 1 == num_workers), устанавливает ready = true и вызывает cvar.notify_one().
5. Работа главного потока (цикл эпох)
Для каждой эпохи (num_epochs = 50):
- Сброс счётчика:
received.store(0) — обнуляет счётчик полученных градиентов. - Запуск эпохи:
barrier.wait() — сигнализирует воркеры начать новую эпоху. - Ожидание градиентов:
Ждёт на Condvar, пока последний воркер не установит ready = true. - Сбор градиентов:
Принимает num_workers градиентов из канала grad_rx.
Суммирует градиенты в total_grad. - Усреднение градиентов:
Делит total_grad на num_workers. - Обновление весов:
Берёт эксклюзивную блокировку weights.write().
Обновляет веса: wi=wi−learning_rate⋅avg_gradi. - Вывод результатов:
Печатает текущие веса и значение init_value.
6. Завершение работы
- Сигнал остановки:
stop.store(true) — устанавливает флаг остановки для воркеров. - Финальный барьер:
barrier.wait() — позволяет воркеры проверить флаг stop. - Завершение потоков:
Главный поток вызывает handle.join() для каждого воркера.
Поток данных и взаимодействие компонентов
Эпоха обучения:
- Главный поток:
сбрасывает счётчик градиентов;
запускает эпоху через барьер. - Воркеры:
ждут на барьере;
читают веса;
вычисляют градиенты;
отправляют градиенты в канал;
уведомляют главный поток через Condvar. - Главный поток:
ждёт на Condvar;
собирает градиенты из канала;
усредняет градиенты;
обновляет веса;
печатает результаты.
Ключевые особенности реализации
- Параллельное вычисление градиентов: воркеры работают одновременно, читают веса параллельно (через RwLock).
- Синхронизация эпох: барьер гарантирует, что все воркеры начинают новую эпоху одновременно.
- Атомарный счётчик: позволяет избежать блокировок при подсчёте полученных градиентов.
- Ожидание готовности: Condvar эффективно ждёт, пока все градиенты будут готовы.
- Ленивая инициализация: данные загружаются один раз, только при необходимости.
- Однократная инициализация: Once гарантирует выполнение блока ровно один раз.
Математическая модель
- Модель: y=w0+w1⋅x (линейная регрессия).
- Функция потерь: MSE (Mean Squared Error).
- Градиенты:
∂w0∂L=n1∑(predi−yi);
∂w1∂L=n1∑(predi−yi)⋅xi. - Обновление весов: wi←wi−η⋅∇wi, где η — learning rate.