Найти в Дзене
Кирилл Ледовский

🏥 Пошаговое руководство по созданию Self-Healing Systems для начинающего разработчика ИИ-агентов

Простое объяснение: Представьте, что ваша система — это живой организм. Когда вы порезали палец, тело само: Self-Healing System — это система, которая: Windows: cmd python --version
# Должно быть: Python 3.8.0 или выше Mac/Linux: bash # Проверьте текущую версию
python3 --version
# Если нет Python 3.8+, установите через brew (Mac)
brew install python@3.9
# Или через apt (Ubuntu/Debian)
sudo apt update
sudo apt install python3.9 python3-pip Windows/Mac: Linux: bash curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
sudo usermod -aG docker $USER
# Перезайдите в систему bash # Windows (PowerShell как администратор):
choco install minikube
# Mac:
brew install minikube
# Linux:
curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
sudo install minikube-linux-amd64 /usr/local/bin/minikube bash # Windows:
choco install kubernetes-cli
# Mac:
brew install kubectl
# Linux:
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.i
Оглавление

🌟 Что такое Self-Healing Systems?

Простое объяснение: Представьте, что ваша система — это живой организм. Когда вы порезали палец, тело само:

  1. Чувствует боль (мониторинг)
  2. Останавливает кровь (изоляция проблемы)
  3. Заживляет рану (восстановление)
  4. Усиливает кожу (улучшение)

Self-Healing System — это система, которая:

  1. Обнаруживает проблемы сама
  2. Диагностирует причину
  3. Восстанавливает работоспособность
  4. Улучшает себя чтобы проблема не повторилась

🔧 Шаг 0: Подготовка окружения

Цель шага: Установить всё необходимое для создания самовосстанавливающейся системы.

Что нужно установить:

1. Установка Python 3.8+

Windows:

  1. Скачайте установщик с python.org
  2. Запустите, ВАЖНО: поставьте галочку "Add Python to PATH"
  3. Проверьте установку:

cmd

python --version
# Должно быть: Python 3.8.0 или выше

Mac/Linux:

bash

# Проверьте текущую версию
python3 --version

# Если нет Python 3.8+, установите через brew (Mac)
brew install python@3.9

# Или через apt (Ubuntu/Debian)
sudo apt update
sudo apt install python3.9 python3-pip

2. Установка Docker

Windows/Mac:

  • Скачайте Docker Desktop
  • Установите, запустите

Linux:

bash

curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
sudo usermod -aG docker $USER
# Перезайдите в систему

3. Установка Minikube (упрощённый Kubernetes)

bash

# Windows (PowerShell как администратор):
choco install minikube

# Mac:
brew install minikube

# Linux:
curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
sudo install minikube-linux-amd64 /usr/local/bin/minikube

4. Установка kubectl (управление Kubernetes)

bash

# Windows:
choco install kubernetes-cli

# Mac:
brew install kubectl

# Linux:
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
sudo install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl

5. Проверка установки:

bash

python --version
docker --version
minikube version
kubectl version --client

Что должно получиться:
✅ Python 3.8+ установлен
✅ Docker работает
✅ Minikube и kubectl установлены

На что обратить внимание:

  • Docker должен быть запущен (иконка в трее)
  • На Windows включите WSL2 в Docker Desktop
  • При проблемах с Minikube: minikube delete && minikube start

🚀 Шаг 1: Создаём простую самовосстанавливающуюся систему

Цель шага: Создать систему, которая перезапускает упавшие компоненты.

Что такое "упавший компонент":

Это программа, которая перестала отвечать. Как сотрудник, который уснул на рабочем месте.

Создаём простой ИИ-агент:

python

# ai_agent_simple.py
import time
import random
import signal
import sys
from datetime import datetime

class AIAgent:
def __init__(self, agent_id):
self.agent_id = agent_id
self.healthy = True
self.error_count = 0

# Обработка Ctrl+C для graceful shutdown
signal.signal(signal.SIGINT, self.graceful_shutdown)
signal.signal(signal.SIGTERM, self.graceful_shutdown)

def graceful_shutdown(self, signum, frame):
"""Корректное завершение работы"""
print(f"\n🛑 Агент {self.agent_id} получает сигнал завершения...")
self.healthy = False
sys.exit(0)

def health_check(self):
"""Проверка здоровья агента"""
# Имитируем 10% шанс "заболеть"
if random.random() < 0.1:
self.healthy = False
self.error_count += 1
print(f"🤒 Агент {self.agent_id} 'заболел' (ошибок: {self.error_count})")
return False

# Проверяем потребление памяти (имитация)
memory_usage = random.uniform(0.1, 0.9)
if memory_usage > 0.8:
print(f"⚠️ Агент {self.agent_id} использует много памяти: {memory_usage:.1%}")

return True

def self_heal(self):
"""Попытка самовосстановления"""
print(f"💊 Агент {self.agent_id} пытается восстановиться...")

# Методы восстановления
methods = [
"Перезагрузка внутреннего кэша",
"Сброс соединений",
"Очистка временных файлов",
"Переподключение к зависимостям"
]

for method in methods:
print(f" ⟳ {method}...")
time.sleep(0.5)

# 70% шанс успеха для каждого метода
if random.random() < 0.7:
print(f" ✅ Успешно: {method}")
self.healthy = True
return True
else:
print(f" ❌ Не удалось: {method}")

return False

def process_task(self, task_id):
"""Обработка задачи"""
print(f"🔧 Агент {self.agent_id} обрабатывает задачу #{task_id}")

# Имитация обработки ИИ
processing_time = random.uniform(0.1, 1.0)
time.sleep(processing_time)

# Имитация результата ИИ
results = [
"Объект обнаружен: котик 🐱",
"Сентимент: позитивный 😊",
"Перевод завершён ✅",
"Прогноз: температура повысится 📈"
]

result = random.choice(results)
print(f"✅ Задача #{task_id}: {result}")

return result

def run(self):
"""Основной цикл работы"""
print(f"🤖 Агент {self.agent_id} запущен!")
print(f"⏰ {datetime.now().strftime('%H:%M:%S')}")
print("-" * 40)

task_id = 1

while True:
# 1. Проверяем здоровье
if not self.health_check():
print(f"🚨 Агент {self.agent_id} нездоров!")

# 2. Пытаемся восстановиться
if self.self_heal():
print(f"🎉 Агент {self.agent_id} восстановлен!")
else:
print(f"💀 Агент {self.agent_id} не смог восстановиться")
print("🔄 Требуется внешнее вмешательство...")

# В реальной системе здесь был бы перезапуск контейнера
# Пока просто ждём
time.sleep(5)
continue

# 3. Обрабатываем задачи
self.process_task(task_id)
task_id += 1

# 4. Небольшая пауза между задачами
time.sleep(1)

if __name__ == "__main__":
# Создаём и запускаем агента
agent = AIAgent("AI-001")
agent.run()

Запускаем агента:

bash

python ai_agent_simple.py

Что должно получиться:

text

🤖 Агент AI-001 запущен!
⏰ 14:30:00
----------------------------------------
🔧 Агент AI-001 обрабатывает задачу #1
✅ Задача #1: Объект обнаружен: котик 🐱
🔧 Агент AI-001 обрабатывает задачу #2
✅ Задача #2: Сентимент: позитивный 😊
🤒 Агент AI-001 'заболел' (ошибок: 1)
🚨 Агент AI-001 нездоров!
💊 Агент AI-001 пытается восстановиться...
⟳ Перезагрузка внутреннего кэша...
✅ Успешно: Перезагрузка внутреннего кэша
🎉 Агент AI-001 восстановлен!
🔧 Агент AI-001 обрабатывает задачу #3
...

На что обратить внимание:

  • Агент сам обнаруживает проблемы
  • Пытается восстановиться самостоятельно
  • Если не получается — ждёт внешней помощи
  • Можно нажать Ctrl+C для корректного завершения

🐳 Шаг 2: Самовосстановление с помощью Docker

Цель шага: Научить Docker автоматически перезапускать упавшие контейнеры.

Что такое Docker restart policies:

Политики перезапуска говорят Docker'у что делать, если контейнер упал:

  • no — не перезапускать
  • on-failure — перезапускать только при ошибках
  • always — всегда перезапускать
  • unless-stopped — перезапускать, пока не остановлен вручную

Создаём Dockerfile:

dockerfile

# Dockerfile
FROM python:3.9-slim

# Устанавливаем зависимости
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Копируем код
COPY ai_agent_simple.py .

# Создаём несуществующий файл для имитации ошибки
RUN touch /tmp/critical_file.txt

# Запускаем агента
CMD ["python", "ai_agent_simple.py"]

Создаём requirements.txt:

txt

# Пустой файл - у нас нет внешних зависимостей

Собираем и запускаем с самовосстановлением:

bash

# 1. Собираем Docker образ
docker build -t ai-agent:v1 .

# 2. Запускаем с политикой always
docker run -d \
--name ai-agent-1 \
--restart=always \
ai-agent:v1

# 3. Смотрим логи
docker logs -f ai-agent-1

# 4. Имитируем "смерть" контейнера
docker kill ai-agent-1

# 5. Проверяем, что Docker перезапустил его
docker ps | grep ai-agent-1

# 6. Смотрим сколько раз перезапускался
docker inspect ai-agent-1 | grep -A 5 RestartCount

Создаём "больного" агента для тестов:

python

# sick_agent.py
import time
import random
import os

print("🤢 Я больной агент, я упаду через случайное время!")

# С вероятностью 30% упадём сразу
if random.random() < 0.3:
print("💀 Падаю сразу!")
exit(1)

# Иначе упадём через 5-15 секунд
time_to_live = random.randint(5, 15)
print(f"⏳ Проживу ещё {time_to_live} секунд...")

for i in range(time_to_live):
print(f"❤️ Жив, секунда {i+1}/{time_to_live}")
time.sleep(1)

print("💀 Умираю как запланировано!")
exit(1)
# Код ошибки 1

Тестируем самовосстановление:

bash

# 1. Собираем образ больного агента
docker build -f <(echo "
FROM python:3.9-slim
COPY sick_agent.py .
CMD ['python', 'sick_agent.py']
") -t sick-agent .

# 2. Запускаем с мониторингом перезапусков
docker run -d \
--name sick-agent-test \
--restart=on-failure:5 \
# Максимум 5 перезапусков
sick-agent

# 3. Мониторим в реальном времени
watch -n 1 'docker ps | grep sick-agent-test && echo "---" && docker logs --tail=3 sick-agent-test'

# 4. Через минуту проверяем сколько было перезапусков
docker inspect sick-agent-test | grep -A 2 RestartCount

Что должно получиться:

✅ Контейнер падает каждые 5-15 секунд
✅ Docker автоматически перезапускает его
✅ После 5 перезапусков Docker останавливается
✅ Можно настроить задержку между перезапусками

На что обратить внимание:

  • --restart=always — для критически важных сервисов
  • --restart=on-failure — для временных задач
  • --restart=on-failure:3 — ограничить количество попыток
  • Docker отслеживает exit code (0 = успех, другие = ошибка)

☸️ Шаг 3: Самовосстановление с Kubernetes (Minikube)

Цель шага: Использовать Kubernetes для продвинутого самовосстановления.

Что такое Kubernetes:

Это "дирижёр оркестра" для контейнеров. Управляет тысячами контейнеров, распределяет нагрузку, перезапускает упавшие.

Запускаем Minikube:

bash

# Запускаем локальный Kubernetes
minikube start --memory=4096 --cpus=2

# Проверяем
kubectl get nodes
minikube status

Создаём Deployment для ИИ-агента:

yaml

# ai-agent-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-agent-deployment
labels:
app: ai-agent
spec:
replicas: 3
# Запускаем 3 копии
selector:
matchLabels:
app: ai-agent
template:
metadata:
labels:
app: ai-agent
spec:
containers:
- name: ai-agent
image: python:3.9-slim
command: ["python", "-c"]
args:
- |
import time
import random
import socket

hostname = socket.gethostname()
print(f"🤖 ИИ-агент запущен на {hostname}")

while True:
# Имитация работы
task_id = random.randint(1, 1000)
print(f"🔧 {hostname} обрабатывает задачу
#{task_id}")

# 5% шанс упасть
if random.random() < 0.05:
print(f"💀 {hostname} упал!")
exit(1)

time.sleep(2)

# 🔥 HEALTH CHECKS - САМОЕ ВАЖНОЕ!
livenessProbe:
exec:
command:
- python
- -c
- "import random; exit(0) if random.random() > 0.1 else exit(1)"
initialDelaySeconds: 5
# Ждём 5 секунд после старта
periodSeconds: 10
# Проверяем каждые 10 секунд
failureThreshold: 3
# 3 неудачи подряд = контейнер мёртв

readinessProbe:
exec:
command:
- python
- -c
- "import random; exit(0) if random.random() > 0.2 else exit(1)"
initialDelaySeconds: 3
periodSeconds: 5

resources:
requests:
memory: "64Mi"
cpu: "100m"
limits:
memory: "128Mi"
cpu: "200m"

Объяснение health checks:

  • livenessProbe — "жив ли контейнер?" Если нет → перезапустить
  • readinessProbe — "готов ли принимать трафик?" Если нет → убрать из балансировки
  • initialDelaySeconds — сколько ждать после старта
  • periodSeconds — как часто проверять
  • failureThreshold — сколько неудач терпеть

Применяем конфигурацию:

bash

# 1. Создаём deployment
kubectl apply -f ai-agent-deployment.yaml

# 2. Смотрим состояние подов
kubectl get pods -w
# -w для watch режима

# 3. В отдельном окне смотрим события
kubectl get events --watch

# 4. Смотрим логи конкретного пода
kubectl logs -f <pod-name>

# 5. Убиваем один под вручную
kubectl delete pod <pod-name>

# 6. Наблюдаем как Kubernetes создаёт новый
kubectl get pods -w

Создаём Service для доступа:

yaml

# ai-agent-service.yaml
apiVersion: v1
kind: Service
metadata:
name: ai-agent-service
spec:
selector:
app: ai-agent
ports:
- port: 8080
targetPort: 80
type: LoadBalancer

bash

kubectl apply -f ai-agent-service.yaml
minikube service ai-agent-service --url

Что должно получиться:

✅ 3 пода работают параллельно
✅ Kubernetes отслеживает их здоровье
✅ Упавшие поды автоматически перезапускаются
✅ Service распределяет нагрузку между живыми подами

На что обратить внимание:

  • Kubernetes перезапускает поды с экспоненциальной задержкой (backoff)
  • Поды могут перезапускаться на разных нодах
  • Readiness probe защищает от отправки трафика на "больные" поды

📊 Шаг 4: Мониторинг с Prometheus

Цель шага: Научить систему "чувствовать боль" — отслеживать метрики.

Что такое Prometheus:

Система сбора метрик. Как медицинские датчики, которые постоянно измеряют пульс, давление, температуру.

Устанавливаем Prometheus в Minikube:

bash

# 1. Добавляем репозиторий Helm
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo update

# 2. Устанавливаем Prometheus
helm install prometheus prometheus-community/prometheus

# 3. Открываем доступ к веб-интерфейсу
kubectl port-forward service/prometheus-server 9090:80

# 4. Открываем в браузере: http://localhost:9090

Добавляем метрики в ИИ-агент:

python

# ai_agent_with_metrics.py
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import time
import random
import psutil
import json

class MetricsHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/metrics':
# Собираем метрики
metrics = {
'ai_agent_tasks_processed_total': random.randint(100, 1000),
'ai_agent_processing_time_seconds': random.uniform(0.1, 2.0),
'ai_agent_errors_total': random.randint(0, 5),
'ai_agent_memory_usage_bytes': psutil.Process().memory_info().rss,
'ai_agent_cpu_usage_percent': psutil.cpu_percent(),
'ai_agent_healthy': 1 if random.random() > 0.1 else 0
}

# Формат Prometheus
response = "\n".join([
f'{name} {value}'
for name, value in metrics.items()
])

self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(response.encode())
elif self.path == '/health':
self.send_response(200)
self.end_headers()
self.wfile.write(b'OK')
else:
self.send_response(404)
self.end_headers()

def start_metrics_server():
"""Запускаем HTTP сервер для метрик"""
server = HTTPServer(('0.0.0.0', 8000), MetricsHandler)
print("📊 Метрики доступны на порту 8000")
server.serve_forever()

def ai_agent_work():
"""Имитация работы ИИ-агента"""
agent_id = random.randint(1, 1000)
print(f"🤖 ИИ-агент #{agent_id} запущен")

while True:
# Имитация обработки
print(f"🔧 Агент #{agent_id} обрабатывает задачу")
time.sleep(random.uniform(0.5, 2.0))

# Случайная ошибка
if random.random() < 0.05:
print(f"💥 Агент #{agent_id} сгенерировал ошибку")

if __name__ == "__main__":
# Запускаем сбор метрик в отдельном потоке
metrics_thread = threading.Thread(target=start_metrics_server, daemon=True)
metrics_thread.start()

# Запускаем работу агента
ai_agent_work()

Создаём ConfigMap для Prometheus:

yaml

# prometheus-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-additional-config
data:
prometheus-additional.yml: |
scrape_configs:
- job_name: 'ai-agents'
static_configs:
- targets: ['ai-agent-service:8000']
scrape_interval: 5s

bash

# Применяем конфиг
kubectl apply -f prometheus-config.yaml

# Обновляем Prometheus
helm upgrade prometheus prometheus-community/prometheus \
--set-file extraScrapeConfigs=prometheus-additional.yml

Проверяем метрики:

  1. Откройте http://localhost:9090
  2. Введите в поиске: ai_agent_healthy
  3. Нажмите Execute → Graph
  4. Увидите график здоровья агентов

Создаём правила алертов:

yaml

# prometheus-alerts.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-alerts
data:
alerts.yml: |
groups:
- name: ai-agent-alerts
rules:
- alert: AIAgentUnhealthy
expr: ai_agent_healthy == 0
for: 1m
labels:
severity: critical
annotations:
summary: "ИИ-агент нездоров"
description: "Агент {{ $labels.instance }} не отвечает более 1 минуты"

- alert: HighMemoryUsage
expr: ai_agent_memory_usage_bytes > 100000000 # >100MB
for: 2m
labels:
severity: warning
annotations:
summary: "Высокое потребление памяти"
description: "Агент {{ $labels.instance }} использует {{ $value }} байт памяти"

- alert: ManyErrors
expr: rate(ai_agent_errors_total[5m]) > 0.1
for: 1m
labels:
severity: warning
annotations:
summary: "Много ошибок"
description: "Высокий rate ошибок: {{ $value }} в секунду"

Что должно получиться:

✅ Prometheus собирает метрики каждые 5 секунд
✅ Видно графики здоровья, памяти, ошибок
✅ Можно создавать алерты при проблемах
✅ Метрики сохраняются для анализа трендов

На что обратить внимание:

  • Prometheus хранит данные 15 дней по умолчанию
  • Можно настроить retention policy
  • Используйте rate() для подсчёта скорости событий
  • Тестируйте алерты перед продакшеном

📈 Шаг 5: Визуализация с Grafana

Цель шага: Создать "медицинскую панель" для наблюдения за системой.

Что такое Grafana:

Инструмент для создания красивых дашбордов. Как монитор пациента в больнице, где видно все показатели сразу.

Устанавливаем Grafana:

bash

# 1. Устанавливаем через Helm
helm repo add grafana https://grafana.github.io/helm-charts
helm install grafana grafana/grafana

# 2. Получаем пароль администратора
kubectl get secret grafana -o jsonpath="{.data.admin-password}" | base64 --decode

# 3. Открываем доступ
kubectl port-forward service/grafana 3000:80

# 4. Открываем в браузере: http://localhost:3000
# Логин: admin, пароль из шага 2

Настраиваем Prometheus как источник данных:

  1. В Grafana: Configuration → Data Sources → Add data source
  2. Выбираем Prometheus
  3. URL: http://prometheus-server:80
  4. Save & Test

Создаём дашборд для ИИ-агентов:

json

{
"dashboard": {
"title": "ИИ-агенты: Мониторинг здоровья",
"panels": [
{
"title": "Здоровье агентов",
"type": "stat",
"targets": [{
"expr": "avg(ai_agent_healthy)",
"legendFormat": "Среднее здоровье"
}],
"fieldConfig": {
"defaults": {
"thresholds": {
"steps": [
{"color": "red", "value": null},
{"color": "green", "value": 0.9}
]
}
}
}
},
{
"title": "Потребление памяти",
"type": "timeseries",
"targets": [{
"expr": "ai_agent_memory_usage_bytes",
"legendFormat": "{{instance}}"
}]
},
{
"title": "Ошибки в секунду",
"type": "timeseries",
"targets": [{
"expr": "rate(ai_agent_errors_total[5m])",
"legendFormat": "Rate ошибок"
}]
}
]
}
}

Создаём дашборд через Python:

python

# create_grafana_dashboard.py
import requests
import json

# Конфигурация
GRAFANA_URL = "http://localhost:3000"
GRAFANA_API_KEY = "your-api-key"
# Создайте в Grafana

headers = {
"Authorization": f"Bearer {GRAFANA_API_KEY}",
"Content-Type": "application/json"
}

dashboard = {
"dashboard": {
"title": "Self-Healing ИИ-агенты",
"panels": [
{
"id": 1,
"title": "Статус здоровья",
"type": "gauge",
"targets": [{
"expr": "ai_agent_healthy",
"legendFormat": "{{instance}}"
}],
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 0}
},
{
"id": 2,
"title": "Перезапуски пода",
"type": "stat",
"targets": [{
"expr": "kube_pod_container_status_restarts_total{container='ai-agent'}",
"legendFormat": "{{pod}}"
}],
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 0}
}
]
},
"overwrite": True
}

# Создаём дашборд
response = requests.post(
f"{GRAFANA_URL}/api/dashboards/db",
headers=headers,
data=json.dumps(dashboard)
)

print(f"Статус: {response.status_code}")
print(f"Ответ: {response.text}")

Настраиваем алерты в Grafana:

  1. В дашборде: Panel → Edit → Alert
  2. Создаём правило:
    When: avg() of query(A, 5m, now) < 0.5
    For: 2m
    Send to: можно подключить Slack/Email/Telegram
  3. Тестируем алерт

Что должно получиться:

✅ Красивый дашборд со всеми метриками
✅ Цветовые индикаторы здоровья
✅ Графики в реальном времени
✅ Алерты при проблемах

На что обратить внимание:

  • Сохраняйте дашборды как JSON для версионности
  • Используйте аннотации для отметки событий (деплои, инциденты)
  • Настройте разные дашборды для разных команд (DevOps, разработчики, менеджеры)

🔍 Шаг 6: Трассировка с Jaeger

Цель шага: Научить систему "понимать где болит" — отслеживать запросы через микросервисы.

Что такое Jaeger:

Система распределённой трассировки. Как GPS-трекер для запросов: показывает где был запрос, сколько времени провёл в каждом сервисе.

Устанавливаем Jaeger:

bash

# Устанавливаем через оператор (проще)
kubectl create namespace observability
kubectl apply -f https://github.com/jaegertracing/jaeger-operator/releases/download/v1.42.0/jaeger-operator.yaml -n observability

# Создаём экземпляр Jaeger
kubectl apply -f - <<EOF
apiVersion: jaegertracing.io/v1
kind: Jaeger
metadata:
name: simplest
EOF

# Открываем UI
kubectl port-forward service/simplest-query 16686:16686
# http://localhost:16686

Инструментируем ИИ-агента для трассировки:

python

# ai_agent_with_tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import Resource
import random
import time

# Настраиваем трассировку
trace.set_tracer_provider(
TracerProvider(
resource=Resource.create({"service.name": "ai-agent"})
)
)

jaeger_exporter = JaegerExporter(
agent_host_name="simplest-agent.observability",
agent_port=6831,
)

trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)

tracer = trace.get_tracer(__name__)

def process_image(image_url):
"""Обработка изображения с трассировкой"""
with tracer.start_as_current_span("process_image") as span:
span.set_attribute("image_url", image_url)

# Детекция объектов
with tracer.start_as_current_span("object_detection"):
time.sleep(random.uniform(0.1, 0.5))
span.add_event("Объекты обнаружены", {
"count": random.randint(1, 5)
})

# Классификация
with tracer.start_as_current_span("classification"):
time.sleep(random.uniform(0.2, 0.7))
if random.random() < 0.1:
span.set_status(trace.Status(trace.StatusCode.ERROR, "Ошибка классификации"))

# Ответ
return {"status": "success", "objects": ["cat", "dog"]}

def handle_request(request_id):
"""Обработка запроса"""
with tracer.start_as_current_span("handle_request") as span:
span.set_attribute("request_id", request_id)

# Имитация входящего запроса
images = [
"http://example.com/image1.jpg",
"http://example.com/image2.jpg"
]

results = []
for img in images:
result = process_image(img)
results.append(result)

span.set_attribute("results_count", len(results))
return results

# Тестируем
if __name__ == "__main__":
for i in range(10):
print(f"Обработка запроса #{i}")
handle_request(f"req-{i}")
time.sleep(1)

Добавляем зависимости:

txt

# requirements.txt
opentelemetry-api
opentelemetry-sdk
opentelemetry-exporter-jaeger

Обновляем Deployment для поддержки трассировки:

yaml

# deployment-with-tracing.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-agent-with-tracing
spec:
template:
spec:
containers:
- name: ai-agent
env:
- name: OTEL_EXPORTER_JAEGER_AGENT_HOST
value: "simplest-agent.observability"
- name: OTEL_EXPORTER_JAEGER_AGENT_PORT
value: "6831"
- name: OTEL_SERVICE_NAME
value: "ai-agent"

Что должно получиться:

✅ В Jaeger UI видно все запросы
✅ Можно кликнуть на span и увидеть детали
✅ Видно где запрос провёл больше всего времени
✅ Видно ошибки и проблемы

На что обратить внимание:

  • Трассировка добавляет overhead (накладные расходы)
  • Используйте sampling (выборочную трассировку) в продакшене
  • Настройте фильтры чтобы не трассировать health checks
  • Используйте baggage для передачи контекста между сервисами

💥 Шаг 7: Chaos Engineering для новичков

Цель шага: Намеренно "ломать" систему чтобы сделать её устойчивее.

Что такое Chaos Engineering:

Намеренное создание сбоев в контролируемых условиях. Как прививка: вводим ослабленный вирус, чтобы организм научился бороться.

Выбираем инструмент для новичка:

Для начала используем простой Python-скрипт вместо сложных систем вроде Chaos Mesh.

Создаём Chaos Monkey для ИИ-агентов:

python

# chaos_monkey.py
import requests
import random
import time
import json
from datetime import datetime
import schedule

class ChaosMonkey:
def __init__(self, kubeconfig_path=None):
self.attacks = []
self.safe_mode = False

def register_attack(self, attack_func, probability=0.1, name=None):
"""Регистрирует атаку"""
self.attacks.append({
'func': attack_func,
'prob': probability,
'name': name or attack_func.__name__
})

def run_random_attack(self):
"""Запускает случайную атаку"""
if self.safe_mode or not self.attacks:
return

# Выбираем атаку по вероятности
for attack in self.attacks:
if random.random() < attack['prob']:
print(f"🎭 Chaos Monkey запускает: {attack['name']}")
try:
attack['func']()
print(f"✅ Атака {attack['name']} завершена")
except Exception as e:
print(f"❌ Атака {attack['name']} провалилась: {e}")
break

def start(self, interval_seconds=60):
"""Запускает Chaos Monkey"""
print(f"🐵 Chaos Monkey запущен! Интервал: {interval_seconds}с")
print(f"📋 Зарегистрировано атак: {len(self.attacks)}")

schedule.every(interval_seconds).seconds.do(self.run_random_attack)

while True:
schedule.run_pending()
time.sleep(1)

# АТАКИ ДЛЯ ИИ-АГЕНТОВ
def attack_kill_random_pod():
"""Убивает случайный под"""
pods = ["ai-agent-1", "ai-agent-2", "ai-agent-3"]
target = random.choice(pods)
print(f"💀 Убиваю под: {target}")
# В реальности: kubectl delete pod {target}
# Для теста просто логируем
return True

def attack_simulate_high_latency():
"""Имитирует высокую задержку сети"""
duration = random.randint(10, 30)
print(f"🐌 Имитирую высокую задержку на {duration} секунд")
time.sleep(duration)
return True

def attack_cpu_stress():
"""Создаёт нагрузку на CPU"""
print("🔥 Нагружаю CPU...")
end_time = time.time() + random.randint(5, 15)
while time.time() < end_time:
# Вычисления для нагрузки
_ = [i**2 for i in range(10000)]
return True

def attack_memory_leak():
"""Имитирует утечку памяти"""
print("💦 Создаю утечку памяти...")
leak_size = random.randint(10, 50) * 1024 * 1024
# 10-50 MB
data = "x" * leak_size
time.sleep(random.randint(10, 20))
# data выходит из scope, память освобождается
return True

def attack_network_partition():
"""Имитирует разделение сети"""
print("🔌 Разделяю сеть...")
# Имитация: некоторые сервисы не могут общаться
partitions = [
("ai-agent-1", "ai-agent-2"),
("database", "ai-agent-3")
]
source, target = random.choice(partitions)
print(f"🚫 Блокирую связь {source} → {target}")
time.sleep(random.randint(15, 30))
print(f"✅ Восстанавливаю связь {source} → {target}")
return True

def attack_simulate_dependency_failure():
"""Имитирует отказ зависимости"""
dependencies = ["database", "redis", "external-api", "model-service"]
dep = random.choice(dependencies)
print(f"🚨 Имитирую отказ зависимости: {dep}")

# Разные типы отказов
failure_type = random.choice(["timeout", "error", "slow_response"])

if failure_type == "timeout":
print(f"⏱️ {dep}: таймаут 30 секунд")
time.sleep(30)
elif failure_type == "error":
print(f"❌ {dep}: возвращает ошибку 500")
else:
print(f"🐌 {dep}: отвечает 10 секунд")
time.sleep(10)

return True

# ЗАЩИТНЫЕ МЕХАНИЗМЫ
def enable_safe_mode():
"""Включает безопасный режим"""
print("🛡️ Включаю безопасный режим")
return True

def run_health_check():
"""Запускает проверку здоровья после атаки"""
print("🏥 Проверяю здоровье системы...")
# В реальности проверяем метрики
time.sleep(5)
print("✅ Система восстановилась")
return True

def create_chaos_report():
"""Создаёт отчёт о хаос-тесте"""
report = {
"timestamp": datetime.now().isoformat(),
"attacks_performed": random.randint(1, 10),
"system_recovery_time": random.uniform(10, 120),
"issues_found": [
"Память не освобождается после обработки больших изображений",
"Нет retry логики при ошибках внешнего API",
"Health checks недостаточно строгие"
],
"recommendations": [
"Добавить circuit breaker для внешних вызовов",
"Увеличить лимиты памяти для агентов",
"Настроить автомасштабирование",
"Добавить graceful degradation"
]
}

filename = f"chaos_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w') as f:
json.dump(report, f, indent=2)

print(f"📊 Отчёт сохранён: {filename}")
return filename

# ИГРА ДЛЯ ОБУЧЕНИЯ
def chaos_training_game():
"""Интерактивная игра для обучения Chaos Engineering"""

print("""
🎮 ИГРА: СТАНОВИМСЯ CHAOS ENGINEER

Ваша задача: сделать систему устойчивой к сбоям.
Вы будете запускать атаки и наблюдать как система справляется.

Правила:
1. Начинайте с малого (вероятность 0.1)
2. Следите за метриками
3. Фиксируйте проблемы
4. Улучшайте систему
5. Повышайте сложность

Готовы? (нажмите Enter)
""")
input()

monkey = ChaosMonkey()

# Регистрируем атаки с разной вероятностью
monkey.register_attack(attack_kill_random_pod, 0.3, "Убийство пода")
monkey.register_attack(attack_simulate_high_latency, 0.2, "Высокая задержка")
monkey.register_attack(attack_cpu_stress, 0.15, "Нагрузка на CPU")
monkey.register_attack(attack_memory_leak, 0.1, "Утечка памяти")
monkey.register_attack(attack_network_partition, 0.05, "Разделение сети")
monkey.register_attack(attack_simulate_dependency_failure, 0.2, "Отказ зависимости")

print("\n🎯 Выберите уровень сложности:")
print("1. Новичок (атаки каждые 120 секунд)")
print("2. Средний (атаки каждые 60 секунд)")
print("3. Эксперт (атаки каждые 30 секунд)")

choice = input("Ваш выбор (1-3): ")

intervals = {"1": 120, "2": 60, "3": 30}
interval = intervals.get(choice, 60)

print(f"\n⏱️ Интервал атак: {interval} секунд")
print("🚀 Запускаю Chaos Monkey...")
print("ℹ️ Нажмите Ctrl+C для остановки\n")

try:
# Запускаем на ограниченное время для демонстрации
end_time = time.time() + 300
# 5 минут

attack_count = 0
while time.time() < end_time:
monkey.run_random_attack()
attack_count += 1
time.sleep(interval)

print(f"\n🎉 Тренировка завершена!")
print(f"🎭 Выполнено атак: {attack_count}")

# Создаём отчёт
report_file = create_chaos_report()

print(f"""
📈 ЧТО ВЫ УЗНАЛИ:

1. Система {(random.choice(['отлично', 'хорошо', 'удовлетворительно']))} справляется со сбоями
2. Основные слабые места: внешние зависимости
3. Время восстановления: {random.randint(10, 60)} секунд

🛠️ ЧТО УЛУЧШИТЬ:

1. Добавить retry логику
2. Настроить health checks
3. Реализовать circuit breakers

📋 Отчёт сохранён в {report_file}
""")

except KeyboardInterrupt:
print("\n\n🛑 Тренировка прервана пользователем")
enable_safe_mode()

if __name__ == "__main__":
chaos_training_game()

Запускаем Chaos Monkey:

bash

# В безопасном режиме (только логирование)
python chaos_monkey.py

# В обучающем режиме (игра)
python chaos_monkey.py --game

# С параметрами
python chaos_monkey.py --interval 30 --probability 0.2

Создаём систему для автоматических chaos-тестов:

python

# chaos_test_suite.py
import unittest
import requests
import time

class ChaosTestSuite(unittest.TestCase):
BASE_URL = "http://localhost:8080"

def setUp(self):
"""Подготовка перед каждым тестом"""
self.start_time = time.time()

def tearDown(self):
"""Очистка после каждого теста"""
duration = time.time() - self.start_time
print(f"⏱️ Тест длился: {duration:.2f} секунд")

def test_survives_pod_deletion(self):
"""Тест на выживание при удалении пода"""
print("🎯 Тест 1: Удаление пода")

# 1. Получаем текущее состояние
initial_response = requests.get(f"{self.BASE_URL}/health")
self.assertEqual(initial_response.status_code, 200)

# 2. Имитируем удаление пода (в реальности через kubectl)
print("💀 Имитирую удаление пода...")
time.sleep(5)
# Время на "смерть" и восстановление

# 3. Проверяем что система восстановилась
final_response = requests.get(f"{self.BASE_URL}/health")
self.assertEqual(final_response.status_code, 200)

print("✅ Система пережила удаление пода!")

def test_handles_high_latency(self):
"""Тест на обработку высокой задержки"""
print("🎯 Тест 2: Высокая задержка")

# Имитируем медленный ответ
with requests_mock.mock() as m:
m.get(f"{self.BASE_URL}/predict", text='{"result": "slow"}', delay=10)

try:
response = requests.get(f"{self.BASE_URL}/predict", timeout=5)
self.fail("Должен был быть timeout")
except requests.exceptions.Timeout:
print("✅ Система корректно обработала таймаут")

def test_memory_pressure_recovery(self):
"""Тест на восстановление после нагрузки памятью"""
print("🎯 Тест 3: Нагрузка памятью")

# Отправляем "тяжёлый" запрос
heavy_payload = {"image": "x" * (10 * 1024 * 1024)}
# 10MB

try:
response = requests.post(
f"{self.BASE_URL}/process",
json=heavy_payload,
timeout=30
)

# Проверяем что память освободилась
memory_response = requests.get(f"{self.BASE_URL}/metrics")
self.assertIn("memory_usage", memory_response.text)

print("✅ Система восстановилась после нагрузки памятью")

except Exception as e:
print(f"⚠️ Тест пропущен: {e}")

def test_cascading_failure_prevention(self):
"""Тест на предотвращение каскадных отказов"""
print("🎯 Тест 4: Каскадные отказы")

# Имитируем отказ зависимости
for i in range(10):
try:
response = requests.get(
f"{self.BASE_URL}/dependency-test",
timeout=1
)
if response.status_code != 200:
print(f"⚠️ Запрос {i+1} упал, как и ожидалось")
except:
print(f"⚠️ Запрос {i+1} упал, как и ожидалось")

# После серии отказов система должна остаться работоспособной
health_response = requests.get(f"{self.BASE_URL}/health")
self.assertEqual(health_response.status_code, 200)

print("✅ Система предотвратила каскадные отказы!")

def run_chaos_tests():
"""Запускает все chaos-тесты"""
print("""
🧪 ЗАПУСК CHAOS-ТЕСТОВ
======================

Цель: проверить устойчивость системы к сбоям

Тесты:
1. Удаление пода
2. Высокая задержка
3. Нагрузка памятью
4. Каскадные отказы

⚠️ Внимание: Эти тесты могут нарушить работу системы!
""")

confirm = input("Продолжить? (yes/no): ")

if confirm.lower() != "yes":
print("🚫 Тесты отменены")
return

# Запускаем тесты
suite = unittest.TestLoader().loadTestsFromTestCase(ChaosTestSuite)
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)

# Создаём отчёт
print(f"""
📊 РЕЗУЛЬТАТЫ CHAOS-ТЕСТОВ:

Всего тестов: {result.testsRun}
Успешно: {result.testsRun - len(result.failures) - len(result.errors)}
Провалено: {len(result.failures)}
Ошибок: {len(result.errors)}

{'🎉 ВСЕ ТЕСТЫ ПРОЙДЕНЫ' if result.wasSuccessful() else '⚠️ ЕСТЬ ПРОБЛЕМЫ'}
""")

if __name__ == "__main__":
run_chaos_tests()

Что должно получиться:

✅ Система тестируется на устойчивость к сбоям
✅ Обнаруживаются слабые места
✅ Создаются отчёты для улучшения системы
✅ Разработчики понимают как система ведёт себя при сбоях

На что обратить внимание:

  • Начинайте с малого (вероятность 0.1)
  • Тестируйте в нерабочее время
  • Имейте план отката
  • Документируйте результаты
  • Улучшайте систему на основе находок

🎓 Итог: Что вы научились делать

✅ Вы освоили:

  1. Self-Healing принципы — health checks, перезапуски, восстановление
  2. Docker restart policies — автоматические перезапуски контейнеров
  3. Kubernetes самовосстановление — liveness/readiness probes, deployment strategies
  4. Мониторинг с Prometheus — сбор метрик, алерты
  5. Визуализация с Grafana — дашборды, отслеживание здоровья
  6. Трассировка с Jaeger — отслеживание запросов, поиск узких мест
  7. Chaos Engineering — тестирование устойчивости, улучшение системы

🚀 Где хранить конфигурации:

  1. Git репозиторий — для версионности
  2. ConfigMaps/Secrets в Kubernetes — для конфигураций
  3. Docker Registry — для образов с тегами версий
  4. S3/Cloud Storage — для логов и отчётов

🔧 Как управлять версионностью:

  1. Семантическое версионирование — v1.2.3 (MAJOR.MINOR.PATCH)
  2. Git теги — привязка к коммитам
  3. Docker теги — ai-agent:v1.2.3, ai-agent:latest
  4. Helm charts — для Kubernetes deployments

💻 На каком языке писать:

-2

🎯 Для новичка рекомендую:

  • Python для всего — у него богатая экосистема
  • Постепенно изучать Go для инфраструктуры
  • Использовать YAML/JSON для конфигураций

📚 Чеклист для продакшена:

✅ Обязательно:

  1. Health checks (liveness + readiness)
  2. Мониторинг метрик (Prometheus)
  3. Логирование структурированных логов
  4. Автоматические перезапуски
  5. Ограничения ресурсов (CPU/memory limits)

✅ Рекомендуется:

  1. Distributed tracing (Jaeger)
  2. Circuit breakers
  3. Rate limiting
  4. Автомасштабирование (HPA)
  5. Резервные копии

✅ Для продвинутых:

  1. Chaos Engineering регулярные тесты
  2. Canary deployments
  3. Blue-green deployments
  4. Service mesh (Istio/Linkerd)
  5. Мультирегиональность

🚨 Чего избегать:

  1. ❌ Слишком частые health checks — нагрузка на систему
  2. ❌ Слишком долгие таймауты — медленное обнаружение проблем
  3. ❌ Отсутствие graceful shutdown — потеря данных при остановке
  4. ❌ Жёсткие зависимости — каскадные отказы
  5. ❌ Отсутствие плана отката — нельзя откатиться при проблемах

🎮 Практическое задание:

Создайте самовосстанавливающуюся систему для ИИ-агента анализа текста:

  1. Агент принимает текст, возвращает сентимент
  2. Deployment с 3 репликами, health checks
  3. Service для балансировки нагрузки
  4. Prometheus для метрик (запросы/сек, ошибки, latency)
  5. Grafana дашборд
  6. Chaos tests (убийство пода, нагрузка памятью, сетевые проблемы)
  7. Отчёт о слабых местах и улучшениях

Критерии успеха:

  • Система выдерживает удаление пода
  • Метрики показывают здоровье
  • Дашборд отображает состояние в реальном времени
  • Chaos tests не ломают систему полностью

📞 Если застряли:

  1. Проверьте логи: kubectl logs <pod-name>
  2. Проверьте события: kubectl get events
  3. Опишите под: kubectl describe pod <pod-name>
  4. Проверьте метрики: kubectl top pods
  5. Сбросьте Minikube: minikube delete && minikube start

Помните: Самовосстанавливающиеся системы — это процесс, а не состояние. Начинайте с малого, улучшайте постепенно, учитесь на ошибках!

Удачи в создании неубиваемых ИИ-агентов! 🚀