Asyncio - библиотека для асинхронного программирования на python
Состоит из High Level и Low Level API
High Level API
Coroutines and Tasks
Определяются путём использования синтаксиса async/await. В документации описаны как preffered way of writing asyncio applications.
Простой вызов функции не приводит к выполнению кода. Чтобы запустить coroutine - asyncio поставляет три способа:
- asyncio.run()
- Использование инструкции await (только в пределах async функции)
- asyncio.create_task() для конкурентного запуска асинхронных программ
Awaitables
Объект можно назвать awaitable если он может использоваться с выражением await. Существует три типа: coroutines, Tasks, Futures
coroutines - могут быть вложены друг в друга
Tasks - когда coroutine обёрнута в Task с помощью функции типа asyncio.create_task - она автоматически планируется к запуску
Futures - специальный низкоуровневый объект который поставляет возможный результат асинхронной функции. Когда такой объект используется вместе с выражением await - имеется ввиду то, что coroutine будет ожидать выполнения Future которое происходит в каком-то другом месте. <== привести пример. С их помощью достигается поддержка механизма callback и в обычной ситуации их не нужно создавать явно в коде.
Запуск
asyncio.run(coro, *, debug=False)
Запускает переданную ф-ю, заботится о менеджменте цикла событий. Завершает асинхронные генераторы и закрывает threadpool.
Не может быть вызвана в то время как запущена другая такая же. Или иными словами если в том же потоке запущен другой цикл событий.
Всегда создаёт новый цикл событий и закрывает его когда работа окончена. Используется как точка входа в асинхронное приложение. В идеале вызывается лишь раз.
Создание Tasks
asyncio.create_task(coro, *, name=None)
Оборачивает coro в Task и планирует её выполнение. Возвращает объект Task
Запуск будет запланирован в цикле событий возвращённом функцией get_running_loop(), если цикл не запущен в текущем потоке - вызывается исключение RuntimeError.
Sleeping
coroutine asyncio.sleep(delay, result=None, *, loop=None)
Блокирует выполнение на delay секунд. Если передан result - он будет возвращён в вызывающий объект когда корутина завершится. Всегда приостанавливает задачу позволяя запускать другие задачи.
Running Tasks Concurrently
awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)
Конкурентно запускает awaitable objects в последовательности aws (очевидно в той последовательности в которой они хранятся в переменной aws).
Если все awaitables завершились успешно - result представляет собой агрегированный список возвращённых значений. Порядок значений такой же как и порядок выполнения.
Если return_exceptions = False - первое выброшенное исключение немедленно передаётся в ожидающий объект. Остальные awaitables не прерывают выполнение.
Если return_exceptions = True - исключение будет передано в список возвращённых значений.
Если gather прерван или отменён - все выполняющиеся корутины также прекращают выполнение.
Если отменяется или прерывается одна из корутин - gather не прерывает выполнение.
Сам оборачивает корутины в Task
Защита от отмены
awaitable asyncio.shield(aw, *, loop=None)
Защищает awaitable object от отмены
Если aw корутина - она автоматически планируется к запуску
res = await shield(some_func())
эквивалентно вызову
res = await some_func()
с той разницей, что при использовании shield, для some_func, при попытке её прервать, ничего не произойдёт. Но исключение CancellError всё равно будет выброшено и попадёт в res.
Timeouts
coroutine asyncio.wait_for(aw, timeout, *, loop=None)
Если aw корутина, то она автоматически планируется к запуску в цикле событий
Если timeout None - выполнение блокируется до выполнения Future
Если происходит timeout, он отменяет задачу и выбрасывает asyncio.TimeoutError
Чтобы препятствовать отмене задачи - оберните её в shield
Если wait отменяется, то future aw тоже отменяется
Waiting Primitives
coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
Конкурентно выполняет awaitable objects из переменной aws и не прекращает работу до выполнения условия в return_when
Возвращает два множества Tasks/Futures: (done, pending)
Использование
done, pending = await asyncio.wait(aws)
timeout = максимальное кол-во секунд которое будет продолжаться выполнение. Не выбрасывает TimeoutError. Незавершённые Futures и Tasks, при срабатывании timeout, просто возвращаются во втором множестве.
return_when принимает на вход константы:
- FIRST_COMPLETED
- FIRST_EXCEPTION
- ALL_COMPLETED
В отличие от wait_for - wait не отменяет futures при срабатывании timeout
coroutine asyncio.as_completed(aws, *, loop=None, timeout=None)
Конкурентно запускает awaitable objects. Возвращает iterator of coroutines. Каждая возвращённая корутина может быть использована в выражении await.
Выбрасывает asyncio.TimeoutError при истечении timeout до выполнения всех Futures.
Пример
for coro in as_completed(aws):
earliest_result = await coro
# ...
Запуск в потоках
coroutine asyncio.to_thread(func, /, *args, **kwargs)
Асинхронно запускает func в отдельном потоке. args и kwargs передаются в func. На func распространиться доступ к текущему объект contextvars.Context, что позволит получить доступ к переменным event loop из запущенного потока.
Возвращает корутину которая может быть использована в выражении await для получения результата func.
Такой способ запуска предназначен для работы с вводом/выводом который не будет блокировать поток выполнения.
Нужно отметить, что потоки не дают параллельности в сPython, так что максимум что возможно это работа с неблокирующим вводом/выводом. Подробнее читайте про GIL.
Планирование из других потоков
asyncio.run_coroutine_threadsafe(coro, loop)
Потокобезопасно добавляет корутину в указанный цикл событий.
Возвращает concurent.futures.Future для ожидания результата из другого потока ОС.
Предназначена для вызова из другого потока ОС, отличного от того, в котором запущен цикл событий.
Task Object
class asyncio.Task(coro, *, loop=None, name=None)
Future-like объект который запускает корутины. Не потокобезопасный.
Используется для запуска корутин в цикле событий. Если корутина ожидается во Future, Task приостанавливает её выполнение и ждёт ответа от Future. Когда Future выполнится - выполнение корутины продолжится.
event loop запускает один Task одновременно. (cooperative scheduling). Пока Task ожидает выполнения своей корутины - event loop запускает следующий Task.
Не создавайте объекты Task явно, используйте ф-ции asyncio.create_task(), loop.create_task(), ensure_future().
Для отмены работы Task используйте его метод cancel(). Его вызов приведёт к срабатыванию CancelledError. Если корутина ожидает выполнения объекта Future, после отмены от тоже будет отменён.
Для проверки отмены Task - используйте его метод cancelled()
Методы:
- cancel(msg=None)
- cancelled()
- done()
- result()
- exception()
- add_done_callback(callback, *, context=None)
- remove_done_callback(callback)
- get_stack(*, limit=None)
- print_stack(*, limit=None, file=None)
- get_coro()
- get_name()
- set_name()