Найти тему
Один Rust не п...Rust

P2P на Rust

Для чего нужна данная статья? :
Узнать как реализовать:

Обмен файлами P2P между пользователями, что позволяет им скачивать и загружать контент напрямую друг у друга. Примеры включают BitTorrent и eDonkey.
Децентрализованные финансовые системы такие как Bitcoin и Ethereum, используют технологию P2P для обеспечения децентрализованных финансовых транзакций без посредников.
Стриминг мультимедийного контента P2P для стриминга аудио и видео контента напрямую от других участников сети.
Децентрализованные социальные сети используя технологию P2P, чтобы пользователи могли обмениваться контентом и общаться без центрального управления.
Облачные вычисления P2P, где ресурсы разделяются между участниками сети для выполнения задач и обработки данных.
Интернет вещей (IoT) P2P для обмена данными между устройствами, улучшая эффективность и безопасность коммуникации.
Децентрализованные ресурсы хранения P2P для создания децентрализованных хранилищ данных, где каждый участник сети предоставляет ресурсы для хранения данных других участников.
Децентрализованные рынки и торговля P2P для создания децентрализованных торговых платформ, где участники могут обмениваться товарами и услугами напрямую.

Зачем Вам это уметь? :

Для поиска компромисса между собственными библиотеками и стандартными.

Пример P2P-сервера, который слушает соединения на адресе 127.0.0.1:8080.

use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;
const SERVER_ADDR: &str = "127.0.0.1:8080";
fn handle_client(mut stream: TcpStream, files: &HashMap<String, Vec<u8>>) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap();
let request = String::from_utf8_lossy(&buffer[..]);
let request_parts: Vec<&str> = request.split_whitespace().collect();
if request_parts.len() != 2 || request_parts[0] != "GET" {
return;
}
let file_name = request_parts[1];
if let Some(file_content) = files.get(file_name) {
stream.write_all(file_content).unwrap();
}
}
fn main() {
// у клиента есть набор файлов, для скачивания.
let mut user_files = HashMap::new();
user_files.insert("file1.txt".to_string(), b"Content of file 1".to_vec());
user_files.insert("file2.txt".to_string(), b"Content of file 2".to_vec());
let listener = TcpListener::bind(SERVER_ADDR).unwrap();
println!("Server listening on {}", SERVER_ADDR);
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let files = user_files.clone();
thread::spawn(move || {
handle_client(stream, &files);
});
}
Err(e) => {
println!("Error: {}", e);
}
}
}
}

Пример реализации блокчейна.

use chrono::prelude::*;
use crypto::digest::Digest;
use crypto::sha2::Sha256;
use std::fmt;
// Блок цепочки блокчейна
#[derive(Debug)]
struct Block {
index: u64,
timestamp: i64,
data: String,
previous_hash: String,
hash: String,
}
impl Block {
fn new(index: u64, data: String, previous_hash: String) -> Block {
let timestamp = Utc::now().timestamp();
let hash = Block::calculate_hash(index, timestamp, &data, &previous_hash);
Block {
index,
timestamp,
data,
previous_hash,
hash,
}
}
fn calculate_hash(index: u64, timestamp: i64, data: &str, previous_hash: &str) -> String {
let input = format!("{}{}{}{}", index, timestamp, data, previous_hash);
let mut sha = Sha256::new();
sha.input_str(&input);
sha.result_str()
}
}
impl fmt::Display for Block {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Block #{} [Timestamp: {}, Data: {}, Previous Hash: {}, Hash: {}]",
self.index, self.timestamp, self.data, self.previous_hash, self.hash
)
}
}
// Блокчейн
struct Blockchain {
chain: Vec<Block>,
}
impl Blockchain {
fn new() -> Blockchain {
let genesis_block = Block::new(0, "Genesis Block".to_string(), "0".to_string());
Blockchain {
chain: vec![genesis_block],
}
}
fn add_block(&mut self, data: String) {
let previous_hash = self.get_latest_block().hash.clone();
let new_block = Block::new((self.chain.len() as u64), data, previous_hash);
self.chain.push(new_block);
}
fn get_latest_block(&self) -> &Block {
self.chain.last().unwrap()
}
}
fn main() {
// Создаем блокчейн
let mut blockchain = Blockchain::new();
// Добавляем несколько блоков
blockchain.add_block("Transaction 1".to_string());
blockchain.add_block("Transaction 2".to_string());
// Выводим содержимое блокчейна
for block in &blockchain.chain {
println!("{}", block);
}
}

Пример создания P2P-сервера, который может отправлять аудиофайл.

use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;
const SERVER_ADDR: &str = "127.0.0.1:8080";
const AUDIO_FILE_PATH: &str = "path/to/audio/file.mp3";
fn handle_client(mut stream: TcpStream) {
// Открываем и читаем аудиофайл
let mut audio_file = std::fs::File::open(AUDIO_FILE_PATH).expect("Failed to open audio file");
let mut audio_data = Vec::new();
audio_file
.read_to_end(&mut audio_data)
.expect("Failed to read audio file");
// Отправляем аудиофайл клиенту
stream.write_all(&audio_data).expect("Failed to send audio data to client");
}
fn main() {
let listener = TcpListener::bind(SERVER_ADDR).expect("Failed to bind to address");
println!("Server listening on {}", SERVER_ADDR);
for stream in listener.incoming() {
match stream {
Ok(stream) => {
// Запускаем новый поток для обслуживания клиента
thread::spawn(move || {
handle_client(stream);
});
}
Err(e) => {
println!("Error: {}", e);
}
}
}
}

Пример P2P-сети, где пользователи могут обмениваться сообщениями.

//[dependencies]
//tokio = { version = "1", features = ["full"] }

use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use tokio::time::Instant;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::error::Error;
// Структура представляющая сообщение пользователя
#[derive(Debug)]
struct Message {
sender: String,
content: String,
}
// Структура представляющая пользователя
#[derive(Debug)]
struct User {
name: String,
sender: mpsc::Sender<Message>,
}
// Структура представляющая состояние сервера
struct ServerState {
users: HashMap<String, Arc<Mutex<User>>>,
}
impl ServerState {
fn new() -> ServerState {
ServerState {
users: HashMap::new(),
}
}
// Добавление нового пользователя
fn add_user(&mut self, name: String, sender: mpsc::Sender<Message>) {
let user = User { name: name.clone(), sender };
let user_arc = Arc::new(Mutex::new(user));
self.users.insert(name, user_arc);
}
// Рассылка сообщения всем пользователям
async fn broadcast(&self, message: Message) {
for user in self.users.values() {
let user = user.clone();
let message = message.clone();
tokio::spawn(async move {
let mut user = user.lock().unwrap();
user.sender.send(message).await.unwrap();
});
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let server_state = Arc::new(Mutex::new(ServerState::new()));
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server listening on 127.0.0.1:8080");
while let Ok((stream, _)) = listener.accept().await {
let server_state = server_state.clone();
// Для каждого нового подключения, создаем новый таск
tokio::spawn(async move {
if let Err(e) = handle_client(stream, server_state).await {
println!("Error handling client: {}", e);
}
});
}
Ok(())
}
async fn handle_client(stream: TcpStream, server_state: Arc<Mutex<ServerState>>) -> Result<(), Box<dyn Error>> {
let (mut reader, mut writer) = tokio::io::split(stream);
// Получаем имя пользователя от клиента
let mut buffer = [0; 1024];
let n = reader.read(&mut buffer).await?;
let username = String::from_utf8_lossy(&buffer[0..n]).trim().to_string();
println!("New user connected: {}", username);
// Создаем канал для обмена сообщениями между сервером и клиентом
let (sender, mut receiver) = mpsc::channel::<Message>(100);
// Добавляем нового пользователя в состояние сервера
server_state.lock().unwrap().add_user(username.clone(), sender);
// Обработка входящих сообщений от клиента
let server_state_clone = server_state.clone();
tokio::spawn(async move {
while let Some(message) = receiver.recv().await {
// Рассылка сообщения всем пользователям
server_state_clone.lock().unwrap().broadcast(message).await;
}
});
// Обработка входящих сообщений от клиента
while let Some(message) = receiver.recv().await {
writer.write_all(message.content.as_bytes()).await?;
}
Ok(())
}

Пример децентрализованных облачных вычислений.

//[dependencies]
//tokio = { version = "1", features = ["full"] }

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use std::error::Error;
use std::sync::{Arc, Mutex};
use std::thread;
// Структура представляющая задачу обработки данных
struct Task {
data: Vec<u8>,
}
// Структура представляющая узел сети
struct Node {
id: u64,
tasks: Arc<Mutex<Vec<Task>>>,
}
impl Node {
fn new(id: u64) -> Node {
Node {
id,
tasks: Arc::new(Mutex::new(Vec::new())),
}
}
// Добавление задачи к узлу
fn add_task(&self, task: Task) {
self.tasks.lock().unwrap().push(task);
}
// Обработка задачи
async fn process_tasks(&self) {
let mut tasks = self.tasks.lock().unwrap();
for task in tasks.drain(..) {
// Здесь может быть ваш код обработки задачи
println!("Node {} processing task: {:?}", self.id, task);
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server listening on 127.0.0.1:8080");
// Создаем несколько узлов сети
let node1 = Arc::new(Node::new(1));
let node2 = Arc::new(Node::new(2));
// Канал для передачи задач между узлами
let (sender, mut receiver) = mpsc::channel::<Task>(100);
// Клонируем Arc для передачи его в потоки
let sender_clone = sender.clone();
let node1_clone = node1.clone();
let node2_clone = node2.clone();
// Поток для обработки задач узла 1
thread::spawn(move || {
tokio::runtime::Runtime::new().unwrap().block_on(async move {
while let Some(task) = node1_clone.tasks.lock().unwrap().pop() {
// Здесь может быть ваш код обработки задачи
println!("Node 1 processing task: {:?}", task);
}
});
});
// Поток для обработки задач узла 2
thread::spawn(move || {
tokio::runtime::Runtime::new().unwrap().block_on(async move {
while let Some(task) = node2_clone.tasks.lock().unwrap().pop() {
// Здесь может быть ваш код обработки задачи
println!("Node 2 processing task: {:?}", task);
}
});
});
// Поток для передачи задач между узлами
tokio::spawn(async move {
while let Some(task) = receiver.recv().await {
// Рассылка задачи узлам
node1.add_task(task.clone());
node2.add_task(task.clone());
}
});
// Слушаем подключения от клиентов
while let Ok((stream, _)) = listener.accept().await {
let sender_clone = sender.clone();
// Для каждого нового подключения, создаем новый таск
tokio::spawn(async move {
if let Err(e) = handle_client(stream, sender_clone).await {
println!("Error handling client: {}", e);
}
});
}
Ok(())
}
async fn handle_client(mut stream: TcpStream, sender: mpsc::Sender<Task>) -> Result<(), Box<dyn Error>> {
let mut buffer = [0; 1024];
let n = stream.read(&mut buffer).await?;
let data = buffer[0..n].to_vec();
let task = Task { data };
// Отправляем задачу в общую очередь
sender.send(task).await?;
// Закрываем соединение
Ok(())
}

Пример P2P-коммуникации между устройствами IoT через TCP-соединение.

//[dependencies]
//tokio = { version = "1", features = ["full"] }

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use std::collections::HashMap;
use std::error::Error;
use std::sync::{Arc, Mutex};
// Структура представляющая устройство IoT
struct IoTDevice {
id: String,
data: String,
}
impl IoTDevice {
fn new(id: &str) -> IoTDevice {
IoTDevice {
id: id.to_string(),
data: String::new(),
}
}
// Обновление данных устройства
fn update_data(&mut self, new_data: &str) {
self.data = new_data.to_string();
}
}
// Структура представляющая состояние сети IoT
struct IoTNetwork {
devices: Arc<Mutex<HashMap<String, IoTDevice>>>,
}
impl IoTNetwork {
fn new() -> IoTNetwork {
IoTNetwork {
devices: Arc::new(Mutex::new(HashMap::new())),
}
}
// Добавление нового устройства в сеть
fn add_device(&self, device_id: &str) {
let mut devices = self.devices.lock().unwrap();
devices.insert(device_id.to_string(), IoTDevice::new(device_id));
}
// Обновление данных устройства в сети
fn update_device_data(&self, device_id: &str, new_data: &str) {
let mut devices = self.devices.lock().unwrap();
if let Some(device) = devices.get_mut(device_id) {
device.update_data(new_data);
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server listening on 127.0.0.1:8080");
// Создаем сеть IoT
let iot_network = Arc::new(IoTNetwork::new());
// Поток для обработки подключений устройств IoT
while let Ok((stream, _)) = listener.accept().await {
let iot_network = iot_network.clone();
// Для каждого нового подключения, создаем новый таск
tokio::spawn(async move {
if let Err(e) = handle_device_connection(stream, iot_network).await {
println!("Error handling device connection: {}", e);
}
});
}
Ok(())
}
async fn handle_device_connection(mut stream: TcpStream, iot_network: Arc<IoTNetwork>) -> Result<(), Box<dyn Error>> {
let mut buffer = [0; 1024];
let n = stream.read(&mut buffer).await?;
let data = String::from_utf8_lossy(&buffer[0..n]).trim().to_string();
// Получаем идентификатор устройства и данные
let mut parts = data.split(',');
if let Some(device_id) = parts.next() {
if let Some(device_data) = parts.next() {
// Добавляем устройство в сеть IoT, если оно новое
iot_network.add_device(device_id);
// Обновляем данные устройства в сети IoT
iot_network.update_device_data(device_id, device_data);
// Отправляем подтверждение клиенту
stream.write_all(b"Data received by server").await?;
}
}
Ok(())
}

Пример предоставляет механизм для сохранения и извлечения данных.

//[dependencies]
//tokio = { version = "1", features = ["full"] }

use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use tokio::time::Duration;
// Структура, представляющая децентрализованное хранилище данных
struct DecentralizedStorage {
data: Arc<Mutex<HashMap<String, Vec<u8>>>>,
}
impl DecentralizedStorage {
fn new() -> DecentralizedStorage {
DecentralizedStorage {
data: Arc::new(Mutex::new(HashMap::new())),
}
}
// Добавление данных в хранилище
fn store_data(&self, key: String, value: Vec<u8>) {
let mut data = self.data.lock().unwrap();
data.insert(key, value);
}
// Получение данных из хранилища
fn retrieve_data(&self, key: &str) -> Option<Vec<u8>> {
let data = self.data.lock().unwrap();
data.get(key).cloned()
}
}
#[tokio::main]
async fn main() {
let storage = Arc::new(DecentralizedStorage::new());
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
println!("Server listening on 127.0.0.1:8080");
while let Ok((stream, addr)) = listener.accept().await {
let storage = storage.clone();
tokio::spawn(async move {
handle_client(stream, addr, storage).await;
});
}
}
async fn handle_client(mut stream: TcpStream, addr: SocketAddr, storage: Arc<DecentralizedStorage>) {
let mut buffer = [0; 1024];
let n = stream.read(&mut buffer).await.unwrap();
let request = String::from_utf8_lossy(&buffer[0..n]).trim().to_string();
let parts: Vec<&str> = request.split(',').collect();
match parts[0] {
"STORE" => {
if parts.len() == 3 {
let key = parts[1].to_string();
let value = parts[2].as_bytes().to_vec();
storage.store_data(key, value);
stream.write_all(b"Data stored successfully").await.unwrap();
} else {
stream.write_all(b"Invalid STORE command").await.unwrap();
}
}
"RETRIEVE" => {
if parts.len() == 2 {
let key = parts[1];
if let Some(data) = storage.retrieve_data(key) {
stream.write_all(&data).await.unwrap();
} else {
stream.write_all(b"Key not found").await.unwrap();
}
} else {
stream.write_all(b"Invalid RETRIEVE command").await.unwrap();
}
}
_ => {
stream.write_all(b"Invalid command").await.unwrap();
}
}
println!("Connection from {}: {}", addr, request);
}

Пример P2P-рынка и торговли.

//[dependencies]
//tokio = { version = "1", features = ["full"] }

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use std::collections::HashMap;
use std::error::Error;
use std::sync::{Arc, Mutex};
// Структура представляющая товар или услугу
struct Product {
name: String,
price: f64,
}
// Структура представляющая децентрализованный рынок
struct DecentralizedMarket {
products: Arc<Mutex<HashMap<String, Product>>>,
}
impl DecentralizedMarket {
fn new() -> DecentralizedMarket {
DecentralizedMarket {
products: Arc::new(Mutex::new(HashMap::new())),
}
}
// Добавление товара в рынок
fn add_product(&self, name: String, price: f64) {
let mut products = self.products.lock().unwrap();
products.insert(name.clone(), Product { name, price });
}
// Получение списка товаров
fn get_products(&self) -> Vec<Product> {
let products = self.products.lock().unwrap();
products.values().cloned().collect()
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let market = Arc::new(DecentralizedMarket::new());
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server listening on 127.0.0.1:8080");
while let Ok((stream, _)) = listener.accept().await {
let market = market.clone();
tokio::spawn(async move {
handle_client(stream, market).await;
});
}
Ok(())
}
async fn handle_client(mut stream: TcpStream, market: Arc<DecentralizedMarket>) {
let mut buffer = [0; 1024];
let n = stream.read(&mut buffer).await.unwrap();
let request = String::from_utf8_lossy(&buffer[0..n]).trim().to_string();
match request.as_str() {
"GET_PRODUCTS" => {
let products = market.get_products();
let response = serde_json::to_string(&products).unwrap();
stream.write_all(response.as_bytes()).await.unwrap();
}
_ => {
stream.write_all(b"Invalid request").await.unwrap();
}
}
}