В современной разработке программного обеспечения все большую популярность набирают микросервисные архитектуры и асинхронное программирование. Они позволяют создавать высокопроизводительные, масштабируемые и отзывчивые приложения. Если вы работаете с сообщениями и брокерами вроде Kafka, RabbitMQ или NATS, вам необходим инструмент, который упростит создание асинхронных потребителей и продюсеров. Именно таким инструментом является FastStream — молодой, но очень перспективный фреймворк, вдохновленный известными FastAPI и Hydra.
Что такое FastStream?
FastStream — это асинхронный фреймворк для построения микросервисов, которые взаимодействуют через брокеры сообщений. Он предоставляет простой и интуитивно понятный синтаксис для декларативного описания обработчиков сообщений, сильно напоминая подход FastAPI к созданию HTTP-ендпоинтов.
Ключевые особенности FastStream:
1. Высокая производительность: Благодаря асинхронной природе (async/await) и использованию под капотом мощных библиотек вроде `aio-pika` (для RabbitMQ) и `aiokafka`.
2. Простота и интуитивность: Вы описываете что нужно сделать с сообщением, а фреймворк берет на себя всю рутину: подключение к брокеру, подписку на топики, десериализацию, валидацию и даже документацию.
3. Встроенная валидация данных: Интеграция с Pydantic позволяет автоматически валидировать и парсить входящие сообщения в строго типизированные модели Python.
4. Зависимости (Dependency Injection): Как и в FastAPI, вы можете объявлять зависимости (например, подключение к БД), которые будут автоматически внедрены в ваши обработчики.
5. Автогенерация документации: FastStream умеет генерировать схему вашего приложения (AsyncAPI), что позволяет наглядно видеть, какие топики обрабатываются и какие сообщения ожидаются.
6. Поддержка нескольких брокеров: На момент написания статьи поддерживаются Kafka, RabbitMQ и NATS.
Установка FastStream
Установка осуществляется через pip. Вы можете установить базовую версию или с поддержкой конкретного брокера.
# Базовая установка (включает зависимости для RabbitMQ)
pip install faststream
# Или для конкретного брокера
pip install "faststream[kafka]"
pip install "faststream[nats]"
pip install "faststream[redis]"
Основные концепции на примере RabbitMQ
Давайте разберем основные концепции FastStream, создав простое приложение для RabbitMQ.
1. Базовое приложение: Продюсер и Потребитель
Создадим два файла: `producer.py` и `consumer.py`.
Потребитель (consumer.py):
from faststream import FastStream
from faststream.rabbit import RabbitBroker
import asyncio
# Создаем экземпляр брокера и приложения
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)
# Объявляем очередь 'hello'
@broker.subscriber("hello")
async def handle_hello(msg_body: str):
...."""Эта функция будет вызываться при получении сообщения в очередь 'hello'."""
....print(f"Получено сообщение: {msg_body}")
....# Имитируем некоторую работу
....await asyncio.sleep(1)
....print("Обработка завершена!")
# Запускаем приложение
if __name__ == "__main__":
....import asyncio
....asyncio.run(app.run())
Продюсер (`producer.py`):
from faststream.rabbit import RabbitBroker
import asyncio
async def main():
....async with RabbitBroker("amqp://guest:guest@localhost:5672/") as broker:
....# Отправляем сообщение в очередь 'hello'
....await broker.publish("Hello, FastStream!", queue="hello")
....print("Сообщение отправлено!")
if __name__ == "__main__":
....asyncio.run(main())
Как это работает:
1. Запустите `consumer.py`. Он подключится к RabbitMQ и начнет слушать очередь `hello`.
2. Запустите `producer.py`. Он отправит сообщение в очередь и завершит работу.
3. В консоли с потребителем вы увидите: `Получено сообщение: Hello, FastStream!`.
FastStream автоматически создал очередь `hello` (если ее не было) и подписал на нее функцию `handle_hello`. Тип параметра `msg_body: str` указывает фреймворку, что тело сообщения нужно интерпретировать как строку.
2. Использование Pydantic для валидации
Чаще всего сообщения имеют структурированный формат, например JSON. FastStream отлично работает с Pydantic-моделями для валидации таких сообщений.
Обновленный потребитель с Pydantic:
from pydantic import BaseModel, Field
from faststream import FastStream
from faststream.rabbit import RabbitBroker
# Модель Pydantic для входящего сообщения
class UserCreated(BaseModel):
....user_id: int = Field(..., gt=0, description="ID пользователя")
....email: str = Field(..., pattern=r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
....name: str = Field(..., min_length=1, max_length=100)
....broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)
@broker.subscriber("user_created")
async def handle_user_created_event(event: UserCreated):
....# Теперь 'event' - это экземпляр UserCreated, а не сырая строка
....print(f"Пользователь #{event.user_id} создан!")
....print(f"Email для связи: {event.email}")
....# Можем быть уверены, что данные валидны
if __name__ == "__main__":
....import asyncio
....asyncio.run(app.run())
Обновленный продюсер с Pydantic:
from faststream.rabbit import RabbitBroker
from consumer import UserCreated # Импортируем ту же модель
import asyncio
async def main():
....async with RabbitBroker("amqp://guest:guest@localhost:5672/") as broker:
....# Отправляем данные в виде словаря, который будет сериализован в JSON
....# и валидирован на стороне потребителя.
....message_data = {"user_id": 123, "email": "alice@example.com", "name": "Alice"}
....await broker.publish(message_data, queue="user_created")
....# Или можно отправить напрямую экземпляр Pydantic-модели
....user = UserCreated(user_id=456, email="bob@example.com", name="Bob")
....await broker.publish(user, queue="user_created")
if __name__ == "__main__":
....asyncio.run(main())
Что дает Pydantic:
Валидация: Если сообщение в очереди `user_created` не будет содержать `user_id` или `email` будет невалидным, FastStream автоматически отклонит это сообщение (например, отправит в Dead Letter Exchange в RabbitMQ).
Автодокументирование: Модели используются для генерации AsyncAPI-схемы.
Type Hints и IDE Support: Ваша IDE будет подсказывать поля модели `UserCreated`.
3. Внедрение зависимостей (Dependency Injection)
Часто обработчикам нужны дополнительные сервисы: подключение к базе данных, кэш, клиент для внешнего API. FastStream предоставляет элегантный механизм зависимостей.
from faststream import Depends, FastStream
from faststream.rabbit import RabbitBroker
import asyncpg # Асинхронный драйвер для PostgreSQL
# Функция-зависимость для подключения к БД
async def get_db_connection():
....conn = await asyncpg.connect("postgresql://user:password@localhost/db")
....try:
........yield conn
....finally:
........await conn.close()
broker = RabbitBroker("amqp://localhost:5672/")
app = FastStream(broker)
@broker.subscriber("process_data")
async def process_data_handler(
....data: dict,
....db: asyncpg.Connection = Depends(get_db_connection) # Внедряем зависимость
):
....# Теперь мы можем использовать подключение к БД внутри обработчика
....user_id = data["user_id"]
....user = await db.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
....if user:
........print(f"Найден пользователь: {user['username']}")
........# Зависимость автоматически закроет соединение после выполнения обработчика
if __name__ == "__main__":
....import asyncio
....asyncio.run(app.run())
4. Генерация документации AsyncAPI
Одним из killer features FastStream является автоматическая генерация схемы по стандарту AsyncAPI.
from faststream import FastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://localhost:5672/")
app = FastStream(broker)
@broker.subscriber("user_created")
async def handle_user_created(user_id: int, email: str):
...
# Генерация схемы AsyncAPI
asyncapi_schema = app.get_asyncapi_schema()
print(asyncapi_schema) # Выведет JSON-схему
# Если использовать FastStream CLI, можно посмотреть красивую документацию
# faststream run consumer:app --docs
Запустив приложение с флагом `--docs`, вы получите URL, по которому будет доступна интерактивная документация, похожая на Swagger UI в FastAPI.
Работа с Kafka
Принципы работы с Kafka абсолютно идентичны. Меняется только тип брокера.
from pydantic import BaseModel
from faststream import FastStream
from faststream.kafka import KafkaBroker
class OrderData(BaseModel):
....order_id: int
....product: str
....quantity: int
broker = KafkaBroker("localhost:9092") # Адрес Kafka-брокера
app = FastStream(broker)
# Подписываемся на топик 'orders'
@broker.subscriber("orders")
async def process_order(order: OrderData):
....print(f"Обрабатывается заказ #{order.order_id} на {order.quantity} шт. {order.product}")
....# Логика обработки заказа...
if __name__ == "__main__":
....import asyncio
....asyncio.run(app.run())
Заключение
FastStream — это современный, быстроразвивающийся фреймворк, который значительно упрощает разработку асинхронных микросервисов, работающих с очередями сообщений. Его сильные стороны:
Низкий порог входа: Синтаксис, знакомый по FastAPI.
Безопасность типов: Глубокая интеграция с Pydantic.
Модульность: Поддержка зависимостей и мидлварей.
Производительность: Полностью асинхронная архитектура.
Документированность: Автоматическая генерация AsyncAPI-схем.
Если ваша задача — создать отказоустойчивый, масштабируемый и хорошо документированный сервис для обработки сообщений из Kafka, RabbitMQ или NATS, FastStream является одним из лучших выборов в экосистеме Python на сегодняшний день. Начинайте с простых потребителей и продюсеров, постепенно осваивая более сложные паттерны, такие как RPC, обработка ошибок и кастомные мидлвари, чтобы раскрыть весь потенциал этого фреймворка.
Подписывайтесь:
Телеграм https://t.me/lets_go_code
Канал "Просто о программировании" https://dzen.ru/lets_go_code