Для чего нужна данная статья? :
Реализовать многофункциональный пайплайн на Rust:
- Читать данные из локальных файлов (CSV/JSON) и HTTP API.
- Обрабатывать данные с использованием serde_json, csv, regex, chrono.
- Загружать данные в PostgreSQL с sqlx.
- Асинхронно обрабатывать HTTP-запросы с tokio и reqwest.
- Обрабатывать данные в потоках через Kafka (rdkafka) и RabbitMQ (lapin).
- Использовать аналитические библиотеки polars и datafusion.
- Работать с Apache Arrow и Parquet для эффективного хранения больших данных.
- Распределять нагрузку с timely-dataflow и differential-dataflow.
- Поддерживать WebAssembly с wasmer.
- Использовать облачные решения, включая AWS Lambda и Kubernetes (kube-rs).
Зачем Вам это уметь? :
1. Простая ETL-пайплайн с использованием стандартной библиотеки Rust
- Чтение данных из файлов (std::fs::File) или HTTP (reqwest).
- Преобразование данных с использованием serde_json, csv, regex и chrono.
- Загрузка данных в базу с sqlx (PostgreSQL, MySQL) или rusqlite (SQLite).
Минусы: Не масштабируется, нет асинхронной обработки.
2. Асинхронная ETL с использованием tokio и async
- Использование tokio для параллельной загрузки данных.
- reqwest или surf для HTTP-запросов.
- sqlx или diesel для базы данных.
- rayon для многопоточного преобразования.
Минусы: Более сложная реализация.
3. Потоковая обработка ETL через Kafka/RabbitMQ
- rdkafka для Kafka или lapin для RabbitMQ.
- Обработка сообщений в режиме реального времени.
- Поддержка отказоустойчивости через очереди сообщений.
Минусы: Требует внешнего брокера сообщений.
4. Использование polars или datafusion для аналитики
- polars — аналог Pandas для Rust (работает с DataFrame).
- datafusion — SQL-движок для обработки данных в памяти.
Минусы: Высокий расход памяти.
5. ETL с использованием Arrow и Parquet для больших данных
- arrow2 для работы с Apache Arrow.
- parquet для хранения данных в колонном формате.
Минусы: Требует интеграции с большими дата-движками.
6. Распределенный ETL с Timely Dataflow или Differential Dataflow
- timely-dataflow для распределенной потоковой обработки.
- differential-dataflow для инкрементальных вычислений.
Минусы: Высокая сложность, требует кластеров.
7. Использование WebAssembly (WASM) для встраиваемого ETL
- wasmer или wasmtime для выполнения ETL внутри контейнера WASM.
- Можно использовать в серверных или edge-решениях.
Минусы: Ограниченная работа с файловыми системами.
8. Облачные и серверless ETL-решения на Rust
- AWS Lambda + Rust.
- Google Cloud Functions через WebAssembly.
- Встраивание в Kubernetes (kube-rs).
Минусы: Завязка на облака.
Пример асинхронной обработки данных (локальные файлы и HTTP) БД через sqlx, брокера для потоковой передачи данных, Polars и Apache Arrow для аналитики, Timely Dataflow для распределенных вычислений, WebAssembly для встраивания, Kubernetes API для работы в облаке.
use std::fs::File;
use std::io::{BufReader, BufRead};
use serde::{Deserialize, Serialize};
use sqlx::{PgPool, postgres::PgPoolOptions};
use tokio::task;
use reqwest::Client;
use regex::Regex;
use chrono::NaiveDate;
use rdkafka::producer::{FutureProducer, FutureRecord};
use polars::prelude::*;
use arrow2::{array::Int32Array, datatypes::DataType, chunk::Chunk};
use timely::dataflow::operators::{Inspect, Map};
use wasmer::{Store, Module, Instance, imports};
use kube::{Client as KubeClient, api::Api};
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, Channel};
use std::time::Duration;
use log::{info, error};
#[derive(Debug, Serialize, Deserialize)]
struct DataRecord {
id: i32,
name: String,
email: String,
date_of_birth: String,
}
async fn read_from_http(url: &str) -> Result<Vec<DataRecord>, reqwest::Error> {
let client = Client::new();
let resp = client.get(url).send().await?.json::<Vec<DataRecord>>().await?;
Ok(resp)
}
fn read_from_csv(file_path: &str) -> Result<Vec<DataRecord>, csv::Error> {
let mut rdr = csv::Reader::from_path(file_path)?;
let mut records = Vec::new();
for result in rdr.deserialize() {
let record: DataRecord = result?;
records.push(record);
}
Ok(records)
}
async fn insert_to_db(pool: &PgPool, data: Vec<DataRecord>) {
for record in data {
sqlx::query!(
"INSERT INTO users (id, name, email, date_of_birth) VALUES ($1, $2, $3, $4)",
record.id,
record.name,
record.email,
record.date_of_birth
)
.execute(pool)
.await
.expect("Failed to insert data");
}
}
async fn send_to_kafka(producer: &FutureProducer, topic: &str, data: &str) {
producer.send(
FutureRecord::to(topic).payload(data).key("key"),
std::time::Duration::from_secs(0),
).await.expect("Failed to send to Kafka");
}
async fn send_to_rabbitmq(channel: &Channel, queue: &str, data: &str) {
channel
.basic_publish("", queue, BasicPublishOptions::default(), data.as_bytes(), BasicProperties::default())
.await.expect("Failed to send to RabbitMQ");
}
fn analyze_with_polars(data: &[DataRecord]) {
let df = DataFrame::new(
vec![Series::new("id", data.iter().map(|r| r.id).collect::<Vec<_>>())]
).expect("Failed to create DataFrame");
println!("{}", df);
}
fn process_with_arrow() {
let array = Int32Array::from_slice(&[1, 2, 3]);
let chunk = Chunk::new(vec![array.into()]);
println!("Arrow Chunk: {:?}", chunk);
}
fn timely_dataflow_example() {
timely::execute_from_args(std::env::args(), move |worker| {
worker.dataflow(|scope| {
(0..10).to_stream(scope)
.map(|x| x * 2)
.inspect(|x| println!("Timely Data: {}", x));
});
}).expect("Timely Dataflow failed");
}
#[tokio::main]
async fn main() {
env_logger::init();
let database_url = "postgres://user:password@localhost/dbname";
let pool = PgPoolOptions::new().connect(database_url).await.expect("Failed to connect to database");
let data = read_from_csv("data.csv").expect("Failed to read CSV");
insert_to_db(&pool, data.clone()).await;
let producer: FutureProducer = rdkafka::config::ClientConfig::new().create().expect("Failed to create Kafka producer");
send_to_kafka(&producer, "my_topic", "Test message").await;
let conn = Connection::connect("amqp://localhost:5672/%2f", ConnectionProperties::default()).await.expect("Failed to connect to RabbitMQ");
let channel = conn.create_channel().await.expect("Failed to create channel");
channel.queue_declare("my_queue", QueueDeclareOptions::default(), FieldTable::default()).await.expect("Failed to declare queue");
send_to_rabbitmq(&channel, "my_queue", "Test message to RabbitMQ").await;
analyze_with_polars(&data);
process_with_arrow();
timely_dataflow_example();
let store = Store::default();
let module = Module::new(&store, "(module (func (export \"run\") (result i32) i32.const 42))").unwrap();
let instance = Instance::new(&module, &imports! {}).unwrap();
let run = instance.exports.get_function("run").unwrap();
println!("WASM Result: {:?}", run.call(&[]));
let kube_client = KubeClient::try_default().await.unwrap();
let pods: Api<k8s_openapi::api::core::v1::Pod> = Api::all(kube_client);
let pod_list = pods.list(&Default::default()).await.unwrap();
println!("Kubernetes Pods: {:?}", pod_list);
}