Для минимальной реализации необходимо следующее:
- Интеграция ML через ONNX Runtime:
Загрузка и выполнение ONNX-моделей
Предобработка изображений для нейросетей
Анализ контента страницы с помощью CV - Продвинутые техники автоматизации:
Человекообразный скроллинг с рандомизацией
Обработка капч с использованием ML
Обход анти-бот систем через DevTools Protocol
Динамический анализ контента страницы - Архитектурные особенности:
Кастомные трейты для расширения функциональности
Асинхронная обработка изображений
Изоляция блокирующих операций ML
Конфигурируемая инициализация драйвера - Дополнительные функции:
Прокси-поддержка
Управление сессиями через куки
Анализ результатов действий с помощью ML
Обработка всплывающих окон
[dependencies]
anyhow = "1.0"
async-openai = "0.16"
async-trait = "0.1"
backoff = "0.4"
bb8 = "0.8"
bb8-redis = "0.17"
image = "0.24"
ndarray = "0.15"
rand = "0.8"
redis = "0.23"
reqwest = "0.11"
serde_json = "1.0"
thirtyfour = { version = "0.31", features = ["chrome"] }
thiserror = "1.0"
tokio = { version = "1.0", features = ["full"] }
tract-onnx = "0.18"
tracing = "0.1"
tracing-subscriber = "0.3"
use anyhow::{anyhow, Context, Result};
use async_openai::{
types::{ChatCompletionRequestMessageArgs, CreateChatCompletionRequestArgs, Role},
Client,
};
use backoff::{future::retry, ExponentialBackoff};
use bb8::{Pool, PooledConnection};
use bb8_redis::RedisConnectionManager;
use redis::AsyncCommands;
use thirtyfour::{
extensions::chrome::{ChromeDevTools, ChromeOptions},
prelude::*,
};
use tract_onnx::prelude::*;
use image::{io::Reader as ImageReader, DynamicImage};
use tokio::{
fs, sync::Mutex,
task, time::{sleep, Duration},
};
use async_trait::async_trait;
use std::{path::Path, sync::Arc, time::Instant};
use tracing::{debug, error, info, instrument, span, Level};
/// Кастомные ошибки
#[derive(thiserror::Error, Debug)]
enum AutomationError {
#[error("WebDriver error: {0}")]
WebDriver(String),
#[error("ML processing failed: {0}")]
MLProcessing(String),
#[error("OpenAI error: {0}")]
OpenAI(String),
#[error("Redis error: {0}")]
Redis(String),
#[error("Max retries exceeded")]
MaxRetriesExceeded,
}
/// Расширенная конфигурация
struct MLDriverConfig {
model_path: String,
headless: bool,
proxy: Option<String>,
user_agent: String,
openai_api_key: String,
redis_url: String,
webdriver_pool_size: u32,
}
/// Пул WebDriver
type WebDriverPool = Pool<WebDriverConnectionManager>;
/// Менеджер соединений WebDriver для bb8
struct WebDriverConnectionManager {
url: String,
}
#[async_trait]
impl bb8::ManageConnection for WebDriverConnectionManager {
type Connection = WebDriver;
type Error = WebDriverError;
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
WebDriver::new(&self.url, DesiredCapabilities::chrome()).await
}
async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
conn.status().await.map(|_| ())
}
fn has_broken(&self, _: &mut Self::Connection) -> bool {
false
}
}
/// Структура для распределенного выполнения задач
struct DistributedExecutor {
redis_pool: bb8::Pool<RedisConnectionManager>,
}
impl DistributedExecutor {
async fn new(redis_url: &str) -> Result<Self> {
let manager = RedisConnectionManager::new(redis_url)?;
let pool = bb8::Pool::builder().build(manager).await?;
Ok(Self { redis_pool: pool })
}
async fn enqueue_task(&self, queue: &str, task: &str) -> Result<()> {
let mut conn = self.redis_pool.get().await?;
conn.lpush(queue, task).await?;
Ok(())
}
async fn dequeue_task(&self, queue: &str) -> Result<Option<String>> {
let mut conn = self.redis_pool.get().await?;
let task: String = conn.rpop(queue, None).await?;
Ok(Some(task))
}
}
/// Кастомный WebDriver с интеграцией ML и поддержкой распределенного выполнения
struct MLEnhancedWebDriver {
driver: WebDriver,
devtools: ChromeDevTools,
model: SimplePlan<TypedFact, Box<dyn TypedOp>, Graph<TypedFact, Box<dyn TypedOp>>>,
openai_client: Client,
executor: DistributedExecutor,
logger: Arc<Mutex<tracing::Span>>,
}
/// Трейт с расширенными методами, включая LLM
#[async_trait]
trait BrowserAutomation {
async fn solve_captcha(&self, image_path: &str) -> Result<String>;
async fn behavioral_scroll(&self, scroll_steps: usize) -> Result<()>;
async fn ai_analyze_page(&self) -> Result<Vec<f32>>;
async fn llm_analyze_content(&self, html: &str, prompt: &str) -> Result<String>;
async fn distributed_task_execution(&self, task: &str) -> Result<String>;
}
#[async_trait]
impl BrowserAutomation for MLEnhancedWebDriver {
#[instrument(skip(self), level = "info")]
async fn solve_captcha(&self, image_path: &str) -> Result<String> {
let operation = || async {
let start_time = Instant::now();
let img = ImageReader::open(image_path)
.map_err(|e| backoff::Error::permanent(AutomationError::MLProcessing(e.to_string())))?
.decode()
.map_err(|e| backoff::Error::permanent(AutomationError::MLProcessing(e.to_string())))?;
let processed = preprocess_image(img).await.map_err(|e| backoff::Error::transient(e))?;
let result = self.model.run(tvec!(processed.into()))
.map_err(|e| backoff::Error::permanent(AutomationError::MLProcessing(e.to_string())))?;
let best = result[0]
.to_array_view::<f32>()
.map_err(|e| backoff::Error::permanent(AutomationError::MLProcessing(e.to_string())))?
.iter()
.enumerate()
.max_by(|a, b| a.1.partial_cmp(b.1).unwrap());
match best {
Some((idx, _)) => {
info!("Captcha solved in {:?}", start_time.elapsed());
Ok(format!("{}", idx))
},
None => Err(backoff::Error::permanent(AutomationError::MLProcessing("No solution found".to_string()))),
}
};
let backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(30)),
..ExponentialBackoff::default()
};
retry(backoff, operation).await.map_err(|e| anyhow!(e))
}
#[instrument(skip(self), level = "info")]
async fn behavioral_scroll(&self, scroll_steps: usize) -> Result<()> {
for i in 0..scroll_steps {
let scroll_distance = (300 + rand::random::<i32>() % 200) as i64;
let variance = (rand::random::<i32>() % 50) as i64;
let duration = 200 + (rand::random::<u64>() % 300);
if let Err(e) = self.devtools
.execute_cdp("Input.dispatchMouseEvent", serde_json::json!({
"type": "mouseWheel",
"x": 100,
"y": 400 + i * 30,
"deltaX": 0,
"deltaY": scroll_distance + variance,
"pointerType": "mouse"
}))
.await
{
error!("Behavioral scroll failed: {}", e);
return Err(anyhow!(AutomationError::WebDriver(e.to_string())));
}
sleep(Duration::from_millis(duration)).await;
}
Ok(())
}
#[instrument(skip(self), level = "info")]
async fn ai_analyze_page(&self) -> Result<Vec<f32>> {
let screenshot = self.driver.screenshot_as_png().await
.map_err(|e| anyhow!(AutomationError::WebDriver(e.to_string())))?;
let img = image::load_from_memory(&screenshot)
.map_err(|e| anyhow!(AutomationError::MLProcessing(e.to_string())))?;
let tensor = preprocess_image(img).await?;
let result = self.model.run(tvec!(tensor))
.map_err(|e| anyhow!(AutomationError::MLProcessing(e.to_string())))?;
Ok(result[0].to_array_view::<f32>()?.to_vec())
}
#[instrument(skip(self), level = "info")]
async fn llm_analyze_content(&self, html: &str, prompt: &str) -> Result<String> {
let messages = vec![
ChatCompletionRequestMessageArgs::default()
.role(Role::System)
.content("You are an expert web content analyst. Analyze the provided HTML and answer the user's question.")
.build()
.map_err(|e| AutomationError::OpenAI(e.to_string()))?,
ChatCompletionRequestMessageArgs::default()
.role(Role::User)
.content(format!("HTML: {}\n\nQuestion: {}", html, prompt))
.build()
.map_err(|e| AutomationError::OpenAI(e.to_string()))?,
];
let request = CreateChatCompletionRequestArgs::default()
.model("gpt-4")
.messages(messages)
.build()
.map_err(|e| AutomationError::OpenAI(e.to_string()))?;
let backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(60)),
..ExponentialBackoff::default()
};
let response = retry(backoff, || async {
self.openai_client
.chat()
.create(request.clone())
.await
.map_err(|e| backoff::Error::transient(AutomationError::OpenAI(e.to_string())))
})
.await?;
response
.choices
.into_iter()
.next()
.and_then(|c| c.message.content)
.ok_or_else(|| anyhow!(AutomationError::OpenAI("Empty response".to_string())))
}
#[instrument(skip(self), level = "info")]
async fn distributed_task_execution(&self, task: &str) -> Result<String> {
self.executor.enqueue_task("automation_tasks", task).await?;
info!("Task enqueued: {}", task);
let start_time = Instant::now();
while start_time.elapsed() < Duration::from_secs(300) {
if let Some(result) = self.executor.dequeue_task("results").await? {
info!("Task result received: {}", result);
return Ok(result);
}
sleep(Duration::from_secs(1)).await;
}
Err(anyhow!(AutomationError::Redis("Task timeout".to_string())))
}
}
/// Предобработка изображения для модели
async fn preprocess_image(img: DynamicImage) -> Result<Tensor> {
task::spawn_blocking(move || {
let resized = img.resize_exact(224, 224, image::imageops::FilterType::Lanczos3);
let rgb = resized.to_rgb8();
let mut array = ndarray::Array::zeros((1, 3, 224, 224));
for (x, y, pixel) in rgb.enumerate_pixels() {
array[[0, 0, y as usize, x as usize]] = pixel[0] as f32 / 255.0;
array[[0, 1, y as usize, x as usize]] = pixel[1] as f32 / 255.0;
array[[0, 2, y as usize, x as usize]] = pixel[2] as f32 / 255.0;
}
Ok(array.into_tensor())
})
.await?
.map_err(|e| anyhow!(e))
}
/// Инициализация логирования
fn init_logging() {
tracing_subscriber::fmt()
.with_max_level(Level::INFO)
.init();
}
impl MLEnhancedWebDriver {
#[instrument(level = "info")]
async fn new(config: &MLDriverConfig) -> Result<Self> {
let mut options = ChromeOptions::new();
if config.headless {
options.add_arg("--headless=new").map_err(|e| anyhow!(e))?;
}
if let Some(proxy) = &config.proxy {
options.add_arg(format!("--proxy-server={}", proxy)).map_err(|e| anyhow!(e))?;
}
options.add_arg(format!("--user-agent={}", config.user_agent)).map_err(|e| anyhow!(e))?;
options.add_experimental_option("excludeSwitches", vec!["enable-automation"]).map_err(|e| anyhow!(e))?;
let caps = DesiredCapabilities::chrome();
let driver = WebDriver::new("http://localhost:9515", caps).await
.map_err(|e| anyhow!(AutomationError::WebDriver(e.to_string())))?;
let devtools = ChromeDevTools::new(driver.handle());
// Загрузка ONNX-модели
let model = task::spawn_blocking({
let model_path = config.model_path.clone();
move || {
tract_onnx::onnx()
.model_for_path(Path::new(&model_path))
.context("Failed to load model")?
.with_input_fact(0, f32::fact([1, 3, 224, 224]).into())
.context("Setting input fact")?
.into_optimized()
.context("Optimizing model")?
.into_runnable()
.context("Creating runnable")
}
})
.await?
.map_err(|e| anyhow!(AutomationError::MLProcessing(e.to_string())))??;
// Клиент OpenAI
let openai_client = Client::new().with_api_key(&config.openai_api_key);
// Распределенный исполнитель
let executor = DistributedExecutor::new(&config.redis_url).await?;
// Логгер
let logger = Arc::new(Mutex::new(span!(Level::INFO, "MLEnhancedWebDriver")));
Ok(Self {
driver,
devtools,
model,
openai_client,
executor,
logger,
})
}
#[instrument(skip(self), level = "info")]
async fn smart_navigate(&self, url: &str) -> Result<()> {
let backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(60)),
..ExponentialBackoff::default()
};
retry(backoff, || async {
self.driver.goto(url).await
.map_err(|e| backoff::Error::transient(AutomationError::WebDriver(e.to_string())))
}).await?;
sleep(Duration::from_secs(2)).await;
// Анализ контента с помощью ML
match self.ai_analyze_page().await {
Ok(analysis) if analysis[0] > 0.7 => {
self.behavioral_scroll(5).await?;
}
Err(e) => error!("AI analysis failed: {}", e),
_ => {}
};
// Обнаружение и обработка модальных окон
if self.driver.query(By::Css(".modal")).first().await.is_ok() {
self.driver.execute(
"document.querySelector('.modal').style.display = 'none';",
vec![],
).await?;
}
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<()> {
init_logging();
let config = MLDriverConfig {
model_path: "model.onnx".to_string(),
headless: false,
proxy: Some("socks5://127.0.0.1:9050".to_string()),
user_agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36".to_string(),
openai_api_key: "sk-...".to_string(),
redis_url: "redis://127.0.0.1:6379".to_string(),
webdriver_pool_size: 5,
};
// Создаем пул WebDriver
let manager = WebDriverConnectionManager {
url: "http://localhost:9515".to_string(),
};
let webdriver_pool = bb8::Pool::builder()
.max_size(config.webdriver_pool_size)
.build(manager)
.await?;
// Пример использования пула
let conn = webdriver_pool.get().await?;
let bot = MLEnhancedWebDriver::new(&config).await?;
// Навигация с интеллектуальным анализом
bot.smart_navigate("https://example.com/protected").await?;
// Обработка капчи с ретраями
if let Ok(captcha) = bot.driver.find(By::Id("captcha-image")).await {
let screenshot = captcha.screenshot_as_png().await?;
fs::write("captcha.png", &screenshot).await?;
let solution = bot.solve_captcha("captcha.png").await?;
bot.driver.find(By::Id("captcha-input")).await?.send_keys(&solution).await?;
}
// Анализ контента с помощью LLM
let html = bot.driver.source().await?;
let analysis = bot.llm_analyze_content(&html, "What is the main purpose of this page?").await?;
info!("LLM analysis: {}", analysis);
// Распределенное выполнение задачи
let task_result = bot.distributed_task_execution("analyze_page").await?;
debug!("Task result: {}", task_result);
// Сохранение сессии
let cookies = bot.driver.get_all_cookies().await?;
fs::write("session.json", serde_json::to_string(&cookies)?).await?;
bot.driver.quit().await?;
Ok(())
}