Потоки (Streams) - это высокоуровневые асинхронные / ожидающие примитивы для работы с сетевыми соединениями. Потоки позволяют отправлять и получать данные без использования обратных вызовов или низкоуровневых протоколов и транспортов.
Вот пример эхо-клиента TCP, написанного с использованием потоков asyncio:
Stream Functions
Для создания потоков и работы с ними можно использовать следующие функции asyncio верхнего уровня:
coroutine asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)
Установит сетевое соединение и вернёт пару объектов (reader, writer).
Возвращённые объекты reader и writer являются экземплярами классов StreamReader и StreamWriter.
Аргумент loop опциональный и может быть всегда определён автоматически когда функция ожидается (awaited) из корутины (coroutine).
limit определяет размер буфера используемый возвращаемым экземпляром StreamReader. По умолчанию limit установлен на 64 KiB.
Остальные аргументы передаются напрямую в loop.create_connection().
В python 3.7 добавили: параметр ssl_handshake_timeout.
coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)
Запускает сокет-сервер.
client_connected_cb callback вызывается всякий раз когда устанавливается новое соединение. Он получает пару (reader, writer) как два аргумента, которые являются экземплярами классов StreamReader и StreamWriter.
client_connected_cb может быть обычной callable или coroutine function; если это coroutine function, она будет автоматически запланирована как Task.
Аргумент loop опциональный и может быть всегда определён автоматически когда функция ожидается (awaited) из корутины (coroutine).
limit определяет размер буфера используемый возвращаемым экземпляром StreamReader. По умолчанию limit установлен на 64 KiB.
Остальные аргументы передаются напрямую в loop.create_server().
В python 3.7 добавили: параметры ssl_handshake_timeout и start_serving.
Unix Sockets
coroutine asyncio.open_unix_connection(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)
Устанавливает Unix socket соединение и возвращает пару (reader, writer).
Тоже что open_connection() но работает с сокетами Unix.
Также смотрите документацию loop.create_unix_connection().
Доступно на: Unix
В python 3.7 добавили: параметр ssl_handshake_timeout.
Изменено в версии 3.7: Параметр path может быть path-like object
coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)
Запускает Unix socket сервер.
Тоже что и start_server() но работает с Unix sockets.
Также смотри документацию по loop.create_unix_server().
Доступно на: Unix
В python 3.7 добавили: параметр ssl_handshake_timeout.
Изменено в версии 3.7: Параметр path может быть path-like object
StreamReader
class asyncio.StreamReader
Представляет объект поставляющий API для чтения данных из потока ввода/вывода.
Не рекомендуется создавать экземпляр StreamReader явно; используйте вместо этого open_connection() и start_server().
coroutine read(n=-1)
Читает до n байт. Если n не указан или установлен в -1, читает до EOF и возвращает все прочитанные байты. (А будет ли переполнение? Можно ли так забить всю память?)
Если достигнут EOF, и внутренний буфер пуст, возвращает пустой объект типа bytes.
coroutine readline()
Читает одну последовательность байт заканчивающуюся на \n
Если EOF получен и \n не найден, метод вернёт частично прочитанные данные.
Если достигнут EOF, и внутренний буфер пуст, возвращает пустой объект типа bytes.
coroutine readexactly(n)
Читает точно n байт.
Выбрасывает IncompleteReadError если EOF достигнут до n. Используйте атрибут IncompleteReadError.partial чтобы получить частично прочитанные данные.
coroutine readuntil(separator=b'\n')
Читает данные из потока пока ни найдёт separator.
В случае успеха, данные и separator будут удалены из внутреннего буфера (consumed). Возвращённые данные будут иметь separator в конце.
Если кол-во прочитанных данных достигает stream limit, выбрасывается LimitOverrunError, и данные остаются во внутреннем буфере и могут быть прочитаны снова.
Если EOF достигнут до того как был найден separator, выбрасывается IncompleteReadError, и внутренний буфер сбрасывается. Атрибут IncompleteReadError.partial может содержать часть separator.
at_eof()
Вернёт True если буфер пуст и была вызвана функция feed_eof()
StreamWriter
class asyncio.StreamWriter
Представляет объект поставляющий API для записи данных в поток ввода/вывода.
Не рекомендуется создавать экземпляры StreamWriter явно; используйте вместо этого open_connection() и start_server().
write(data)
Метод пытается немедленно записать данные в базовый сокет. Если это не удается, данные помещаются в очередь во внутреннем буфере записи до тех пор, пока не будут отправлены.
Этот метод следует использовать вместе с методом drain():
writelines(data)
Метод записывает список (или любой итерируемый объект) байт в основной сокет немедленно. Если не удаётся, данные помещаются в очередь во внутренний буфер записи в ожидании возможности их записать.
Этот метод следует использовать вместе с методом drain():
close()
Закрывает поток и основной сокет
Должен использоваться с методом wait_closed()
can_write_eof()
Возвращает True если основной транспорт поддерживает метод write_eof(), иначе False.
write_eof()
Закрывает поток на запись после того как буферизированные данные были сброшены.
transport
Возвращает базовый транспорт asyncio
get_extra_info(name, default=None)
Позволяет получить доступ к необязательной информации о транспорте. Подробнее см. BaseTransport.get_extra_info().
coroutine drain()
Ждёт момент, когда будет уместно возобновить запись в поток. Пример:
Это метод управления потоком, который взаимодействует с базовым буфером записи ввода-вывода. Когда размер буфера достигает предела, drain() блокирует его до тех пор, пока размер буфера не опустится до низкого уровня и запись не будет возобновлена. Когда ждать уже нечего, функция drain() немедленно возвращается.
is_closing()
Возвращает True, если поток закрыт или находится в процессе закрытия.
coroutine wait_closed()
Ждёт пока закроется поток.
Должен быть вызван после close() чтобы подождать пока базовое соединение закроется.