Найти в Дзене

Введение в сопрограммы Kotlin

Оглавление

1. Обзор

В этой статье мы рассмотрим сопрограммы языка Kotlin. Проще говоря, сопрограммы позволяют нам свободно создавать асинхронные программы , и они основаны на концепции программирования в стиле передачи продолжения .

Язык Kotlin предоставляет нам базовые конструкции, но может получить доступ к более полезным сопрограммам с помощью библиотеки kotlinx-coroutines-core . Мы рассмотрим эту библиотеку, как только поймем основные строительные блоки языка Kotlin.

2. Создание сопрограммы с помощью BuildSequence

Давайте создадим первую сопрограмму, используя функцию buildSequence .

И давайте реализуем генератор последовательности Фибоначчи, используя эту функцию:

val fibonacciSeq = sequence {
var a = 0
var b = 1

yield(1)

while (true) {
yield(a + b)

val tmp = a + b
a = b
b = tmp
}
}

Сигнатура функции доходности:

public abstract suspend fun yield(value: T)

Ключевое слово suspend означает, что эта функция может быть блокирующей. Такая функция может приостановить сопрограмму buildSequence .

Приостанавливающие функции могут быть созданы как стандартные функции Kotlin, но мы должны помнить, что мы можем вызывать их только из сопрограммы. В противном случае мы получим ошибку компилятора.

Если мы приостановили вызов внутри buildSequence , этот вызов будет преобразован в выделенное состояние в конечном автомате. Сопрограмму можно передать и назначить переменной, как и любую другую функцию.

В сопрограмме fibonacciSeq у нас есть две точки подвески. Во-первых, когда мы вызываем выход(1), а во-вторых, когда мы вызываем выход(a+b).

Если эта функция доходности приводит к некоторому блокирующему вызову, текущий поток не будет блокироваться на ней. Он сможет выполнить какой-то другой код. Как только приостановленная функция завершит свое выполнение, поток может возобновить выполнение сопрограммы fibonacciSeq .

Мы можем протестировать наш код, взяв некоторые элементы из последовательности Фибоначчи:

val res = fibonacciSeq
.take(5)
.toList()

assertEquals(res, listOf(1, 1, 2, 3, 5))

3. Добавление зависимости Maven для сопрограмм kotlinx

Давайте посмотрим на библиотеку kotlinx-coroutines , в которой есть полезные конструкции, построенные на основе базовых сопрограмм.

Давайте добавим зависимость в библиотеку kotlinx-coroutines-core.

<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
<version>1.3.9</version>
</dependency>

4. Асинхронное программирование с использованием процедуры launch()

Библиотека kotlinx-coroutines добавляет множество полезных конструкций, позволяющих создавать асинхронные программы. Допустим, у нас есть дорогостоящая вычислительная функция, которая добавляет строку в список ввода:

suspend fun expensiveComputation(res: MutableList<String>) {
delay(1000L)
res.add("word!")
}

Мы можем использовать сопрограмму запуска , которая будет выполнять функцию приостановки неблокирующим способом.

@Test
fun givenAsyncCoroutine_whenStartIt_thenShouldExecuteItInTheAsyncWay() {
//given
val res = mutableListOf<String>()

//when
runBlocking {
launch { expensiveComputation(res) }
res.add("Hello,")
}

//then
assertEquals(res, listOf("Hello,", "word!"))
}

Чтобы иметь возможность протестировать наш код, мы передаем всю логику в сопрограмму runBlocking , которая является блокирующим вызовом. Поэтому наш метод AssertEquals() может выполняться синхронно после кода внутри метода runBlocking() .

Обратите внимание, что в этом примере, хотя метод launch() запускается первым, это вычисление с задержкой. Основной поток продолжит добавление строки «Hello» в список результатов.

После односекундной задержки, введенной в функции дорогойComputation() , слово «word!» К результату будет добавлена ​​строка.

5. Сопрограммы очень легкие

Давайте представим ситуацию, в которой мы хотим выполнить 100000 операций асинхронно. Создание такого большого количества потоков будет очень дорогостоящим и может привести к исключению OutOfMemoryException.

К счастью, при использовании сопрограмм это не так. Мы можем выполнить столько операций блокировки, сколько захотим. «Под капотом» эти операции будут обрабатываться фиксированным количеством потоков без чрезмерного создания потоков:

@Test
fun givenHugeAmountOfCoroutines_whenStartIt_thenShouldExecuteItWithoutOutOfMemory() {
runBlocking<Unit> {
//given
val counter = AtomicInteger(0)
val numberOfCoroutines = 100_000

//when
val jobs = List(numberOfCoroutines) {
launch {
delay(1L)
counter.incrementAndGet()
}
}
jobs.forEach { it.join() }

//then
assertEquals(counter.get(), numberOfCoroutines)
}
}

Обратите внимание, что мы выполняем 100 000 сопрограмм, и каждый запуск добавляет существенную задержку. Тем не менее, нет необходимости создавать слишком много потоков, поскольку эти операции выполняются асинхронно с использованием потоков из общего фонового пула потоков.

6. Отмена и тайм-ауты

Иногда, после того как мы запустили какое-то длительное асинхронное вычисление, мы хотим отменить его, потому что результат нас больше не интересует.

Когда мы запускаем наше асинхронное действие с помощью сопрограммы launch() , мы можем проверить флаг isActive . Этот флаг устанавливается в значение false всякий раз, когда основной поток вызывает метод cancel() в экземпляре задания:

@Test
fun givenCancellableJob_whenRequestForCancel_thenShouldQuit() {
runBlocking<Unit> {
//given
val job = launch(Dispatchers.Default) {
while (isActive) {
//println("is working")
}
}

delay(1300L)

//when
job.cancel()

//then cancel successfully
}
}

Это очень элегантный и простой способ использования механизма отмены . В асинхронном действии нам нужно только проверить, равен ли флаг isActive значению false , и отменить нашу обработку.

Когда мы запрашиваем некоторую обработку и не уверены, сколько времени займет это вычисление, рекомендуется установить тайм-аут для такого действия. Если обработка не завершится в течение заданного таймаута, мы получим исключение и сможем отреагировать на него соответствующим образом.

Например, мы можем повторить действие:

@Test(expected = CancellationException::class)
fun givenAsyncAction_whenDeclareTimeout_thenShouldFinishWhenTimedOut() {
runBlocking<Unit> {
withTimeout(1300L) {
repeat(1000) { i ->
println("Some expensive computation $i ...")
delay(500L)
}
}
}
}

Если мы не определим тайм-аут, возможно, что наш поток будет заблокирован навсегда из-за зависания вычислений. Мы не сможем обработать этот случай в нашем коде, если тайм-аут не определен.

7. Одновременное выполнение асинхронных действий

Допустим, нам нужно одновременно запустить два асинхронных действия и потом дождаться их результатов. Если наша обработка занимает одну секунду и нам нужно выполнить эту обработку дважды, время выполнения синхронного выполнения блокировки составит две секунды.

Было бы лучше, если бы мы могли запускать оба этих действия в отдельных потоках и ждать результатов в основном потоке.

Для этого мы можем использовать сопрограмму async() , начав обработку в двух отдельных потоках одновременно:

@Test
fun givenHaveTwoExpensiveAction_whenExecuteThemAsync_thenTheyShouldRunConcurrently() {
runBlocking<Unit> {
val delay = 1000L
val time = measureTimeMillis {
//given
val one = async(Dispatchers.Default) { someExpensiveComputation(delay) }
val two = async(Dispatchers.Default) { someExpensiveComputation(delay) }

//when
runBlocking {
one.await()
two.await()
}
}

//then
assertTrue(time < delay * 2)
}
}

После того как мы отправим два дорогостоящих вычисления, мы приостанавливаем сопрограмму, выполнив вызов runBlocking() . Как только результаты один и два будут доступны, сопрограмма возобновит работу и вернет результаты. Выполнение двух задач таким образом должно занять около одной секунды.

Мы можем передать CoroutineStart.LAZY в качестве второго аргумента метода async() , но это будет означать, что асинхронные вычисления не будут запущены до тех пор, пока они не будут запрошены. Поскольку мы запрашиваем вычисления в сопрограмме runBlocking , это означает, что вызов two.await() будет выполнен только после завершения one.await():

fun givenTwoExpensiveAction_whenExecuteThemLazy_thenTheyShouldNotConcurrently() {
runBlocking<Unit> {
val delay = 1000L
val time = measureTimeMillis {
//given
val one = async(Dispatchers.Default, CoroutineStart.LAZY) { someExpensiveComputation(delay) }
val two = async(Dispatchers.Default, CoroutineStart.LAZY) { someExpensiveComputation(delay) }

//when
runBlocking {
one.await()
two.await()
}
}

//then
assertTrue(time > delay * 2)
}
}

Ленивое выполнение в этом конкретном примере приводит к тому, что наш код выполняется синхронно. Это происходит потому, что когда мы вызываем await() , основной поток блокируется, и только после завершения первой задачи запускается вторая задача.

Нам необходимо помнить о ленивом выполнении асинхронных действий, поскольку они могут выполняться блокирующим образом.

8. Заключение

В этой статье мы рассмотрели основы сопрограмм Kotlin.

Мы видели, что последовательность является основным строительным блоком каждой сопрограммы. Мы описали, как выглядит поток выполнения в этом стиле программирования с передачей продолжения.

Наконец, мы рассмотрели библиотеку kotlinx-coroutines , которая содержит множество очень полезных конструкций для создания асинхронных программ.

Оригинал статьи: https://www.baeldung.com/kotlin/coroutines