Для чего нужна данная статья? :
Научиться использовать pcap для перехвата трафика,с целью выявления Torrent - запросов и gRPC для отправки данных в распределённую систему мониторинга.
🔹 Архитектура
- Захват пакетов: Используем pcap для перехвата трафика.
- Фильтрация BitTorrent: Анализируем пакеты, выявляя BitTorrent-запросы.
- Сбор статистики: Учитываем IP-адреса клиентов в LAN, объём переданных данных.
- Отчётность: Отправляем данные по gRPC в централизованный сервер или логируем локально.
- Распределённость: Можно использовать RabbitMQ/Kafka для сбора данных в распределённой сети.
Зачем Вам это уметь? :
1. Низкоуровневые реализации (Сырой TCP/UDP)
Подходит для реализации собственных протоколов или высокопроизводительных решений.
1.1. TCP (потоковые соединения)
- Используется для надёжной передачи данных.
- Контроль ошибок, порядок пакетов гарантирован.
🔧 Библиотеки: std::net, mio, tokio::net
Пример TCP-сервера:
use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
use std::thread;
fn handle_client(mut stream: TcpStream) {
let mut buffer = [0; 1024];
while let Ok(bytes_read) = stream.read(&mut buffer) {
if bytes_read == 0 {
break;
}
stream.write_all(&buffer[..bytes_read]).unwrap();
}
}
fn main() {
let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
for stream in listener.incoming() {
match stream {
Ok(stream) => {
thread::spawn(|| handle_client(stream));
}
Err(e) => eprintln!("Error: {}", e),
}
}
}
1.2. UDP (без установления соединения)
- Используется для быстрых, но ненадёжных протоколов.
- Хорош для стриминга, VoIP, игр.
🔧 Библиотеки: std::net, tokio::net
Пример UDP-сервера с tokio:
use tokio::net::UdpSocket;
#[tokio::main]
async fn main() {
let socket = UdpSocket::bind("127.0.0.1:8080").await.unwrap();
let mut buf = [0; 1024];
loop {
let (size, peer) = socket.recv_from(&mut buf).await.unwrap();
socket.send_to(&buf[..size], &peer).await.unwrap();
}
}
2. Асинхронные реализации (Tokio, async-std, smol)
Для высокой производительности и работы с большим числом соединений.
🔧 Библиотеки: tokio, async-std, smol
📌 Используется для:
- Реализации серверов высокой нагрузки
- P2P-протоколов
- RPC и микросервисов
3. RPC-протоколы
Применяются для взаимодействия между сервисами.
3.1. gRPC (Tonic)
- Высокопроизводительный RPC на HTTP/2
- Стриминг, авторизация, сериализация данных
🔧 Библиотеки: tonic, prost
3.2. JSON-RPC
- Подходит для API, микросервисов
- Легковесный протокол поверх HTTP или WebSocket
🔧 Библиотеки: jsonrpc-core, jsonrpc-http-server, jsonrpc-ws-server
3.3. Cap’n Proto и Flatbuffers
- RPC с низкими накладными расходами
- Быстрее gRPC за счёт zero-copy
🔧 Библиотеки: capnp, flatbuffers
4. Message Queue (Очереди сообщений)
Используются для распределённых систем, асинхронной обработки задач.
🔧 Библиотеки:
- lapin (RabbitMQ)
- async-nats (NATS)
- rdkafka (Kafka)
- redis (Redis Pub/Sub)
📌 Подходит для:
- Микросервисов
- Логирования событий
- Очередей задач
5. ZeroMQ (ZMQ)
- Высокопроизводительная библиотека для обмена сообщениями
- Позволяет строить распределённые системы без брокеров
🔧 Библиотека: zmq
📌 Применение:
- P2P-приложения
- Высоконагруженные распределённые сервисы
6. QUIC (UDP + TLS)
- Современный протокол, быстрее TCP
- Подходит для WebRTC, API, low-latency сервисов
🔧 Библиотеки:
- quinn
- s2n-quic
7. WebRTC (P2P-взаимодействие)
Используется для передачи аудио, видео и данных между узлами.
🔧 Библиотека: webrtc-rs
📌 Применение:
- Видеоконференции
- P2P-файлообмен
- Игровые приложения
8. P2P-протоколы (BitTorrent, IPFS, libp2p)
Для построения децентрализованных систем.
🔧 Библиотеки:
- libp2p (используется в IPFS)
- bittorrent-rs
📌 Подходит для:
- Децентрализованных сетей
- Файлообмена
- Доступа к данным без центральных серверов
9. Distributed Consensus (RAFT, Paxos, PBFT)
Используется для распределённых баз данных, кластерных систем.
🔧 Библиотеки:
- raft (Rust-реализация RAFT)
- hotstuff (PBFT)
📌 Применение:
- Лог-репликация
- Надёжные распределённые системы
10. Blockchain Protocols
Для создания децентрализованных блокчейнов.
🔧 Библиотеки:
- substrate (Polkadot)
- solana-sdk (Solana)
📌 Подходит для:
- Смарт-контрактов
- Децентрализованных приложений
Реализация
1. Установка зависимостей
Добавьте следующие зависимости в Cargo.toml:
[dependencies]
pnet = "0.27"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Как это работает:
- Сетевой захват : Используется pnet для прослушивания сетевого трафика через выбранный интерфейс.
- Анализ пакетов : Для каждого TCP-пакета проверяется, содержит ли он Handshake-сообщение BitTorrent.
- Сбор статистики : IP-адреса источников и получателей трафика добавляются в хэш-таблицу вместе с количеством переданных байтов.
- Генерация отчета : Каждые 60 секунд статистика сохраняется в JSON-файл.
5. Фильтрация LAN-IP Мы добавим проверку, чтобы обрабатывать только IP-адреса из локальной сети (например, 192.168.x.x или 10.x.x.x). Это позволит исключить внешний трафик из анализа.
// Проверка, является ли IP частью локальной сети
fn is_lan_ip(ip: &str) -> bool {
ip.starts_with("192.168.") || ip.starts_with("10.")
}
Затем используем эту функцию в методе process_ipv4_packet:
fn process_ipv4_packet(ethernet_packet: &EthernetPacket, stats: &mut HashMap<String, TrafficStats>) {
let ipv4_packet = Ipv4Packet::new(ethernet_packet.payload()).unwrap();
if ipv4_packet.get_next_level_protocol() == IpNextHeaderProtocols::Tcp {
let tcp_packet = TcpPacket::new(ipv4_packet.payload());
if let Some(tcp_packet) = tcp_packet {
let src_ip = ipv4_packet.get_source().to_string();
let dst_ip = ipv4_packet.get_destination().to_string();
// Проверяем, что IP принадлежит локальной сети
if is_lan_ip(&src_ip) && is_lan_ip(&dst_ip) {
if is_bittorrent_traffic(&tcp_packet) {
update_stats(stats, &src_ip, tcp_packet.packet().len() as u64);
update_stats(stats, &dst_ip, tcp_packet.packet().len() as u64);
}
}
}
}
}
6. Логирование
Добавим библиотеку log для записи событий в лог-файл. Для этого потребуется еще одна зависимость в Cargo.toml:
[dependencies]
log = "0.4"
simple_logger = "1.12" # Простой логгер для быстрой настройки
Инициализируем логгер в начале программы:
use log::{info, error};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
simple_logger::init_with_level(log::Level::Info).expect("Не удалось инициализировать логгер");
info!("Запуск мониторинга трафика...");
// Остальной код программы...
}
Теперь можно использовать info! и error! для записи сообщений в лог:
info!("Получен пакет от {} к {}", src_ip, dst_ip);
error!("Ошибка при обработке пакета: {}", e);
7. Централизованный сервер
Чтобы отправлять данные на удаленный сервер, воспользуемся HTTP-клиентом reqwest. Добавьте зависимость в Cargo.toml:
reqwest = { version = "0.11", features = ["json"] }
Измените метод generate_report, чтобы отправлять данные на удаленный сервер:
use reqwest::Client;
async fn generate_report(
stats: &HashMap<String, TrafficStats>,
server_url: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let mut report_data = Vec::new();
for (_, stat) in stats.iter() {
report_data.push(json!({
"ip": stat.ip,
"bytes_transferred": stat.bytes_transferred,
}));
}
let client = Client::new();
let response = client
.post(server_url)
.header("Content-Type", "application/json")
.body(serde_json::to_string(&report_data)?)
.send()
.await?;
if response.status().is_success() {
info!("Отчет успешно отправлен на сервер");
} else {
error!("Ошибка при отправке отчета: {}", response.status());
}
Ok(())
}
Вызов метода:
let server_url = "http://your-central-server.com/api/report";
generate_report(&stats, server_url).await?;
8. Оптимизация производительности
Для параллельной обработки пакетов используем многопоточность с помощью tokio::task::spawn. Измените основной цикл так, чтобы каждый пакет обрабатывался в отдельной задаче:
loop {
match rx.next() {
Ok(packet) => {
let packet_copy = packet.to_vec(); // Создаем копию пакета
tokio::task::spawn(async move {
let ethernet_packet = EthernetPacket::new(&packet_copy).unwrap();
if ethernet_packet.get_ethertype() == EtherTypes::Ipv4 {
process_ipv4_packet(ðernet_packet, &mut stats); // Обновляем статистику
}
});
}
Err(e) => {
error!("Ошибка при чтении пакета: {}", e);
}
}
// Генерация отчета каждые 60 секунд
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
generate_report(&stats, "http://your-central-server.com/api/report").await?;
}
Обратите внимание, что доступ к stats должен быть синхронизирован, если вы планируете обновлять его из нескольких потоков. Для этого используйте Arc<Mutex<HashMap<...>>>:
use std::sync::{Arc, Mutex};
let stats: Arc<Mutex<HashMap<String, TrafficStats>>> = Arc::new(Mutex::new(HashMap::new()));
// Внутри процесса:
let stats_clone = Arc::clone(&stats);
tokio::task::spawn(async move {
let mut stats_guard = stats_clone.lock().unwrap();
process_ipv4_packet(ðernet_packet, &mut *stats_guard);
});
Итоговый код
use pnet::packet::ethernet::{EtherTypes, EthernetPacket};
use pnet::packet::ip::IpNextHeaderProtocols;
use pnet::packet::ipv4::Ipv4Packet;
use pnet::packet::tcp::TcpPacket;
use pnet::packet::Packet;
use pnet::datalink::{self, NetworkInterface};
use pnet::datalink::Channel::Ethernet;
use std::collections::HashMap;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use serde_json::json;
use reqwest::Client;
use std::sync::{Arc, Mutex};
use log::{info, error};
use simple_logger;
#[derive(Debug)]
struct TrafficStats {
ip: String,
bytes_transferred: u64,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
simple_logger::init_with_level(log::Level::Info).expect("Не удалось инициализировать логгер");
let interface_name = "eth0"; // Замените на имя вашего интерфейса
let interfaces = datalink::interfaces();
let interface = interfaces.into_iter()
.find(|iface| iface.name == interface_name)
.expect("Не удалось найти указанный сетевой интерфейс");
let config = datalink::Config::default();
let (mut tx, mut rx) = match datalink::channel(&interface, config) {
Ok(Ethernet(tx, rx)) => (tx, rx),
Ok(_) => panic!("Неподдерживаемый тип канала"),
Err(e) => panic!("Ошибка при создании канала: {}", e),
};
let stats: Arc<Mutex<HashMap<String, TrafficStats>>> = Arc::new(Mutex::new(HashMap::new()));
let server_url = "http://your-central-server.com/api/report";
info!("Запуск мониторинга трафика через интерфейс {}", interface_name);
loop {
match rx.next() {
Ok(packet) => {
let packet_copy = packet.to_vec();
let stats_clone = Arc::clone(&stats);
tokio::task::spawn(async move {
let ethernet_packet = EthernetPacket::new(&packet_copy).unwrap();
if ethernet_packet.get_ethertype() == EtherTypes::Ipv4 {
let mut stats_guard = stats_clone.lock().unwrap();
process_ipv4_packet(ðernet_packet, &mut *stats_guard);
}
});
}
Err(e) => {
error!("Ошибка при чтении пакета: {}", e);
}
}
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
generate_report(&stats, server_url).await?;
}
}
fn process_ipv4_packet(ethernet_packet: &EthernetPacket, stats: &mut HashMap<String, TrafficStats>) {
let ipv4_packet = Ipv4Packet::new(ethernet_packet.payload()).unwrap();
if ipv4_packet.get_next_level_protocol() == IpNextHeaderProtocols::Tcp {
let tcp_packet = TcpPacket::new(ipv4_packet.payload());
if let Some(tcp_packet) = tcp_packet {
let src_ip = ipv4_packet.get_source().to_string();
let dst_ip = ipv4_packet.get_destination().to_string();
if is_lan_ip(&src_ip) && is_lan_ip(&dst_ip) {
if is_bittorrent_traffic(&tcp_packet) {
update_stats(stats, &src_ip, tcp_packet.packet().len() as u64);
update_stats(stats, &dst_ip, tcp_packet.packet().len() as u64);
}
}
}
}
}
fn is_lan_ip(ip: &str) -> bool {
ip.starts_with("192.168.") || ip.starts_with("10.")
}
fn is_bittorrent_traffic(tcp_packet: &TcpPacket) -> bool {
const BITTORRENT_HANDSHAKE_PREFIX: &[u8] = b"\x13BitTorrent protocol";
tcp_packet.payload().starts_with(BITTORRENT_HANDSHAKE_PREFIX)
}
fn update_stats(stats: &mut HashMap<String, TrafficStats>, ip: &str, bytes: u64) {
let entry = stats.entry(ip.to_string()).or_insert(TrafficStats {
ip: ip.to_string(),
bytes_transferred: 0,
});
entry.bytes_transferred += bytes;
}
async fn generate_report(
stats: &Arc<Mutex<HashMap<String, TrafficStats>>>,
server_url: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let mut report_data = Vec::new();
let stats_guard = stats.lock().unwrap();
for (_, stat) in stats_guard.iter() {
report_data.push(json!({
"ip": stat.ip,
"bytes_transferred": stat.bytes_transferred,
}));
}
let client = Client::new();
let response = client
.post(server_url)
.header("Content-Type", "application/json")
.body(serde_json::to_string(&report_data)?)
.send()
.await?;
if response.status().is_success() {
info!("Отчет успешно отправлен на сервер");
} else {
error!("Ошибка при отправке отчета: {}", response.status());
}
Ok(())
}