Добавить в корзинуПозвонить
Найти в Дзене
Один Rust не п...Rust

Rust для распределенных акторов

t.me/oneRustnoqRust Написать систему обучения, где узлы обучают модели на локальных данных, обмениваются обновлениями с центральным сервером через асинхронное сетевое взаимодействие, с обработкой ошибок, и балансировкой нагрузки. Научиться использовать ML, для создания распределенных систем и акторов. etcd — это распределённое ключ-значение хранилище, которое часто используется для хранения конфигураций, управления сервисами и координации между узлами в кластере. Пример:
Представьте, что у вас есть несколько серверов (узлов), которые должны знать друг о друге. etcd позволяет каждому узлу записать информацию о себе (например, свой IP-адрес) и узнать, какие ещё узлы работают в системе. Эта функция регистрирует новый узел в etcd, чтобы другие узлы могли его обнаружить. Что происходит: Пример:
Если у вас есть узел с node_id="node1" и node_addr="192.168.1.1", то в etcd появится запись: /fedlearn/nodes/node1 → "192.168.1.1" Если узел "упадёт" и не продлит аренду, запись автоматически удалитс
Оглавление
GitHub - nicktretyakov/federated_learning
ML на RUST без заморочек

t.me/oneRustnoqRust

Для чего нужна данная статья? :

Написать систему обучения, где узлы обучают модели на локальных данных, обмениваются обновлениями с центральным сервером через асинхронное сетевое взаимодействие, с обработкой ошибок, и балансировкой нагрузки.

  • CentralServer: Агрегирует параметры моделей от узлов и рассылает обновления.
    NodeActor: Локально обучает модель, отправляет параметры на сервер и принимает обновления.
  • Федеративное обучение: Каждый узел обучает модель на своих данных (в примере данные фиктивные).
    Центральный сервер использует алгоритм FedAvg для усреднения параметров.
  • Асинхронность: Используем tokio и actix для асинхронного взаимодействия между узлами и сервером.
  • Обработка ошибок: Логируем сбои с помощью log.
    Используем anyhow для удобной обработки ошибок.

Зачем Вам это уметь? :

Научиться использовать ML, для создания распределенных систем и акторов.

Что такое etcd?

etcd — это распределённое ключ-значение хранилище, которое часто используется для хранения конфигураций, управления сервисами и координации между узлами в кластере.

Пример:
Представьте, что у вас есть несколько серверов (узлов), которые должны знать друг о друге. etcd позволяет каждому узлу записать информацию о себе (например, свой IP-адрес) и узнать, какие ещё узлы работают в системе.

Регистрация узла (register_node)

Эта функция регистрирует новый узел в etcd, чтобы другие узлы могли его обнаружить.

Что происходит:

  • Создаётся клиент для подключения к etcd.
  • Узел записывает информацию о себе (например, node_id и node_addr) по ключу /fedlearn/nodes/{node_id}.
  • Создаётся лиз (lease) — это как "аренда" записи на 30 секунд. Если узел не продлит аренду, запись автоматически удалится.
  • Запускается фоновая задача, которая каждые 10 секунд продлевает аренду, чтобы запись не исчезла.

Пример:
Если у вас есть узел с node_id="node1" и node_addr="192.168.1.1", то в etcd появится запись: /fedlearn/nodes/node1 → "192.168.1.1"

Если узел "упадёт" и не продлит аренду, запись автоматически удалится через 30 секунд.

Обнаружение узлов (discover_nodes)

Эта функция получает список всех зарегистрированных узлов из etcd.

Что происходит:

  • Клиент подключается к etcd.
  • Запрашиваются все ключи, которые начинаются с /fedlearn/nodes/.
  • Из каждой записи извлекается значение (адрес узла) и добавляется в список.

Пример:
Если в etcd есть записи:

/fedlearn/nodes/node1 → "192.168.1.1"
/fedlearn/nodes/node2 → "192.168.1.2"

то функция вернёт список: ["192.168.1.1", "192.168.1.2"].

Зачем это нужно в федеративном обучении?

В федеративном обучении узлы (например, устройства или серверы) обучают модели локально и обмениваются обновлениями. Чтобы узлы могли найти друг друга и координировать работу, им нужен механизм регистрации и обнаружения — именно это и делает данный код.

use actix::prelude::*;

Что это?
Это подключение библиотек из фреймворка Actix — популярного инструмента для создания акторов и асинхронных приложений на Rust.
Простыми словами:
Представь, что ты строишь дом. Тебе нужны кирпичи, цемент, окна — это всё материалы. Здесь мы подключаем "материалы" для работы с акторами (актор — это как независимый работник, который выполняет задачи и общается с другими работниками).

use anyhow::Result;

Что это?
Библиотека anyhow упрощает работу с ошибками. Вместо того, чтобы писать сложные конструкции для обработки ошибок, можно использовать Result из anyhow.
Простыми словами:
Представь, что ты готовишь блюдо. Если что-то идёт не так (например, сгорело), ты хочешь быстро понять, что случилось. anyhow::Result помогает легко сообщать об ошибках.

use serde::{Deserialize, Serialize};

Что это?
serde — библиотека для преобразования данных в формат, который можно легко сохранить или передать (например, в JSON).
Простыми словами:
Представь, что ты отправляешь письмо. Чтобы адресат понял, что ты написал, нужно перевести текст на его язык. Serialize — это перевод данных в удобный формат, а Deserialize — обратный перевод.

#[derive(...)]

Что это?
Атрибут, который автоматически генерирует код для структур или перечислений (enum).
Простыми словами:
Представь, что у тебя есть чертеж машины. Вместо того, чтобы самому собирать все детали, ты говоришь заводу: "Собери мне машину по этому чертежу". #[derive(...)] — это как команда заводу собрать нужные функции автоматически.

#[derive(Serialize, Deserialize, Message, Clone, Debug)]

Что это?

  • Serialize, Deserialize — позволяют преобразовывать данные в JSON и обратно.
  • Message — помечает структуру как сообщение для Actix (чтобы акторы могли обмениваться такими сообщениями).
  • Clone — позволяет копировать объект.
  • Debug — позволяет выводить объект для отладки.

Простыми словами:
Представь, что ты отправляешь послание в бутылке. Тебе нужно:

  • Записать послание (Serialize).
  • Прочитать его (Deserialize).
  • Убедиться, что бутылка не разбилась (Clone).
  • Проверить, что послание правильное (Debug).

#[rtype(result = "...")]

Что это?
Указывает, какой тип данных будет возвращён в ответ на сообщение.
Простыми словами:
Представь, что ты спрашиваешь у друга: "Сколько яблок у тебя?" Он может ответить числом (5) или сказать "Не знаю" (String). Здесь мы указываем, что ответ будет или успешным (Result<(), String>), или списком (Vec<...>).

pub enum NodeMessage { ... }

Что это?
Перечисление (enum) — это набор возможных вариантов сообщений, которые могут отправлять узлы (nodes) серверу.
Простыми словами:
Представь, что у тебя есть меню в ресторане:

  • Train — "Потренируй модель на этих данных".
  • Predict — "Предскажи результат для этих данных".
  • UpdateModel — "Обнови параметры модели".
  • RegisterNode — "Зарегистрируй меня как новый узел".

Примеры сообщений

  • Train { data: Vec<f32>, labels: Vec<f32> }
    Узел просит сервер обучить модель на данных data и метках labels.
  • Predict { data: Vec<f32> }
    Узел просит сервер предсказать результат для данных data.
  • UpdateModel { params: Vec<f32> }
    Сервер отправляет узлу новые параметры модели.
  • RegisterNode { addr: String }
    Новый узел сообщает серверу свой адрес.

GetNodesRequest, GetModelParams, ServerMessage

  • GetNodesRequest — запрос списка всех подключённых узлов.
  • GetModelParams — запрос текущих параметров модели.
  • ServerMessage — сообщение от сервера узлу с новыми параметрами модели.

Структура нейронной сети (SimpleModel)

Это основной «блок» нейросети, который хранит все её параметры:

  • w1, b1 — веса и смещения (bias) первого слоя.
  • w2, b2 — веса и смещения второго слоя.
  • learning_rate — скорость обучения (насколько сильно меняются веса при каждом обновлении).

Пример:
Представьте, что нейросеть — это фабрика, где каждый слой — это цех. w1 и b1 — это инструменты и настройки первого цеха, а w2 и b2 — второго.

Инициализация модели (new)

Здесь создаётся новая нейросеть с случайными весами. Используется Xavier-нициализация — метод, который помогает сети быстрее учиться, задавая веса в разумном диапазоне.

Пример:
Как если бы вы бросали игральные кости, чтобы выбрать начальные настройки для каждого инструмента на фабрике.

Прямой проход (forward)

Это процесс, когда нейросеть «думает»: берёт входные данные, пропускает их через слои и выдаёт результат.

  • x.dot(&self.w1) + &self.b1 — умножение входов на веса и добавление смещения.
  • mapv(|v| if v > 0.0 { v } else { 0.0 }) — функция активации ReLU (если значение отрицательное, заменяет его на 0).

Пример:
Вы даёте фабрике сырьё (входные данные), и она выдаёт готовый продукт (результат).

Обучение (train)

Здесь нейросеть учится на ошибках:

  • Forward pass — сначала получает результат.
  • Backward pass — считает, насколько ошиблась, и обновляет веса, чтобы в следующий раз ошибаться меньше.

Пример:
Фабрика сравнивает свой продукт с эталоном, понимает, где ошиблась, и подкручивает инструменты, чтобы в следующий раз сделать лучше.

Сериализация и десериализация (to_params_vec, from_params_vec)

  • to_params_vec — преобразует все веса и смещения в один длинный вектор (для передачи по сети).
  • from_params_vec — загружает веса из вектора обратно в модель.

Пример:
Как упаковать все инструменты фабрики в один ящик для перевозки, а потом распаковать и расставить на место.

Потокобезопасная модель (SharedModel)

Arc<Mutex<SimpleModel>> — обёртка, которая позволяет нескольким потокам безопасно работать с одной моделью.

Пример:
Если несколько рабочих одновременно пользуются инструментами, то Mutex следит, чтобы они не мешали друг другу.

Генерация данных (generate_data, prepare_data)

  • generate_data — создаёт искусственные данные для обучения.
  • prepare_data — преобразует сырые данные в формат, который понимает нейросеть.

Пример:
Вы создаёте учебные задания для фабрики, чтобы она могла тренироваться.

Actix и Actix-web

Что это?
Actix — это библиотека для работы с акторами (actor model) в Rust, а actix-web — фреймворк для создания веб-приложений (как Flask для Python или Express для JavaScript).

Простыми словами:
Представь, что у тебя есть ресторан. Actix — это система, которая помогает поварам (акторам) готовить блюда, не мешая друг другу. Actix-web — это официант, который принимает заказы от посетителей (запросы от пользователей) и передаёт их поварам.

Пример:

use actix_web::{web, HttpResponse};

Здесь мы подключаем actix-web, чтобы обрабатывать HTTP-запросы.

Addr и Актор (Actor)

Что это?
Addr<NodeActor> — это адрес актора (почтового ящика), через который можно отправлять ему сообщения.

Простыми словами:
Актор — это как работник на фабрике, который выполняет только свою задачу. Чтобы отправить ему задание, нужно знать его адрес (Addr). Сообщения отправляются в его "почтовый ящик".

Пример:

actor.send(msg.0).await

Здесь мы отправляем сообщение актору и ждём ответа.

HTTP-обработчики (Handlers)

Что это?
Функции, которые обрабатывают HTTP-запросы (например, когда пользователь открывает страницу или отправляет данные).

Простыми словами:
Это как кнопки на пульте: каждая кнопка выполняет свою функцию. Например, кнопка "Включить телевизор" — это обработчик запроса "включи телевизор".

Пример:

pub async fn receive_node_message(
msg: web::Json<NodeMessage>,
actor: web::Data<Addr<NodeActor>>,
) -> impl Responder {
// ...
}

Эта функция вызывается, когда на узел приходит сообщение.

Сообщения (Messages)

Что это?
Данные, которые передаются между акторами или между клиентом и сервером.

Простыми словами:
Сообщение — это как письмо или SMS. Например, ты отправляешь SMS другу: "Привет, как дела?" — это сообщение.

Пример:

msg: web::Json<NodeMessage>

Здесь NodeMessage — это структура, которая описывает формат сообщения.

HTTP-ответы (HttpResponse)

Что это?
То, что сервер отправляет обратно клиенту (например, "всё ок" или "ошибка").

Простыми словами:
Когда ты спрашиваешь у друга: "Как дела?", он может ответить: "Хорошо!" или "Плохо". HttpResponse — это ответ сервера на запрос.

Пример:

HttpResponse::Ok().json(serde_json::json!({"status": "success"}))

Здесь сервер отвечает: "Всё хорошо, статус — успех".

Сериализация/Десериализация (serde)

Что это?
Преобразование данных в формат, удобный для передачи (например, JSON), и обратно.

Простыми словами:
Представь, что ты хочешь отправить письмо с игрушкой. Чтобы отправить, нужно упаковать игрушку в коробку (сериализация), а когда получаешь — распаковать (десериализация).

Пример:

#[derive(Serialize, Deserialize)]
pub struct NodeStatus {
pub address: String,
pub status: String,
}

Здесь структура NodeStatus может быть преобразована в JSON и обратно.

Асинхронность (async/await)

Что это?
Код, который может выполняться параллельно, не блокируя другие задачи.

Простыми словами:
Представь, что ты готовишь завтрак: пока варится кофе, ты можешь намазать бутерброд. Асинхронность позволяет делать несколько дел одновременно.

Пример:

match actor.send(msg.0).await {
// ...
}

Здесь .await означает, что мы ждём ответа от актора, но не блокируем другие задачи.

Федеративное обучение (Federated Learning)

Что это?
Это способ обучения модели машинного обучения, при котором данные остаются на местных устройствах (например, на телефонах или компьютерах), а на сервер отправляются только обновлённые параметры модели. Так сохраняется конфиденциальность данных.

Пример:
Представь, что у тебя и твоих друзей есть телефоны с фотографиями кошек и собак. Вместо того, чтобы отправлять все фотографии на сервер, каждый телефон обучает модель на своих данных, а потом отправляет только изменения в модели. Сервер объединяет эти изменения и отправляет обновлённую модель обратно.

NodeActor

Что это?
Это актор (actor) — объект, который обрабатывает сообщения и выполняет действия. В данном случае — узел федеративного обучения.

Что делает?

  • Хранит модель (model).
  • Знает адрес сервера (server_addr) и свой адрес (node_addr).
  • Обрабатывает команды: обучение, предсказание, обновление модели.

Пример:
Представь, что NodeActor — это работник на фабрике. Он получает задания (сообщения), выполняет их (обучает модель, делает предсказания) и отправляет результаты начальнику (серверу).

Сообщения (NodeMessage)

Что это?
Это команды, которые может получать и отправлять узел.

Виды сообщений:

  • Train — обучи модель на локальных данных.
  • Predict — сделай предсказание.
  • UpdateModel — обнови модель новыми параметрами.
  • RegisterNode — зарегистрируйся на сервере.

Пример:
Как если бы ты писал письма с просьбами: "Обучи модель на моих данных", "Сделай предсказание", "Обнови модель".

Модель (SharedModel)

Что это?
Это модель машинного обучения, обёрнутая в Mutex (или другой механизм синхронизации), чтобы несколько потоков могли безопасно с ней работать.

Что делает?

  • train — обучается на данных.
  • forward — делает предсказания.
  • extract_params — извлекает параметры модели для отправки на сервер.
  • update_model — обновляет модель новыми параметрами.

Пример:
Представь, что модель — это рецепт торта. Ты можешь его улучшать (обучать), печь торты (делать предсказания), делиться улучшениями (отправлять параметры) и использовать улучшения других (обновлять модель).

Обработка сообщений (Handler<NodeMessage>)

Что это?
Это метод, который определяет, как узел реагирует на разные сообщения.

Пример:
Если пришло сообщение Train, узел:

  1. Подготавливает данные.
  2. Обучает модель.
  3. Отправляет обновлённые параметры на сервер.

Отправка сообщений на сервер (send_to_server)

Что это?
Метод, который отправляет сообщения на сервер по HTTP.

Пример:
Как если бы ты отправлял письмо по почте: "Вот мои обновлённые параметры модели".

Федеративное обучение (Federated Learning)

Что это?
Это способ обучения модели машинного обучения, при котором данные не собираются в одном месте (например, на сервере), а остаются на устройствах пользователей (нодах). Сервер только агрегирует обновления модели от всех устройств, не видя сами данные.

Пример:
Представь, что у тебя и твоих друзей есть смартфоны. Каждый из вас обучает модель распознавания рукописного текста на своих данных (например, на своих записях). Вместо того, чтобы отправлять все записи на сервер, вы отправляете только обновлённые параметры модели. Сервер усредняет их и отправляет обратно улучшенную модель.

Центральный сервер (CentralServer)

Что это?
Это главная программа, которая:

  • Хранит список всех устройств (нод), участвующих в обучении.
  • Собирает обновления модели от нод.
  • Усредняет их (агрегирует) и отправляет обратно новую версию модели.

В коде:

pub struct CentralServer {
nodes: Vec<String>,
// Список адресов нод
aggregated_params: Option<Vec<f32>>,
// Здесь хранятся усреднённые параметры
model: SharedModel,
// Текущая версия модели
updates_received: usize,
// Сколько обновлений получили
total_nodes: usize,
// Сколько всего нод участвует
}

Агрегация параметров (FedAvg)

Что это?
Когда сервер получает обновления от всех нод, он усредняет их параметры (веса модели). Это называется
Federated Averaging (FedAvg).

Пример:
Допустим, у тебя параметр модели = 10, у друга = 20. Сервер усредняет: (10 + 20) / 2 = 15.

В коде:

for param in aggregated.iter_mut() {
*param /= self.total_nodes as f32;
}

Обработка сообщений (Handlers)

Сервер умеет обрабатывать разные типы сообщений:

  • ServerMessage — сообщение от ноды с обновлёнными параметрами.
  • NodeMessage — сообщение от ноды (например, регистрация или обновление).
  • GetNodesRequest — запрос списка нод.
  • GetModelParams — запрос текущих параметров модели.

Пример:
Если нода отправляет обновление, сервер добавляет её параметры к общему пулу и считает, сколько обновлений уже пришло.

Асинхронная отправка (Broadcast)

Когда сервер усреднил параметры, он отправляет новую модель всем нодам асинхронно (не дожидаясь ответа).

В коде:

actix_web::rt::spawn(async move {
let client = awc::Client::default();
match client.post(&node_addr).send_json(&msg_clone).await {
Ok(_) => info!("Broadcast to {} successful", node_addr),
Err(e) => error!("Failed to broadcast to {}: {}", node_addr, e),
}
});

Регистрация нод

Сервер ведёт список всех нод. Если новая нода регистрируется, её адрес добавляется в список.

Пример:
Нода отправляет сообщение: "Я новая, мой адрес — 192.168.1.1". Сервер добавляет её в self.nodes.