Найти в Дзене
Один Rust не п...Rust

Планировщик на Rust с ML

GitHub - nicktretyakov/SchedulerML-

Для минимальной реализации планировщика задач с использоанием ML необходимо следующее:

Tokio для полноценной асинхронной поддержки.
Таймер Tokio для неблокирующих задержек.
Безблокировочная архитектура с асинхронными мьютексами.
Выделенный канал для передачи обучающих данных.
Фоновый поток для обучения модели.
Неблокирующее прогнозирование во время планирования.
Фьючерсы.
Система Waker - обрабатывается средой выполнения Tokio

use std::collections::{HashMap, BinaryHeap};

use std::sync::{Arc, Mutex};

use std::time::{Duration, Instant};

use tokio::sync::{mpsc, Mutex as AsyncMutex};

use ndarray::Array1;

use rand::Rng;

// Simplified task metadata

#[derive(Debug, Clone)]

struct TaskMetadata {

id: usize,

priority: f64,

dependencies: Vec<usize>,

start_time: Instant,

}

// ML Model (simplified)

struct MLModel {

weights: Array1<f64>,

}

impl MLModel {

fn new(num_features: usize) -> Self {

MLModel {

weights: Array1::ones(num_features),

}

}

fn predict(&self, features: &[f64]) -> f64 {

self.weights.dot(&Array1::from_vec(features.to_vec()))

}

async fn train_loop(receiver: mpsc::Receiver<(Array1<f64>, f64)>) {

// Batch training happens in background

while let Some((features, target)) = receiver.recv().await {

// Actual training logic here

println!("Training on data: {:?} -> {}", features, target);

}

}

}

// Async task scheduler

#[derive(Clone)]

struct Scheduler {

task_queue: Arc<AsyncMutex<BinaryHeap<PriorityTask>>>,

ml_sender: mpsc::Sender<(Array1<f64>, f64)>,

task_counter: Arc<Mutex<usize>>,

completed_tasks: Arc<Mutex<HashMap<usize, Duration>>>,

}

#[derive(Debug)]

struct PriorityTask {

metadata: TaskMetadata,

future: Pin<Box<dyn std::future::Future<Output = ()> + Send>>,

}

impl Eq for PriorityTask {}

impl PartialEq for PriorityTask {

fn eq(&self, other: &Self) -> bool {

self.metadata.id == other.metadata.id

}

}

impl PartialOrd for PriorityTask {

fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {

other.metadata.priority.partial_cmp(&self.metadata.priority)

}

}

impl Ord for PriorityTask {

fn cmp(&self, other: &Self) -> std::cmp::Ordering {

self.partial_cmp(other).unwrap()

}

}

impl Scheduler {

fn new() -> Self {

let (ml_sender, ml_receiver) = mpsc::channel(100);

// Start ML training thread

tokio::spawn(async move {

MLModel::train_loop(ml_receiver).await

});

Scheduler {

task_queue: Arc::new(AsyncMutex::new(BinaryHeap::new())),

ml_sender,

task_counter: Arc::new(Mutex::new(0)),

completed_tasks: Arc::new(Mutex::new(HashMap::new())),

}

}

async fn spawn<F>(&self, future: F, dependencies: Vec<usize>)

where

F: Future<Output = ()> + Send + 'static,

{

let id = {

let mut counter = self.task_counter.lock().unwrap();

*counter += 1;

*counter

};

let metadata = TaskMetadata {

id,

priority: rand::thread_rng().gen_range(0.0..1.0), // Initial random priority

dependencies,

start_time: Instant::now(),

};

let task = PriorityTask {

metadata: metadata.clone(),

future: Box::pin(future),

};

self.task_queue.lock().await.push(task);

}

async fn run(&self) {

let model = MLModel::new(3); // Simplified model

while let Some(mut task) = self.task_queue.lock().await.pop() {

let features = self.calculate_features(&task.metadata).await;

let new_priority = model.predict(&features);

// Update task priority

task.metadata.priority = new_priority;

// Re-insert with new priority

self.task_queue.lock().await.push(task);

// Process highest priority task

if let Some(mut task) = self.task_queue.lock().await.pop() {

let start_time = Instant::now();

// Run the future to completion

task.future.await;

// Record completion time

let duration = start_time.elapsed();

self.completed_tasks.lock().unwrap()

.insert(task.metadata.id, duration);

// Send training data to ML thread

let target = if duration < Duration::from_millis(100) {

1.0

} else {

0.0

};

let _ = self.ml_sender.send((Array1::from_vec(features), target)).await;

}

}

}

async fn calculate_features(&self, metadata: &TaskMetadata) -> Vec<f64> {

let completed = self.completed_tasks.lock().unwrap();

let success_rate = metadata.dependencies.iter()

.filter(|id| completed.contains_key(id))

.count() as f64 / metadata.dependencies.len().max(1) as f64;

vec![

metadata.priority,

metadata.dependencies.len() as f64,

success_rate,

]

}

}

// Custom async sleep using Tokio's timers

async fn async_sleep(duration: Duration) {

tokio::time::sleep(duration).await;

}

#[tokio::main]

async fn main() {

let scheduler = Scheduler::new();

// Spawn tasks with dependencies

let sched_clone = scheduler.clone();

tokio::spawn(async move {

sched_clone.spawn(async {

println!("Task 1 started");

async_sleep(Duration::from_secs(1)).await;

println!("Task 1 completed");

}, vec![]).await;

});

// More tasks would be added here...

scheduler.run().await;

}