Для чего нужна данная статья? :
- Разработать API и веб приложение для обработки данных Greenplum с использованием машинного обучения.
Зачем Вам это уметь? :
- Увеличить скорость создания схем данных с последующей обработкой в Greenplum.
Создаем схему данных в Greenplum для хранения информации о продажах, клиентах, продуктах и регионах. Это может выглядеть примерно так:
CREATE TABLE sales (
sale_id SERIAL PRIMARY KEY,
customer_id INT,
product_id INT,
sale_date DATE,
amount DECIMAL,
region VARCHAR(50)
);
Загружаем данные о продажах в базу данных Greenplum с использованием инструментов для массовой вставки данных.
COPY sales FROM '/path/to/sales_data.csv' DELIMITER ',' CSV HEADER;
Пишем API на Rust, используя Rocket framework, который будет взаимодействовать с базой данных Greenplum и предоставлять данные для веб-приложения
Добавьте зависимости в ваш Cargo.toml:
[dependencies]
rocket = "0.5.0"
tokio = { version = "1", features = ["full"] }
tokio-postgres = "0.7.2"
#[macro_use]
extern crate rocket;
use rocket::http::Status;
use rocket::serde::json::Json;
use tokio_postgres::{NoTls, Row};
use tokio_postgres::types::ToSql;
use std::collections::HashMap;
#[tokio::main]
async fn main() {
rocket::ignite()
.mount("/", routes![total_sales_by_region])
.launch()
.await
.expect("Failed to launch Rocket");
}
#[get("/total_sales_by_region")]
async fn total_sales_by_region() -> Result<Json<HashMap<String, f64>>, Status> {
// Строка подключения к вашей базе данных Greenplum
let db_url = "postgresql://username:password@localhost:5432/your_database";
// Установка соединения с базой данных
let (client, connection) =
tokio_postgres::connect(db_url, NoTls).await.map_err(|_| Status::InternalServerError)?;
// Обработка ошибок подключения
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Connection error: {}", e);
}
});
// Пример SQL-запроса
let sql = "SELECT region, SUM(amount) AS total_sales FROM sales GROUP BY region";
// Выполнение запроса
let rows = client.query(sql, &[]).await.map_err(|_| Status::InternalServerError)?;
// Обработка результатов запроса
let mut result = HashMap::new();
for row in rows {
let region: String = row.get("region");
let total_sales: f64 = row.get("total_sales");
result.insert(region, total_sales);
}
Ok(Json(result))
}
Создадим веб-приложение на Rust с использованием фреймворка Yew для визуализации данных о продажах, нужно создать компоненты интерфейса пользователя, обработать запросы к API и отобразить полученные данные.
Добавьте зависимости в ваш Cargo.toml:
[dependencies]
yew = "0.18.0"
yew-router = "0.15.1"
// src/model.rs
use serde::Deserialize;
#[derive(Debug, Deserialize)]
pub struct SalesData {
pub region: String,
pub total_sales: f64,
}
// src/components.rs
use yew::prelude::*;
use crate::model::SalesData;
pub struct App {
link: ComponentLink<Self>,
sales_data: Vec<SalesData>,
}
pub enum Msg {
FetchData,
DataFetched(Vec<SalesData>),
}
impl Component for App {
type Message = Msg;
type Properties = ();
fn create(_: Self::Properties, link: ComponentLink<Self>) -> Self {
let mut app = App {
link,
sales_data: Vec::new(),
};
app.link.send_message(Msg::FetchData);
app
}
fn update(&mut self, msg: Self::Message) -> ShouldRender {
match msg {
Msg::FetchData => {
// Здесь должен быть код для выполнения запроса к вашему API
// и обработки полученных данных
// Вместо этого используем заглушку
let dummy_data = vec![
SalesData { region: "Region A".to_string(), total_sales: 1000.0 },
SalesData { region: "Region B".to_string(), total_sales: 1500.0 },
];
self.link.send_message(Msg::DataFetched(dummy_data));
}
Msg::DataFetched(data) => {
self.sales_data = data;
}
}
true
}
fn view(&self) -> Html {
html! {
<div>
<h1>{"Sales Data Visualization"}</h1>
<ul>
{ for self.sales_data.iter().map(|data| self.view_sales_data(data)) }
</ul>
</div>
}
}
}
impl App {
fn view_sales_data(&self, data: &SalesData) -> Html {
html! {
<li>
<strong>{ &data.region }</strong>
{ format!(" - Total Sales: ${}", data.total_sales) }
</li>
}
}
}
Создадим основный файл main.rs:
// main.rs
use yew::prelude::*;
use yew_router::prelude::*;
use crate::components::App;
mod components;
mod model;
struct Model;
enum Msg {}
impl Component for Model {
type Message = Msg;
type Properties = ();
fn create(_: Self::Properties, _: ComponentLink<Self>) -> Self {
Model
}
fn update(&mut self, _: Self::Message) -> ShouldRender {
true
}
fn view(&self) -> Html {
html! {
<Router<Model, ()>
render = Router::render(|_| html!{ <App /> })
/>
}
}
}
fn main() {
yew::start_app::<Model>();
}
Откройте браузер и перейдите по адресу http://localhost:8080
Добавим обработку больших данных и реализацию машинного обучения
. Подключение к Greenplum и извлечение данных
- Используется библиотека postgres для подключения к Greenplum, который основан на PostgreSQL.
- Данные извлекаются батчами по 1000 строк с помощью серверного курсора (DECLARE CURSOR и FETCH), чтобы эффективно обрабатывать большие объемы данных без загрузки всего набора в память.
2. Предобработка данных
- Числовые признаки: age (возраст) и income (доход) нормализуются с использованием min-max масштабирования: (x - min) / (max - min). Минимальные и максимальные значения запрашиваются из базы данных заранее.
- Категориальный признак: gender преобразуется в one-hot кодирование. Уникальные значения извлекаются через SELECT DISTINCT, создается отображение (HashMap) для преобразования категорий в бинарные признаки.
3. Модель машинного обучения
- Реализована нейронная сеть с одной скрытой прослойкой:Входной слой: количество признаков равно 2 (числовые) + количество уникальных значений gender.
Скрытый слой: 64 нейрона с функцией активации ReLU (max(0, x)).
Выходной слой: 1 нейрон с сигмоидной функцией активации для бинарной классификации. - Веса инициализируются с использованием нормального распределения с учетом размера слоев для лучшей сходимости.
- Обучение происходит с помощью стохастического градиентного спуска (SGD) на каждом батче:Прямое распространение вычисляет выходы скрытого и выходного слоев.
Обратное распространение вычисляет градиенты и обновляет веса.
4. Оценка модели
- Тестовый набор данных извлекается из таблицы test_table.
- Вычисляется точность модели, сравнивая предсказания (порог 0.5) с истинными метками.
Предположения
- Таблица mytable содержит столбцы: age (int), income (float8), gender (text), label (int, 0 или 1).
- Тестовая таблица test_table имеет ту же структуру.
- Данные не содержат пропусков (NULL значений).
- Параметры подключения к Greenplum (хост, пользователь, база данных) верны.
use postgres::{Client, NoTls};
use std::collections::HashMap;
use ndarray::{Array1, Array2, Axis};
use ndarray_rand::RandomExt;
use ndarray_rand::rand_distr::Normal;
// Функция сигмоиды для выходного слоя
fn sigmoid(x: f64) -> f64 {
1.0 / (1.0 + (-x).exp())
}
// Структура нейронной сети
struct NeuralNetwork {
weights_input_hidden: Array2<f64>, // Веса между входным и скрытым слоем
bias_hidden: Array1<f64>, // Смещения скрытого слоя
weights_hidden_output: Array1<f64>, // Веса между скрытым и выходным слоем
bias_output: f64, // Смещение выходного слоя
learning_rate: f64, // Скорость обучения
}
impl NeuralNetwork {
// Конструктор нейронной сети
fn new(n_features: usize, hidden_size: usize, learning_rate: f64) -> Self {
// Инициализация весов с использованием нормального распределения (Xavier initialization)
let weights_input_hidden = Array2::random(
(n_features, hidden_size),
Normal::new(0.0, 1.0 / (n_features as f64).sqrt()).unwrap(),
);
let bias_hidden = Array1::zeros(hidden_size);
let weights_hidden_output = Array1::random(
hidden_size,
Normal::new(0.0, 1.0 / (hidden_size as f64).sqrt()).unwrap(),
);
let bias_output = 0.0;
Self {
weights_input_hidden,
bias_hidden,
weights_hidden_output,
bias_output,
learning_rate,
}
}
// Обучение на одном батче
fn fit_batch(&mut self, X: &Array2<f64>, y: &Array1<f64>) {
let n_samples = X.nrows() as f64;
// Прямое распространение
let hidden = (X.dot(&self.weights_input_hidden) + &self.bias_hidden)
.mapv(|x| x.max(0.0)); // ReLU активация
let output = (hidden.dot(&self.weights_hidden_output) + self.bias_output)
.mapv(sigmoid); // Сигмоида на выходе
// Вычисление ошибки
let errors = &output - y;
// Градиенты для выходного слоя
let gradient_weights_hidden_output = hidden.t().dot(&errors) / n_samples;
let gradient_bias_output = errors.sum() / n_samples;
// Обратное распространение на скрытый слой
let hidden_errors = errors.dot(&self.weights_hidden_output.t())
* &(hidden.mapv(|x| if x > 0.0 { 1.0 } else { 0.0 }));
let gradient_weights_input_hidden = X.t().dot(&hidden_errors) / n_samples;
let gradient_bias_hidden = hidden_errors.sum_axis(Axis(0)) / n_samples;
// Обновление весов и смещений
self.weights_input_hidden -= &(self.learning_rate * &gradient_weights_input_hidden);
self.bias_hidden -= self.learning_rate * &gradient_bias_hidden;
self.weights_hidden_output -= self.learning_rate * &gradient_weights_hidden_output;
self.bias_output -= self.learning_rate * gradient_bias_output;
}
// Предсказание
fn predict(&self, X: &Array2<f64>) -> Array1<f64> {
let hidden = (X.dot(&self.weights_input_hidden) + &self.bias_hidden)
.mapv(|x| x.max(0.0));
(hidden.dot(&self.weights_hidden_output) + self.bias_output).mapv(sigmoid)
}
}
// Предобработка батча данных
fn preprocess_batch(
rows: &[postgres::Row],
min_age: f64,
max_age: f64,
min_income: f64,
max_income: f64,
gender_map: &HashMap<String, usize>,
) -> (Array2<f64>, Array1<f64>) {
let batch_size = rows.len();
let n_features = 2 + gender_map.len(); // 2 числовых + количество категорий
let mut X = Array2::zeros((batch_size, n_features));
let mut y = Array1::zeros(batch_size);
for (i, row) in rows.iter().enumerate() {
let age: i32 = row.get(0); // age
let income: f64 = row.get(1); // income
let gender: String = row.get(2); // gender
let label: i32 = row.get(3); // label
// Нормализация числовых признаков
let norm_age = (age as f64 - min_age) / (max_age - min_age);
let norm_income = (income - min_income) / (max_income - min_income);
X[[i, 0]] = norm_age;
X[[i, 1]] = norm_income;
// One-hot кодирование для gender
if let Some(&index) = gender_map.get(&gender) {
X[[i, 2 + index]] = 1.0;
}
y[i] = label as f64;
}
(X, y)
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Подключение к Greenplum
let mut client = Client::connect("host=localhost user=gpadmin dbname=mydb", NoTls)?;
// Получение минимальных и максимальных значений для нормализации
let min_age: f64 = client
.query_one("SELECT MIN(age::float8) FROM mytable", &[])?
.get(0);
let max_age: f64 = client
.query_one("SELECT MAX(age::float8) FROM mytable", &[])?
.get(0);
let min_income: f64 = client
.query_one("SELECT MIN(income) FROM mytable", &[])?
.get(0);
let max_income: f64 = client
.query_one("SELECT MAX(income) FROM mytable", &[])?
.get(0);
// Получение уникальных значений для категориального признака gender
let genders: Vec<String> = client
.query("SELECT DISTINCT gender FROM mytable ORDER BY gender", &[])?
.into_iter()
.map(|row| row.get(0))
.collect();
let mut gender_map = HashMap::new();
for (i, gender) in genders.iter().enumerate() {
gender_map.insert(gender.clone(), i);
}
let n_features = 2 + genders.len(); // Общее количество признаков
let mut model = NeuralNetwork::new(n_features, 64, 0.01); // Создание модели
// Обучение модели
client.execute("BEGIN", &[])?;
client.execute(
"DECLARE mycursor CURSOR FOR SELECT age, income, gender, label FROM mytable",
&[],
)?;
loop {
let rows = client.query("FETCH 1000 FROM mycursor", &[])?;
if rows.is_empty() {
break;
}
let (X, y) = preprocess_batch(
&rows,
min_age,
max_age,
min_income,
max_income,
&gender_map,
);
model.fit_batch(&X, &y);
}
client.execute("COMMIT", &[])?;
// Оценка модели на тестовом наборе
let test_rows = client.query(
"SELECT age, income, gender, label FROM test_table",
&[],
)?;
let (X_test, y_test) = preprocess_batch(
&test_rows,
min_age,
max_age,
min_income,
max_income,
&gender_map,
);
let predictions = model.predict(&X_test);
// Вычисление точности
let accuracy = predictions
.iter()
.zip(y_test.iter())
.filter(|&(p, y)| (*p > 0.5) as u8 == *y as u8)
.count() as f64
/ y_test.len() as f64;
println!("Точность модели: {}", accuracy);
Ok(())
}