Найти в Дзене
Один Rust не п...Rust

Kubernetes operator & Rust ML

Для чего нужна данная статья? : - Создать оператор в Kubernetes кластере с использованием ML. Зачем Вам это уметь? : Научиться управлять пользовательскими ресурсами (CRD) в кластере Kubernetes. cargo new my-k8s-operator cd my-k8s-operator Откройте файл Cargo.toml и добавьте следующие зависимости: [dependencies] tokio = { version = "1", features = ["full"] } kube = { version = "0.80", features = ["runtime", "derive"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" log = "0.4" env_logger = "0.9"
Давайте определим нашу пользовательскую структуру данных (CRD), которая будет управляться оператором. use kube::CustomResource; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; #[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] pub struct CustomCondition { #[serde(rename = "type")] pub type_: String, pub status: String, pub reason: String, pub message: String, #[serde(rename = "lastTransitionTime")] pub last_transition_time: Strin
Оглавление
GitHub - nicktretyakov/kuber_ml

Для чего нужна данная статья? :

- Создать оператор в Kubernetes кластере с использованием ML.

Зачем Вам это уметь? :

Научиться управлять пользовательскими ресурсами (CRD) в кластере Kubernetes.

Создайте новый проект на Rust:

cargo new my-k8s-operator

cd my-k8s-operator

Откройте файл Cargo.toml и добавьте следующие зависимости:

[dependencies]

tokio = { version = "1", features = ["full"] }

kube = { version = "0.80", features = ["runtime", "derive"] }

serde = { version = "1.0", features = ["derive"] }

serde_json = "1.0"

thiserror = "1.0"

log = "0.4"

env_logger = "0.9"


Давайте определим нашу пользовательскую структуру данных (CRD), которая будет управляться оператором.

crd.rs

use kube::CustomResource;

use schemars::JsonSchema;

use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]

pub struct CustomCondition {

#[serde(rename = "type")]

pub type_: String,

pub status: String,

pub reason: String,

pub message: String,

#[serde(rename = "lastTransitionTime")]

pub last_transition_time: String,

}

#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]

#[kube(

group = "example.com",

version = "v1",

kind = "MLScaler",

plural = "mlscalers"

)]

#[kube(namespaced)]

#[kube(status = "MLScalerStatus")]

#[kube(schema = "disabled")]

pub struct MLScalerSpec {

pub target_deployment: String,

pub min_replicas: i32,

pub max_replicas: i32,

pub metric: String,

pub history_size: usize,

}

#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema, Default)]

pub struct MLScalerStatus {

pub current_replicas: i32,

pub predicted_replicas: i32,

pub conditions: Vec<CustomCondition>,

}

В Kubernetes есть стандартные объекты: Pod, Service, Deployment и другие. Но иногда нужно создать свой тип объекта — например, для специфичных задач вашего приложения. Такой объект называется Custom Resource (CRD).Здесь мы говорим: "Создай новый тип ресурса для Kubernetes на основе этой структуры".

Пример:
Представьте, что вы пишете систему для управления приложениями. Вам нужен объект, который хранит информацию о приложении: его имя и количество копий (реплик). Это и есть ваш AppSpec.

  • group = "example.com" — группа, к которой относится ресурс (обычно домен вашей компании).
  • version = "v1" — версия API.
  • kind = "App" — тип ресурса (будет использоваться в манифестах Kubernetes).
  • namespaced — ресурс принадлежит определённому namespace в Kubernetes.
  • AppSpec — это желаемое состояние вашего ресурса (что вы хотите получить).
  • AppStatus — это текущее состояние (что есть на самом деле).
  • AppSpec хранит имя приложения и количество реплик, которые вы хотите запустить.
  • AppStatus хранит количество реально доступных реплик.

Пример:
Вы хотите запустить 3 реплики (replicas: 3), но пока доступно только 2 (available_replicas: 2). Kubernetes будет пытаться довести количество реплик до нужного.

  • Serialize и Deserialize — позволяют преобразовывать структуру в JSON и обратно (чтобы Kubernetes мог читать и записывать данные).
  • Default — позволяет создать объект с значениями по умолчанию.

Теперь реализуем логику оператора, которая будет отслеживать изменения в CRD и выполнять соответствующие действия.

main.rs

mod crd;

use crd::{MLScaler, MLScalerStatus, CustomCondition};

use futures::StreamExt;

use kube::{

api::{Api, Patch, PatchParams, ResourceExt},

client::Client,

runtime::{

controller::{Action, Controller},

finalizer::{finalizer, Event as Finalizer},

watcher::Config,

},

};

use k8s_openapi::api::apps::v1::Deployment;

use serde_json::json;

use std::sync::Arc;

use std::time::Duration;

use thiserror::Error;

use tracing::{error, info, warn};

use tch::{Device, Kind, Tensor};

const FINALIZER_NAME: &str = "mlscaler.example.com/finalizer";

#[derive(Debug, Error)]

enum Error {

#[error("Kubernetes API error: {0}")]

Kube(#[from] kube::Error),

#[error("ML computation error: {0}")]

Ml(String),

#[error("Finalizer error: {0}")]

Finalizer(#[from] kube::runtime::finalizer::Error<Box<Error>>),

#[error("Serialization error: {0}")]

Serialization(#[from] serde_json::Error),

}

async fn reconcile(obj: Arc<MLScaler>, _ctx: Arc<()>) -> Result<Action, Error> {

let client = Client::try_default().await?;

let ns = obj.namespace().ok_or(Error::Ml("No namespace".to_string()))?;

let name = obj.name_any();

info!("Reconciling MLScaler '{}' in '{}'", name, ns);

let mlscalers: Api<MLScaler> = Api::namespaced(client.clone(), &ns);

finalizer(&mlscalers, FINALIZER_NAME, obj, |event| async {

match event {

Finalizer::Apply(mlscaler) => apply(&client, &ns, mlscaler).await.map_err(Box::new),

Finalizer::Cleanup(mlscaler) => cleanup(&client, &ns, mlscaler).await.map_err(Box::new),

}

})

.await

.map_err(Error::Finalizer)

}

async fn apply(client: &Client, ns: &str, mlscaler: Arc<MLScaler>) -> Result<Action, Error> {

// Simulate historical metrics (replace with real metrics fetch in production)

let history_size = mlscaler.spec.history_size;

let historical_times: Vec<f64> = (0..history_size).map(|i| i as f64).collect();

let historical_metrics: Vec<f64> = vec![0.1, 0.2, 0.3, 0.4, 0.5]; // Simulated CPU values

if historical_metrics.len() != history_size {

return Ok(Action::requeue(Duration::from_secs(10)));

}

// ML: Linear regression using tch-rs (simplified for compatibility)

let device = if tch::Cuda::is_available() { Device::Cuda(0) } else { Device::Cpu };

let times = Tensor::f_from_slice(&historical_times).unwrap().to_device(device);

let metrics = Tensor::f_from_slice(&historical_metrics).unwrap().to_device(device);

// Simple linear regression: y = ax + b

// Using least squares: slope = covariance(x,y) / variance(x)

let times_mean = times.mean(Kind::Float);

let metrics_mean = metrics.mean(Kind::Float);

let times_centered = &times - &times_mean;

let metrics_centered = &metrics - &metrics_mean;

let covariance = (&times_centered * &metrics_centered).mean(Kind::Float);

let variance = (&times_centered * &times_centered).mean(Kind::Float);

let slope = covariance / variance;

let intercept = &metrics_mean - &slope * &times_mean;

// Predict next load

let next_time = Tensor::f_from_slice(&[(history_size as f64)]).unwrap().to_device(device);

let predicted_tensor = &slope * &next_time + &intercept;

let predicted_load: f64 = predicted_tensor.double_value(&[]);

// Compute desired replicas

let factor = 10.0; // Arbitrary scaling factor

let desired = ((predicted_load * factor) as i32).clamp(mlscaler.spec.min_replicas, mlscaler.spec.max_replicas);

// Scale target Deployment

let deps: Api<Deployment> = Api::namespaced(client.clone(), ns);

let target = &mlscaler.spec.target_deployment;

let dep = deps.get(target).await?;

let current = dep.spec.as_ref().and_then(|s| s.replicas).unwrap_or(1);

if desired != current {

let patch = json!({"spec": {"replicas": desired}});

deps.patch(target, &PatchParams::default(), &Patch::Merge(patch)).await?;

info!("Scaled '{}' to {} replicas based on prediction {}", target, desired, predicted_load);

}

// Update status with conditions

let condition = CustomCondition {

type_: "Scaled".to_string(),

status: "True".to_string(),

reason: "PredictionBased".to_string(),

message: format!("Predicted load: {:.2}", predicted_load),

last_transition_time: chrono::Utc::now().to_rfc3339(),

};

let status = MLScalerStatus {

current_replicas: desired,

predicted_replicas: desired,

conditions: vec![condition],

};

let status_patch = json!({"status": status});

let mlscalers: Api<MLScaler> = Api::namespaced(client.clone(), ns);

mlscalers.patch_status(&mlscaler.name_any(), &PatchParams::default(), &Patch::Merge(status_patch)).await?;

Ok(Action::requeue(Duration::from_secs(30)))

}

async fn cleanup(client: &Client, ns: &str, mlscaler: Arc<MLScaler>) -> Result<Action, Error> {

// Reset replicas to min on deletion

let deps: Api<Deployment> = Api::namespaced(client.clone(), ns);

let target = &mlscaler.spec.target_deployment;

let patch = json!({"spec": {"replicas": mlscaler.spec.min_replicas}});

deps.patch(target, &PatchParams::default(), &Patch::Merge(patch)).await?;

info!("Cleaned up '{}' for MLScaler deletion", target);

Ok(Action::await_change())

}

fn error_policy(_obj: Arc<MLScaler>, err: &Error, _ctx: Arc<()>) -> Action {

warn!("Reconciliation error: {:?}", err);

Action::requeue(Duration::from_secs(5))

}

#[tokio::main]

async fn main() -> Result<(), Box<dyn std::error::Error>> {

tracing_subscriber::fmt::init();

let client = Client::try_default().await?;

let mlscalers: Api<MLScaler> = Api::all(client.clone());

let deps: Api<Deployment> = Api::all(client.clone()); // Watch Deployments for ownership

Controller::new(mlscalers, Config::default())

.owns(deps, Config::default())

.run(reconcile, error_policy, Arc::new(()))

.for_each(|res| async move {

match res {

Ok(_) => info!("Reconciled successfully"),

Err(e) => error!("Reconciliation failed: {:?}", e),

}

})

.await;

Ok(())

}

reconcile— Основная функция, которая "приводит в порядок" состояние вашего кастомного ресурса. Она сравнивает желаемое состояние (что вы хотите) с реальным (что есть сейчас) и исправляет расхождения и возникает при каждом выступе CRD. В ней определены, какие действия необходимо активировать в зависимости от текущего состояния ресурса. Если в AppSpec указано, что должно быть 3 реплики, а в реальности их 2 — функция может создать ещё одну реплику или обновить статус, чтобы отразить реальное положение дел.

error_policy— функция, которая определяет поведение оператора при устранении ошибок. В данном случае она просто повторяет это через 5 секунд.

Controller::new— Создаёт контроллер, который будет отслеживать изменения ресурсов CRD и сохранять функцию reconcile.

CRD — это способ добавить свои собственные типы объектов в Kubernetes. Например, если вам нужно управлять не только подами (pods) и сервисами, но и, скажем, "приложениями" (Apps) с уникальной логикой — вы создаёте CRD для этого типа.

Пример:
В коде есть структура App — это и есть ваш кастомный ресурс. Вы можете создавать объекты типа App в Kubernetes, как если бы это были стандартные поды или деплойменты.

  • Client — это соединение с Kubernetes-кластером.
  • Api<App> — это интерфейс для работы с вашими кастомными ресурсами (App) через Kubernetes API.

Пример:
Вы можете создавать, обновлять, удалять объекты App через этот интерфейс, как если бы вы работали с обычными подами.

Patch и PatchParams - это способ обновить только часть объекта в Kubernetes, не перезаписывая его целиком.

Пример:
В коде обновляется только поле status у объекта App, а не весь объект.

Controller — это "наблюдатель", который следит за изменениями ваших кастомных ресурсов и вызывает reconcile, когда что-то меняется.

Пример:
Если кто-то создаст или изменит объект App, контроллер это заметит и вызовет reconcile, чтобы привести всё в порядок.

Для сборки и запуска оператора:

cargo build --release
Dockerfile для оператора:

FROM rust:slim-buster AS builder

WORKDIR /app

COPY . .

RUN cargo build --release

FROM debian:buster-slim

COPY --from=builder /app/target/release/k8s-operator /usr/local/bin/

ENTRYPOINT ["k8s-operator"]

Возьмите Docker-образ:

docker build -t your-username/k8s-operator:latest .

Загрузите образ в реестр Docker:

docker push your-username/k8s-operator:latest

Создайте манифест для оператора развертывания в Kubernetes:

apiVersion: apps/v1

kind: Deployment

metadata:

name: k8s-operator

spec:

replicas: 1

selector:

matchLabels:

app: k8s-operator

template:

metadata:

labels:

app: k8s-operator

spec:
containers:

- name: k8s-operator

image: your-username/k8s-operator:latest

imagePullPolicy: Always

Примените манифесты и разверните оператора:

kubectl apply -f operator-deployment.yaml

Создайте объект вашего пользовательского ресурса (CRD) и наблюдайте за действиями оператора - my-app.yaml:

apiVersion: example.com/v1

kind: App

metadata:

name: my-app

namespace: default

spec:

name: "My Rust App"

replicas: 3

Замените этот файл:

kubectl apply -f my-app.yaml

Теперь ваш оператор будет следить за типом объектов App и обновлять статус на основе количества реплик, указанного в характеристиках.