Найти в Дзене
Кирилл Ледовский

📚 Пошаговое руководство для начинающего разработчика ИИ-агентов с RabbitMQ

ИИ-агент — это программа, которая: RabbitMQ — это "почтовая служба" для программ. Он гарантирует, что сообщения дойдут до нужного агента и не потеряются. ✅ RabbitMQ работает на вашем компьютере
✅ Вы можете зайти в веб-интерфейс
✅ Установлена библиотека для работы из Python Это программа, которая постоянно работает и ждёт сообщений. Как почтальон, сидящий у почтового ящика и проверяющий, не пришла ли почта. python # consumer_simple.py
import pika
import time
# Термины:
# pika - библиотека для работы с RabbitMQ на Python
# connection - соединение с RabbitMQ (как телефонная линия)
# channel - канал внутри соединения (как отдельная линия на многоканальном телефоне)
print("🐇 Подключаемся к RabbitMQ...")
# 1. Подключаемся к RabbitMQ
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost') # Адрес RabbitMQ
)
channel = connection.channel()
# 2. Создаём очередь (если её нет)
# Очередь - это "почтовый ящик" с названием
channel.queue_declare(queue='hello_queue')
pri
Оглавление

🎯 Введение: Что такое ИИ-агент и почему RabbitMQ?

ИИ-агент — это программа, которая:

  • Получает информацию (например, текст, картинку, данные)
  • "Думает" (обрабатывает с помощью ИИ-моделей)
  • Действует (отвечает, сохраняет, передаёт дальше)

RabbitMQ — это "почтовая служба" для программ. Он гарантирует, что сообщения дойдут до нужного агента и не потеряются.

🚀 Шаг 1: Подготовка рабочего окружения

Цель шага: Создать минимальную рабочую среду для экспериментов.

Что нужно сделать:

  1. Установите Docker — это программа, которая позволяет запускать другие программы в "контейнерах" (как виртуальные машины, но легче)
    Скачайте Docker Desktop Установите, запустите
  2. Запустите RabbitMQ через Docker:bashdocker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
  3. Проверьте установку:
    Откройте в браузере:
    http://localhost:15672 Логин: guest
    Пароль: guest
    Вы должны увидеть панель управления RabbitMQ
  4. Установите Python библиотеку:bashpip install pika

Что должно получиться:

✅ RabbitMQ работает на вашем компьютере
✅ Вы можете зайти в веб-интерфейс
✅ Установлена библиотека для работы из Python

На что обратить внимание:

  • Если порт 5672 или 15672 занят, Docker выдаст ошибку
  • Убедитесь, что Docker запущен (иконка в трее)
  • RabbitMQ может запускаться 20-30 секунд

🧪 Шаг 2: Ваш первый агент-получатель (Consumer)

Цель шага: Написать агента, который будет "слушать" сообщения.

Что такое агент-получатель:

Это программа, которая постоянно работает и ждёт сообщений. Как почтальон, сидящий у почтового ящика и проверяющий, не пришла ли почта.

python

# consumer_simple.py
import pika
import time

# Термины:
# pika - библиотека для работы с RabbitMQ на Python
# connection - соединение с RabbitMQ (как телефонная линия)
# channel - канал внутри соединения (как отдельная линия на многоканальном телефоне)

print("🐇 Подключаемся к RabbitMQ...")

# 1. Подключаемся к RabbitMQ
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
# Адрес RabbitMQ
)
channel = connection.channel()

# 2. Создаём очередь (если её нет)
# Очередь - это "почтовый ящик" с названием
channel.queue_declare(queue='hello_queue')

print("✅ Подключено! Ждём сообщения...")
print("ℹ️ Чтобы отправить сообщение, запустите producer_simple.py")
print("ℹ️ Для выхода нажмите Ctrl+C\n")

# 3. Функция, которая будет вызвана при получении сообщения
def callback(ch, method, properties, body):
"""Обработчик входящих сообщений"""
message = body.decode('utf-8')
print(f"📨 Получено сообщение: {message}")

# Имитируем "размышления" агента
print(f"🤔 Агент думает над сообщением '{message}'...")
time.sleep(2)
# Ждём 2 секунды

# Простой "ИИ" - отвечаем на сообщение
response = f"Я понял ваше сообщение: '{message}'. Спасибо!"
print(f"🗣️ Агент отвечает: {response}")
print("-" * 50)

# 4. Настраиваем получение сообщений
# 'hello_queue' - из какой очереди брать сообщения
# callback - какая функция будет обрабатывать сообщения
# auto_ack=True - автоматически подтверждаем получение
channel.basic_consume(
queue='hello_queue',
on_message_callback=callback,
auto_ack=True
)

# 5. Начинаем слушать сообщения (бесконечный цикл)
channel.start_consuming()

Как запустить и проверить:

bash

python consumer_simple.py

Что должно получиться:

✅ Программа запустилась и пишет "Ждём сообщения..."
✅ Программа не завершается - она ждёт сообщений
✅ Можно нажать Ctrl+C для выхода

На что обратить внимание:

  • Программа будет "висеть" в терминале - это нормально!
  • Она ждёт сообщений как веб-сервер ждёт запросов
  • Если соединение разорвётся, программа завершится с ошибкой

📤 Шаг 3: Ваш первый агент-отправитель (Producer)

Цель шага: Написать агента, который отправляет сообщения.

Что такое агент-отправитель:

Это программа, которая отправляет сообщение и завершает работу. Как человек, который бросил письмо в почтовый ящик и пошёл дальше.

python

# producer_simple.py
import pika

print("🐇 Подключаемся к RabbitMQ для отправки сообщения...")

# 1. Подключаемся (так же, как в получателе)
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# 2. Создаём очередь (такую же, как в получателе)
channel.queue_declare(queue='hello_queue')

# 3. Подготовим сообщение
message = "Привет от первого ИИ-агента!"

# 4. Отправляем сообщение в очередь
# exchange='' - используем стандартный обменник
# routing_key='hello_queue' - имя очереди
# body=message - само сообщение
channel.basic_publish(
exchange='',
routing_key='hello_queue',
body=message
)

print(f"✅ Отправлено сообщение: '{message}'")
print("ℹ️ Проверьте программу consumer_simple.py - там должно появиться сообщение")

# 5. Закрываем соединение
connection.close()

Как запустить и проверить:

  1. В первом терминале запустите получателя:bashpython consumer_simple.py
  2. Во втором терминале запустите отправителя:bashpython producer_simple.py

Что должно получиться:

✅ В терминале с получателем появится сообщение:

text

📨 Получено сообщение: Привет от первого ИИ-агента!
🤔 Агент думает над сообщением 'Привет от первого ИИ-агента'...
🗣️ Агент отвечает: Я понял ваше сообщение: 'Привет от первого ИИ-агента!'. Спасибо!

✅ В терминале с отправителем: сообщение об успешной отправке

На что обратить внимание:

  • Сообщения сохраняются в RabbitMQ, даже если получатель отключён
  • Можно запустить отправителя несколько раз - получатель обработает все сообщения
  • Можно запустить несколько получателей - они будут работать параллельно

🔄 Шаг 4: Работа с несколькими агентами

Цель шага: Понять, как несколько агентов могут работать вместе.

Что будем делать:

Создадим систему из:

  1. Главного агента - отправляет задачи
  2. Рабочих агентов - обрабатывают задачи
  3. Менеджера задач - распределяет задачи между рабочими

python

# task_manager.py - Главный агент, создающий задачи
import pika
import json
import random

print("👑 Главный агент запущен")
print("📋 Создаю задачи для рабочих агентов...")

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Создаём очередь для задач
channel.queue_declare(queue='task_queue', durable=True)
# durable=True - очередь сохранится при перезапуске

# Список задач для ИИ-агентов
tasks = [
"Проанализировать сентимент текста: 'Я люблю программировать!'",
"Распознать объекты на изображении с котиком",
"Сгенерировать ответ на вопрос: 'Что такое ИИ-агент?'",
"Предсказать следующее число в последовательности: 2, 4, 6, 8, ...",
"Классифицировать эмоцию: 'Я так рад, что у меня получилось!'"
]

# Отправляем каждую задачу
for i, task in enumerate(tasks, 1):
# Создаём структурированное сообщение
message = {
'task_id': i,
'task': task,
'priority': random.choice(['low', 'medium', 'high'])
}

# Преобразуем в JSON (RabbitMQ работает с байтами)
body = json.dumps(message, ensure_ascii=False)

# Отправляем задачу
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=body.encode('utf-8'),
properties=pika.BasicProperties(
delivery_mode=2,
# Сохранять на диск
)
)

print(f"📤 Отправлена задача #{i}: {task[:50]}...")

connection.close()
print(f"\n✅ Отправлено {len(tasks)} задач")
print("🚀 Запустите несколько worker_ai.py в разных терминалах")

python

# worker_ai.py - Рабочий ИИ-агент
import pika
import json
import time
import random

print("🤖 ИИ-агент запущен и готов к работе!")
print("⏳ Ожидаю задачи...\n")

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Объявляем очередь (такая же, как у менеджера)
channel.queue_declare(queue='task_queue', durable=True)

# Настраиваем политику: не давать новую задачу, пока не обработана текущая
channel.basic_qos(prefetch_count=1)

def process_task(task_id, task_description, priority):
"""Функция обработки задачи (имитация работы ИИ)"""

print(f"🔍 Анализирую задачу #{task_id}")
print(f"📝 Описание: {task_description}")
print(f"🎯 Приоритет: {priority}")

# Имитация "мышления" ИИ
thinking_time = random.uniform(1, 3)
print(f"🤔 Думаю... ({thinking_time:.1f} секунд)")
time.sleep(thinking_time)

# Простые "ИИ-ответы" в зависимости от типа задачи
if "сентимент" in task_description.lower():
result = "Сентимент: ПОЛОЖИТЕЛЬНЫЙ 😊"
elif "распознать" in task_description.lower():
result = "Обнаружены: кот, удивление, милота 🐱"
elif "сгенерировать" in task_description.lower():
result = "ИИ-агент — это автономная программа, которая воспринимает окружение и действует для достижения целей."
elif "предсказать" in task_description.lower():
result = "Следующее число: 10"
elif "классифицировать" in task_description.lower():
result = "Эмоция: РАДОСТЬ 🎉"
else:
result = "Задача обработана успешно!"

print(f"✅ Результат: {result}")
print("-" * 50)

return result

def callback(ch, method, properties, body):
"""Обработчик входящих задач"""

# Декодируем сообщение из JSON
message = json.loads(body.decode('utf-8'))

# Обрабатываем задачу
result = process_task(
message['task_id'],
message['task'],
message['priority']
)

# Сохраняем результат (в реальной системе - в базу данных)
with open('results.txt', 'a', encoding='utf-8') as f:
f.write(f"Задача #{message['task_id']}: {result}\n")

# Подтверждаем выполнение задачи
ch.basic_ack(delivery_tag=method.delivery_tag)

# Подписываемся на очередь задач
channel.basic_consume(
queue='task_queue',
on_message_callback=callback
)

print("🎧 Агент слушает очередь задач...")
print("💡 Запустите task_manager.py для создания задач")
print("📊 Можно запустить несколько копий этого файла для параллельной работы")
print("⏹️ Для выхода нажмите Ctrl+C\n")

try:
channel.start_consuming()
except KeyboardInterrupt:
print("\n👋 ИИ-агент завершает работу")
connection.close()

Как запустить и проверить:

  1. Запустите 2-3 рабочих агента (в разных терминалах):bashpython worker_ai.py
  2. Запустите менеджера задач:bashpython task_manager.py

Что должно получиться:

В терминале менеджера:

text

👑 Главный агент запущен
📋 Создаю задачи для рабочих агентов...
📤 Отправлена задача #1: Проанализировать сентимент текста: 'Я люблю пр...
📤 Отправлена задача #2: Распознать объекты на изображении с котиком...
...
✅ Отправлено 5 задач

В терминалах рабочих агентов (пример для одного):

text

🤖 ИИ-агент запущен и готов к работе!
⏳ Ожидаю задачи...

🎧 Агент слушает очередь задач...
💡 Запустите task_manager.py для создания задач
📊 Можно запустить несколько копий этого файла для параллельной работы
⏹️ Для выхода нажмите Ctrl+C

🔍 Анализирую задачу #3
📝 Описание: Сгенерировать ответ на вопрос: 'Что такое ИИ-агент?'
🎯 Приоритет: medium
🤔 Думаю... (2.3 секунд)
✅ Результат: ИИ-агент — это автономная программа, которая воспринимает...
--------------------------------------------------

На что обратить внимание:

  • Задачи распределяются между всеми запущенными рабочими агентами
  • Каждая задача обрабатывается только одним агентом
  • Можно добавлять новых рабочих "на лету"
  • Результаты сохраняются в файл results.txt

📊 Шаг 5: Мониторинг и отладка

Цель шага: Научиться отслеживать работу агентов и находить проблемы.

Что будем использовать:

  1. Веб-интерфейс RabbitMQ (http://localhost:15672)
  2. Логирование в файлы
  3. Проверка состояния очередей

python

# monitor.py - Простой монитор для отслеживания очередей
import pika
import json
from datetime import datetime

def get_queue_info():
"""Получает информацию об очередях"""

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

try:
# Получаем информацию об очереди
queue_info = channel.queue_declare(
queue='task_queue',
durable=True,
passive=True
# Только получить информацию, не создавать
)

message_count = queue_info.method.message_count
consumer_count = queue_info.method.consumer_count

print(f"\n📊 [{datetime.now().strftime('%H:%M:%S')}] Статус системы:")
print(f" 📨 Очередь 'task_queue':")
print(f" Сообщений в очереди: {message_count}")
print(f" Подключенных агентов: {consumer_count}")

if message_count > 0:
print(f" ⚠️ Есть необработанные задачи!")
elif consumer_count == 0:
print(f" ⚠️ Нет активных агентов!")
else:
print(f" ✅ Все задачи обработаны")

except Exception as e:
print(f"❌ Ошибка: {e}")
finally:
connection.close()

def check_agent_health(agent_id):
"""Проверяет здоровье конкретного агента"""

# В реальной системе здесь была бы проверка:
# - Ответа на ping
# - Загрузки памяти/CPU
# - Времени последней активности

print(f"\n🏥 Проверка здоровья агента #{agent_id}:")

# Имитация проверки
checks = {
"Соединение с RabbitMQ": "✅ OK",
"Доступность моделей ИИ": "✅ OK",
"Память": "⚠️ 85% (высокая загрузка)",
"Последняя активность": "✅ 2 минуты назад"
}

for check, status in checks.items():
print(f" {check}: {status}")

return all("✅" in status for status in checks.values())

if __name__ == "__main__":
print("🔍 Монитор системы ИИ-агентов")
print("=" * 40)

# Проверяем очередь
get_queue_info()

# Проверяем здоровье "агентов" (имитация)
healthy = True
for i in range(1, 4):
# Проверяем 3 агента
if not check_agent_health(i):
healthy = False

print("\n" + "=" * 40)
if healthy:
print("🎉 Все системы работают нормально!")
else:
print("⚠️ Обнаружены проблемы в системе!")

print("\n💡 Советы по мониторингу:")
print("1. Регулярно проверяйте веб-интерфейс RabbitMQ")
print("2. Настройте алерты при росте очереди")
print("3. Логируйте все ошибки агентов")
print("4. Следите за потреблением ресурсов")

Что можно увидеть в веб-интерфейсе RabbitMQ:

  1. Queuestask_queue:
    Ready: Сообщения, ожидающие обработки
    Unacked: Сообщения, взятые агентами, но не подтверждённые
    Total: Всего сообщений
    Consumers: Количество подключённых агентов
  2. Channels: Активные соединения
  3. Connections: Подключённые клиенты
  4. Exchanges: Обменники (пока у нас только стандартный)

Практическое задание:

  1. Запустите 2 рабочих агента
  2. Запустите менеджера задач
  3. Откройте веб-интерфейс RabbitMQ
  4. Наблюдайте, как уменьшается очередь
  5. Остановите одного агента (Ctrl+C)
  6. Посмотрите, как перераспределятся задачи

🚨 Шаг 6: Обработка ошибок и восстановление

Цель шага: Сделать систему устойчивой к сбоям.

Основные проблемы и решения:

Проблема 1: Агент упал во время обработки задачи
Решение: Не подтверждать задачу, пока она не обработана полностью

python

# worker_reliable.py - Надёжный агент
import pika
import json
import time
import random
import signal
import sys

class ReliableAIAgent:
def __init__(self, agent_id):
self.agent_id = agent_id
self.connection = None
self.channel = None
self.running = True

# Обработка Ctrl+C для graceful shutdown
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)

def signal_handler(self, signum, frame):
"""Обработчик сигналов завершения"""
print(f"\n🛑 Агент #{self.agent_id} получает сигнал завершения...")
self.running = False

if self.connection and not self.connection.is_closed:
print("🔌 Закрываю соединение...")
self.connection.close()

sys.exit(0)

def connect(self):
"""Установка соединения с переподключением"""

while self.running:
try:
print(f"🔗 Агент #{self.agent_id} подключается к RabbitMQ...")

self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
'localhost',
heartbeat=600,
# Таймаут соединения
blocked_connection_timeout=300
)
)

self.channel = self.connection.channel()
self.channel.queue_declare(queue='task_queue', durable=True)
self.channel.basic_qos(prefetch_count=1)

print(f"✅ Агент #{self.agent_id} успешно подключён!")
return True

except Exception as e:
print(f"❌ Ошибка подключения: {e}")
print("🔄 Повторная попытка через 5 секунд...")
time.sleep(5)

return False

def process_task_safely(self, task_id, task_data):
"""Безопасная обработка задачи с обработкой ошибок"""

try:
print(f"\n🔍 Агент #{self.agent_id} начинает задачу #{task_id}")

# Имитация возможных ошибок
if random.random() < 0.1:
# 10% шанс на ошибку
raise ValueError("Имитация случайной ошибки в ИИ-модели")

# Основная обработка
result = self.simulate_ai_processing(task_data)

print(f"✅ Задача #{task_id} успешно обработана")
print(f"📊 Результат: {result[:50]}...")

return True, result

except Exception as e:
print(f"❌ Ошибка при обработке задачи #{task_id}: {e}")
print("💡 Задача будет возвращена в очередь для повторной обработки")
return False, str(e)

def simulate_ai_processing(self, task_data):
"""Имитация работы ИИ"""
time.sleep(random.uniform(0.5, 2))

# Простая "ИИ-логика"
task_lower = task_data['task'].lower()

if 'сентимент' in task_lower:
return {'sentiment': 'positive', 'confidence': 0.87}
elif 'распознать' in task_lower:
return {'objects': ['cat', 'sofa', 'window'], 'count': 3}
elif 'сгенерировать' in task_lower:
return {'answer': 'ИИ-агент — это автономная система...'}
else:
return {'status': 'processed', 'task': task_data['task'][:30]}

def callback(self, ch, method, properties, body):
"""Обработчик сообщений с подтверждением"""

try:
# Декодируем задачу
task_data = json.loads(body.decode('utf-8'))
task_id = task_data['task_id']

# Обрабатываем
success, result = self.process_task_safely(task_id, task_data)

if success:
# Сохраняем результат
self.save_result(task_id, result)
# Подтверждаем успешную обработку
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"📨 Задача #{task_id} подтверждена")
else:
# Отказываемся от задачи (вернётся в очередь)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
print(f"🔄 Задача #{task_id} возвращена в очередь")

except json.JSONDecodeError:
print("❌ Неверный формат сообщения")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
print(f"❌ Непредвиденная ошибка: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

def save_result(self, task_id, result):
"""Сохранение результата с проверкой записи"""
try:
with open(f'agent_{self.agent_id}_results.json', 'a', encoding='utf-8') as f:
json.dump({
'task_id': task_id,
'agent_id': self.agent_id,
'timestamp': time.time(),
'result': result
}, f, ensure_ascii=False)
f.write('\n')
except Exception as e:
print(f"⚠️ Не удалось сохранить результат: {e}")

def run(self):
"""Основной цикл работы агента"""

if not self.connect():
return

print(f"\n🤖 Агент #{self.agent_id} начинает работу")
print("=" * 50)

# Подписываемся на очередь
self.channel.basic_consume(
queue='task_queue',
on_message_callback=self.callback
)

try:
while self.running:
# Обрабатываем сообщения
self.connection.process_data_events(time_limit=1)

# Периодическая проверка здоровья
if time.time() % 10 < 0.1:
# Каждые ~10 секунд
print(f"❤️ Агент #{self.agent_id} жив и работает...")

except Exception as e:
print(f"❌ Критическая ошибка: {e}")
finally:
if self.connection and not self.connection.is_closed:
self.connection.close()

if __name__ == "__main__":
# Запускаем агента с ID
import sys
agent_id = sys.argv[1] if len(sys.argv) > 1 else "1"

agent = ReliableAIAgent(agent_id)
agent.run()

Как тестировать надёжность:

  1. Запустите агента:bashpython worker_reliable.py 1
  2. Отправьте задачи:bashpython task_manager.py
  3. Имитируйте сбои:
    Нажмите Ctrl+C во время обработки задачи
    Остановите RabbitMQ (docker stop rabbitmq)
    Удалите файл результатов во время записи

Что должно получиться:

✅ Задачи не теряются при сбоях
✅ Агент переподключается при разрыве соединения
✅ Ошибки логируются и обрабатываются
✅ Результаты сохраняются атомарно

Ключевые особенности надёжного агента:

  1. Graceful shutdown - корректное завершение по сигналам
  2. Reconnection logic - повторное подключение при сбоях
  3. Ack/Nack - подтверждение/отказ от задач
  4. Error boundaries - обработка ошибок на всех уровнях
  5. Heartbeats - поддержание соединения
  6. Result persistence - сохранение результатов с проверкой

📈 Шаг 7: Анализ производительности и оптимизация

Цель шага: Научиться измерять и улучшать производительность агентов.

python

# benchmark.py - Измерение производительности
import pika
import json
import time
import statistics
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime

class PerformanceMonitor:
def __init__(self):
self.metrics = {
'tasks_processed': 0,
'avg_processing_time': 0,
'errors': 0,
'start_time': time.time()
}
self.processing_times = []

def send_test_tasks(self, count=100):
"""Отправка тестовых задач"""

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='benchmark_queue', durable=True)

print(f"📤 Отправка {count} тестовых задач...")

for i in range(count):
task = {
'task_id': i,
'type': 'benchmark',
'data': f"Тестовая задача #{i}",
'timestamp': time.time(),
'complexity': 'simple'
# или 'medium', 'complex'
}

channel.basic_publish(
exchange='',
routing_key='benchmark_queue',
body=json.dumps(task).encode('utf-8'),
properties=pika.BasicProperties(delivery_mode=2)
)

connection.close()
print(f"✅ Отправлено {count} задач")

def benchmark_worker(self, worker_id, task_count):
"""Рабочий для бенчмарка"""

results = {
'worker_id': worker_id,
'tasks_processed': 0,
'total_time': 0,
'errors': 0
}

try:
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='benchmark_queue', durable=True)
channel.basic_qos(prefetch_count=1)

start_time = time.time()

def callback(ch, method, properties, body):
task_start = time.time()

try:
task = json.loads(body.decode('utf-8'))

# Имитация обработки
processing_time = 0.1
# 100 мс
time.sleep(processing_time)

results['tasks_processed'] += 1
self.processing_times.append(time.time() - task_start)

ch.basic_ack(delivery_tag=method.delivery_tag)

except Exception as e:
results['errors'] += 1
ch.basic_nack(delivery_tag=method.delivery_tag)

channel.basic_consume(
queue='benchmark_queue',
on_message_callback=callback
)

# Обрабатываем задачи, пока не достигнем нужного количества
while results['tasks_processed'] < task_count:
connection.process_data_events(time_limit=0.1)

results['total_time'] = time.time() - start_time

connection.close()

except Exception as e:
print(f"❌ Ошибка в worker {worker_id}: {e}")

return results

def run_benchmark(self, worker_count=3, tasks_per_worker=50):
"""Запуск бенчмарка"""

print(f"\n🚀 Запуск бенчмарка:")
print(f" 👥 Количество воркеров: {worker_count}")
print(f" 📊 Задач на воркера: {tasks_per_worker}")
print(f" 📈 Всего задач: {worker_count * tasks_per_worker}")

# Очищаем очередь
self.clear_queue()

# Отправляем задачи
total_tasks = worker_count * tasks_per_worker
self.send_test_tasks(total_tasks)

print("\n⏳ Запускаем воркеров...")

# Запускаем воркеров в потоках
with ThreadPoolExecutor(max_workers=worker_count) as executor:
futures = []
for i in range(worker_count):
future = executor.submit(self.benchmark_worker, i, tasks_per_worker)
futures.append(future)

# Собираем результаты
all_results = []
for future in futures:
all_results.append(future.result())

# Анализируем результаты
self.analyze_results(all_results)

def clear_queue(self):
"""Очистка очереди"""
try:
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_delete(queue='benchmark_queue')
connection.close()
print("🧹 Очередь очищена")
except:
pass

def analyze_results(self, results):
"""Анализ результатов бенчмарка"""

print("\n" + "="*50)
print("📊 РЕЗУЛЬТАТЫ БЕНЧМАРКА")
print("="*50)

total_tasks = sum(r['tasks_processed'] for r in results)
total_errors = sum(r['errors'] for r in results)
total_time = max(r['total_time'] for r in results)
# Время самого медленного

if self.processing_times:
avg_time = statistics.mean(self.processing_times)
min_time = min(self.processing_times)
max_time = max(self.processing_times)
std_dev = statistics.stdev(self.processing_times) if len(self.processing_times) > 1 else 0

print(f"\n📈 Общая статистика:")
print(f" ✅ Обработано задач: {total_tasks}")
print(f" ❌ Ошибок: {total_errors}")
print(f" ⏱️ Общее время: {total_time:.2f} сек")

if total_tasks > 0:
print(f" 🚀 Скорость: {total_tasks/total_time:.2f} задач/сек")
print(f" 📉 Время на задачу: {(total_time/total_tasks)*1000:.1f} мс")

print(f"\n📊 Детальная статистика обработки:")
print(f" 📊 Среднее время: {avg_time*1000:.1f} мс")
print(f" ⬇️ Минимальное: {min_time*1000:.1f} мс")
print(f" ⬆️ Максимальное: {max_time*1000:.1f} мс")
print(f" 📐 Стандартное отклонение: {std_dev*1000:.1f} мс")

print(f"\n👥 Статистика по воркерам:")
for result in results:
if result['tasks_processed'] > 0:
speed = result['tasks_processed'] / result['total_time']
print(f" Воркер {result['worker_id']}: {result['tasks_processed']} задач, "
f"{speed:.2f} задач/сек, {result['errors']} ошибок")

# Рекомендации по оптимизации
print(f"\n💡 РЕКОМЕНДАЦИИ:")

if avg_time > 0.5:
print(" ⚠️ Высокое время обработки:")
print(" - Оптимизируйте ИИ-модели")
print(" - Используйте кэширование")
print(" - Рассмотрите GPU ускорение")

if std_dev > avg_time * 0.5:
print(" ⚠️ Высокий разброс времени:")
print(" - Стандартизируйте входные данные")
print(" - Добавьте таймауты на обработку")

if total_errors > total_tasks * 0.05:
# >5% ошибок
print(" ⚠️ Много ошибок:")
print(" - Улучшите обработку исключений")
print(" - Добавьте retry логику")
print(" - Валидируйте входные данные")

if total_time < 10 and worker_count > 5:
print(" 💡 Можно увеличить prefetch_count для уменьшения накладных расходов")

print("\n🎯 Следующие шаги:")
print(" 1. Запустите бенчмарк с разным количеством воркеров")
print(" 2. Протестируйте с задачами разной сложности")
print(" 3. Измерьте потребление памяти")
print(" 4. Протестируйте при высокой нагрузке")

if __name__ == "__main__":
monitor = PerformanceMonitor()

print("🎯 БЕНЧМАРК СИСТЕМЫ ИИ-АГЕНТОВ")
print("="*50)

# Тест 1: Базовый тест
print("\n🧪 ТЕСТ 1: Базовый тест (3 воркера, 50 задач каждый)")
monitor.run_benchmark(worker_count=3, tasks_per_worker=50)

# Тест 2: Тест под нагрузкой
print("\n" + "="*50)
print("🧪 ТЕСТ 2: Тест под нагрузкой (5 воркеров, 100 задач каждый)")
monitor.run_benchmark(worker_count=5, tasks_per_worker=100)

Как проводить оптимизацию:

  1. Измерьте базовую производительность:bashpython benchmark.py
  2. Экспериментируйте с параметрами:
    Количество воркеров
    Размер prefetch_count
    Параметры соединения
  3. Оптимизируйте код:
    Кэшируйте модели ИИ
    Используйте батчинговую обработку
    Оптимизируйте сериализацию

Метрики для отслеживания:

  1. Throughput - задачи/секунду
  2. Latency - время обработки задачи
  3. Error rate - процент ошибок
  4. Resource usage - память, CPU
  5. Queue length - размер очереди

🎓 Итог: Что вы узнали и что делать дальше

✅ Вы освоили:

  1. Основы RabbitMQ - очереди, обменники, routing keys
  2. Архитектуру агентов - producer/consumer, workers
  3. Надёжность - переподключение, подтверждения, обработка ошибок
  4. Мониторинг - веб-интерфейс, метрики, логирование
  5. Оптимизацию - бенчмаркинг, настройка параметров

🚀 Следующие шаги:

  1. Реальные ИИ-модели:python# Замените имитацию на реальные модели
    from transformers import pipeline

    sentiment_analyzer = pipeline('sentiment-analysis')
    result = sentiment_analyzer("I love programming!")
  2. Сложные архитектуры:
    Fanout exchanges для широковещательных сообщений
    Topic exchanges для маршрутизации по паттернам
    Headers exchanges для сложной фильтрации
  3. Масштабирование:
    Docker Compose для оркестрации
    Kubernetes для управления кластером
    Load balancers для распределения нагрузки
  4. Продвинутые функции:
    DLX (Dead Letter Exchanges) для обработки неудачных сообщений
    TTL (Time-To-Live) для автоматического удаления старых сообщений
    Priority queues для приоритетной обработки

📚 Ресурсы для дальнейшего изучения:

  1. Официальная документация RabbitMQ: https://www.rabbitmq.com/documentation.html
  2. Pika документация: https://pika.readthedocs.io
  3. Patterns for distributed systems: https://www.enterpriseintegrationpatterns.com

💡 Идеи для проектов:

  1. Чат-бот с ИИ - обработка сообщений через очередь
  2. Система классификации изображений - распределённая обработка
  3. Анализ логов в реальном времени - потоковая обработка
  4. Оркестратор ML pipelines - управление цепочками задач

🎯 Помните:

  • Начинайте просто - одна очередь, два агента
  • Тестируйте сбои - останавливайте RabbitMQ, убивайте агенты
  • Измеряйте всё - метрики помогут найти узкие места
  • Документируйте - схемы архитектуры, API, deployment

Удачи в создании ваших ИИ-агентов! 🚀

Часто задаваемые вопросы:

Q: Что делать, если RabbitMQ не запускается?
A: Проверьте, что порты 5672 и 15672 свободны. Попробуйте другой порт:

bash

docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq:management

Q: Как очистить все очереди?
A: В веб-интерфейсе: Queues → Delete. Или перезапустите RabbitMQ:

bash

docker restart rabbitmq

Q: Сколько агентов можно запустить?
A: Технически - сотни. Но начинайте с 2-3, следите за потреблением ресурсов.

Q: Как сохранять сообщения на диск?
A: При объявлении очереди: durable=True, при отправке: delivery_mode=2.

Q: Что делать, если агент "завис"?
A: Добавьте таймауты в обработку и мониторинг heartbeats.