Найти в Дзене

Создание многокомпонентного веб-приложения на aiohttp: подробное руководство

aiohttp — это мощная библиотека Python для асинхронной работы с HTTP, которая позволяет создавать высокопроизводительные веб-серверы и клиенты. В отличие от традиционных синхронных фреймворков, aiohttp использует возможности async/await для обработки тысяч одновременных соединений с минимальными затратами ресурсов. В этой статье мы подробно разберем, как создать многокомпонентное веб-приложение на aiohttp, где различные компоненты работают вместе, обеспечивая полноценную функциональность. Перед тем как перейти к коду, давайте рассмотрим типичную архитектуру aiohttp-приложения: 1. Основное приложение (Application) - центральный компонент 2. Маршрутизатор (Router) - обработка URL-путей 3. Обработчики запросов (Request Handlers) - бизнес-логика 4. Middleware - промежуточное ПО для обработки запросов/ответов 5. Фоновые задачи - асинхронные процессы 6. Подключение к базам данных - работа с persistence-слоем Начнем с создания базовой структуры нашего приложения: from aiohttp import web impor
Оглавление

aiohttp — это мощная библиотека Python для асинхронной работы с HTTP, которая позволяет создавать высокопроизводительные веб-серверы и клиенты. В отличие от традиционных синхронных фреймворков, aiohttp использует возможности async/await для обработки тысяч одновременных соединений с минимальными затратами ресурсов.

В этой статье мы подробно разберем, как создать многокомпонентное веб-приложение на aiohttp, где различные компоненты работают вместе, обеспечивая полноценную функциональность.

Архитектура многокомпонентного приложения

Перед тем как перейти к коду, давайте рассмотрим типичную архитектуру aiohttp-приложения:

1. Основное приложение (Application) - центральный компонент

2. Маршрутизатор (Router) - обработка URL-путей

3. Обработчики запросов (Request Handlers) - бизнес-логика

4. Middleware - промежуточное ПО для обработки запросов/ответов

5. Фоновые задачи - асинхронные процессы

6. Подключение к базам данных - работа с persistence-слоем

Создание базовой структуры приложения

Начнем с создания базовой структуры нашего приложения:

from aiohttp import web
import aiohttp_jinja2
import jinja2
import asyncpg
import logging
async def create_app():
....# Создаем экземпляр приложения
....app = web.Application()
....# Настраиваем логирование
....logging.basicConfig(level=logging.INFO)
....# Инициализируем компоненты
....await setup_database(app)
....setup_routes(app)
....setup_middleware(app)
....setup_template_engine(app)
....setup_background_tasks(app)
....return app
if __name__ == '__main__':
....web.run_app(create_app(), port=8080)

Компонент 1: Маршрутизация и обработчики

Создадим несколько обработчиков для различных URL:

def setup_routes(app):
....# Добавляем маршруты
....app.router.add_get('/', handle_index)
....app.router.add_get('/users', handle_users)
....app.router.add_get('/users/{id}', handle_user_detail)
....app.router.add_post('/users', create_user)
....app.router.add_get('/api/data', api_handler)
....# Статические файлы
....app.router.add_static('/static/', path='static', name='static')
async def handle_index(request):
....return web.Response(text="Добро пожаловать на главную страницу!")
async def handle_users(request):
....# Получаем подключение к БД из приложения
....pool = request.app['db_pool']
....async with pool.acquire() as connection:
........users = await connection.fetch('SELECT * FROM users')
.......# Рендерим шаблон с данными
........context = {'users': users}
........return aiohttp_jinja2.render_template('users.html', request, context)
async def handle_user_detail(request):
....user_id = request.match_info['id']
....pool = request.app['db_pool']
....async with pool.acquire() as connection:
........user = await connection.fetchrow(
............'SELECT * FROM users WHERE id = $1', int(user_id)
........)
........if not user:
............return web.json_response({'error': 'User not found'}, status=404)
........return web.json_response(dict(user))
async def create_user(request):
....data = await request.post()
....# Валидация данных...
....pool = request.app['db_pool']
....async with pool.acquire() as connection:
........await connection.execute(
............'INSERT INTO users(name, email) VALUES($1, $2)',
............data['name'], data['email']
........)
....return web.Response(text="Пользователь создан", status=201)
async def api_handler(request):
....# Пример API обработчика
....return web.json_response({'status': 'ok', 'data': [1, 2, 3]})

Компонент 2: Подключение к базе данных

Реализуем компонент для работы с PostgreSQL с помощью asyncpg:

async def setup_database(app):
....# Создаем пул подключений к БД
....pool = await asyncpg.create_pool(
........host='localhost',
........port=5432,
........user='your_username',
........password='your_password',
........database='your_database',
........min_size=5,
........max_size=20
....)
....app['db_pool'] = pool
....# Закрываем пул при остановке приложения
....async def close_database(app):
........await pool.close()
........app.on_cleanup.append(close_database)
........# Создаем таблицы, если они не существуют
........async with pool.acquire() as connection:
............await connection.execute('''
................CREATE TABLE IF NOT EXISTS users (
................id SERIAL PRIMARY KEY,
................name TEXT NOT NULL,
................email TEXT NOT NULL UNIQUE,
................created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)
............''')

Компонент 3: Middleware для аутентификации и логирования

Middleware позволяет перехватывать запросы и ответы для выполнения общих задач:

def setup_middleware(app):
....# Middleware для логирования
....@web.middleware
....async def logging_middleware(request, handler):
........logger = logging.getLogger('aiohttp.access')
........start_time = time.time()
........# Продолжаем обработку запроса
........response = await handler(request)
........# Логируем информацию о запросе
........processing_time = time.time() - start_time
........logger.info(
............f"{request.method} {request.path} "
............f"processed in {processing_time:.3f}s, "
............f"status: {response.status}"
........)
........return response
....# Middleware для аутентификации
....@web.middleware
....async def auth_middleware(request, handler):
........# Пропускаем аутентификацию для некоторых путей
........if request.path in ['/login', '/static/', '/public']:
............return await handler(request)
........# Проверяем аутентификацию
........token = request.headers.get('Authorization', None)
........if not token or not validate_token(token):
............return web.json_response(
................{'error': 'Unauthorized'}, status=401
............)
........# Добавляем информацию о пользователе в запрос
........request['user'] = await get_user_from_token(token)
........return await handler(request)
....# Добавляем middleware в приложение
....app.middlewares.append(logging_middleware)
....app.middlewares.append(auth_middleware)
async def validate_token(token):
....# Здесь должна быть реальная проверка токена
....return token == 'secret_token'
async def get_user_from_token(token):
....# Здесь должен быть реальный код получения пользователя
....return {'id': 1, 'name': 'Test User'}

Компонент 4: Шаблонизатор Jinja2

Настроим рендеринг HTML-шаблонов:

def setup_template_engine(app):
....# Настраиваем Jinja2
....aiohttp_jinja2.setup(
........app,
........loader=jinja2.FileSystemLoader('templates')
....)
....# Добавляем глобальные переменные в шаблоны
....aiohttp_jinja2.get_env(app).globals.update(
........app_version='1.0.0',
........site_name='My Awesome Site'
....)

Создадим простой шаблон `templates/users.html`:

<!DOCTYPE html>
<html>
<head>
<title>Пользователи</title>
</head>
<body>
<h1>Список пользователей</h1>
<ul>
{% for user in users %}
<li>{{ user.name }} ({{ user.email }})</li>
{% endfor %}
</ul>
</body>
</html>

Компонент 5: Фоновые задачи

aiohttp позволяет запускать фоновые задачи, которые работают параллельно с обработкой запросов:

def setup_background_tasks(app):
....# Задача для периодической очистки кэша
....async def cache_cleaner(app):
........while True:
............await asyncio.sleep(3600) # Каждый час
............# Логика очистки кэша
............logging.info("Cleaning cache...")
............# Задача для отправки уведомлений
....async def notification_sender(app):
........while True:
............await asyncio.sleep(300) # Каждые 5 минут
............# Логика отправки уведомлений
............logging.info("Sending notifications...")
....# Запускаем задачи при старте приложения
....app.on_startup.append(start_background_tasks)
....app.on_cleanup.append(cleanup_background_tasks)
async def start_background_tasks(app):
....# Создаем задачи и сохраняем их в приложении
....app['cache_cleaner'] = asyncio.create_task(cache_cleaner(app))
....app['notification_sender'] = asyncio.create_task(notification_sender(app))
async def cleanup_background_tasks(app):
....# Отменяем задачи при остановке приложения
....app['cache_cleaner'].cancel()
....app['notification_sender'].cancel()
....# Ждем завершения задач
....await app['cache_cleaner']
....await app['notification_sender']

Компонент 6: Обработка ошибок

Создадим единый механизм обработки ошибок:

def setup_error_handlers(app):
....# Обработка 404 ошибки
....async def handle_404(request, response):
........return aiohttp_jinja2.render_template(
............'404.html', request, {}, status=404
........)
....# Обработка 500 ошибки
....async def handle_500(request, response):
........return aiohttp_jinja2.render_template(
............'500.html', request, {}, status=500
........)
....# Регистрируем обработчики ошибок
....app.middlewares.append(error_pages_middleware)
....@web.middleware
....async def error_pages_middleware(request, handler):
........try:
............response = await handler(request)
............if response.status == 404:
................return await handle_404(request, response)
............return response
........except web.HTTPException as ex:
............if ex.status == 404:
................return await handle_404(request, ex)
............raise
........except Exception:
............return await handle_500(request, None)

Компонент 7: Веб-сокеты для реального времени

Добавим поддержку веб-сокетов для функций реального времени:

def setup_websocket_endpoints(app):
....app.router.add_get('/ws', websocket_handler)
....async def websocket_handler(request):
........ws = web.WebSocketResponse()
........await ws.prepare(request)
........# Регистрируем подключение
........request.app['websockets'].add(ws)
........try:
............async for msg in ws:
................if msg.type == aiohttp.WSMsgType.TEXT:
....................# Обрабатываем сообщение
....................await handle_websocket_message(msg.data, ws, request.app)
................elif msg.type == aiohttp.WSMsgType.ERROR:
....................logging.error(f'WebSocket error: {ws.exception()}')
........finally:
............# Удаляем подключение при закрытии
............request.app['websockets'].remove(ws)
........return ws
async def handle_websocket_message(data, ws, app):
....# Обрабатываем полученное сообщение
....try:
........message = json.loads(data)
........if message['type'] == 'chat':
............# Рассылаем сообщение всем подключенным клиентам
............for client in app['websockets']:
................if client is not ws and not client.closed:
....................await client.send_json({
........................'type': 'chat',
........................'user': 'Anonymous',
........................'message': message['text']
....................})
....except json.JSONDecodeError:
........logging.error("Invalid JSON received over WebSocket")
async def start_websocket_storage(app):
....# Инициализируем хранилище для веб-сокет подключений
....app['websockets'] = set()
async def cleanup_websocket_storage(app):
....# Закрываем все подключения при остановке приложения
....for ws in set(app['websockets']):
........await ws.close(code=1001, message='Server shutdown')

Компонент 8: API с версионированием и документацией

Создадим структурированный API с поддержкой версионирования:

def setup_api_routes(app):
....# Создаем подмаршрутизатор для API
....api_v1 = web.RouteTableDef()
....@api_v1.get('/users')
....async def api_get_users(request):
........pool = request.app['db_pool']
........async with pool.acquire() as connection:
............users = await connection.fetch('SELECT * FROM users')
........return web.json_response([dict(user) for user in users])
....@api_v1.get('/users/{id}')
....async def api_get_user(request):
........user_id = request.match_info['id']
........pool = request.app['db_pool']
........async with pool.acquire() as connection:
............user = await connection.fetchrow(
................'SELECT * FROM users WHERE id = $1', int(user_id)
............)
........if not user:
............return web.json_response({'error': 'User not found'}, status=404)
........return web.json_response(dict(user))
....@api_v1.post('/users')
....async def api_create_user(request):
........data = await request.json()
........# Валидация данных
........if not data.get('name') or not data.get('email'):
............return web.json_response(
................{'error': 'Name and email are required'}, status=400
............)
........pool = request.app['db_pool']
........async with pool.acquire() as connection:
............try:
................await connection.execute(
....................'INSERT INTO users(name, email) VALUES($1, $2)',
....................data['name'], data['email']
................)
............except asyncpg.UniqueViolationError:
................return web.json_response(
....................{'error': 'Email already exists'}, status=409
................)
............return web.json_response({'status': 'created'}, status=201)
....# Подключаем API маршруты с префиксом /api/v1
....app.router.add_routes(api_v1, prefix='/api/v1')

Интеграция всех компонентов

Теперь модифицируем функцию создания приложения для интеграции всех компонентов:

async def create_app():
....app = web.Application()
....# Настройка логирования
....logging.basicConfig(level=logging.INFO)
....app['logger'] = logging.getLogger('aiohttp')
....# Инициализация всех компонентов
....await setup_database(app)
....await start_websocket_storage(app)
....setup_routes(app)
....setup_api_routes(app)
....setup_middleware(app)
....setup_template_engine(app)
....setup_error_handlers(app)
....setup_background_tasks(app)
....setup_websocket_endpoints(app)
....# Обработчики для управления жизненным циклом
....app.on_startup.append(start_background_tasks)
....app.on_startup.append(start_websocket_storage)
....app.on_cleanup.append(cleanup_background_tasks)
....app.on_cleanup.append(cleanup_websocket_storage)
....return app

Запуск приложения

Создадим точку входа для запуска нашего приложения:

if __name__ == '__main__':
....# Запускаем приложение
....app = create_app()
....web.run_app(app, host='0.0.0.0', port=8080)

Для production-окружения рекомендуется использовать ASGI-сервер, такой как Gunicorn с Uvicorn workers:

gunicorn myapp:create_app --bind 0.0.0.0:8080 --worker-class aiohttp.GunicornWebWorker

Тестирование приложения

Напишем базовые тесты для нашего приложения:

from aiohttp.test_utils import AioHTTPTestCase, unittest_run_loop
class MyAppTestCase(AioHTTPTestCase):
....async def get_application(self):
........return await create_app()
....@unittest_run_loop
....async def test_index(self):
........resp = await self.client.request("GET", "/")
........assert resp.status == 200
........text = await resp.text()
........assert "Добро пожаловать" in text
....@unittest_run_loop
....async def test_users_api(self):
........resp = await self.client.request("GET", "/api/v1/users")
........assert resp.status == 200
........data = await resp.json()
........assert isinstance(data, list)

Заключение

Мы создали полнофункциональное многокомпонентное веб-приложение на aiohttp, которое включает:

1. Маршрутизацию с поддержкой REST API

2. Подключение к базе данных с пулом соединений

3. Middleware для аутентификации и логирования

4. Шаблонизацию с Jinja2

5. Фоновые задачи для периодических операций

6. Обработку ошибок с пользовательскими страницами

7. Веб-сокеты для работы в реальном времени

8. Структурированный API с версионированием

Такой подход позволяет создавать масштабируемые и поддерживаемые приложения, где каждый компонент отвечает за свою конкретную задачу. aiohttp предоставляет все необходимые инструменты для построения высокопроизводительных асинхронных веб-приложений на Python.

Дальнейшие шаги для развития приложения могут включать добавление кэширования, более сложную систему аутентификации, интеграцию с очередями сообщений и контейнеризацию приложения с помощью Docker.

Подписывайтесь:

Телеграм https://t.me/lets_go_code
Канал "Просто о программировании"
https://dzen.ru/lets_go_code