Добавить в корзинуПозвонить
Найти в Дзене
Один Rust не п...Rust

Distributed Network Protocol на Rust для выявления Torrent - запросов

Для чего нужна данная статья? : Научиться использовать pcap для перехвата трафика,с целью выявления Torrent - запросов и gRPC для отправки данных в распределённую систему мониторинга. Зачем Вам это уметь? : Подходит для реализации собственных протоколов или высокопроизводительных решений. 1.1. TCP (потоковые соединения) 🔧 Библиотеки: std::net, mio, tokio::net Пример TCP-сервера: use std::net::{TcpListener, TcpStream}; use std::io::{Read, Write}; use std::thread; fn handle_client(mut stream: TcpStream) { let mut buffer = [0; 1024]; while let Ok(bytes_read) = stream.read(&mut buffer) { if bytes_read == 0 { break; } stream.write_all(&buffer[..bytes_read]).unwrap(); } } fn main() { let listener = TcpListener::bind("127.0.0.1:8080").unwrap(); for stream in listener.incoming() { match stream { Ok(stream) => { thread::spawn(|| handle_client(stream)); } Err(e) => eprintln!("Error: {}", e), } } } 1.2. UDP (без установления соединения) 🔧 Библиотеки: std::net, tokio::net Пример UDP-сервера с
Оглавление
ML на RUST без заморочек
Один Rust не п...Rust

Для чего нужна данная статья? :

Научиться использовать pcap для перехвата трафика,с целью выявления Torrent - запросов и gRPC для отправки данных в распределённую систему мониторинга.

🔹 Архитектура

  1. Захват пакетов: Используем pcap для перехвата трафика.
  2. Фильтрация BitTorrent: Анализируем пакеты, выявляя BitTorrent-запросы.
  3. Сбор статистики: Учитываем IP-адреса клиентов в LAN, объём переданных данных.
  4. Отчётность: Отправляем данные по gRPC в централизованный сервер или логируем локально.
  5. Распределённость: Можно использовать RabbitMQ/Kafka для сбора данных в распределённой сети.

Зачем Вам это уметь? :

1. Низкоуровневые реализации (Сырой TCP/UDP)

Подходит для реализации собственных протоколов или высокопроизводительных решений.

1.1. TCP (потоковые соединения)

  • Используется для надёжной передачи данных.
  • Контроль ошибок, порядок пакетов гарантирован.

🔧 Библиотеки: std::net, mio, tokio::net

Пример TCP-сервера:

use std::net::{TcpListener, TcpStream};

use std::io::{Read, Write};

use std::thread;

fn handle_client(mut stream: TcpStream) {

let mut buffer = [0; 1024];

while let Ok(bytes_read) = stream.read(&mut buffer) {

if bytes_read == 0 {

break;

}

stream.write_all(&buffer[..bytes_read]).unwrap();

}

}

fn main() {

let listener = TcpListener::bind("127.0.0.1:8080").unwrap();

for stream in listener.incoming() {

match stream {

Ok(stream) => {

thread::spawn(|| handle_client(stream));

}

Err(e) => eprintln!("Error: {}", e),

}

}

}

1.2. UDP (без установления соединения)

  • Используется для быстрых, но ненадёжных протоколов.
  • Хорош для стриминга, VoIP, игр.

🔧 Библиотеки: std::net, tokio::net

Пример UDP-сервера с tokio:

use tokio::net::UdpSocket;

#[tokio::main]

async fn main() {

let socket = UdpSocket::bind("127.0.0.1:8080").await.unwrap();

let mut buf = [0; 1024];

loop {

let (size, peer) = socket.recv_from(&mut buf).await.unwrap();

socket.send_to(&buf[..size], &peer).await.unwrap();

}

}

2. Асинхронные реализации (Tokio, async-std, smol)

Для высокой производительности и работы с большим числом соединений.

🔧 Библиотеки: tokio, async-std, smol

📌 Используется для:

  • Реализации серверов высокой нагрузки
  • P2P-протоколов
  • RPC и микросервисов

3. RPC-протоколы

Применяются для взаимодействия между сервисами.

3.1. gRPC (Tonic)

  • Высокопроизводительный RPC на HTTP/2
  • Стриминг, авторизация, сериализация данных

🔧 Библиотеки: tonic, prost

3.2. JSON-RPC

  • Подходит для API, микросервисов
  • Легковесный протокол поверх HTTP или WebSocket

🔧 Библиотеки: jsonrpc-core, jsonrpc-http-server, jsonrpc-ws-server

3.3. Cap’n Proto и Flatbuffers

  • RPC с низкими накладными расходами
  • Быстрее gRPC за счёт zero-copy

🔧 Библиотеки: capnp, flatbuffers

4. Message Queue (Очереди сообщений)

Используются для распределённых систем, асинхронной обработки задач.

🔧 Библиотеки:

  • lapin (RabbitMQ)
  • async-nats (NATS)
  • rdkafka (Kafka)
  • redis (Redis Pub/Sub)

📌 Подходит для:

  • Микросервисов
  • Логирования событий
  • Очередей задач

5. ZeroMQ (ZMQ)

  • Высокопроизводительная библиотека для обмена сообщениями
  • Позволяет строить распределённые системы без брокеров

🔧 Библиотека: zmq

📌 Применение:

  • P2P-приложения
  • Высоконагруженные распределённые сервисы

6. QUIC (UDP + TLS)

  • Современный протокол, быстрее TCP
  • Подходит для WebRTC, API, low-latency сервисов

🔧 Библиотеки:

  • quinn
  • s2n-quic

7. WebRTC (P2P-взаимодействие)

Используется для передачи аудио, видео и данных между узлами.

🔧 Библиотека: webrtc-rs

📌 Применение:

  • Видеоконференции
  • P2P-файлообмен
  • Игровые приложения

8. P2P-протоколы (BitTorrent, IPFS, libp2p)

Для построения децентрализованных систем.

🔧 Библиотеки:

  • libp2p (используется в IPFS)
  • bittorrent-rs

📌 Подходит для:

  • Децентрализованных сетей
  • Файлообмена
  • Доступа к данным без центральных серверов

9. Distributed Consensus (RAFT, Paxos, PBFT)

Используется для распределённых баз данных, кластерных систем.

🔧 Библиотеки:

  • raft (Rust-реализация RAFT)
  • hotstuff (PBFT)

📌 Применение:

  • Лог-репликация
  • Надёжные распределённые системы

10. Blockchain Protocols

Для создания децентрализованных блокчейнов.

🔧 Библиотеки:

  • substrate (Polkadot)
  • solana-sdk (Solana)

📌 Подходит для:

  • Смарт-контрактов
  • Децентрализованных приложений

Реализация

1. Установка зависимостей

Добавьте следующие зависимости в Cargo.toml:

[dependencies]

pnet = "0.27"

tokio = { version = "1", features = ["full"] }

serde = { version = "1.0", features = ["derive"] }

serde_json = "1.0"

Как это работает:

  1. Сетевой захват : Используется pnet для прослушивания сетевого трафика через выбранный интерфейс.
  2. Анализ пакетов : Для каждого TCP-пакета проверяется, содержит ли он Handshake-сообщение BitTorrent.
  3. Сбор статистики : IP-адреса источников и получателей трафика добавляются в хэш-таблицу вместе с количеством переданных байтов.
  4. Генерация отчета : Каждые 60 секунд статистика сохраняется в JSON-файл.

5. Фильтрация LAN-IP Мы добавим проверку, чтобы обрабатывать только IP-адреса из локальной сети (например, 192.168.x.x или 10.x.x.x). Это позволит исключить внешний трафик из анализа.

// Проверка, является ли IP частью локальной сети

fn is_lan_ip(ip: &str) -> bool {

ip.starts_with("192.168.") || ip.starts_with("10.")

}

Затем используем эту функцию в методе process_ipv4_packet:

fn process_ipv4_packet(ethernet_packet: &EthernetPacket, stats: &mut HashMap<String, TrafficStats>) {

let ipv4_packet = Ipv4Packet::new(ethernet_packet.payload()).unwrap();

if ipv4_packet.get_next_level_protocol() == IpNextHeaderProtocols::Tcp {

let tcp_packet = TcpPacket::new(ipv4_packet.payload());

if let Some(tcp_packet) = tcp_packet {

let src_ip = ipv4_packet.get_source().to_string();

let dst_ip = ipv4_packet.get_destination().to_string();

// Проверяем, что IP принадлежит локальной сети

if is_lan_ip(&src_ip) && is_lan_ip(&dst_ip) {

if is_bittorrent_traffic(&tcp_packet) {

update_stats(stats, &src_ip, tcp_packet.packet().len() as u64);

update_stats(stats, &dst_ip, tcp_packet.packet().len() as u64);

}

}

}

}

}

6. Логирование

Добавим библиотеку log для записи событий в лог-файл. Для этого потребуется еще одна зависимость в Cargo.toml:

[dependencies]

log = "0.4"

simple_logger = "1.12" # Простой логгер для быстрой настройки

Инициализируем логгер в начале программы:

use log::{info, error};

#[tokio::main]

async fn main() -> Result<(), Box<dyn std::error::Error>> {

simple_logger::init_with_level(log::Level::Info).expect("Не удалось инициализировать логгер");

info!("Запуск мониторинга трафика...");

// Остальной код программы...

}

Теперь можно использовать info! и error! для записи сообщений в лог:

info!("Получен пакет от {} к {}", src_ip, dst_ip);

error!("Ошибка при обработке пакета: {}", e);

7. Централизованный сервер

Чтобы отправлять данные на удаленный сервер, воспользуемся HTTP-клиентом reqwest. Добавьте зависимость в Cargo.toml:

reqwest = { version = "0.11", features = ["json"] }

Измените метод generate_report, чтобы отправлять данные на удаленный сервер:

use reqwest::Client;

async fn generate_report(

stats: &HashMap<String, TrafficStats>,

server_url: &str,

) -> Result<(), Box<dyn std::error::Error>> {

let mut report_data = Vec::new();

for (_, stat) in stats.iter() {

report_data.push(json!({

"ip": stat.ip,

"bytes_transferred": stat.bytes_transferred,

}));

}

let client = Client::new();

let response = client

.post(server_url)

.header("Content-Type", "application/json")

.body(serde_json::to_string(&report_data)?)

.send()

.await?;

if response.status().is_success() {

info!("Отчет успешно отправлен на сервер");

} else {

error!("Ошибка при отправке отчета: {}", response.status());

}

Ok(())

}

Вызов метода:

let server_url = "http://your-central-server.com/api/report";

generate_report(&stats, server_url).await?;

8. Оптимизация производительности

Для параллельной обработки пакетов используем многопоточность с помощью tokio::task::spawn. Измените основной цикл так, чтобы каждый пакет обрабатывался в отдельной задаче:

loop {

match rx.next() {

Ok(packet) => {

let packet_copy = packet.to_vec(); // Создаем копию пакета

tokio::task::spawn(async move {

let ethernet_packet = EthernetPacket::new(&packet_copy).unwrap();

if ethernet_packet.get_ethertype() == EtherTypes::Ipv4 {

process_ipv4_packet(&ethernet_packet, &mut stats); // Обновляем статистику

}

});

}

Err(e) => {

error!("Ошибка при чтении пакета: {}", e);

}

}

// Генерация отчета каждые 60 секунд

tokio::time::sleep(std::time::Duration::from_secs(60)).await;

generate_report(&stats, "http://your-central-server.com/api/report").await?;

}

Обратите внимание, что доступ к stats должен быть синхронизирован, если вы планируете обновлять его из нескольких потоков. Для этого используйте Arc<Mutex<HashMap<...>>>:

use std::sync::{Arc, Mutex};

let stats: Arc<Mutex<HashMap<String, TrafficStats>>> = Arc::new(Mutex::new(HashMap::new()));

// Внутри процесса:

let stats_clone = Arc::clone(&stats);

tokio::task::spawn(async move {

let mut stats_guard = stats_clone.lock().unwrap();

process_ipv4_packet(&ethernet_packet, &mut *stats_guard);

});

Итоговый код

use pnet::packet::ethernet::{EtherTypes, EthernetPacket};

use pnet::packet::ip::IpNextHeaderProtocols;

use pnet::packet::ipv4::Ipv4Packet;

use pnet::packet::tcp::TcpPacket;

use pnet::packet::Packet;

use pnet::datalink::{self, NetworkInterface};

use pnet::datalink::Channel::Ethernet;

use std::collections::HashMap;

use tokio::fs::File;

use tokio::io::AsyncWriteExt;

use serde_json::json;

use reqwest::Client;

use std::sync::{Arc, Mutex};

use log::{info, error};

use simple_logger;

#[derive(Debug)]

struct TrafficStats {

ip: String,

bytes_transferred: u64,

}

#[tokio::main]

async fn main() -> Result<(), Box<dyn std::error::Error>> {

simple_logger::init_with_level(log::Level::Info).expect("Не удалось инициализировать логгер");

let interface_name = "eth0"; // Замените на имя вашего интерфейса

let interfaces = datalink::interfaces();

let interface = interfaces.into_iter()

.find(|iface| iface.name == interface_name)

.expect("Не удалось найти указанный сетевой интерфейс");

let config = datalink::Config::default();

let (mut tx, mut rx) = match datalink::channel(&interface, config) {

Ok(Ethernet(tx, rx)) => (tx, rx),

Ok(_) => panic!("Неподдерживаемый тип канала"),

Err(e) => panic!("Ошибка при создании канала: {}", e),

};

let stats: Arc<Mutex<HashMap<String, TrafficStats>>> = Arc::new(Mutex::new(HashMap::new()));

let server_url = "http://your-central-server.com/api/report";

info!("Запуск мониторинга трафика через интерфейс {}", interface_name);

loop {

match rx.next() {

Ok(packet) => {

let packet_copy = packet.to_vec();

let stats_clone = Arc::clone(&stats);

tokio::task::spawn(async move {

let ethernet_packet = EthernetPacket::new(&packet_copy).unwrap();

if ethernet_packet.get_ethertype() == EtherTypes::Ipv4 {

let mut stats_guard = stats_clone.lock().unwrap();

process_ipv4_packet(&ethernet_packet, &mut *stats_guard);

}

});

}

Err(e) => {

error!("Ошибка при чтении пакета: {}", e);

}

}

tokio::time::sleep(std::time::Duration::from_secs(60)).await;

generate_report(&stats, server_url).await?;

}

}

fn process_ipv4_packet(ethernet_packet: &EthernetPacket, stats: &mut HashMap<String, TrafficStats>) {

let ipv4_packet = Ipv4Packet::new(ethernet_packet.payload()).unwrap();

if ipv4_packet.get_next_level_protocol() == IpNextHeaderProtocols::Tcp {

let tcp_packet = TcpPacket::new(ipv4_packet.payload());

if let Some(tcp_packet) = tcp_packet {

let src_ip = ipv4_packet.get_source().to_string();

let dst_ip = ipv4_packet.get_destination().to_string();

if is_lan_ip(&src_ip) && is_lan_ip(&dst_ip) {

if is_bittorrent_traffic(&tcp_packet) {

update_stats(stats, &src_ip, tcp_packet.packet().len() as u64);

update_stats(stats, &dst_ip, tcp_packet.packet().len() as u64);

}

}

}

}

}

fn is_lan_ip(ip: &str) -> bool {

ip.starts_with("192.168.") || ip.starts_with("10.")

}

fn is_bittorrent_traffic(tcp_packet: &TcpPacket) -> bool {

const BITTORRENT_HANDSHAKE_PREFIX: &[u8] = b"\x13BitTorrent protocol";

tcp_packet.payload().starts_with(BITTORRENT_HANDSHAKE_PREFIX)

}

fn update_stats(stats: &mut HashMap<String, TrafficStats>, ip: &str, bytes: u64) {

let entry = stats.entry(ip.to_string()).or_insert(TrafficStats {

ip: ip.to_string(),

bytes_transferred: 0,

});

entry.bytes_transferred += bytes;

}

async fn generate_report(

stats: &Arc<Mutex<HashMap<String, TrafficStats>>>,

server_url: &str,

) -> Result<(), Box<dyn std::error::Error>> {

let mut report_data = Vec::new();

let stats_guard = stats.lock().unwrap();

for (_, stat) in stats_guard.iter() {

report_data.push(json!({

"ip": stat.ip,

"bytes_transferred": stat.bytes_transferred,

}));

}

let client = Client::new();

let response = client

.post(server_url)

.header("Content-Type", "application/json")

.body(serde_json::to_string(&report_data)?)

.send()

.await?;

if response.status().is_success() {

info!("Отчет успешно отправлен на сервер");

} else {

error!("Ошибка при отправке отчета: {}", response.status());

}

Ok(())

}