Найти в Дзене
Один Rust не п...Rust

Greenplum и Rust

Оглавление

Для чего нужна данная статья? :
- Разработать API и веб приложение для обработки данных Greenplum с использованием машинного обучения.

Зачем Вам это уметь? :
- Увеличить скорость создания схем данных с последующей обработкой в Greenplum.

Создаем схему данных в Greenplum для хранения информации о продажах, клиентах, продуктах и регионах. Это может выглядеть примерно так:

CREATE TABLE sales (
sale_id SERIAL PRIMARY KEY,
customer_id INT,
product_id INT,
sale_date DATE,
amount DECIMAL,
region VARCHAR(50)
);

Загружаем данные о продажах в базу данных Greenplum с использованием инструментов для массовой вставки данных.

COPY sales FROM '/path/to/sales_data.csv' DELIMITER ',' CSV HEADER;

Пишем API на Rust, используя Rocket framework, который будет взаимодействовать с базой данных Greenplum и предоставлять данные для веб-приложения

Добавьте зависимости в ваш Cargo.toml:

[dependencies]
rocket = "0.5.0"
tokio = { version = "1", features = ["full"] }
tokio-postgres = "0.7.2"

#[macro_use]
extern crate rocket;
use rocket::http::Status;
use rocket::serde::json::Json;
use tokio_postgres::{NoTls, Row};
use tokio_postgres::types::ToSql;
use std::collections::HashMap;
#[tokio::main]
async fn main() {
rocket::ignite()
.mount("/", routes![total_sales_by_region])
.launch()
.await
.expect("Failed to launch Rocket");
}
#[get("/total_sales_by_region")]
async fn total_sales_by_region() -> Result<Json<HashMap<String, f64>>, Status> {
// Строка подключения к вашей базе данных Greenplum
let db_url = "postgresql://username:password@localhost:5432/your_database";
// Установка соединения с базой данных
let (client, connection) =
tokio_postgres::connect(db_url, NoTls).await.map_err(|_| Status::InternalServerError)?;
// Обработка ошибок подключения
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Connection error: {}", e);
}
});
// Пример SQL-запроса
let sql = "SELECT region, SUM(amount) AS total_sales FROM sales GROUP BY region";
// Выполнение запроса
let rows = client.query(sql, &[]).await.map_err(|_| Status::InternalServerError)?;
// Обработка результатов запроса
let mut result = HashMap::new();
for row in rows {
let region: String = row.get("region");
let total_sales: f64 = row.get("total_sales");
result.insert(region, total_sales);
}
Ok(Json(result))
}

Создадим веб-приложение на Rust с использованием фреймворка Yew для визуализации данных о продажах, нужно создать компоненты интерфейса пользователя, обработать запросы к API и отобразить полученные данные.

Добавьте зависимости в ваш Cargo.toml:

[dependencies]
yew = "0.18.0"
yew-router = "0.15.1"

// src/model.rs
use serde::Deserialize;
#[derive(Debug, Deserialize)]
pub struct SalesData {
pub region: String,
pub total_sales: f64,
}
// src/components.rs
use yew::prelude::*;
use crate::model::SalesData;
pub struct App {
link: ComponentLink<Self>,
sales_data: Vec<SalesData>,
}
pub enum Msg {
FetchData,
DataFetched(Vec<SalesData>),
}
impl Component for App {
type Message = Msg;
type Properties = ();
fn create(_: Self::Properties, link: ComponentLink<Self>) -> Self {
let mut app = App {
link,
sales_data: Vec::new(),
};
app.link.send_message(Msg::FetchData);
app
}
fn update(&mut self, msg: Self::Message) -> ShouldRender {
match msg {
Msg::FetchData => {
// Здесь должен быть код для выполнения запроса к вашему API
// и обработки полученных данных
// Вместо этого используем заглушку
let dummy_data = vec![
SalesData { region: "Region A".to_string(), total_sales: 1000.0 },
SalesData { region: "Region B".to_string(), total_sales: 1500.0 },
];
self.link.send_message(Msg::DataFetched(dummy_data));
}
Msg::DataFetched(data) => {
self.sales_data = data;
}
}
true
}
fn view(&self) -> Html {
html! {
<div>
<h1>{"Sales Data Visualization"}</h1>
<ul>
{ for self.sales_data.iter().map(|data| self.view_sales_data(data)) }
</ul>
</div>
}
}
}
impl App {
fn view_sales_data(&self, data: &SalesData) -> Html {
html! {
<li>
<strong>{ &data.region }</strong>
{ format!(" - Total Sales: ${}", data.total_sales) }
</li>
}
}
}

Создадим основный файл main.rs:

// main.rs
use yew::prelude::*;
use yew_router::prelude::*;
use crate::components::App;
mod components;
mod model;
struct Model;
enum Msg {}
impl Component for Model {
type Message = Msg;
type Properties = ();
fn create(_: Self::Properties, _: ComponentLink<Self>) -> Self {
Model
}
fn update(&mut self, _: Self::Message) -> ShouldRender {
true
}
fn view(&self) -> Html {
html! {
<Router<Model, ()>
render = Router::render(|_| html!{ <App /> })
/>
}
}
}
fn main() {
yew::start_app::<Model>();
}


Откройте браузер и перейдите по адресу
http://localhost:8080

Добавим обработку больших данных и реализацию машинного обучения

. Подключение к Greenplum и извлечение данных

  • Используется библиотека postgres для подключения к Greenplum, который основан на PostgreSQL.
  • Данные извлекаются батчами по 1000 строк с помощью серверного курсора (DECLARE CURSOR и FETCH), чтобы эффективно обрабатывать большие объемы данных без загрузки всего набора в память.

2. Предобработка данных

  • Числовые признаки: age (возраст) и income (доход) нормализуются с использованием min-max масштабирования: (x - min) / (max - min). Минимальные и максимальные значения запрашиваются из базы данных заранее.
  • Категориальный признак: gender преобразуется в one-hot кодирование. Уникальные значения извлекаются через SELECT DISTINCT, создается отображение (HashMap) для преобразования категорий в бинарные признаки.

3. Модель машинного обучения

  • Реализована нейронная сеть с одной скрытой прослойкой:Входной слой: количество признаков равно 2 (числовые) + количество уникальных значений gender.
    Скрытый слой: 64 нейрона с функцией активации ReLU (max(0, x)).
    Выходной слой: 1 нейрон с сигмоидной функцией активации для бинарной классификации.
  • Веса инициализируются с использованием нормального распределения с учетом размера слоев для лучшей сходимости.
  • Обучение происходит с помощью стохастического градиентного спуска (SGD) на каждом батче:Прямое распространение вычисляет выходы скрытого и выходного слоев.
    Обратное распространение вычисляет градиенты и обновляет веса.

4. Оценка модели

  • Тестовый набор данных извлекается из таблицы test_table.
  • Вычисляется точность модели, сравнивая предсказания (порог 0.5) с истинными метками.

Предположения

  • Таблица mytable содержит столбцы: age (int), income (float8), gender (text), label (int, 0 или 1).
  • Тестовая таблица test_table имеет ту же структуру.
  • Данные не содержат пропусков (NULL значений).
  • Параметры подключения к Greenplum (хост, пользователь, база данных) верны.

use postgres::{Client, NoTls};

use std::collections::HashMap;

use ndarray::{Array1, Array2, Axis};

use ndarray_rand::RandomExt;

use ndarray_rand::rand_distr::Normal;

// Функция сигмоиды для выходного слоя

fn sigmoid(x: f64) -> f64 {

1.0 / (1.0 + (-x).exp())

}

// Структура нейронной сети

struct NeuralNetwork {

weights_input_hidden: Array2<f64>, // Веса между входным и скрытым слоем

bias_hidden: Array1<f64>, // Смещения скрытого слоя

weights_hidden_output: Array1<f64>, // Веса между скрытым и выходным слоем

bias_output: f64, // Смещение выходного слоя

learning_rate: f64, // Скорость обучения

}

impl NeuralNetwork {

// Конструктор нейронной сети

fn new(n_features: usize, hidden_size: usize, learning_rate: f64) -> Self {

// Инициализация весов с использованием нормального распределения (Xavier initialization)

let weights_input_hidden = Array2::random(

(n_features, hidden_size),

Normal::new(0.0, 1.0 / (n_features as f64).sqrt()).unwrap(),

);

let bias_hidden = Array1::zeros(hidden_size);

let weights_hidden_output = Array1::random(

hidden_size,

Normal::new(0.0, 1.0 / (hidden_size as f64).sqrt()).unwrap(),

);

let bias_output = 0.0;

Self {

weights_input_hidden,

bias_hidden,

weights_hidden_output,

bias_output,

learning_rate,

}

}

// Обучение на одном батче

fn fit_batch(&mut self, X: &Array2<f64>, y: &Array1<f64>) {

let n_samples = X.nrows() as f64;

// Прямое распространение

let hidden = (X.dot(&self.weights_input_hidden) + &self.bias_hidden)

.mapv(|x| x.max(0.0)); // ReLU активация

let output = (hidden.dot(&self.weights_hidden_output) + self.bias_output)

.mapv(sigmoid); // Сигмоида на выходе

// Вычисление ошибки

let errors = &output - y;

// Градиенты для выходного слоя

let gradient_weights_hidden_output = hidden.t().dot(&errors) / n_samples;

let gradient_bias_output = errors.sum() / n_samples;

// Обратное распространение на скрытый слой

let hidden_errors = errors.dot(&self.weights_hidden_output.t())

* &(hidden.mapv(|x| if x > 0.0 { 1.0 } else { 0.0 }));

let gradient_weights_input_hidden = X.t().dot(&hidden_errors) / n_samples;

let gradient_bias_hidden = hidden_errors.sum_axis(Axis(0)) / n_samples;

// Обновление весов и смещений

self.weights_input_hidden -= &(self.learning_rate * &gradient_weights_input_hidden);

self.bias_hidden -= self.learning_rate * &gradient_bias_hidden;

self.weights_hidden_output -= self.learning_rate * &gradient_weights_hidden_output;

self.bias_output -= self.learning_rate * gradient_bias_output;

}

// Предсказание

fn predict(&self, X: &Array2<f64>) -> Array1<f64> {

let hidden = (X.dot(&self.weights_input_hidden) + &self.bias_hidden)

.mapv(|x| x.max(0.0));

(hidden.dot(&self.weights_hidden_output) + self.bias_output).mapv(sigmoid)

}

}

// Предобработка батча данных

fn preprocess_batch(

rows: &[postgres::Row],

min_age: f64,

max_age: f64,

min_income: f64,

max_income: f64,

gender_map: &HashMap<String, usize>,

) -> (Array2<f64>, Array1<f64>) {

let batch_size = rows.len();

let n_features = 2 + gender_map.len(); // 2 числовых + количество категорий

let mut X = Array2::zeros((batch_size, n_features));

let mut y = Array1::zeros(batch_size);

for (i, row) in rows.iter().enumerate() {

let age: i32 = row.get(0); // age

let income: f64 = row.get(1); // income

let gender: String = row.get(2); // gender

let label: i32 = row.get(3); // label

// Нормализация числовых признаков

let norm_age = (age as f64 - min_age) / (max_age - min_age);

let norm_income = (income - min_income) / (max_income - min_income);

X[[i, 0]] = norm_age;

X[[i, 1]] = norm_income;

// One-hot кодирование для gender

if let Some(&index) = gender_map.get(&gender) {

X[[i, 2 + index]] = 1.0;

}

y[i] = label as f64;

}

(X, y)

}

fn main() -> Result<(), Box<dyn std::error::Error>> {

// Подключение к Greenplum

let mut client = Client::connect("host=localhost user=gpadmin dbname=mydb", NoTls)?;

// Получение минимальных и максимальных значений для нормализации

let min_age: f64 = client

.query_one("SELECT MIN(age::float8) FROM mytable", &[])?

.get(0);

let max_age: f64 = client

.query_one("SELECT MAX(age::float8) FROM mytable", &[])?

.get(0);

let min_income: f64 = client

.query_one("SELECT MIN(income) FROM mytable", &[])?

.get(0);

let max_income: f64 = client

.query_one("SELECT MAX(income) FROM mytable", &[])?

.get(0);

// Получение уникальных значений для категориального признака gender

let genders: Vec<String> = client

.query("SELECT DISTINCT gender FROM mytable ORDER BY gender", &[])?

.into_iter()

.map(|row| row.get(0))

.collect();

let mut gender_map = HashMap::new();

for (i, gender) in genders.iter().enumerate() {

gender_map.insert(gender.clone(), i);

}

let n_features = 2 + genders.len(); // Общее количество признаков

let mut model = NeuralNetwork::new(n_features, 64, 0.01); // Создание модели

// Обучение модели

client.execute("BEGIN", &[])?;

client.execute(

"DECLARE mycursor CURSOR FOR SELECT age, income, gender, label FROM mytable",

&[],

)?;

loop {

let rows = client.query("FETCH 1000 FROM mycursor", &[])?;

if rows.is_empty() {

break;

}

let (X, y) = preprocess_batch(

&rows,

min_age,

max_age,

min_income,

max_income,

&gender_map,

);

model.fit_batch(&X, &y);

}

client.execute("COMMIT", &[])?;

// Оценка модели на тестовом наборе

let test_rows = client.query(

"SELECT age, income, gender, label FROM test_table",

&[],

)?;

let (X_test, y_test) = preprocess_batch(

&test_rows,

min_age,

max_age,

min_income,

max_income,

&gender_map,

);

let predictions = model.predict(&X_test);

// Вычисление точности

let accuracy = predictions

.iter()

.zip(y_test.iter())

.filter(|&(p, y)| (*p > 0.5) as u8 == *y as u8)

.count() as f64

/ y_test.len() as f64;

println!("Точность модели: {}", accuracy);

Ok(())

}