Найти в Дзене

asyncio.Semaphore в Python: контроль параллелизма с примерами

Оглавление

Асинхронное программирование в Python через asyncio позволяет эффективно выполнять I/O-операции без блокировок. Однако при работе с ограниченными ресурсами (сетевые соединения, API с rate-limiting, базы данных) неконтролируемый параллелизм вызывает проблемы:

- Перегрузка серверов/API.

- Исчерпание файловых дескрипторов.

- Ошибки типа Too many open files.

Семафоры решают эти задачи, ограничивая количество одновременных операций.

2. Что такое семафор? Теория

Семафор — примитив синхронизации, управляющий доступом к общему ресурсу. Состоит из счётчика и очереди ожидания:

- Счётчик: Максимальное число "разрешений" на доступ.

- Принцип работы:

- Корутина "захватывает" семафор (`acquire()`), уменьшая счётчик. Если счётчик = 0 — корутина ждёт.

- После завершения операции корутина "освобождает" семафор (`release()`), увеличивая счётчик и пробуждая ждущие корутины.

В `asyncio` реализован через `asyncio.Semaphore`.

3. Создание и базовое использование asyncio.Semaphore

import asyncio
sem = asyncio.Semaphore(value=3) # Максимум 3 одновременных операции

Контекстный менеджер `async with` (рекомендуемый способ):

async def task(id):
....async with sem: # Автоматический acquire/release
........print(f"Задача {id} начала работу")
........await asyncio.sleep(1)
........print(f"Задача {id} завершилась")

Ручное управление:

async def task(id):
....await sem.acquire() # Ждём разрешения
....try:
........print(f"Задача {id} начала работу")
........await asyncio.sleep(1)
....finally:
....sem.release() # Важно! Даже при ошибке

4. Подробный пример: Ограничение запросов к API

Задача: Отправить 100 запросов к API с ограничением 5 одновременных запросов.

import aiohttp
import asyncio
async def fetch(url, sem, id):
....async with sem:
........async with aiohttp.ClientSession() as session:
............print(f"Запрос {id} начат")
............async with session.get(url) as response:
................data = await response.json()
................print(f"Запрос {id} завершён, статус: {response.status}")
................return data
async def main():
....sem = asyncio.Semaphore(5)
....urls = ["https://api.example.com/data"] * 100
....tasks = [
........fetch(url, sem, i)
........for i, url in enumerate(urls)
....]
....results = await asyncio.gather(*tasks)
asyncio.run(main())

Как это работает:

1. Создаётся семафор на 5 разрешений.

2. 100 корутин `fetch` пытаются выполнить запрос.

3. Только 5 корутин одновременно получают доступ к API.

4. Как только одна завершается, следующая в очереди активируется.

5. Обработка ошибок и гарантия освобождения семафора

Проблема: Если в корутине возникает ошибка до вызова `release()`, семафор "застревает".

Решение: Использовать `try/finally` или контекстный менеджер.

Пример с обработкой исключений:

async def safe_task(id):
....await sem.acquire()
....try:
........print(f"Задача {id} работает")
........await asyncio.sleep(0.5)
........if id == 3:
........raise ValueError("Ошибка в задаче 3!")
....finally:
........sem.release() # Гарантированное освобождение

6. Динамическое изменение лимита семафора

Лимит семафора можно менять "на лету":

sem = asyncio.Semaphore(3)
print(sem._value) # 3
# Увеличить лимит до 5
sem._value = 5 # Внутренняя переменная, осторожно!
# Безопасный способ через наследование:
class DynamicSemaphore(asyncio.Semaphore):
....def set_limit(self, value):
........if value < 0:
............raise ValueError("Limit must be >= 0")
........self._value = value

7. Очередь ожидания и приоритеты

Семафор использует FIFO-очередь (первый пришёл — первый вышел). Приоритеты задаются через отдельную логику:

# Пример с приоритетными задачами
high_priority_tasks = []
low_priority_tasks = []
async def prioritized_task(id, priority):
....if priority == "high":
........high_priority_tasks.append(id)
....else:
........low_priority_tasks.append(id)
....await sem.acquire()
....try:
........print(f"Выполняется задача {id} ({priority})")
........await asyncio.sleep(1)
....finally:
........sem.release()

8. Семафор vs Библютоки (BoundedSemaphore)

- `Semaphore`: Число `release()` может превысить начальное значение.

- `BoundedSemaphore`: Выбрасывает `ValueError` при превышении лимита.

sem = asyncio.BoundedSemaphore(2)
await sem.acquire()
sem.release()
sem.release() # ValueError: Semaphore released too many times

Рекомендация: Используйте `BoundedSemaphore` для избежания логических ошибок.

9. Реальные кейсы использования

1. Парсинг веб-страниц: Ограничение параллельных запросов к сайту.

2. Работа с базами данных: Контроль соединений с СУБД.

3. Интеграция с API с rate-limiting: 100 запросов в минуту.

4. Управление памятью: Ограничение одновременной обработки больших файлов.

10. Пример: Семафоры в цепочках корутин

Семафоры можно передавать между корутинами для координации:

async def worker(sem, data):
....async with sem:
........processed = await process_data(data)
........await send_to_db(processed) # Внутри тоже может быть семафор!
async def process_data(data):
...
async def send_to_db(data):
....async with db_sem: # Другой семафор для БД
...

11. Отладка и мониторинг

Проверка состояния семафора:

print("Свободные слоты:", sem._value) # Текущий счётчик
print("Ожидающие задачи:", len(sem._waiters)) # Корутины в очереди

Визуализация через логи:

async def task(id):
....print(f"[DEBUG] Задача {id} ждёт семафора (свободно: {sem._value})")
async with sem:
....print(f"[DEBUG] Задача {id} получила семафор (свободно: {sem._value})")

12. Ограничения и лучшие практики

- Не блокируйте семафор надолго: Используйте только для I/O, не для CPU-bound операций.

- Избегайте вложенных семафоров: Риск взаимоблокировок (deadlocks).

- Тестируйте под нагрузкой: Убедитесь, что лимит оптимален.

- Комбинируйте с другими примитивами: `asyncio.Event`, `asyncio.Lock` для сложных сценариев.

13. Полный пример: Скачивание файлов с ограничением

import aiohttp
import asyncio
import os
async def download_file(url, sem, folder, id):
....async with sem:
........async with aiohttp.ClientSession() as session:
............async with session.get(url) as response:
................filename = url.split("/")[-1]
................path = os.path.join(folder, filename)
................with open(path, "wb") as f:
....................while chunk := await response.content.read(1024):
........................f.write(chunk)
................print(f"Файл {id} скачан: {filename}")
async def main():
....sem = asyncio.Semaphore(10) # 10 параллельных загрузок
....urls = [
........"https://example.com/file1.zip",
........"https://example.com/file2.jpg",
........# ... 100 ссылок
....]
....tasks = [
........download_file(url, sem, "downloads", i)
........for i, url in enumerate(urls)
....]
....await asyncio.gather(*tasks)
asyncio.run(main())

14. Заключение

`asyncio.Semaphore` — ключевой инструмент для:

- Контроля параллелизма в асинхронных приложениях.

- Предотвращения перегрузки ресурсов.

- Реализации сложных сценариев синхронизации.

Главные правила:

- Всегда используйте `async with` для гарантии освобождения.

- Выбирайте лимит на основе тестов.

- Для жёстких ограничений применяйте `BoundedSemaphore`.

Используя семафоры, вы создаёте отказоустойчивые и эффективные асинхронные системы, готовые к высоким нагрузкам.

Подписывайтесь:

Телеграм https://t.me/lets_go_code
Канал "Просто о программировании"
https://dzen.ru/lets_go_code