🎯 Введение: Что такое ИИ-агент и почему RabbitMQ?
ИИ-агент — это программа, которая:
- Получает информацию (например, текст, картинку, данные)
- "Думает" (обрабатывает с помощью ИИ-моделей)
- Действует (отвечает, сохраняет, передаёт дальше)
RabbitMQ — это "почтовая служба" для программ. Он гарантирует, что сообщения дойдут до нужного агента и не потеряются.
🚀 Шаг 1: Подготовка рабочего окружения
Цель шага: Создать минимальную рабочую среду для экспериментов.
Что нужно сделать:
- Установите Docker — это программа, которая позволяет запускать другие программы в "контейнерах" (как виртуальные машины, но легче)
Скачайте Docker Desktop Установите, запустите - Запустите RabbitMQ через Docker:bashdocker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
- Проверьте установку:
Откройте в браузере: http://localhost:15672 Логин: guest
Пароль: guest
Вы должны увидеть панель управления RabbitMQ - Установите Python библиотеку:bashpip install pika
Что должно получиться:
✅ RabbitMQ работает на вашем компьютере
✅ Вы можете зайти в веб-интерфейс
✅ Установлена библиотека для работы из Python
На что обратить внимание:
- Если порт 5672 или 15672 занят, Docker выдаст ошибку
- Убедитесь, что Docker запущен (иконка в трее)
- RabbitMQ может запускаться 20-30 секунд
🧪 Шаг 2: Ваш первый агент-получатель (Consumer)
Цель шага: Написать агента, который будет "слушать" сообщения.
Что такое агент-получатель:
Это программа, которая постоянно работает и ждёт сообщений. Как почтальон, сидящий у почтового ящика и проверяющий, не пришла ли почта.
python
# consumer_simple.py
import pika
import time
# Термины:
# pika - библиотека для работы с RabbitMQ на Python
# connection - соединение с RabbitMQ (как телефонная линия)
# channel - канал внутри соединения (как отдельная линия на многоканальном телефоне)
print("🐇 Подключаемся к RabbitMQ...")
# 1. Подключаемся к RabbitMQ
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost') # Адрес RabbitMQ
)
channel = connection.channel()
# 2. Создаём очередь (если её нет)
# Очередь - это "почтовый ящик" с названием
channel.queue_declare(queue='hello_queue')
print("✅ Подключено! Ждём сообщения...")
print("ℹ️ Чтобы отправить сообщение, запустите producer_simple.py")
print("ℹ️ Для выхода нажмите Ctrl+C\n")
# 3. Функция, которая будет вызвана при получении сообщения
def callback(ch, method, properties, body):
"""Обработчик входящих сообщений"""
message = body.decode('utf-8')
print(f"📨 Получено сообщение: {message}")
# Имитируем "размышления" агента
print(f"🤔 Агент думает над сообщением '{message}'...")
time.sleep(2) # Ждём 2 секунды
# Простой "ИИ" - отвечаем на сообщение
response = f"Я понял ваше сообщение: '{message}'. Спасибо!"
print(f"🗣️ Агент отвечает: {response}")
print("-" * 50)
# 4. Настраиваем получение сообщений
# 'hello_queue' - из какой очереди брать сообщения
# callback - какая функция будет обрабатывать сообщения
# auto_ack=True - автоматически подтверждаем получение
channel.basic_consume(
queue='hello_queue',
on_message_callback=callback,
auto_ack=True
)
# 5. Начинаем слушать сообщения (бесконечный цикл)
channel.start_consuming()
Как запустить и проверить:
bash
python consumer_simple.py
Что должно получиться:
✅ Программа запустилась и пишет "Ждём сообщения..."
✅ Программа не завершается - она ждёт сообщений
✅ Можно нажать Ctrl+C для выхода
На что обратить внимание:
- Программа будет "висеть" в терминале - это нормально!
- Она ждёт сообщений как веб-сервер ждёт запросов
- Если соединение разорвётся, программа завершится с ошибкой
📤 Шаг 3: Ваш первый агент-отправитель (Producer)
Цель шага: Написать агента, который отправляет сообщения.
Что такое агент-отправитель:
Это программа, которая отправляет сообщение и завершает работу. Как человек, который бросил письмо в почтовый ящик и пошёл дальше.
python
# producer_simple.py
import pika
print("🐇 Подключаемся к RabbitMQ для отправки сообщения...")
# 1. Подключаемся (так же, как в получателе)
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# 2. Создаём очередь (такую же, как в получателе)
channel.queue_declare(queue='hello_queue')
# 3. Подготовим сообщение
message = "Привет от первого ИИ-агента!"
# 4. Отправляем сообщение в очередь
# exchange='' - используем стандартный обменник
# routing_key='hello_queue' - имя очереди
# body=message - само сообщение
channel.basic_publish(
exchange='',
routing_key='hello_queue',
body=message
)
print(f"✅ Отправлено сообщение: '{message}'")
print("ℹ️ Проверьте программу consumer_simple.py - там должно появиться сообщение")
# 5. Закрываем соединение
connection.close()
Как запустить и проверить:
- В первом терминале запустите получателя:bashpython consumer_simple.py
- Во втором терминале запустите отправителя:bashpython producer_simple.py
Что должно получиться:
✅ В терминале с получателем появится сообщение:
text
📨 Получено сообщение: Привет от первого ИИ-агента!
🤔 Агент думает над сообщением 'Привет от первого ИИ-агента'...
🗣️ Агент отвечает: Я понял ваше сообщение: 'Привет от первого ИИ-агента!'. Спасибо!
✅ В терминале с отправителем: сообщение об успешной отправке
На что обратить внимание:
- Сообщения сохраняются в RabbitMQ, даже если получатель отключён
- Можно запустить отправителя несколько раз - получатель обработает все сообщения
- Можно запустить несколько получателей - они будут работать параллельно
🔄 Шаг 4: Работа с несколькими агентами
Цель шага: Понять, как несколько агентов могут работать вместе.
Что будем делать:
Создадим систему из:
- Главного агента - отправляет задачи
- Рабочих агентов - обрабатывают задачи
- Менеджера задач - распределяет задачи между рабочими
python
# task_manager.py - Главный агент, создающий задачи
import pika
import json
import random
print("👑 Главный агент запущен")
print("📋 Создаю задачи для рабочих агентов...")
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Создаём очередь для задач
channel.queue_declare(queue='task_queue', durable=True) # durable=True - очередь сохранится при перезапуске
# Список задач для ИИ-агентов
tasks = [
"Проанализировать сентимент текста: 'Я люблю программировать!'",
"Распознать объекты на изображении с котиком",
"Сгенерировать ответ на вопрос: 'Что такое ИИ-агент?'",
"Предсказать следующее число в последовательности: 2, 4, 6, 8, ...",
"Классифицировать эмоцию: 'Я так рад, что у меня получилось!'"
]
# Отправляем каждую задачу
for i, task in enumerate(tasks, 1):
# Создаём структурированное сообщение
message = {
'task_id': i,
'task': task,
'priority': random.choice(['low', 'medium', 'high'])
}
# Преобразуем в JSON (RabbitMQ работает с байтами)
body = json.dumps(message, ensure_ascii=False)
# Отправляем задачу
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=body.encode('utf-8'),
properties=pika.BasicProperties(
delivery_mode=2, # Сохранять на диск
)
)
print(f"📤 Отправлена задача #{i}: {task[:50]}...")
connection.close()
print(f"\n✅ Отправлено {len(tasks)} задач")
print("🚀 Запустите несколько worker_ai.py в разных терминалах")
python
# worker_ai.py - Рабочий ИИ-агент
import pika
import json
import time
import random
print("🤖 ИИ-агент запущен и готов к работе!")
print("⏳ Ожидаю задачи...\n")
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем очередь (такая же, как у менеджера)
channel.queue_declare(queue='task_queue', durable=True)
# Настраиваем политику: не давать новую задачу, пока не обработана текущая
channel.basic_qos(prefetch_count=1)
def process_task(task_id, task_description, priority):
"""Функция обработки задачи (имитация работы ИИ)"""
print(f"🔍 Анализирую задачу #{task_id}")
print(f"📝 Описание: {task_description}")
print(f"🎯 Приоритет: {priority}")
# Имитация "мышления" ИИ
thinking_time = random.uniform(1, 3)
print(f"🤔 Думаю... ({thinking_time:.1f} секунд)")
time.sleep(thinking_time)
# Простые "ИИ-ответы" в зависимости от типа задачи
if "сентимент" in task_description.lower():
result = "Сентимент: ПОЛОЖИТЕЛЬНЫЙ 😊"
elif "распознать" in task_description.lower():
result = "Обнаружены: кот, удивление, милота 🐱"
elif "сгенерировать" in task_description.lower():
result = "ИИ-агент — это автономная программа, которая воспринимает окружение и действует для достижения целей."
elif "предсказать" in task_description.lower():
result = "Следующее число: 10"
elif "классифицировать" in task_description.lower():
result = "Эмоция: РАДОСТЬ 🎉"
else:
result = "Задача обработана успешно!"
print(f"✅ Результат: {result}")
print("-" * 50)
return result
def callback(ch, method, properties, body):
"""Обработчик входящих задач"""
# Декодируем сообщение из JSON
message = json.loads(body.decode('utf-8'))
# Обрабатываем задачу
result = process_task(
message['task_id'],
message['task'],
message['priority']
)
# Сохраняем результат (в реальной системе - в базу данных)
with open('results.txt', 'a', encoding='utf-8') as f:
f.write(f"Задача #{message['task_id']}: {result}\n")
# Подтверждаем выполнение задачи
ch.basic_ack(delivery_tag=method.delivery_tag)
# Подписываемся на очередь задач
channel.basic_consume(
queue='task_queue',
on_message_callback=callback
)
print("🎧 Агент слушает очередь задач...")
print("💡 Запустите task_manager.py для создания задач")
print("📊 Можно запустить несколько копий этого файла для параллельной работы")
print("⏹️ Для выхода нажмите Ctrl+C\n")
try:
channel.start_consuming()
except KeyboardInterrupt:
print("\n👋 ИИ-агент завершает работу")
connection.close()
Как запустить и проверить:
- Запустите 2-3 рабочих агента (в разных терминалах):bashpython worker_ai.py
- Запустите менеджера задач:bashpython task_manager.py
Что должно получиться:
В терминале менеджера:
text
👑 Главный агент запущен
📋 Создаю задачи для рабочих агентов...
📤 Отправлена задача #1: Проанализировать сентимент текста: 'Я люблю пр...
📤 Отправлена задача #2: Распознать объекты на изображении с котиком...
...
✅ Отправлено 5 задач
В терминалах рабочих агентов (пример для одного):
text
🤖 ИИ-агент запущен и готов к работе!
⏳ Ожидаю задачи...
🎧 Агент слушает очередь задач...
💡 Запустите task_manager.py для создания задач
📊 Можно запустить несколько копий этого файла для параллельной работы
⏹️ Для выхода нажмите Ctrl+C
🔍 Анализирую задачу #3
📝 Описание: Сгенерировать ответ на вопрос: 'Что такое ИИ-агент?'
🎯 Приоритет: medium
🤔 Думаю... (2.3 секунд)
✅ Результат: ИИ-агент — это автономная программа, которая воспринимает...
--------------------------------------------------
На что обратить внимание:
- Задачи распределяются между всеми запущенными рабочими агентами
- Каждая задача обрабатывается только одним агентом
- Можно добавлять новых рабочих "на лету"
- Результаты сохраняются в файл results.txt
📊 Шаг 5: Мониторинг и отладка
Цель шага: Научиться отслеживать работу агентов и находить проблемы.
Что будем использовать:
- Логирование в файлы
- Проверка состояния очередей
python
# monitor.py - Простой монитор для отслеживания очередей
import pika
import json
from datetime import datetime
def get_queue_info():
"""Получает информацию об очередях"""
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
try:
# Получаем информацию об очереди
queue_info = channel.queue_declare(
queue='task_queue',
durable=True,
passive=True # Только получить информацию, не создавать
)
message_count = queue_info.method.message_count
consumer_count = queue_info.method.consumer_count
print(f"\n📊 [{datetime.now().strftime('%H:%M:%S')}] Статус системы:")
print(f" 📨 Очередь 'task_queue':")
print(f" Сообщений в очереди: {message_count}")
print(f" Подключенных агентов: {consumer_count}")
if message_count > 0:
print(f" ⚠️ Есть необработанные задачи!")
elif consumer_count == 0:
print(f" ⚠️ Нет активных агентов!")
else:
print(f" ✅ Все задачи обработаны")
except Exception as e:
print(f"❌ Ошибка: {e}")
finally:
connection.close()
def check_agent_health(agent_id):
"""Проверяет здоровье конкретного агента"""
# В реальной системе здесь была бы проверка:
# - Ответа на ping
# - Загрузки памяти/CPU
# - Времени последней активности
print(f"\n🏥 Проверка здоровья агента #{agent_id}:")
# Имитация проверки
checks = {
"Соединение с RabbitMQ": "✅ OK",
"Доступность моделей ИИ": "✅ OK",
"Память": "⚠️ 85% (высокая загрузка)",
"Последняя активность": "✅ 2 минуты назад"
}
for check, status in checks.items():
print(f" {check}: {status}")
return all("✅" in status for status in checks.values())
if __name__ == "__main__":
print("🔍 Монитор системы ИИ-агентов")
print("=" * 40)
# Проверяем очередь
get_queue_info()
# Проверяем здоровье "агентов" (имитация)
healthy = True
for i in range(1, 4): # Проверяем 3 агента
if not check_agent_health(i):
healthy = False
print("\n" + "=" * 40)
if healthy:
print("🎉 Все системы работают нормально!")
else:
print("⚠️ Обнаружены проблемы в системе!")
print("\n💡 Советы по мониторингу:")
print("1. Регулярно проверяйте веб-интерфейс RabbitMQ")
print("2. Настройте алерты при росте очереди")
print("3. Логируйте все ошибки агентов")
print("4. Следите за потреблением ресурсов")
Что можно увидеть в веб-интерфейсе RabbitMQ:
- Queues → task_queue:
Ready: Сообщения, ожидающие обработки
Unacked: Сообщения, взятые агентами, но не подтверждённые
Total: Всего сообщений
Consumers: Количество подключённых агентов - Channels: Активные соединения
- Connections: Подключённые клиенты
- Exchanges: Обменники (пока у нас только стандартный)
Практическое задание:
- Запустите 2 рабочих агента
- Запустите менеджера задач
- Откройте веб-интерфейс RabbitMQ
- Наблюдайте, как уменьшается очередь
- Остановите одного агента (Ctrl+C)
- Посмотрите, как перераспределятся задачи
🚨 Шаг 6: Обработка ошибок и восстановление
Цель шага: Сделать систему устойчивой к сбоям.
Основные проблемы и решения:
Проблема 1: Агент упал во время обработки задачи
Решение: Не подтверждать задачу, пока она не обработана полностью
python
# worker_reliable.py - Надёжный агент
import pika
import json
import time
import random
import signal
import sys
class ReliableAIAgent:
def __init__(self, agent_id):
self.agent_id = agent_id
self.connection = None
self.channel = None
self.running = True
# Обработка Ctrl+C для graceful shutdown
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, signum, frame):
"""Обработчик сигналов завершения"""
print(f"\n🛑 Агент #{self.agent_id} получает сигнал завершения...")
self.running = False
if self.connection and not self.connection.is_closed:
print("🔌 Закрываю соединение...")
self.connection.close()
sys.exit(0)
def connect(self):
"""Установка соединения с переподключением"""
while self.running:
try:
print(f"🔗 Агент #{self.agent_id} подключается к RabbitMQ...")
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
'localhost',
heartbeat=600, # Таймаут соединения
blocked_connection_timeout=300
)
)
self.channel = self.connection.channel()
self.channel.queue_declare(queue='task_queue', durable=True)
self.channel.basic_qos(prefetch_count=1)
print(f"✅ Агент #{self.agent_id} успешно подключён!")
return True
except Exception as e:
print(f"❌ Ошибка подключения: {e}")
print("🔄 Повторная попытка через 5 секунд...")
time.sleep(5)
return False
def process_task_safely(self, task_id, task_data):
"""Безопасная обработка задачи с обработкой ошибок"""
try:
print(f"\n🔍 Агент #{self.agent_id} начинает задачу #{task_id}")
# Имитация возможных ошибок
if random.random() < 0.1: # 10% шанс на ошибку
raise ValueError("Имитация случайной ошибки в ИИ-модели")
# Основная обработка
result = self.simulate_ai_processing(task_data)
print(f"✅ Задача #{task_id} успешно обработана")
print(f"📊 Результат: {result[:50]}...")
return True, result
except Exception as e:
print(f"❌ Ошибка при обработке задачи #{task_id}: {e}")
print("💡 Задача будет возвращена в очередь для повторной обработки")
return False, str(e)
def simulate_ai_processing(self, task_data):
"""Имитация работы ИИ"""
time.sleep(random.uniform(0.5, 2))
# Простая "ИИ-логика"
task_lower = task_data['task'].lower()
if 'сентимент' in task_lower:
return {'sentiment': 'positive', 'confidence': 0.87}
elif 'распознать' in task_lower:
return {'objects': ['cat', 'sofa', 'window'], 'count': 3}
elif 'сгенерировать' in task_lower:
return {'answer': 'ИИ-агент — это автономная система...'}
else:
return {'status': 'processed', 'task': task_data['task'][:30]}
def callback(self, ch, method, properties, body):
"""Обработчик сообщений с подтверждением"""
try:
# Декодируем задачу
task_data = json.loads(body.decode('utf-8'))
task_id = task_data['task_id']
# Обрабатываем
success, result = self.process_task_safely(task_id, task_data)
if success:
# Сохраняем результат
self.save_result(task_id, result)
# Подтверждаем успешную обработку
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"📨 Задача #{task_id} подтверждена")
else:
# Отказываемся от задачи (вернётся в очередь)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
print(f"🔄 Задача #{task_id} возвращена в очередь")
except json.JSONDecodeError:
print("❌ Неверный формат сообщения")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
print(f"❌ Непредвиденная ошибка: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def save_result(self, task_id, result):
"""Сохранение результата с проверкой записи"""
try:
with open(f'agent_{self.agent_id}_results.json', 'a', encoding='utf-8') as f:
json.dump({
'task_id': task_id,
'agent_id': self.agent_id,
'timestamp': time.time(),
'result': result
}, f, ensure_ascii=False)
f.write('\n')
except Exception as e:
print(f"⚠️ Не удалось сохранить результат: {e}")
def run(self):
"""Основной цикл работы агента"""
if not self.connect():
return
print(f"\n🤖 Агент #{self.agent_id} начинает работу")
print("=" * 50)
# Подписываемся на очередь
self.channel.basic_consume(
queue='task_queue',
on_message_callback=self.callback
)
try:
while self.running:
# Обрабатываем сообщения
self.connection.process_data_events(time_limit=1)
# Периодическая проверка здоровья
if time.time() % 10 < 0.1: # Каждые ~10 секунд
print(f"❤️ Агент #{self.agent_id} жив и работает...")
except Exception as e:
print(f"❌ Критическая ошибка: {e}")
finally:
if self.connection and not self.connection.is_closed:
self.connection.close()
if __name__ == "__main__":
# Запускаем агента с ID
import sys
agent_id = sys.argv[1] if len(sys.argv) > 1 else "1"
agent = ReliableAIAgent(agent_id)
agent.run()
Как тестировать надёжность:
- Запустите агента:bashpython worker_reliable.py 1
- Отправьте задачи:bashpython task_manager.py
- Имитируйте сбои:
Нажмите Ctrl+C во время обработки задачи
Остановите RabbitMQ (docker stop rabbitmq)
Удалите файл результатов во время записи
Что должно получиться:
✅ Задачи не теряются при сбоях
✅ Агент переподключается при разрыве соединения
✅ Ошибки логируются и обрабатываются
✅ Результаты сохраняются атомарно
Ключевые особенности надёжного агента:
- Graceful shutdown - корректное завершение по сигналам
- Reconnection logic - повторное подключение при сбоях
- Ack/Nack - подтверждение/отказ от задач
- Error boundaries - обработка ошибок на всех уровнях
- Heartbeats - поддержание соединения
- Result persistence - сохранение результатов с проверкой
📈 Шаг 7: Анализ производительности и оптимизация
Цель шага: Научиться измерять и улучшать производительность агентов.
python
# benchmark.py - Измерение производительности
import pika
import json
import time
import statistics
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
class PerformanceMonitor:
def __init__(self):
self.metrics = {
'tasks_processed': 0,
'avg_processing_time': 0,
'errors': 0,
'start_time': time.time()
}
self.processing_times = []
def send_test_tasks(self, count=100):
"""Отправка тестовых задач"""
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='benchmark_queue', durable=True)
print(f"📤 Отправка {count} тестовых задач...")
for i in range(count):
task = {
'task_id': i,
'type': 'benchmark',
'data': f"Тестовая задача #{i}",
'timestamp': time.time(),
'complexity': 'simple' # или 'medium', 'complex'
}
channel.basic_publish(
exchange='',
routing_key='benchmark_queue',
body=json.dumps(task).encode('utf-8'),
properties=pika.BasicProperties(delivery_mode=2)
)
connection.close()
print(f"✅ Отправлено {count} задач")
def benchmark_worker(self, worker_id, task_count):
"""Рабочий для бенчмарка"""
results = {
'worker_id': worker_id,
'tasks_processed': 0,
'total_time': 0,
'errors': 0
}
try:
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='benchmark_queue', durable=True)
channel.basic_qos(prefetch_count=1)
start_time = time.time()
def callback(ch, method, properties, body):
task_start = time.time()
try:
task = json.loads(body.decode('utf-8'))
# Имитация обработки
processing_time = 0.1 # 100 мс
time.sleep(processing_time)
results['tasks_processed'] += 1
self.processing_times.append(time.time() - task_start)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
results['errors'] += 1
ch.basic_nack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue='benchmark_queue',
on_message_callback=callback
)
# Обрабатываем задачи, пока не достигнем нужного количества
while results['tasks_processed'] < task_count:
connection.process_data_events(time_limit=0.1)
results['total_time'] = time.time() - start_time
connection.close()
except Exception as e:
print(f"❌ Ошибка в worker {worker_id}: {e}")
return results
def run_benchmark(self, worker_count=3, tasks_per_worker=50):
"""Запуск бенчмарка"""
print(f"\n🚀 Запуск бенчмарка:")
print(f" 👥 Количество воркеров: {worker_count}")
print(f" 📊 Задач на воркера: {tasks_per_worker}")
print(f" 📈 Всего задач: {worker_count * tasks_per_worker}")
# Очищаем очередь
self.clear_queue()
# Отправляем задачи
total_tasks = worker_count * tasks_per_worker
self.send_test_tasks(total_tasks)
print("\n⏳ Запускаем воркеров...")
# Запускаем воркеров в потоках
with ThreadPoolExecutor(max_workers=worker_count) as executor:
futures = []
for i in range(worker_count):
future = executor.submit(self.benchmark_worker, i, tasks_per_worker)
futures.append(future)
# Собираем результаты
all_results = []
for future in futures:
all_results.append(future.result())
# Анализируем результаты
self.analyze_results(all_results)
def clear_queue(self):
"""Очистка очереди"""
try:
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_delete(queue='benchmark_queue')
connection.close()
print("🧹 Очередь очищена")
except:
pass
def analyze_results(self, results):
"""Анализ результатов бенчмарка"""
print("\n" + "="*50)
print("📊 РЕЗУЛЬТАТЫ БЕНЧМАРКА")
print("="*50)
total_tasks = sum(r['tasks_processed'] for r in results)
total_errors = sum(r['errors'] for r in results)
total_time = max(r['total_time'] for r in results) # Время самого медленного
if self.processing_times:
avg_time = statistics.mean(self.processing_times)
min_time = min(self.processing_times)
max_time = max(self.processing_times)
std_dev = statistics.stdev(self.processing_times) if len(self.processing_times) > 1 else 0
print(f"\n📈 Общая статистика:")
print(f" ✅ Обработано задач: {total_tasks}")
print(f" ❌ Ошибок: {total_errors}")
print(f" ⏱️ Общее время: {total_time:.2f} сек")
if total_tasks > 0:
print(f" 🚀 Скорость: {total_tasks/total_time:.2f} задач/сек")
print(f" 📉 Время на задачу: {(total_time/total_tasks)*1000:.1f} мс")
print(f"\n📊 Детальная статистика обработки:")
print(f" 📊 Среднее время: {avg_time*1000:.1f} мс")
print(f" ⬇️ Минимальное: {min_time*1000:.1f} мс")
print(f" ⬆️ Максимальное: {max_time*1000:.1f} мс")
print(f" 📐 Стандартное отклонение: {std_dev*1000:.1f} мс")
print(f"\n👥 Статистика по воркерам:")
for result in results:
if result['tasks_processed'] > 0:
speed = result['tasks_processed'] / result['total_time']
print(f" Воркер {result['worker_id']}: {result['tasks_processed']} задач, "
f"{speed:.2f} задач/сек, {result['errors']} ошибок")
# Рекомендации по оптимизации
print(f"\n💡 РЕКОМЕНДАЦИИ:")
if avg_time > 0.5:
print(" ⚠️ Высокое время обработки:")
print(" - Оптимизируйте ИИ-модели")
print(" - Используйте кэширование")
print(" - Рассмотрите GPU ускорение")
if std_dev > avg_time * 0.5:
print(" ⚠️ Высокий разброс времени:")
print(" - Стандартизируйте входные данные")
print(" - Добавьте таймауты на обработку")
if total_errors > total_tasks * 0.05: # >5% ошибок
print(" ⚠️ Много ошибок:")
print(" - Улучшите обработку исключений")
print(" - Добавьте retry логику")
print(" - Валидируйте входные данные")
if total_time < 10 and worker_count > 5:
print(" 💡 Можно увеличить prefetch_count для уменьшения накладных расходов")
print("\n🎯 Следующие шаги:")
print(" 1. Запустите бенчмарк с разным количеством воркеров")
print(" 2. Протестируйте с задачами разной сложности")
print(" 3. Измерьте потребление памяти")
print(" 4. Протестируйте при высокой нагрузке")
if __name__ == "__main__":
monitor = PerformanceMonitor()
print("🎯 БЕНЧМАРК СИСТЕМЫ ИИ-АГЕНТОВ")
print("="*50)
# Тест 1: Базовый тест
print("\n🧪 ТЕСТ 1: Базовый тест (3 воркера, 50 задач каждый)")
monitor.run_benchmark(worker_count=3, tasks_per_worker=50)
# Тест 2: Тест под нагрузкой
print("\n" + "="*50)
print("🧪 ТЕСТ 2: Тест под нагрузкой (5 воркеров, 100 задач каждый)")
monitor.run_benchmark(worker_count=5, tasks_per_worker=100)
Как проводить оптимизацию:
- Измерьте базовую производительность:bashpython benchmark.py
- Экспериментируйте с параметрами:
Количество воркеров
Размер prefetch_count
Параметры соединения - Оптимизируйте код:
Кэшируйте модели ИИ
Используйте батчинговую обработку
Оптимизируйте сериализацию
Метрики для отслеживания:
- Throughput - задачи/секунду
- Latency - время обработки задачи
- Error rate - процент ошибок
- Resource usage - память, CPU
- Queue length - размер очереди
🎓 Итог: Что вы узнали и что делать дальше
✅ Вы освоили:
- Основы RabbitMQ - очереди, обменники, routing keys
- Архитектуру агентов - producer/consumer, workers
- Надёжность - переподключение, подтверждения, обработка ошибок
- Мониторинг - веб-интерфейс, метрики, логирование
- Оптимизацию - бенчмаркинг, настройка параметров
🚀 Следующие шаги:
- Реальные ИИ-модели:python# Замените имитацию на реальные модели
from transformers import pipeline
sentiment_analyzer = pipeline('sentiment-analysis')
result = sentiment_analyzer("I love programming!") - Сложные архитектуры:
Fanout exchanges для широковещательных сообщений
Topic exchanges для маршрутизации по паттернам
Headers exchanges для сложной фильтрации - Масштабирование:
Docker Compose для оркестрации
Kubernetes для управления кластером
Load balancers для распределения нагрузки - Продвинутые функции:
DLX (Dead Letter Exchanges) для обработки неудачных сообщений
TTL (Time-To-Live) для автоматического удаления старых сообщений
Priority queues для приоритетной обработки
📚 Ресурсы для дальнейшего изучения:
💡 Идеи для проектов:
- Чат-бот с ИИ - обработка сообщений через очередь
- Система классификации изображений - распределённая обработка
- Анализ логов в реальном времени - потоковая обработка
- Оркестратор ML pipelines - управление цепочками задач
🎯 Помните:
- Начинайте просто - одна очередь, два агента
- Тестируйте сбои - останавливайте RabbitMQ, убивайте агенты
- Измеряйте всё - метрики помогут найти узкие места
- Документируйте - схемы архитектуры, API, deployment
Удачи в создании ваших ИИ-агентов! 🚀
❓ Часто задаваемые вопросы:
Q: Что делать, если RabbitMQ не запускается?
A: Проверьте, что порты 5672 и 15672 свободны. Попробуйте другой порт:
bash
docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq:management
Q: Как очистить все очереди?
A: В веб-интерфейсе: Queues → Delete. Или перезапустите RabbitMQ:
bash
docker restart rabbitmq
Q: Сколько агентов можно запустить?
A: Технически - сотни. Но начинайте с 2-3, следите за потреблением ресурсов.
Q: Как сохранять сообщения на диск?
A: При объявлении очереди: durable=True, при отправке: delivery_mode=2.
Q: Что делать, если агент "завис"?
A: Добавьте таймауты в обработку и мониторинг heartbeats.