Найти в Дзене
Один Rust не п...Rust

Mutex в Rust

Оглавление

Для чего нужна данная статья? :

- найти компромиссы между видами мьютексов.
- научиться использовать несколько потоков, обращающихся к защищенному объекту при помощи ML.

Зачем Вам это уметь? :

- для создания нескольких функций для асинхронного доступа к объекту.

Mutex — это «Взаимное исключение» примитив параллельного программирования, гарантирует, что только одна часть программы выполняет какую-то конкретную задачу в данный момент времени.Обычно это доступ к объекту, который используется несколькими потоками.

Какие виды мьютексов?:

- `Mutex`: для доступа к данным из нескольких потоков(`std::sync::Mutex`): Владение данными передаётся через блокировку (lock), что обеспечивает безопасное чтение и изменение данных. При блокировке поток ждёт освобождения мьютекса, если он уже захвачен другим потоком.

-2

use std::sync::Mutex;

use std::thread;

fn main() {

let counter = Mutex::new(0);

let mut handles = vec![];

for _ in 0..10 {

let handle = thread::spawn(move || {

let mut num = counter.lock().unwrap();

*num += 1;

});

handles.push(handle);

}

for handle in handles {

handle.join().unwrap();

}

println!("Final counter value: {}", *counter.lock().unwrap());

}


- `RwLock`: для доступа к данным из нескольких потоков, позволяет одновременное чтение или эксклюзивную запись(`std::sync::RwLock`): Это не мьютекс в классическом понимании, но `RwLock` предоставляет схожие функции защиты. Позволяет множественные потоки одновременно читать данные или эксклюзивно писать. Полезен, когда доступ к данным преимущественно для чтения.

-3

use std::sync::RwLock;

use std::thread;

fn main() {

let data = RwLock::new(vec![1, 2, 3]);

// Multiple threads can read simultaneously

for _ in 0..5 {

let data_clone = data.clone();

thread::spawn(move || {

let r = data_clone.read().unwrap();

println!("Read data: {:?}", *r);

});

}

// One thread for exclusive write access

thread::spawn(move || {

let mut w = data.write().unwrap();

w.push(4);

println!("Added data: {:?}", *w);

}).join().unwrap();

}

- `ReentrantMutex`: позволяет потоку многократно захватывать мьютекс(`std::sync::Mutex` с некоторыми оговорками): В стандартной библиотеке Rust прямо нет типичного рекурсивного мьютекса, который позволяет потоку повторно захватывать мьютекс, который он уже захватил. Однако, можно имитировать поведение рекурсивного мьютекса, используя другие методы синхронизации или библиотеки сторонних разработчиков, такие как `parking_lot::ReentrantMutex`.

-4

// Cargo.toml

// [dependencies]

// parking_lot = "0.11.1"

use parking_lot::ReentrantMutex;

use std::sync::Arc;

use std::thread;

fn main() {

let data = Arc::new(ReentrantMutex::new(5));

let data_clone = Arc::clone(&data);

// Thread 1: Locks the ReentrantMutex multiple times

let handle1 = thread::spawn(move || {

let lock1 = data_clone.lock();

println!("Thread 1: First lock acquired");

let lock2 = data_clone.lock();

println!("Thread 1: Second lock acquired");

drop(lock1); // Release the first lock

println!("Thread 1: First lock released");

drop(lock2); // Release the second lock

println!("Thread 1: Second lock released");

});

let data_clone = Arc::clone(&data);

// Thread 2: Tries to lock the ReentrantMutex

let handle2 = thread::spawn(move || {

let lock = data_clone.lock();

println!("Thread 2: Lock acquired");

});

handle1.join().unwrap();

handle2.join().unwrap();

}

- `RefMutex`: Подобен `Mutex`, но является `Sync` и `Send`, даже если тип `T` не является `Send`.

-5

use std::cell::RefCell;

use std::sync::{Arc, Mutex};

struct RefMutex<T> {

data: Arc<Mutex<RefCell<T>>>,

}
impl<T> RefMutex<T> {

fn new(data: T) -> Self {

RefMutex {

data: Arc::new(Mutex::new(RefCell::new(data))),

}

}
fn lock(&self) -> RefMut<T> {

let guard = self.data.lock().unwrap();

RefMut {

data: guard,

}

}

}
struct RefMut<T> {

data: std::sync::MutexGuard<'static, RefCell<T>>,

}

impl<T> std::ops::Deref for RefMut<T> {

type Target = RefCell<T>;

fn deref(&self) -> &Self::Target {

&*self.data

}

}
fn main() {

let data = RefMutex::new(5);

let mut lock = data.lock();

*lock.borrow_mut() += 10;

println!("Data after modification: {}", *lock.borrow());

}

- `StaticLocks`: Предоставляет мьютексы из `parking_lot`, но их можно использовать в статических и `const fn`.

-6

// Cargo.toml

// [dependencies]

// parking_lot = "0.11.1"

use parking_lot::const_mutex;
static MY_STATIC_MUTEX: parking_lot::Mutex<()> = const_mutex(());

fn main() {

{

let _lock = MY_STATIC_MUTEX.lock();

// Mutex locked in this scope

}

// Mutex automatically unlocked when _lock goes out of scope
}


- Мьютекс с таймаутом (`std::sync::Mutex` с использованием `try_lock`): Позволяет попытаться захватить мьютекс в течение определённого времени. Если мьютекс не получается захватить в течение этого времени, поток может выполнить другие операции или корректно обработать ситуацию таймаута.

use std::sync::{Mutex, Arc};

use std::thread;

use std::time::Duration;

fn main() {

let mutex = Arc::new(Mutex::new(()));

let mutex_timeout = Arc::clone(&mutex);
let handle = thread::spawn(move || {

if let Ok(lock) = mutex_timeout.try_lock() {

println!("Mutex acquired successfully");

// Mutex locked, do something

} else {

println!("Timeout reached, couldn't acquire mutex");

// Handle timeout situation

}

});

// Wait for a while before trying to acquire the mutex

thread::sleep(Duration::from_secs(2));

// Ensure the thread finishes before the main thread exits

handle.join().unwrap();

}

-7

Какие методы у мьютексов?:

Мьютекс имеет два метода Замок и Разблокировать. Код блокирует мьютекс, что-то делает, а затем разблокирует мьютекс. Если другая часть программы уже удерживает мьютекс, код блокируется методом блокировки.

fn exclusive_access(mutex: &MyMutex, protected: &MyObject) {

mutex.lock();

protected.do_something();

mutex.unlock();

}

Какие проблемы у мьютексов?

Проблема в том, что MyObject защищен только соглашением. Не каждый ресурс, к которому MyObject осуществляет доступ, заблокирован одним и тот же мьютексом (если вы заблокируете другой мьютекс, это вообще не поможет). Вы можете забыть разблокировать мьютекс, когда закончите. Представьте, что мы используем оператор для досрочного возврата do_something(), если он возвращает ошибку. Теперь мы больше никогда не сможем заблокировать мьютекс.

Что делать , если мы хотим контролировать доступ к некоторому общему асинхронному ресурсу? - использовать асинхронный мьютекс в tokio::sync::Mutex. Можно безопасно удерживать защиту этого мьютекса на точках ожидания. Потому, что его lock()метод является асинхронным - он не будет блокировать поток во время ожидания блокировки. Поэтому какая-то другая задача, удерживающая блокировку, может выполниться.

Один из вариантов избежать проблемы — никогда не удерживать мьютекс в awaitточке. Однако вам придется проверять это правило при проверке кода, а не во время компиляции, а это значит, что в конечном итоге вы его пропустите. Чтобы восстановить эту проверку времени компиляции, приложение Tokio может обернуть защиту мьютекса в тип, который не реализует Send. Это не позволит асинхронной функции удерживать его в awaitточке… в Токио. Это потому, что Токио требует, чтобы все фьючерсы реализовывались Send. По умолчанию lilos Mutexтип (в 0.3 и более поздних версиях) предоставляет perform операцию вместо lock операции. perform требует закрытия для выполнения, пока блокировка удерживается.

mutex.perform(|data| data.squirrels += 1).await;

это обычное замыкание Rust, а не async блок. Это означает, что невозможно использовать await во время perform, поэтому отмена не может произойти. Если вы попытаетесь это сделать await в теле замыкания, вы получите ошибку компиляции. Иногда вам действительно необходимо удерживать мьютекс во время await выполнения какой-либо операции. (Например, использование мьютекса для совместного доступа к чему-то вроде периферийного устройства). lilosВерсия 0.3 предоставляет несовершенный, но полезный выход для таких ситуаций: она определяет оболочку под названием CancelSafe. CancelSafeэто не магия; это определяется так:

pub struct CancelSafe<T>(pub T);

Обертывая свои данные в CancelSafe, вы подтверждаете мьютексу, что всегда будете поддерживать его в допустимом состоянии в любой возможной точке отмены. Компилятор не может это проверить (пока), поэтому вам нужно будет позаботиться о том, чтобы использовать его правильно. Но если вы создадите Mutex<CancelSafe<T>> вместо Mutex<T>, lock операция появится снова. Как и прежде, lockсама операция является безопасной для отмены, поскольку ее будущее можно удалить, не испортив ваши структуры данных, но фьючерсы, которые вы строите поверх этого, могут легко испортить ваши структуры данных, если они будут отменены. Используя CancelSafe вы указываете, что вы обдумали это и готовы пойти на риск.

Так же Rust стремится упростить безопасное параллельное программирование, предоставляя мощные абстракции, такие как `Mutex<T>`, для безопасной работы с разделяемыми данными.

Защищенные мьютексы

Rust объединяет мьютекс и объект, который он защищает. std::sync::Mutex. Защищенный объект находится внутри мьютекса. Когда вы блокируете мьютекс, вы получаете файл MutexGuard. Вы можете получить доступ к своему защищенному объекту через "охранник". Когда "охранник" выходит за пределы области действия, мьютекс разблокируется автоматически. Такое поведение называется RAII «Приобретение ресурсов — это инициализация». Мы можем получить a только MutexGuardв том случае, если сможем получить блокировку. Охранник — это ресурс или буква «R» в RAII). Замок привязан к жизни охранника. Как только охрана упадет, замок откроется. Поэтому, пока объект не исчез, мьютекс никогда не останется заблокированным навсегда.

fn exclusive_access(mutex: &std::sync::Mutex<MyObject>) {

let guard = mutex

.lock()

.expect("the mutex is poisoned, program cannot continue");

guard.do_something();

}

Мы не можем получить доступ к MyObject не заблокировав мьютекс. Система типов делает это невозможным (нет методов, по std::sync::Mutex которым дают доступ). И мы не можем случайно забыть разблокировать мьютекс. Как только защита будет снята, мьютекс разблокируется.

Если поток, удерживающий защиту мьютекса, паникует - охрана сбрасывается (паника не предотвращает падения) и мьютекс становиться неуправляемым. документация Mutex

Что если несколько потоков, обращающихся к защищенному объекту.

Мы не можем просто передать мьютекс при создании двух потоков. Нам нужно обернуть его умным указателем.

Если два потока пытаются заблокировать мьютекс. Только поток 1 успешен, поток 2 припаркован. Как только поток 1 сбрасывает защиту мьютекса, мьютекс разблокируется. Тогда поток 2 сможет получить блокировку и выполнить свою работу.

Асинхронная функция Hold-Mutex-Guard

Если мы хотим, чтобы мьютекс удерживался в точке ожидания.

Необходимо:

  1. Чтение общего счетчика
  2. Доступ к асинхронному общему ресурсу
  3. Записать новое значение в общий счетчик

use std::sync::{Arc, Mutex};

async fn hold_mutex_guard(data: Arc<Mutex<u64>>) -> Result<(), DataAccessError> {

let mut guard = data.lock().map_err(|_| DataAccessError {})?;

println!("existing value: {}", *guard);

tokio::task::yield_now().await;

*guard = *guard + 1;

println!("new value: {}", *guard);

Ok(())
}

Если мы хотим получить доступ к значению и изменить его из нескольких задач (защиты мьютексов в Rust). Нужно получить доступ к мьютексу из нескольких задач (std::sync::Arc неатомарная версия Arc std::rc::Rc).

Блокируем мьютекс. Теперь мы «получим доступ к нашему асинхронному ресурсу».

Попробуем использовать #[tokio::main]макрос.

#[tokio::main]

async fn main() {

let data = Arc::new(Mutex::new(0_u64));

hold_mutex_guard(Arc::clone(&data))

.await

.expect("failed to perform operation");

}

Мы создаем наши данные - начальное значение 0 и удерживаем мьютекс в точке ожидания, после чего получаем новое значение 1.

Как возвращать ошибку

Блокировка мьютекса может оказаться неудачной. Поэтому в этом случае мы должны вернуть ошибку.

use std::{error::Error, fmt};

#[derive(Debug)]

struct DataAccessError {}

impl fmt::Display for DataAccessError {

fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {

write!(f, "there was an error accessing the shared data")

}

}

impl Error for DataAccessError {}

Как запустить нескольких фьючерсов (tokio::spaw)

Если мы не можем ожидать выполнения нашей асинхронной функции. Существует способ одновременного запуска нескольких фьючерсов в асинхронной среде выполнения tokio::spawn.

#[tokio::main(flavor = "current_thread|multi_thread")]

async fn main() {

tokio::spawn(spawn_again());

do_nothing().await;

tokio::task::yield_now().await

tokio::task::yield_now().await

}

async fn spawn_again() {

tokio::spawn(do_nothing());

}
async fn do_nothing() {

}

Здесь async main функция порождает задачу сspawn_again (асинхронная функция, которая создаст другую задачу). И затем он ожидает асинхронную функцию do_nothing (ничего не делает). Функция async spawn_again порождает задачу с расширением do_nothing.

Если задача порождаться в текущем потоке.

В асинхронной среде выполнения может быть только один рабочий процесс. Например, планировщик текущих потоков в Токио. Тогда мы могли бы создать задачу из другой задачи. Но задача не будет опрошена до тех пор, пока текущая задача не будет передана планировщику (или, может быть, позже, если ждут другие задачи). Создаваемым задачам нужно ждать, пока среда выполнения не освободится. Затем они пройдут опрос.

Если задача создается в многопоточном режиме.

Среда выполнения может иметь несколько рабочих процессов (несколько потоков - многопоточный планировщик). Тогда параллельно опрашиваемых задач может быть столько, сколько рабочих. Дескриптор соединения можно использовать для ожидания завершения задачи и для прерывания порожденной задачи.

Создание нескольких асинхронных функций - ошибка

Мы получим ошибку если создадим несколько экземпляров асинхронной функции.

#[tokio::main]

async fn main() {

let data = Arc::new(Mutex::new(0_u64));

tokio::spawn(hold_mutex_guard(Arc::clone(&data)));

tokio::spawn(hold_mutex_guard(Arc::clone(&data)));

}

Future вместо std::marker

В Rust есть концепция под названием «черты маркеров» (std::marker). Это черты, у которых нет никаких методов. В рассматриваемом случае эта Send черта указывает на то, что тип можно безопасно передавать между потоками. Поэтому можно запускать Send функцию not async дважды одновременно для попытки заблокировать мьютекс. Но это небезопасно с точки зрения работы с памятью.

Давайте создадим еще одну асинхронную функцию, без выхода и без точки ожидания.

async fn yieldless_mutex_access(data: Arc<Mutex<u64>>) -> Result<(),

DataAccessError> {

let mut guard = data.lock().map_err(|_| DataAccessError {})?;

println!("existing value: {}", *guard);

*guard = *guard + 1;

println!("new value: {}", *guard);

Ok(())

}

#[tokio::main(flavor = "current_thread")]

async fn main() {

let data = Arc::new(Mutex::new(0_u64));

tokio::spawn(yieldless_mutex_access(Arc::clone(&data)));

hold_mutex_guard(Arc::clone(&data))

.await

.expect("failed to perform operation");

}

Здесь мы создаем асинхронную функцию yieldless_mutex_access() и вызываем функцию hold_mutex_guard(). И получаем зависание.

Учитывая это, исправим код следующим образом.

use std::sync::{Arc, Mutex};

enum HoldMutexGuard<'a> {

Init {

data: Arc<Mutex<u64>>,

},
Yielded {

guard: MutexGuard<'a, u64>,

_data: Arc<Mutex<u64>>,

},
Done,

}

обернем Future в функцию.

fn hold_mutex_guard(

data: Arc<Mutex<u64>>,

) -> impl Future<Output = Result<(), DataAccessError>> {

HoldMutexGuard::Init { data }

}

-8

Давайте снова воспроизведем эту зависшую программу.

fn main() {

let body = async {

let data = Arc::new(Mutex::new(0_u64));

tokio::spawn(yieldless_mutex_access(Arc::clone(&data)));

hold_mutex_guard(Arc::clone(&data))

.await

.expect("failed to perform operation");

};
return tokio::runtime::Builder::new_current_thread()

.enable_all()

.build()

.expect("failed to build runtime")

.block_on(body);

}

yieldless_mutex_access() появляется первым. HoldMutexGuardего ждет. Среда выполнения является однопоточной. Новая задача, созданная с помощью, yieldless_mutex_access() должна ждать, пока текущая задача не будет передана среде выполнения. HoldMutexGuard запускается в первую очередь. Он блокирует мьютекс и получает файл MutexGuard. После возвращения он будет снова опрошен Poll::Pending. Затем меняет состояние на Yielded, сохраняя MutexGuardв себе. А потом возвращается Poll::Pending. Теперь среда выполнения может опросить следующую задачу. Эта задача блокирует мьютекс. Но мьютекс уже заблокирован, поэтому он блокируется до тех пор, пока не будет разблокирован. Поскольку среда выполнения имеет только один поток, это блокирует всю среду выполнения.

Реализация Future HoldMutexGuard

Используем std::mem::transmute.

impl<'a> Future for HoldMutexGuard<'a> {

type Output = Result<(), DataAccessError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

let state = &mut *self;

match state {

Self::Init { data } => {

let guard = unsafe {

// SAFETY: We will hold on to the Arc containing the mutex as long
// as we hold onto the guard.

std::mem::transmute::<MutexGuard<'_, u64>, MutexGuard<'static, u64>>(
data.lock().map_err(|_| DataAccessError {})?,

)

};

println!("existing value: {}", *guard);

cx.waker().wake_by_ref();

*state = Self::Yielded {

guard: guard,

_data: Arc::clone(data),

};

Poll::Pending

}

Self::Yielded { guard, _data } => {

println!("new value: {}", *guard);

*state = Self::Done;

Poll::Ready(Ok(()))

}

Self::Done => panic!("Please stop polling me!"),

}

}

}

Получив наш MutexGuard, мы печатаем значение. Теперь мы собираемся вернуться к среде выполнения. Затем мы устанавливаем следующее состояние: Yielded. В следующий раз, когда наше будущее будет опрошено, мы уже будем в состоянии Yielded. Мы напечатаем значение из файла MutexGuard. Затем Done и Poll::Ready.

Реализация мьютекса интеграции нейронной сети для управления блокировками.

Добавьте следующие зависимости в ваш Cargo.toml:

[dependencies]

smartcore = "0.2.1"

rand = "0.8.5"

std = { version = "1.0", features = ["thread"] }

use std::sync::{Arc, Mutex, MutexGuard};

use std::thread;

use std::time::{Duration, Instant};

use smartcore::linalg::naive::dense_matrix::DenseMatrix;

use smartcore::neighbors::knn_classifier::KNNClassifier;

use smartcore::model_traits::Predict;

use rand::Rng;

// Структура для хранения состояния мьютекса

struct MLMutex {

inner: Mutex<()>,

// Внутренний мьютекс для синхронизации

model: Mutex<KNNClassifier<f64, u8, DenseMatrix<f64>, Vec<u8>>>,

// ML-модель

training_data: Mutex<Vec<(Vec<f64>, u8)>>,

// Данные для онлайн-обучения

}

// Тип для результата блокировки

type LockResult<'a> = Result<MutexGuard<'a, ()>, String>;

impl MLMutex {

// Создание нового мьютекса с инициализацией ML-модели

fn new() -> Arc<Self> {

// Инициализация начальных данных для модели

let x = DenseMatrix::from_2d_array(&[

&[1.0, 2.0, 3.0],

&[4.0, 5.0, 6.0],

&[7.0, 8.0, 9.0],

]);

let y = vec![0u8, 1u8, 1u8]; // Метки: 0 - блокировка запрещена, 1 - разрешена

let model = KNNClassifier::fit(&x, &y, Default::default()).unwrap();

Arc::new(MLMutex {

inner: Mutex::new(()),

model: Mutex::new(model),

training_data: Mutex::new(Vec::new()),

})

}

// Попытка блокировки мьютекса с использованием ML

fn lock(&self, features: Vec<f64>) -> LockResult {

// Подготовка данных для предсказания

let input = DenseMatrix::from_2d_vec(&[features.clone()]);

// Получение предсказания от модели

let model_guard = self.model.lock().map_err(|e| e.to_string())?;

let prediction = model_guard.predict(&input).map_err(|e| e.to_string())?;

if prediction[0] == 1 {

// Если модель разрешает, блокируем мьютекс

let guard = self.inner.lock().map_err(|e| e.to_string())?;

println!("Mutex locked based on ML prediction");

// Добавляем данные в обучающий набор (обратная связь)

self.add_training_data(features, 1);

Ok(guard)

} else {

println!("Mutex lock denied by ML model");

self.add_training_data(features, 0);

Err("Lock denied by ML model".to_string())

}

}

// Разблокировка мьютекса

fn unlock(&self, _guard: MutexGuard<()>) {

// Разблокировка происходит автоматически при выходе guard из области видимости

println!("Mutex unlocked");

}

// Добавление данных для онлайн-обучения

fn add_training_data(&self, features: Vec<f64>, label: u8) {

let mut training_data = self.training_data.lock().unwrap();

training_data.push((features, label));

// Если достаточно данных, обновляем модель

if training_data.len() >= 10 {

self.retrain_model();

}

}

// Переобучение модели

fn retrain_model(&self) {

let training_data = self.training_data.lock().unwrap();

let (x_vec, y): (Vec<Vec<f64>>, Vec<u8>) = training_data.iter().cloned().unzip();

let x = DenseMatrix::from_2d_vec(&x_vec);

let new_model = KNNClassifier::fit(&x, &y, Default::default()).unwrap();

let mut model_guard = self.model.lock().unwrap();

*model_guard = new_model;

println!("ML model retrained with {} samples", training_data.len());

}

}

// Пример использования

fn main() {

let ml_mutex = MLMutex::new();

// Создаем несколько потоков для тестирования

let mut handles = vec![];

for i in 0..5 {

let ml_mutex_clone = Arc::clone(&ml_mutex);

let handle = thread::spawn(move || {

let mut rng = rand::thread_rng();

let features = vec![

rng.gen_range(0.0..10.0), // Количество активных потоков

rng.gen_range(0.0..5.0), // Время ожидания

i as f64,

// ID потока

];

let start = Instant::now();

match ml_mutex_clone.lock(features) {

Ok(guard) => {

println!("Thread {} locked mutex", i);

thread::sleep(Duration::from_millis(500));

ml_mutex_clone.unlock(guard);

}

Err(e) => println!("Thread {} failed to lock: {}", i, e),

}

let duration = start.elapsed();

println!("Thread {} completed in {:?}", i, duration);

});

handles.push(handle);

}

// Ожидаем завершения всех потоков

for handle in handles {

handle.join().unwrap();

}

}

Объяснение кода

1. Структура MLMutex

  • inner: Стандартный Mutex для обеспечения синхронизации.
  • model: KNN-классификатор, предсказывающий, можно ли заблокировать мьютекс (0 - нет, 1 - да).
  • training_data: Хранит данные для онлайн-обучения модели.

2. Метод new

  • Создает мьютекс с предобученной моделью на синтетических данных.
  • Возвращает Arc для безопасного использования в многопоточной среде.

3. Метод lock

  • Принимает вектор признаков (например, состояние системы).
  • Использует ML-модель для предсказания.
  • Если предсказание положительное, блокирует мьютекс и возвращает MutexGuard.
  • В противном случае возвращает ошибку.

4. Метод unlock

  • Разблокировка происходит автоматически благодаря MutexGuard, но метод добавлен для явного логирования.

5. Онлайн-обучение

  • add_training_data: Добавляет новые данные в обучающий набор.
  • retrain_model: Переобучает модель, когда накоплено достаточно данных.

6. Тестирование

  • В main создается 5 потоков, каждый из которых пытается заблокировать мьютекс с рандомными признаками.
  • Выводятся результаты блокировки и время выполнения.