Асинхронное программирование в Python через asyncio позволяет эффективно выполнять I/O-операции без блокировок. Однако при работе с ограниченными ресурсами (сетевые соединения, API с rate-limiting, базы данных) неконтролируемый параллелизм вызывает проблемы:
- Перегрузка серверов/API.
- Исчерпание файловых дескрипторов.
- Ошибки типа Too many open files.
Семафоры решают эти задачи, ограничивая количество одновременных операций.
2. Что такое семафор? Теория
Семафор — примитив синхронизации, управляющий доступом к общему ресурсу. Состоит из счётчика и очереди ожидания:
- Счётчик: Максимальное число "разрешений" на доступ.
- Принцип работы:
- Корутина "захватывает" семафор (`acquire()`), уменьшая счётчик. Если счётчик = 0 — корутина ждёт.
- После завершения операции корутина "освобождает" семафор (`release()`), увеличивая счётчик и пробуждая ждущие корутины.
В `asyncio` реализован через `asyncio.Semaphore`.
3. Создание и базовое использование asyncio.Semaphore
import asyncio
sem = asyncio.Semaphore(value=3) # Максимум 3 одновременных операции
Контекстный менеджер `async with` (рекомендуемый способ):
async def task(id):
....async with sem: # Автоматический acquire/release
........print(f"Задача {id} начала работу")
........await asyncio.sleep(1)
........print(f"Задача {id} завершилась")
Ручное управление:
async def task(id):
....await sem.acquire() # Ждём разрешения
....try:
........print(f"Задача {id} начала работу")
........await asyncio.sleep(1)
....finally:
....sem.release() # Важно! Даже при ошибке
4. Подробный пример: Ограничение запросов к API
Задача: Отправить 100 запросов к API с ограничением 5 одновременных запросов.
import aiohttp
import asyncio
async def fetch(url, sem, id):
....async with sem:
........async with aiohttp.ClientSession() as session:
............print(f"Запрос {id} начат")
............async with session.get(url) as response:
................data = await response.json()
................print(f"Запрос {id} завершён, статус: {response.status}")
................return data
async def main():
....sem = asyncio.Semaphore(5)
....urls = ["https://api.example.com/data"] * 100
....tasks = [
........fetch(url, sem, i)
........for i, url in enumerate(urls)
....]
....results = await asyncio.gather(*tasks)
asyncio.run(main())
Как это работает:
1. Создаётся семафор на 5 разрешений.
2. 100 корутин `fetch` пытаются выполнить запрос.
3. Только 5 корутин одновременно получают доступ к API.
4. Как только одна завершается, следующая в очереди активируется.
5. Обработка ошибок и гарантия освобождения семафора
Проблема: Если в корутине возникает ошибка до вызова `release()`, семафор "застревает".
Решение: Использовать `try/finally` или контекстный менеджер.
Пример с обработкой исключений:
async def safe_task(id):
....await sem.acquire()
....try:
........print(f"Задача {id} работает")
........await asyncio.sleep(0.5)
........if id == 3:
........raise ValueError("Ошибка в задаче 3!")
....finally:
........sem.release() # Гарантированное освобождение
6. Динамическое изменение лимита семафора
Лимит семафора можно менять "на лету":
sem = asyncio.Semaphore(3)
print(sem._value) # 3
# Увеличить лимит до 5
sem._value = 5 # Внутренняя переменная, осторожно!
# Безопасный способ через наследование:
class DynamicSemaphore(asyncio.Semaphore):
....def set_limit(self, value):
........if value < 0:
............raise ValueError("Limit must be >= 0")
........self._value = value
7. Очередь ожидания и приоритеты
Семафор использует FIFO-очередь (первый пришёл — первый вышел). Приоритеты задаются через отдельную логику:
# Пример с приоритетными задачами
high_priority_tasks = []
low_priority_tasks = []
async def prioritized_task(id, priority):
....if priority == "high":
........high_priority_tasks.append(id)
....else:
........low_priority_tasks.append(id)
....await sem.acquire()
....try:
........print(f"Выполняется задача {id} ({priority})")
........await asyncio.sleep(1)
....finally:
........sem.release()
8. Семафор vs Библютоки (BoundedSemaphore)
- `Semaphore`: Число `release()` может превысить начальное значение.
- `BoundedSemaphore`: Выбрасывает `ValueError` при превышении лимита.
sem = asyncio.BoundedSemaphore(2)
await sem.acquire()
sem.release()
sem.release() # ValueError: Semaphore released too many times
Рекомендация: Используйте `BoundedSemaphore` для избежания логических ошибок.
9. Реальные кейсы использования
1. Парсинг веб-страниц: Ограничение параллельных запросов к сайту.
2. Работа с базами данных: Контроль соединений с СУБД.
3. Интеграция с API с rate-limiting: 100 запросов в минуту.
4. Управление памятью: Ограничение одновременной обработки больших файлов.
10. Пример: Семафоры в цепочках корутин
Семафоры можно передавать между корутинами для координации:
async def worker(sem, data):
....async with sem:
........processed = await process_data(data)
........await send_to_db(processed) # Внутри тоже может быть семафор!
async def process_data(data):
...
async def send_to_db(data):
....async with db_sem: # Другой семафор для БД
...
11. Отладка и мониторинг
Проверка состояния семафора:
print("Свободные слоты:", sem._value) # Текущий счётчик
print("Ожидающие задачи:", len(sem._waiters)) # Корутины в очереди
Визуализация через логи:
async def task(id):
....print(f"[DEBUG] Задача {id} ждёт семафора (свободно: {sem._value})")
async with sem:
....print(f"[DEBUG] Задача {id} получила семафор (свободно: {sem._value})")
12. Ограничения и лучшие практики
- Не блокируйте семафор надолго: Используйте только для I/O, не для CPU-bound операций.
- Избегайте вложенных семафоров: Риск взаимоблокировок (deadlocks).
- Тестируйте под нагрузкой: Убедитесь, что лимит оптимален.
- Комбинируйте с другими примитивами: `asyncio.Event`, `asyncio.Lock` для сложных сценариев.
13. Полный пример: Скачивание файлов с ограничением
import aiohttp
import asyncio
import os
async def download_file(url, sem, folder, id):
....async with sem:
........async with aiohttp.ClientSession() as session:
............async with session.get(url) as response:
................filename = url.split("/")[-1]
................path = os.path.join(folder, filename)
................with open(path, "wb") as f:
....................while chunk := await response.content.read(1024):
........................f.write(chunk)
................print(f"Файл {id} скачан: {filename}")
async def main():
....sem = asyncio.Semaphore(10) # 10 параллельных загрузок
....urls = [
........"https://example.com/file1.zip",
........"https://example.com/file2.jpg",
........# ... 100 ссылок
....]
....tasks = [
........download_file(url, sem, "downloads", i)
........for i, url in enumerate(urls)
....]
....await asyncio.gather(*tasks)
asyncio.run(main())
14. Заключение
`asyncio.Semaphore` — ключевой инструмент для:
- Контроля параллелизма в асинхронных приложениях.
- Предотвращения перегрузки ресурсов.
- Реализации сложных сценариев синхронизации.
Главные правила:
- Всегда используйте `async with` для гарантии освобождения.
- Выбирайте лимит на основе тестов.
- Для жёстких ограничений применяйте `BoundedSemaphore`.
Используя семафоры, вы создаёте отказоустойчивые и эффективные асинхронные системы, готовые к высоким нагрузкам.
Подписывайтесь:
Телеграм https://t.me/lets_go_code
Канал "Просто о программировании" https://dzen.ru/lets_go_code