Найти в Дзене
Павлин Шарит

Асинхронное программирование в Python: от простого к сложному

Асинхронное программирование в Python: от простого к сложному. Часть 5 Сегодня завершаем нашу серию про асинхронное программирование и поговорим про очереди (Queue) и асинхронные генераторы. Это те инструменты, которые помогают организовать поток данных между корутинами Начнем с самого частого сценария - обработка данных через очередь: async def download_images(queue: asyncio.Queue, urls: list[str]): """Загружает изображения и кладет их в очередь""" async with aiohttp.ClientSession() as session: for url in urls: try: async with session.get(url) as response: image_data = await response.read() await queue.put((url, image_data)) print(f"Загружено: {url}") except Exception as e: print(f"Ошибка загрузки {url}: {e}") async def process_images(queue: asyncio.Queue): """Обрабатывает изображения из очереди""" while True: try: url, image_data = await queue.get() try: # Какая-то обработка изображения await process_image(image_data) print(f"Обработано: {url}") finally: # Важно! Отмечаем задачу к

Асинхронное программирование в Python: от простого к сложному. Часть 5

Сегодня завершаем нашу серию про асинхронное программирование и поговорим про очереди (Queue) и асинхронные генераторы. Это те инструменты, которые помогают организовать поток данных между корутинами

Начнем с самого частого сценария - обработка данных через очередь:

async def download_images(queue: asyncio.Queue, urls: list[str]):

"""Загружает изображения и кладет их в очередь"""

async with aiohttp.ClientSession() as session:

for url in urls:

try:

async with session.get(url) as response:

image_data = await response.read()

await queue.put((url, image_data))

print(f"Загружено: {url}")

except Exception as e:

print(f"Ошибка загрузки {url}: {e}")

async def process_images(queue: asyncio.Queue):

"""Обрабатывает изображения из очереди"""

while True:

try:

url, image_data = await queue.get()

try:

# Какая-то обработка изображения

await process_image(image_data)

print(f"Обработано: {url}")

finally:

# Важно! Отмечаем задачу как выполненную

queue.task_done()

except asyncio.CancelledError:

break

async def main():

# Создаем очередь с ограничением

queue = asyncio.Queue(maxsize=10)

# Список URL для загрузки

urls = [

"https://example.com/image1.jpg",

"https://example.com/image2.jpg",

"https://example.com/image3.jpg"

]

# Запускаем загрузчик и обработчики

downloader = asyncio.create_task(download_images(queue, urls))

processors = [

asyncio.create_task(process_images(queue))

for _ in range(3) # 3 обработчика

]

# Ждем завершения загрузки

await downloader

# Ждем обработки всех изображений

await queue.join()

# Отменяем обработчики

for proc in processors:

proc.cancel()

Queue в asyncio бывает трёх видов:

- Queue - обычная очередь, первым пришел - первым вышел

- PriorityQueue - элементы обрабатываются по приоритету

- LifoQueue - последним пришел - первым вышел

Важные моменты при работе с очередями:

1. Не забывайте про queue.task_done():

try:

item = await queue.get()

await process_item(item)

finally:

queue.task_done() # Выполняем всегда!

2. Используйте maxsize чтобы контролировать память:

# Очередь не больше 1000 элементов

queue = asyncio.Queue(maxsize=1000)

3. Дожидайтесь обработки всех элементов через join():

# Положили все элементы

await producer()

# Дождались обработки всех

await queue.join()

# Теперь можно останавливать обработчики

for consumer in consumers:

consumer.cancel()

Практические сценарии использования очередей:

- Загрузка и обработка файлов

- Парсинг данных с сайтов

- Обработка сообщений из брокера

- Параллельная обработка задач с ограничением нагрузки

На этом завершаем нашу серию про асинхронное программирование!

Надеюсь, что смогли освежить свои знания и узнать что-то новое

Поддержать на Boosty

Посмотреть на Youtube