Найти в Дзене
11 подписчиков

Асинхронное программирование в Python: от простого к сложному. Часть 2


Продолжаем разбираться с асинхронным программированием. В прошлый раз мы познакомились с базовыми концептами, а сегодня поговорим о задачах (Task) и главных инструментах для управления ими

Начнем с самого важного - что такое Task?
Task - это "обертка" вокруг корутины, которая позволяет:
- Отслеживать состояние выполнения (запущена, завершена, отменена)
- Получать результат выполнения
- Управлять жизненным циклом (запуск, отмена, ожидание)
- Обрабатывать ошибки

У Task есть несколько полезных свойств и методов:
task = asyncio.create_task(some_coro())

# Свойства таски
print(task.done()) # Завершена ли задача
print(task.cancelled()) # Была ли отменена
print(task.get_name()) # Имя задачи
print(task.get_coro()) # Получить корутину
print(task._state) # Текущее состояние (PENDING, RUNNING, FINISHED, CANCELLED)

# Методы управления
task.cancel() # Отменить задачу
task.set_name("my_task") # Установить имя
result = await task # Дождаться результата

Теперь про инструменты для работы с несколькими задачами. Их два основных:
1. gather() - когда нужно запустить все задачи параллельно и дождаться их результатов:
# Запускаем все задачи параллельно
results = await asyncio.gather(
task1(),
task2(),
task3(),
return_exceptions=True # Не падаем при ошибках
)

# Результаты в том же порядке, что и задачи
for result in results:
if isinstance(result, Exception):
print(f"Ошибка: {result}")
else:
process_result(result)

2. wait() - когда нужен более гибкий контроль:
# Создаем задачи
tasks = [
asyncio.create_task(long_operation1()),
asyncio.create_task(long_operation2()),
asyncio.create_task(long_operation3())
]

# Варианты return_when:
# FIRST_COMPLETED - дождаться первой завершенной задачи
# FIRST_EXCEPTION - дождаться первой ошибки или завершения всех задач
# ALL_COMPLETED - дождаться завершения всех задач

# Пример: ждем первую завершенную
done, pending = await asyncio.wait(
tasks,
timeout=10,
return_when=asyncio.FIRST_COMPLETED
)

# Пример: ждем все с таймаутом
done, pending = await asyncio.wait(
tasks,
timeout=5,
return_when=asyncio.ALL_COMPLETED
)

# Обработка результатов
for task in done:
try:
result = task.result()
process_result(result)
except Exception as e:
handle_error(e)

# Отменяем незавершенные
for task in pending:
task.cancel()

Про отмену задач:
CancelledError возникает в трех случаях:
- Явный вызов task.cancel()
- Таймаут в wait()
- Отмена родительской задачи

Правильная обработка отмены:
async def process_queue():
# Создаем подключение к базе
db = await create_db_connection()
# Создаем подзадачи для обработки
worker_tasks = []

try:
while True:
# Получаем новые задания
messages = await db.fetch_messages(limit=10)

# Создаем задачи для обработки
for msg in messages:
task = asyncio.create_task(
process_message(msg, db),
name=f"worker-{msg.id}"
)
worker_tasks.append(task)

# Ждем немного перед следующей проверкой
await asyncio.sleep(1)

except asyncio.CancelledError:
# 1. Отменяем все рабочие задачи
for task in worker_tasks:
if not task.done():
task.cancel()

# 2. Ждем их завершение
if worker_tasks:
await asyncio.wait(worker_tasks)

# 3. Закрываем соединение с БД
await db.close()

# 4. Пробрасываем отмену дальше
raise

В следующей части поговорим про управление ресурсами: Semaphore, Lock и Event

2 минуты