Найти в Дзене

Asyncio ensure_future в Python: Подробное руководство с примерами

Asyncio — это мощная библиотека Python для написания параллельного кода с использованием синтаксиса async/await. Одной из ключевых возможностей asyncio является запуск корутин в качестве задач (Task), которые выполняются конкурентно в цикле событий. В этой статье мы глубоко погрузимся в функцию `ensure_future`, изучим её назначение, особенности и практическое применение. `asyncio.ensure_future` — это функция, которая преобразует объект корутины в объект Task, планируя его выполнение в цикле событий. Если переданный объект уже является Future или Task, он возвращается без изменений. - Преобразование корутин в задачи для параллельного выполнения - Обеспечение совместимости между разными типами асинхронных объектов - Планирование выполнения в цикле событий Рассмотрим простой пример использования `ensure_future`: import asyncio async def simple_coroutine(): ....print("Начало корутины") ....await asyncio.sleep(1) ....print("Корутина завершена") async def main(): ....# Создаем задачу из ко
Оглавление

Asyncio — это мощная библиотека Python для написания параллельного кода с использованием синтаксиса async/await. Одной из ключевых возможностей asyncio является запуск корутин в качестве задач (Task), которые выполняются конкурентно в цикле событий. В этой статье мы глубоко погрузимся в функцию `ensure_future`, изучим её назначение, особенности и практическое применение.

Что такое ensure_future?

`asyncio.ensure_future` — это функция, которая преобразует объект корутины в объект Task, планируя его выполнение в цикле событий. Если переданный объект уже является Future или Task, он возвращается без изменений.

Основное назначение

- Преобразование корутин в задачи для параллельного выполнения

- Обеспечение совместимости между разными типами асинхронных объектов

- Планирование выполнения в цикле событий

Базовое использование

Рассмотрим простой пример использования `ensure_future`:

import asyncio
async def simple_coroutine():
....print("Начало корутины")
....await asyncio.sleep(1)
....print("Корутина завершена")
async def main():
....# Создаем задачу из корутины
....task = asyncio.ensure_future(simple_coroutine())
....# Задача уже запланирована на выполнение
....# Ждем завершения задачи
....await task
....# Запуск основной программы
....loop = asyncio.get_event_loop()
....loop.run_until_complete(main())

ensure_future vs create_task

В Python 3.7+ появилась функция `asyncio.create_task()`, которая рекомендуется для создания задач. Однако между ними есть важные различия:

async def example():
....coro = some_coroutine()
....# Рекомендуемый способ в Python 3.7+
....task1 = asyncio.create_task(coro)
....# Альтернативный способ (работает в более старых версиях)
....task2 = asyncio.ensure_future(coro)
....# ensure_future может принимать любые awaitable объекты
....future = asyncio.Future()
....task3 = asyncio.ensure_future(future) # Возвращает тот же future

Ключевые различия:

- `create_task` принимает только корутины

- `ensure_future` принимает корутины, Future и другие awaitable объекты

- `create_task` доступен только в Python 3.7+

Практические примеры использования

Параллельное выполнение задач

import asyncio
import time
async def task(name, duration):
....print(f"Задача '{name}' началась")
....await asyncio.sleep(duration)
....print(f"Задача '{name}' завершена после {duration} сек")
....return f"Результат {name}"
async def main():
....start_time = time.time()
....# Создаем несколько задач
....task1 = asyncio.ensure_future(task("A", 2))
....task2 = asyncio.ensure_future(task("B", 1))
....task3 = asyncio.ensure_future(task("C", 3))
....# Ждем завершения всех задач
....results = await asyncio.gather(task1, task2, task3)
....print(f"Все задачи завершены. Результаты: {results}")
....end_time = time.time()
....print(f"Общее время выполнения: {end_time - start_time:.2f} сек")
asyncio.run(main())

Обработка исключений в задачах

import asyncio
async def risky_task(name):
....print(f"Задача '{name}' выполняется")
....await asyncio.sleep(1)
....if name == "B":
........raise ValueError("Искусственная ошибка в задаче B")
....return f"Успех {name}"
async def main():
....tasks = [
........asyncio.ensure_future(risky_task("A")),
........asyncio.ensure_future(risky_task("B")),
........asyncio.ensure_future(risky_task("C"))
....]
....try:
........# Ждем завершения всех задач
........results = await asyncio.gather(*tasks, return_exceptions=True)
........print("Результаты:", results)
....except Exception as e:
........print(f"Поймано исключение: {e}")
....# Альтернативный подход - обработка для каждой задачи
....for task in tasks:
........try:
............result = await task
............print(f"Результат задачи: {result}")
........except Exception as e:
............print(f"Ошибка в задаче: {e}")
asyncio.run(main())

Ожидание с таймаутом

import asyncio
async def long_running_task():
....print("Долгая задача началась")
....await asyncio.sleep(5)
....print("Долгая задача завершена")
....return "Готово"
async def main():
....# Создаем задачу
....task = asyncio.ensure_future(long_running_task())
....try:
........# Ждем с таймаутом
........result = await asyncio.wait_for(task, timeout=2.0)
........print(f"Результат: {result}")
....except asyncio.TimeoutError:
........print("Задача не завершилась в срок")
....# Проверяем состояние задачи
....if not task.done():
........print("Задача все еще выполняется")
........# Можно отменить задачу
........task.cancel()
....try:
........await task
....except asyncio.CancelledError:
........print("Задача была отменена")
asyncio.run(main())

Работа с результатами задач

Получение результатов по завершении

import asyncio
async def calculate_square(x):
....await asyncio.sleep(1)
....return x * x
async def main():
....# Создаем несколько задач
....numbers = [1, 2, 3, 4, 5]
....tasks = [asyncio.ensure_future(calculate_square(n)) for n in numbers]
....# Ждем завершения каждой задачи и обрабатываем результат
....for task in asyncio.as_completed(tasks):
........result = await task
........print(f"Получен результат: {result}")
........# Альтернативно: получить все результаты сразу
........results = await asyncio.gather(*tasks)
....print("Все результаты:", results)
asyncio.run(main())

Использование callback-функций

import asyncio
def handle_result(future):
....try:
........result = future.result()
........print(f"Callback: Задача завершена с результатом {result}")
....except Exception as e:
........print(f"Callback: Задача завершена с ошибкой {e}")
async def successful_task():
....await asyncio.sleep(1)
....return "Успех"
async def failing_task():
....await asyncio.sleep(1)
....raise ValueError("Что-то пошло не так")
async def main():
....# Создаем задачи и добавляем callback
....task1 = asyncio.ensure_future(successful_task())
....task1.add_done_callback(handle_result)
....task2 = asyncio.ensure_future(failing_task())
....task2.add_done_callback(handle_result)
....# Ждем завершения задач
....await asyncio.sleep(2)
asyncio.run(main())

Продвинутые техники

Цепочки задач

import asyncio
async def first_task():
....print("Первая задача началась")
....await asyncio.sleep(1)
....print("Первая задача завершена")
....return 10
async def second_task(x):
....print("Вторая задача началась")
....await asyncio.sleep(1)
....print("Вторая задача завершена")
....return x * 2
async def third_task(x):
....print("Третья задача началась")
....await asyncio.sleep(1)
....print("Третья задача завершена")
....return x + 5
async def main():
....# Создаем цепочку задач
....first = asyncio.ensure_future(first_task())
....# Создаем задачу, которая зависит от результата первой
async def chained_tasks():
....result1 = await first
....result2 = await asyncio.ensure_future(second_task(result1))
....result3 = await asyncio.ensure_future(third_task(result2))
....return result3
# Запускаем цепочку
final_result = await chained_tasks()
print(f"Финальный результат: {final_result}")
asyncio.run(main())

Ограничение количества одновременных задач

import asyncio
from asyncio import Semaphore
async def limited_task(name, semaphore):
....async with semaphore:
....print(f"Задача '{name}' начала выполнение")
....await asyncio.sleep(2)
....print(f"Задача '{name}' завершила выполнение")
....return name
async def main():
....# Ограничиваем количество одновременных задач до 2
....semaphore = Semaphore(2)
....# Создаем задачи
....tasks = [
........asyncio.ensure_future(limited_task(f"Task-{i}", semaphore))
........for i in range(5)
....]
....# Ждем завершения всех задач
....results = await asyncio.gather(*tasks)
....print("Все задачи завершены:", results)
asyncio.run(main())

Обработка отмены задач

import asyncio
async def cancellable_task():
....try:
........print("Задача началась")
........for i in range(10):
............print(f"Шаг {i}")
............await asyncio.sleep(0.5)
............return "Завершено"
....except asyncio.CancelledError:
........print("Задача была отменена")
........raise
....finally:
........print("Задача завершает работу")
async def main():
....# Создаем задачу
....task = asyncio.ensure_future(cancellable_task())
....# Даем задаче немного поработать
....await asyncio.sleep(2)
....# Отменяем задачу
....task.cancel()
....try:
........# Пытаемся получить результат
........result = await task
........print(f"Результат: {result}")
....except asyncio.CancelledError:
........print("Задача была успешно отменена")
asyncio.run(main())

Лучшие практики и советы

1. Используйте create_task вместо ensure_future для корутин** (в Python 3.7+)

2. Всегда обрабатывайте исключения в задачах

3. Используйте таймауты для бесконечного ожидания

4. Отменяйте ненужные задачи для освобождения ресурсо

5. Ограничивайте количество одновременных задач при работе с ресурсами

Заключение

`asyncio.ensure_future` — это мощный инструмент для работы с асинхронными задачами в Python. Понимание его работы и правильное применение позволяет эффективно использовать возможности асинхронного программирования, создавать отзывчивые и производительные приложения.

Несмотря на появление более специализированной функции `create_task`, `ensure_future` остается важным инструментом, особенно при работе с различными типами awaitable-объектов и в коде, который должен быть совместим с разными версиями Python.

Помните, что с большой силой приходит и большая ответственность — всегда тщательно управляйте жизненным циклом задач, обрабатывайте исключения и обеспечивайте корректное завершение работы ваших асинхронных приложений.

Подписывайтесь:

Телеграм https://t.me/lets_go_code
Канал "Просто о программировании"
https://dzen.ru/lets_go_code