Найти в Дзене
Один Rust не п...Rust

ETL & Rust

Для чего нужна данная статья? : Реализовать многофункциональный пайплайн на Rust: Зачем Вам это уметь? : Минусы: Не масштабируется, нет асинхронной обработки. Минусы: Более сложная реализация. Минусы: Требует внешнего брокера сообщений. Минусы: Высокий расход памяти. Минусы: Требует интеграции с большими дата-движками. Минусы: Высокая сложность, требует кластеров. Минусы: Ограниченная работа с файловыми системами. Минусы: Завязка на облака. use std::fs::File; use std::io::{BufReader, BufRead}; use serde::{Deserialize, Serialize}; use sqlx::{PgPool, postgres::PgPoolOptions}; use tokio::task; use reqwest::Client; use regex::Regex; use chrono::NaiveDate; use rdkafka::producer::{FutureProducer, FutureRecord}; use polars::prelude::*; use arrow2::{array::Int32Array, datatypes::DataType, chunk::Chunk}; use timely::dataflow::operators::{Inspect, Map}; use wasmer::{Store, Module, Instance, imports}; use kube::{Client as KubeClient, api::Api}; use lapin::{options::*, types::FieldTable, Conne
Оглавление

Для чего нужна данная статья? :

Реализовать многофункциональный пайплайн на Rust:

  • Читать данные из локальных файлов (CSV/JSON) и HTTP API.
  • Обрабатывать данные с использованием serde_json, csv, regex, chrono.
  • Загружать данные в PostgreSQL с sqlx.
  • Асинхронно обрабатывать HTTP-запросы с tokio и reqwest.
  • Обрабатывать данные в потоках через Kafka (rdkafka) и RabbitMQ (lapin).
  • Использовать аналитические библиотеки polars и datafusion.
  • Работать с Apache Arrow и Parquet для эффективного хранения больших данных.
  • Распределять нагрузку с timely-dataflow и differential-dataflow.
  • Поддерживать WebAssembly с wasmer.
  • Использовать облачные решения, включая AWS Lambda и Kubernetes (kube-rs).

Зачем Вам это уметь? :

1. Простая ETL-пайплайн с использованием стандартной библиотеки Rust

  • Чтение данных из файлов (std::fs::File) или HTTP (reqwest).
  • Преобразование данных с использованием serde_json, csv, regex и chrono.
  • Загрузка данных в базу с sqlx (PostgreSQL, MySQL) или rusqlite (SQLite).

Минусы: Не масштабируется, нет асинхронной обработки.

2. Асинхронная ETL с использованием tokio и async

  • Использование tokio для параллельной загрузки данных.
  • reqwest или surf для HTTP-запросов.
  • sqlx или diesel для базы данных.
  • rayon для многопоточного преобразования.

Минусы: Более сложная реализация.

3. Потоковая обработка ETL через Kafka/RabbitMQ

  • rdkafka для Kafka или lapin для RabbitMQ.
  • Обработка сообщений в режиме реального времени.
  • Поддержка отказоустойчивости через очереди сообщений.

Минусы: Требует внешнего брокера сообщений.

4. Использование polars или datafusion для аналитики

  • polars — аналог Pandas для Rust (работает с DataFrame).
  • datafusion — SQL-движок для обработки данных в памяти.

Минусы: Высокий расход памяти.

5. ETL с использованием Arrow и Parquet для больших данных

  • arrow2 для работы с Apache Arrow.
  • parquet для хранения данных в колонном формате.

Минусы: Требует интеграции с большими дата-движками.

6. Распределенный ETL с Timely Dataflow или Differential Dataflow

  • timely-dataflow для распределенной потоковой обработки.
  • differential-dataflow для инкрементальных вычислений.

Минусы: Высокая сложность, требует кластеров.

7. Использование WebAssembly (WASM) для встраиваемого ETL

  • wasmer или wasmtime для выполнения ETL внутри контейнера WASM.
  • Можно использовать в серверных или edge-решениях.

Минусы: Ограниченная работа с файловыми системами.

8. Облачные и серверless ETL-решения на Rust

  • AWS Lambda + Rust.
  • Google Cloud Functions через WebAssembly.
  • Встраивание в Kubernetes (kube-rs).

Минусы: Завязка на облака.

Пример асинхронной обработки данных (локальные файлы и HTTP) БД через sqlx, брокера для потоковой передачи данных, Polars и Apache Arrow для аналитики, Timely Dataflow для распределенных вычислений, WebAssembly для встраивания, Kubernetes API для работы в облаке.

use std::fs::File;

use std::io::{BufReader, BufRead};

use serde::{Deserialize, Serialize};

use sqlx::{PgPool, postgres::PgPoolOptions};

use tokio::task;

use reqwest::Client;

use regex::Regex;

use chrono::NaiveDate;

use rdkafka::producer::{FutureProducer, FutureRecord};

use polars::prelude::*;

use arrow2::{array::Int32Array, datatypes::DataType, chunk::Chunk};

use timely::dataflow::operators::{Inspect, Map};

use wasmer::{Store, Module, Instance, imports};

use kube::{Client as KubeClient, api::Api};

use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, Channel};

use std::time::Duration;

use log::{info, error};

#[derive(Debug, Serialize, Deserialize)]

struct DataRecord {

id: i32,

name: String,

email: String,

date_of_birth: String,

}

async fn read_from_http(url: &str) -> Result<Vec<DataRecord>, reqwest::Error> {

let client = Client::new();

let resp = client.get(url).send().await?.json::<Vec<DataRecord>>().await?;

Ok(resp)

}

fn read_from_csv(file_path: &str) -> Result<Vec<DataRecord>, csv::Error> {

let mut rdr = csv::Reader::from_path(file_path)?;

let mut records = Vec::new();

for result in rdr.deserialize() {

let record: DataRecord = result?;

records.push(record);

}

Ok(records)

}

async fn insert_to_db(pool: &PgPool, data: Vec<DataRecord>) {

for record in data {

sqlx::query!(

"INSERT INTO users (id, name, email, date_of_birth) VALUES ($1, $2, $3, $4)",

record.id,

record.name,

record.email,

record.date_of_birth

)

.execute(pool)

.await

.expect("Failed to insert data");

}

}

async fn send_to_kafka(producer: &FutureProducer, topic: &str, data: &str) {

producer.send(

FutureRecord::to(topic).payload(data).key("key"),

std::time::Duration::from_secs(0),

).await.expect("Failed to send to Kafka");

}

async fn send_to_rabbitmq(channel: &Channel, queue: &str, data: &str) {

channel

.basic_publish("", queue, BasicPublishOptions::default(), data.as_bytes(), BasicProperties::default())

.await.expect("Failed to send to RabbitMQ");

}

fn analyze_with_polars(data: &[DataRecord]) {

let df = DataFrame::new(

vec![Series::new("id", data.iter().map(|r| r.id).collect::<Vec<_>>())]

).expect("Failed to create DataFrame");

println!("{}", df);

}

fn process_with_arrow() {

let array = Int32Array::from_slice(&[1, 2, 3]);

let chunk = Chunk::new(vec![array.into()]);

println!("Arrow Chunk: {:?}", chunk);

}

fn timely_dataflow_example() {

timely::execute_from_args(std::env::args(), move |worker| {

worker.dataflow(|scope| {

(0..10).to_stream(scope)

.map(|x| x * 2)

.inspect(|x| println!("Timely Data: {}", x));

});

}).expect("Timely Dataflow failed");

}

#[tokio::main]

async fn main() {

env_logger::init();

let database_url = "postgres://user:password@localhost/dbname";

let pool = PgPoolOptions::new().connect(database_url).await.expect("Failed to connect to database");

let data = read_from_csv("data.csv").expect("Failed to read CSV");

insert_to_db(&pool, data.clone()).await;

let producer: FutureProducer = rdkafka::config::ClientConfig::new().create().expect("Failed to create Kafka producer");

send_to_kafka(&producer, "my_topic", "Test message").await;

let conn = Connection::connect("amqp://localhost:5672/%2f", ConnectionProperties::default()).await.expect("Failed to connect to RabbitMQ");

let channel = conn.create_channel().await.expect("Failed to create channel");

channel.queue_declare("my_queue", QueueDeclareOptions::default(), FieldTable::default()).await.expect("Failed to declare queue");

send_to_rabbitmq(&channel, "my_queue", "Test message to RabbitMQ").await;

analyze_with_polars(&data);

process_with_arrow();

timely_dataflow_example();

let store = Store::default();

let module = Module::new(&store, "(module (func (export \"run\") (result i32) i32.const 42))").unwrap();

let instance = Instance::new(&module, &imports! {}).unwrap();

let run = instance.exports.get_function("run").unwrap();

println!("WASM Result: {:?}", run.call(&[]));

let kube_client = KubeClient::try_default().await.unwrap();

let pods: Api<k8s_openapi::api::core::v1::Pod> = Api::all(kube_client);

let pod_list = pods.list(&Default::default()).await.unwrap();

println!("Kubernetes Pods: {:?}", pod_list);

}