Для чего нужна данная статья? :
- Создать оператор в Kubernetes кластере с использованием ML.
Зачем Вам это уметь? :
Научиться управлять пользовательскими ресурсами (CRD) в кластере Kubernetes.
Создайте новый проект на Rust:
cargo new my-k8s-operator
cd my-k8s-operator
Откройте файл Cargo.toml и добавьте следующие зависимости:
[dependencies]
tokio = { version = "1", features = ["full"] }
kube = { version = "0.80", features = ["runtime", "derive"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
log = "0.4"
env_logger = "0.9"
Давайте определим нашу пользовательскую структуру данных (CRD), которая будет управляться оператором.
crd.rs
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub struct CustomCondition {
#[serde(rename = "type")]
pub type_: String,
pub status: String,
pub reason: String,
pub message: String,
#[serde(rename = "lastTransitionTime")]
pub last_transition_time: String,
}
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "example.com",
version = "v1",
kind = "MLScaler",
plural = "mlscalers"
)]
#[kube(namespaced)]
#[kube(status = "MLScalerStatus")]
#[kube(schema = "disabled")]
pub struct MLScalerSpec {
pub target_deployment: String,
pub min_replicas: i32,
pub max_replicas: i32,
pub metric: String,
pub history_size: usize,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema, Default)]
pub struct MLScalerStatus {
pub current_replicas: i32,
pub predicted_replicas: i32,
pub conditions: Vec<CustomCondition>,
}
В Kubernetes есть стандартные объекты: Pod, Service, Deployment и другие. Но иногда нужно создать свой тип объекта — например, для специфичных задач вашего приложения. Такой объект называется Custom Resource (CRD).Здесь мы говорим: "Создай новый тип ресурса для Kubernetes на основе этой структуры".
Пример:
Представьте, что вы пишете систему для управления приложениями. Вам нужен объект, который хранит информацию о приложении: его имя и количество копий (реплик). Это и есть ваш AppSpec.
- group = "example.com" — группа, к которой относится ресурс (обычно домен вашей компании).
- version = "v1" — версия API.
- kind = "App" — тип ресурса (будет использоваться в манифестах Kubernetes).
- namespaced — ресурс принадлежит определённому namespace в Kubernetes.
- AppSpec — это желаемое состояние вашего ресурса (что вы хотите получить).
- AppStatus — это текущее состояние (что есть на самом деле).
- AppSpec хранит имя приложения и количество реплик, которые вы хотите запустить.
- AppStatus хранит количество реально доступных реплик.
Пример:
Вы хотите запустить 3 реплики (replicas: 3), но пока доступно только 2 (available_replicas: 2). Kubernetes будет пытаться довести количество реплик до нужного.
- Serialize и Deserialize — позволяют преобразовывать структуру в JSON и обратно (чтобы Kubernetes мог читать и записывать данные).
- Default — позволяет создать объект с значениями по умолчанию.
Теперь реализуем логику оператора, которая будет отслеживать изменения в CRD и выполнять соответствующие действия.
main.rs
mod crd;
use crd::{MLScaler, MLScalerStatus, CustomCondition};
use futures::StreamExt;
use kube::{
api::{Api, Patch, PatchParams, ResourceExt},
client::Client,
runtime::{
controller::{Action, Controller},
finalizer::{finalizer, Event as Finalizer},
watcher::Config,
},
};
use k8s_openapi::api::apps::v1::Deployment;
use serde_json::json;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tracing::{error, info, warn};
use tch::{Device, Kind, Tensor};
const FINALIZER_NAME: &str = "mlscaler.example.com/finalizer";
#[derive(Debug, Error)]
enum Error {
#[error("Kubernetes API error: {0}")]
Kube(#[from] kube::Error),
#[error("ML computation error: {0}")]
Ml(String),
#[error("Finalizer error: {0}")]
Finalizer(#[from] kube::runtime::finalizer::Error<Box<Error>>),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
}
async fn reconcile(obj: Arc<MLScaler>, _ctx: Arc<()>) -> Result<Action, Error> {
let client = Client::try_default().await?;
let ns = obj.namespace().ok_or(Error::Ml("No namespace".to_string()))?;
let name = obj.name_any();
info!("Reconciling MLScaler '{}' in '{}'", name, ns);
let mlscalers: Api<MLScaler> = Api::namespaced(client.clone(), &ns);
finalizer(&mlscalers, FINALIZER_NAME, obj, |event| async {
match event {
Finalizer::Apply(mlscaler) => apply(&client, &ns, mlscaler).await.map_err(Box::new),
Finalizer::Cleanup(mlscaler) => cleanup(&client, &ns, mlscaler).await.map_err(Box::new),
}
})
.await
.map_err(Error::Finalizer)
}
async fn apply(client: &Client, ns: &str, mlscaler: Arc<MLScaler>) -> Result<Action, Error> {
// Simulate historical metrics (replace with real metrics fetch in production)
let history_size = mlscaler.spec.history_size;
let historical_times: Vec<f64> = (0..history_size).map(|i| i as f64).collect();
let historical_metrics: Vec<f64> = vec![0.1, 0.2, 0.3, 0.4, 0.5]; // Simulated CPU values
if historical_metrics.len() != history_size {
return Ok(Action::requeue(Duration::from_secs(10)));
}
// ML: Linear regression using tch-rs (simplified for compatibility)
let device = if tch::Cuda::is_available() { Device::Cuda(0) } else { Device::Cpu };
let times = Tensor::f_from_slice(&historical_times).unwrap().to_device(device);
let metrics = Tensor::f_from_slice(&historical_metrics).unwrap().to_device(device);
// Simple linear regression: y = ax + b
// Using least squares: slope = covariance(x,y) / variance(x)
let times_mean = times.mean(Kind::Float);
let metrics_mean = metrics.mean(Kind::Float);
let times_centered = × - ×_mean;
let metrics_centered = &metrics - &metrics_mean;
let covariance = (×_centered * &metrics_centered).mean(Kind::Float);
let variance = (×_centered * ×_centered).mean(Kind::Float);
let slope = covariance / variance;
let intercept = &metrics_mean - &slope * ×_mean;
// Predict next load
let next_time = Tensor::f_from_slice(&[(history_size as f64)]).unwrap().to_device(device);
let predicted_tensor = &slope * &next_time + &intercept;
let predicted_load: f64 = predicted_tensor.double_value(&[]);
// Compute desired replicas
let factor = 10.0; // Arbitrary scaling factor
let desired = ((predicted_load * factor) as i32).clamp(mlscaler.spec.min_replicas, mlscaler.spec.max_replicas);
// Scale target Deployment
let deps: Api<Deployment> = Api::namespaced(client.clone(), ns);
let target = &mlscaler.spec.target_deployment;
let dep = deps.get(target).await?;
let current = dep.spec.as_ref().and_then(|s| s.replicas).unwrap_or(1);
if desired != current {
let patch = json!({"spec": {"replicas": desired}});
deps.patch(target, &PatchParams::default(), &Patch::Merge(patch)).await?;
info!("Scaled '{}' to {} replicas based on prediction {}", target, desired, predicted_load);
}
// Update status with conditions
let condition = CustomCondition {
type_: "Scaled".to_string(),
status: "True".to_string(),
reason: "PredictionBased".to_string(),
message: format!("Predicted load: {:.2}", predicted_load),
last_transition_time: chrono::Utc::now().to_rfc3339(),
};
let status = MLScalerStatus {
current_replicas: desired,
predicted_replicas: desired,
conditions: vec![condition],
};
let status_patch = json!({"status": status});
let mlscalers: Api<MLScaler> = Api::namespaced(client.clone(), ns);
mlscalers.patch_status(&mlscaler.name_any(), &PatchParams::default(), &Patch::Merge(status_patch)).await?;
Ok(Action::requeue(Duration::from_secs(30)))
}
async fn cleanup(client: &Client, ns: &str, mlscaler: Arc<MLScaler>) -> Result<Action, Error> {
// Reset replicas to min on deletion
let deps: Api<Deployment> = Api::namespaced(client.clone(), ns);
let target = &mlscaler.spec.target_deployment;
let patch = json!({"spec": {"replicas": mlscaler.spec.min_replicas}});
deps.patch(target, &PatchParams::default(), &Patch::Merge(patch)).await?;
info!("Cleaned up '{}' for MLScaler deletion", target);
Ok(Action::await_change())
}
fn error_policy(_obj: Arc<MLScaler>, err: &Error, _ctx: Arc<()>) -> Action {
warn!("Reconciliation error: {:?}", err);
Action::requeue(Duration::from_secs(5))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;
let mlscalers: Api<MLScaler> = Api::all(client.clone());
let deps: Api<Deployment> = Api::all(client.clone()); // Watch Deployments for ownership
Controller::new(mlscalers, Config::default())
.owns(deps, Config::default())
.run(reconcile, error_policy, Arc::new(()))
.for_each(|res| async move {
match res {
Ok(_) => info!("Reconciled successfully"),
Err(e) => error!("Reconciliation failed: {:?}", e),
}
})
.await;
Ok(())
}
reconcile— Основная функция, которая "приводит в порядок" состояние вашего кастомного ресурса. Она сравнивает желаемое состояние (что вы хотите) с реальным (что есть сейчас) и исправляет расхождения и возникает при каждом выступе CRD. В ней определены, какие действия необходимо активировать в зависимости от текущего состояния ресурса. Если в AppSpec указано, что должно быть 3 реплики, а в реальности их 2 — функция может создать ещё одну реплику или обновить статус, чтобы отразить реальное положение дел.
error_policy— функция, которая определяет поведение оператора при устранении ошибок. В данном случае она просто повторяет это через 5 секунд.
Controller::new— Создаёт контроллер, который будет отслеживать изменения ресурсов CRD и сохранять функцию reconcile.
CRD — это способ добавить свои собственные типы объектов в Kubernetes. Например, если вам нужно управлять не только подами (pods) и сервисами, но и, скажем, "приложениями" (Apps) с уникальной логикой — вы создаёте CRD для этого типа.
Пример:
В коде есть структура App — это и есть ваш кастомный ресурс. Вы можете создавать объекты типа App в Kubernetes, как если бы это были стандартные поды или деплойменты.
- Client — это соединение с Kubernetes-кластером.
- Api<App> — это интерфейс для работы с вашими кастомными ресурсами (App) через Kubernetes API.
Пример:
Вы можете создавать, обновлять, удалять объекты App через этот интерфейс, как если бы вы работали с обычными подами.
Patch и PatchParams - это способ обновить только часть объекта в Kubernetes, не перезаписывая его целиком.
Пример:
В коде обновляется только поле status у объекта App, а не весь объект.
Controller — это "наблюдатель", который следит за изменениями ваших кастомных ресурсов и вызывает reconcile, когда что-то меняется.
Пример:
Если кто-то создаст или изменит объект App, контроллер это заметит и вызовет reconcile, чтобы привести всё в порядок.
Для сборки и запуска оператора:
cargo build --release
Dockerfile для оператора:
FROM rust:slim-buster AS builder
WORKDIR /app
COPY . .
RUN cargo build --release
FROM debian:buster-slim
COPY --from=builder /app/target/release/k8s-operator /usr/local/bin/
ENTRYPOINT ["k8s-operator"]
Возьмите Docker-образ:
docker build -t your-username/k8s-operator:latest .
Загрузите образ в реестр Docker:
docker push your-username/k8s-operator:latest
Создайте манифест для оператора развертывания в Kubernetes:
apiVersion: apps/v1
kind: Deployment
metadata:
name: k8s-operator
spec:
replicas: 1
selector:
matchLabels:
app: k8s-operator
template:
metadata:
labels:
app: k8s-operator
spec:
containers:
- name: k8s-operator
image: your-username/k8s-operator:latest
imagePullPolicy: Always
Примените манифесты и разверните оператора:
kubectl apply -f operator-deployment.yaml
Создайте объект вашего пользовательского ресурса (CRD) и наблюдайте за действиями оператора - my-app.yaml:
apiVersion: example.com/v1
kind: App
metadata:
name: my-app
namespace: default
spec:
name: "My Rust App"
replicas: 3
Замените этот файл:
kubectl apply -f my-app.yaml
Теперь ваш оператор будет следить за типом объектов App и обновлять статус на основе количества реплик, указанного в характеристиках.