В современной разработке веб-приложений и API часто возникает необходимость выполнять множество HTTP-запросов одновременно. Традиционные синхронные подходы с использованием библиотек типа `requests` могут стать узким местом производительности, особенно когда приложению нужно обрабатывать множество одновременных соединений.
aiohttp — это мощная библиотека для Python, которая предоставляет асинхронный HTTP-клиент и сервер, построенный на основе `asyncio`. Она позволяет эффективно обрабатывать тысячи одновременных соединений, что делает её идеальным выбором для высокопроизводительных веб-приложений и сервисов.
Ключевые преимущества aiohttp:
- Асинхронность: Неблокирующие операции ввода-вывода
- Производительность: Высокая пропускная способность при малом потреблении ресурсов
- Удобный API: Простые и понятные методы для работы с HTTP
- Поддержка WebSockets: Полноценная работа с веб-сокетами
- Серверные возможности: Создание асинхронных HTTP-серверов
Установка и настройка
Для начала работы с aiohttp необходимо установить библиотеку:
pip install aiohttp
Дополнительно рекомендуется установить библиотеки для ускорения работы:
pip install aiohttp[speedups]
Эта команда установит оптимизированные зависимости (aiodns, cchardet и др.).
Базовое использование: HTTP-клиент
Простой GET-запрос
Рассмотрим простейший пример выполнения асинхронного GET-запроса:
import aiohttp
import asyncio
async def main():
....async with aiohttp.ClientSession() as session:
........async with session.get('https://api.example.com/data') as response:
............print("Status:", response.status)
............print("Content-type:", response.headers['content-type'])
............html = await response.text()
............print("Body:", html[:100], "...")
# Запуск асинхронной функции
asyncio.run(main())
Обработка ошибок
Важно правильно обрабатывать ошибки в асинхронном коде:
import aiohttp
import asyncio
from aiohttp import ClientError, ClientResponseError
async def fetch_data():
....try:
........async with aiohttp.ClientSession() as session:
............async with session.get('https://api.example.com/data') as response:
................response.raise_for_status() # Проверка статуса ответа
................return await response.json()
....except ClientResponseError as e:
........print(f"HTTP Error: {e.status} - {e.message}")
....except ClientError as e:
........print(f"Network Error: {str(e)}")
....except asyncio.TimeoutError:
........print("Request timed out")
....except Exception as e:
........print(f"Unexpected error: {str(e)}")
asyncio.run(fetch_data())
Продвинутые техники работы с клиентом
Использование сессий и контекстных менеджеров
Правильное управление сессиями критически важно для производительности:
import aiohttp
import asyncio
class APIClient:
....def __init__(self):
........self.session = None
....async def __aenter__(self):
........self.session = aiohttp.ClientSession()
........return self
....async def __aexit__(self, exc_type, exc_val, exc_tb):
........await self.session.close()
....async def get_user(self, user_id):
........url = f"https://api.example.com/users/{user_id}"
....async with self.session.get(url) as response:
........return await response.json()
....async def create_user(self, user_data):
........url = "https://api.example.com/users"
....async with self.session.post(url, json=user_data) as response:
........return await response.json()
....async def main():
........async with APIClient() as client:
............# Получаем пользователя
............user = await client.get_user(123)
............print(f"User: {user}")
............# Создаем нового пользователя
............new_user = await client.create_user({
................"name": "John Doe",
................"email": "john@example.com"
............})
...........print(f"Created user: {new_user}")
asyncio.run(main())
Параллельные запросы
Одно из главных преимуществ aiohttp — возможность выполнения множества запросов одновременно:
import aiohttp
import asyncio
async def fetch_url(session, url):
....async with session.get(url) as response:
........return await response.text()
async def main():
....urls = [
........'https://httpbin.org/html',
........'https://httpbin.org/json',
........'https://httpbin.org/xml'
....]
....async with aiohttp.ClientSession() as session:
........# Создаем список задач
........tasks = []
........for url in urls:
............task = asyncio.create_task(fetch_url(session, url))
............tasks.append(task)
........# Ждем завершения всех задач
........results = await asyncio.gather(*tasks, return_exceptions=True)
........# Обрабатываем результаты
........for url, result in zip(urls, results):
............if isinstance(result, Exception):
................print(f"Failed to fetch {url}: {str(result)}")
............else:
................print(f"Fetched {url}, length: {len(result)}")
asyncio.run(main())
Ограничение скорости запросов
Для избежания блокировок при частых запросах к API полезно ограничивать скорость:
import aiohttp
import asyncio
from aiolimiter import AsyncLimiter
# Ограничение: 10 запросов в секунду
limiter = AsyncLimiter(10, 1)
async def limited_request(session, url):
....async with limiter:
........async with session.get(url) as response:
............return await response.text()
async def main():
....urls = [f"https://httpbin.org/delay/{i%3}" for i in range(20)]
....async with aiohttp.ClientSession() as session:
........tasks = [limited_request(session, url) for url in urls]
........results = await asyncio.gather(*tasks)
........for url, content in zip(urls, results):
............print(f"URL: {url}, Length: {len(content)}")
asyncio.run(main())
Работа с cookies и сессиями
aiohttp автоматически управляет cookies, но также предоставляет ручное управление:
import aiohttp
import asyncio
async def login(session, username, password):
....login_data = {
........'username': username,
........'password': password
....}
....async with session.post('https://api.example.com/login', json=login_data) as response:
........# Проверяем успешность авторизации
............if response.status == 200:
................print("Login successful")
................# Возвращаем обновленную сессию с cookies
................return session
............else:
................raise Exception("Login failed")
async def get_protected_data(session):
....async with session.get('https://api.example.com/protected') as response:
........return await response.json()
async def main():
....# Создаем сессию с сохранением cookies между запросами
....async with aiohttp.ClientSession() as session:
........# Логинимся
........await login(session, "user", "pass")
........# Делаем запрос к защищенному ресурсу
........data = await get_protected_data(session)
........print(f"Protected data: {data}")
asyncio.run(main())
Создание HTTP-сервера с aiohttp
aiohttp также позволяет создавать асинхронные HTTP-серверы:
Простой HTTP-сервер
from aiohttp import web
import asyncio
async def handle_index(request):
....return web.Response(text="Hello, World!")
async def handle_user(request):
....user_id = request.match_info.get('user_id', "Anonymous")
....return web.Response(text=f"Hello, user {user_id}!")
async def handle_json(request):
....data = {"message": "Hello JSON!", "status": "success"}
....return web.json_response(data)
async def handle_post(request):
....# Чтение данных из POST-запроса
....data = await request.post()
....name = data.get('name', 'Unknown')
....# Или для JSON:
....# data = await request.json()
....return web.Response(text=f"Hello, {name}!")
# Создание приложения
app = web.Application()
# Добавление маршрутов
app.router.add_get('/', handle_index)
app.router.add_get('/user/{user_id}', handle_user)
app.router.add_get('/json', handle_json)
app.router.add_post('/greet', handle_post)
# Запуск сервера
if __name__ == '__main__':
....web.run_app(app, host='localhost', port=8080)
Промежуточное ПО (Middleware)
Middleware позволяет перехватывать и обрабатывать запросы и ответы:
from aiohttp import web
import time
async def auth_middleware(app, handler):
....async def middleware(request):
........# Проверяем аутентификацию
........auth_token = request.headers.get('Authorization', None)
........if not auth_token or not validate_token(auth_token):
............return web.json_response({'error': 'Unauthorized'}, status=401)
........# Продолжаем обработку запроса
........return await handler(request)
....return middleware
async def logging_middleware(app, handler):
....async def middleware(request):
........# Логируем входящий запрос
........start_time = time.time()
........print(f"Request: {request.method} {request.path}")
........# Обрабатываем запрос
........response = await handler(request)
........# Логируем ответ
........duration = time.time() - start_time
........print(f"Response: {response.status}, Time: {duration:.2f}s")
........return response
return middleware
def validate_token(token):
....# Простая проверка токена
....return token == "Bearer secret-token"
# Создание приложения с middleware
app = web.Application(middlewares=[logging_middleware, auth_middleware])
async def protected_handler(request):
....return web.json_response({"message": "Access granted"})
app.router.add_get('/protected', protected_handler)
if __name__ == '__main__':
....web.run_app(app, port=8080)
Работа с WebSockets
aiohttp предоставляет отличную поддержку WebSockets:
from aiohttp import web
import asyncio
import json
async def websocket_handler(request):
....ws = web.WebSocketResponse()
....await ws.prepare(request)
....print("WebSocket connection established")
....async for msg in ws:
....if msg.type == web.WSMsgType.TEXT:
........try:
............data = json.loads(msg.data)
............print(f"Received: {data}")
............# Ответ клиенту
............response = {"echo": data, "timestamp": asyncio.get_event_loop().time()}
............await ws.send_json(response)
........except json.JSONDecodeError:
............await ws.send_str("Invalid JSON")
....elif msg.type == web.WSMsgType.ERROR:
........print(f"WebSocket error: {ws.exception()}")
........print("WebSocket connection closed")
........return ws
app = web.Application()
app.router.add_get('/ws', websocket_handler)
if __name__ == '__main__':
....web.run_app(app, port=8080)
Тестирование приложений aiohttp
Тестирование клиента
Для тестирования можно использовать aiohttp.test_utils:
import aiohttp
from aiohttp.test_utils import make_mocked_request
import pytest
async def my_handler(request):
....return aiohttp.web.json_response({"status": "ok"})
async def test_my_handler():
....# Создаем mock-запрос
....request = make_mocked_request('GET', '/test')
....# Вызываем обработчик
....response = await my_handler(request)
....# Проверяем результат
....assert response.status == 200
....data = await response.json()
....assert data['status'] == 'ok'
Тестирование сервера
Интеграционное тестирование сервера:
from aiohttp.test_utils import TestClient, TestServer
from aiohttp import web
import pytest
async def hello_handler(request):
....return web.Response(text="Hello, World!")
@pytest.fixture
async def app():
....app = web.Application()
....app.router.add_get('/', hello_handler)
....return app
async def test_hello_handler(aiohttp_client, app):
....client = await aiohttp_client(app)
....# Делаем запрос к приложению
....resp = await client.get('/')
....# Проверяем ответ
....assert resp.status == 200
....text = await resp.text()
....assert text == "Hello, World!"
Оптимизация производительности
Использование connection pool
Пул соединений позволяет переиспользовать соединения к серверу:
import aiohttp
import asyncio
from aiohttp import TCPConnector
async def main():
....# Создаем connector с ограничением пула соединений
....connector = TCPConnector(limit=10, limit_per_host=5)
....async with aiohttp.ClientSession(connector=connector) as session:
........tasks = []
........for i in range(20):
............task = asyncio.create_task(
................session.get(f'https://httpbin.org/delay/1')
............)
............tasks.append(task)
............responses = await asyncio.gather(*tasks)
........for response in responses:
............print(f"Status: {response.status}")
asyncio.run(main())
Кэширование ответов
Для уменьшения количества запросов можно реализовать кэширование:
import aiohttp
import asyncio
from datetime import datetime, timedelta
class CachedSession:
....def __init__(self, session):
........self.session = session
........self.cache = {}
....async def get(self, url, expire_in=300):
........now = datetime.now()
........# Проверяем наличие свежего кэша
........if url in self.cache:
............response, timestamp = self.cache[url]
............if now - timestamp < timedelta(seconds=expire_in):
................return response
........# Делаем новый запрос
........async with self.session.get(url) as response:
............text = await response.text()
............# Сохраняем в кэш
............self.cache[url] = (text, now)
............return text
async def main():
....async with aiohttp.ClientSession() as session:
........cached_session = CachedSession(session)
........# Первый запрос - загружает данные
........result1 = await cached_session.get('https://httpbin.org/json')
........print("First request done")
........# Второй запрос - берет из кэша
........result2 = await cached_session.get('https://httpbin.org/json')
........print("Second request (from cache)")
asyncio.run(main())
Заключение
aiohttp — это мощная и гибкая библиотека для работы с HTTP в асинхронном стиле. Она предоставляет все необходимые инструменты для создания высокопроизводительных клиентов и серверов, способных обрабатывать тысячи одновременных соединений с минимальным потреблением ресурсов.
Ключевые моменты:
1. Асинхронность — основа высокой производительности
2. Правильное управление сессиями критически важно
3. Обработка ошибок требует особого внимания в асинхронном коде
4. Middleware предоставляет мощные возможности для перехвата запросов
5. Тестирование асинхронного кода имеет свои особенности
aiohttp продолжает развиваться и является одним из наиболее популярных решений для асинхронной работы с HTTP в Python. Освоение этой библиотеки позволит вам создавать современные, высокопроизводительные веб-приложения и сервисы.
Подписывайтесь:
Телеграм https://t.me/lets_go_code
Канал "Просто о программировании" https://dzen.ru/lets_go_code