1. Обзор
В этой статье мы рассмотрим сопрограммы языка Kotlin. Проще говоря, сопрограммы позволяют нам свободно создавать асинхронные программы , и они основаны на концепции программирования в стиле передачи продолжения .
Язык Kotlin предоставляет нам базовые конструкции, но может получить доступ к более полезным сопрограммам с помощью библиотеки kotlinx-coroutines-core . Мы рассмотрим эту библиотеку, как только поймем основные строительные блоки языка Kotlin.
2. Создание сопрограммы с помощью BuildSequence
Давайте создадим первую сопрограмму, используя функцию buildSequence .
И давайте реализуем генератор последовательности Фибоначчи, используя эту функцию:
val fibonacciSeq = sequence {
var a = 0
var b = 1
yield(1)
while (true) {
yield(a + b)
val tmp = a + b
a = b
b = tmp
}
}
Сигнатура функции доходности:
public abstract suspend fun yield(value: T)
Ключевое слово suspend означает, что эта функция может быть блокирующей. Такая функция может приостановить сопрограмму buildSequence .
Приостанавливающие функции могут быть созданы как стандартные функции Kotlin, но мы должны помнить, что мы можем вызывать их только из сопрограммы. В противном случае мы получим ошибку компилятора.
Если мы приостановили вызов внутри buildSequence , этот вызов будет преобразован в выделенное состояние в конечном автомате. Сопрограмму можно передать и назначить переменной, как и любую другую функцию.
В сопрограмме fibonacciSeq у нас есть две точки подвески. Во-первых, когда мы вызываем выход(1), а во-вторых, когда мы вызываем выход(a+b).
Если эта функция доходности приводит к некоторому блокирующему вызову, текущий поток не будет блокироваться на ней. Он сможет выполнить какой-то другой код. Как только приостановленная функция завершит свое выполнение, поток может возобновить выполнение сопрограммы fibonacciSeq .
Мы можем протестировать наш код, взяв некоторые элементы из последовательности Фибоначчи:
val res = fibonacciSeq
.take(5)
.toList()
assertEquals(res, listOf(1, 1, 2, 3, 5))
3. Добавление зависимости Maven для сопрограмм kotlinx
Давайте посмотрим на библиотеку kotlinx-coroutines , в которой есть полезные конструкции, построенные на основе базовых сопрограмм.
Давайте добавим зависимость в библиотеку kotlinx-coroutines-core.
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
<version>1.3.9</version>
</dependency>
4. Асинхронное программирование с использованием процедуры launch()
Библиотека kotlinx-coroutines добавляет множество полезных конструкций, позволяющих создавать асинхронные программы. Допустим, у нас есть дорогостоящая вычислительная функция, которая добавляет строку в список ввода:
suspend fun expensiveComputation(res: MutableList<String>) {
delay(1000L)
res.add("word!")
}
Мы можем использовать сопрограмму запуска , которая будет выполнять функцию приостановки неблокирующим способом.
@Test
fun givenAsyncCoroutine_whenStartIt_thenShouldExecuteItInTheAsyncWay() {
//given
val res = mutableListOf<String>()
//when
runBlocking {
launch { expensiveComputation(res) }
res.add("Hello,")
}
//then
assertEquals(res, listOf("Hello,", "word!"))
}
Чтобы иметь возможность протестировать наш код, мы передаем всю логику в сопрограмму runBlocking , которая является блокирующим вызовом. Поэтому наш метод AssertEquals() может выполняться синхронно после кода внутри метода runBlocking() .
Обратите внимание, что в этом примере, хотя метод launch() запускается первым, это вычисление с задержкой. Основной поток продолжит добавление строки «Hello» в список результатов.
После односекундной задержки, введенной в функции дорогойComputation() , слово «word!» К результату будет добавлена строка.
5. Сопрограммы очень легкие
Давайте представим ситуацию, в которой мы хотим выполнить 100000 операций асинхронно. Создание такого большого количества потоков будет очень дорогостоящим и может привести к исключению OutOfMemoryException.
К счастью, при использовании сопрограмм это не так. Мы можем выполнить столько операций блокировки, сколько захотим. «Под капотом» эти операции будут обрабатываться фиксированным количеством потоков без чрезмерного создания потоков:
@Test
fun givenHugeAmountOfCoroutines_whenStartIt_thenShouldExecuteItWithoutOutOfMemory() {
runBlocking<Unit> {
//given
val counter = AtomicInteger(0)
val numberOfCoroutines = 100_000
//when
val jobs = List(numberOfCoroutines) {
launch {
delay(1L)
counter.incrementAndGet()
}
}
jobs.forEach { it.join() }
//then
assertEquals(counter.get(), numberOfCoroutines)
}
}
Обратите внимание, что мы выполняем 100 000 сопрограмм, и каждый запуск добавляет существенную задержку. Тем не менее, нет необходимости создавать слишком много потоков, поскольку эти операции выполняются асинхронно с использованием потоков из общего фонового пула потоков.
6. Отмена и тайм-ауты
Иногда, после того как мы запустили какое-то длительное асинхронное вычисление, мы хотим отменить его, потому что результат нас больше не интересует.
Когда мы запускаем наше асинхронное действие с помощью сопрограммы launch() , мы можем проверить флаг isActive . Этот флаг устанавливается в значение false всякий раз, когда основной поток вызывает метод cancel() в экземпляре задания:
@Test
fun givenCancellableJob_whenRequestForCancel_thenShouldQuit() {
runBlocking<Unit> {
//given
val job = launch(Dispatchers.Default) {
while (isActive) {
//println("is working")
}
}
delay(1300L)
//when
job.cancel()
//then cancel successfully
}
}
Это очень элегантный и простой способ использования механизма отмены . В асинхронном действии нам нужно только проверить, равен ли флаг isActive значению false , и отменить нашу обработку.
Когда мы запрашиваем некоторую обработку и не уверены, сколько времени займет это вычисление, рекомендуется установить тайм-аут для такого действия. Если обработка не завершится в течение заданного таймаута, мы получим исключение и сможем отреагировать на него соответствующим образом.
Например, мы можем повторить действие:
@Test(expected = CancellationException::class)
fun givenAsyncAction_whenDeclareTimeout_thenShouldFinishWhenTimedOut() {
runBlocking<Unit> {
withTimeout(1300L) {
repeat(1000) { i ->
println("Some expensive computation $i ...")
delay(500L)
}
}
}
}
Если мы не определим тайм-аут, возможно, что наш поток будет заблокирован навсегда из-за зависания вычислений. Мы не сможем обработать этот случай в нашем коде, если тайм-аут не определен.
7. Одновременное выполнение асинхронных действий
Допустим, нам нужно одновременно запустить два асинхронных действия и потом дождаться их результатов. Если наша обработка занимает одну секунду и нам нужно выполнить эту обработку дважды, время выполнения синхронного выполнения блокировки составит две секунды.
Было бы лучше, если бы мы могли запускать оба этих действия в отдельных потоках и ждать результатов в основном потоке.
Для этого мы можем использовать сопрограмму async() , начав обработку в двух отдельных потоках одновременно:
@Test
fun givenHaveTwoExpensiveAction_whenExecuteThemAsync_thenTheyShouldRunConcurrently() {
runBlocking<Unit> {
val delay = 1000L
val time = measureTimeMillis {
//given
val one = async(Dispatchers.Default) { someExpensiveComputation(delay) }
val two = async(Dispatchers.Default) { someExpensiveComputation(delay) }
//when
runBlocking {
one.await()
two.await()
}
}
//then
assertTrue(time < delay * 2)
}
}
После того как мы отправим два дорогостоящих вычисления, мы приостанавливаем сопрограмму, выполнив вызов runBlocking() . Как только результаты один и два будут доступны, сопрограмма возобновит работу и вернет результаты. Выполнение двух задач таким образом должно занять около одной секунды.
Мы можем передать CoroutineStart.LAZY в качестве второго аргумента метода async() , но это будет означать, что асинхронные вычисления не будут запущены до тех пор, пока они не будут запрошены. Поскольку мы запрашиваем вычисления в сопрограмме runBlocking , это означает, что вызов two.await() будет выполнен только после завершения one.await():
fun givenTwoExpensiveAction_whenExecuteThemLazy_thenTheyShouldNotConcurrently() {
runBlocking<Unit> {
val delay = 1000L
val time = measureTimeMillis {
//given
val one = async(Dispatchers.Default, CoroutineStart.LAZY) { someExpensiveComputation(delay) }
val two = async(Dispatchers.Default, CoroutineStart.LAZY) { someExpensiveComputation(delay) }
//when
runBlocking {
one.await()
two.await()
}
}
//then
assertTrue(time > delay * 2)
}
}
Ленивое выполнение в этом конкретном примере приводит к тому, что наш код выполняется синхронно. Это происходит потому, что когда мы вызываем await() , основной поток блокируется, и только после завершения первой задачи запускается вторая задача.
Нам необходимо помнить о ленивом выполнении асинхронных действий, поскольку они могут выполняться блокирующим образом.
8. Заключение
В этой статье мы рассмотрели основы сопрограмм Kotlin.
Мы видели, что последовательность является основным строительным блоком каждой сопрограммы. Мы описали, как выглядит поток выполнения в этом стиле программирования с передачей продолжения.
Наконец, мы рассмотрели библиотеку kotlinx-coroutines , которая содержит множество очень полезных конструкций для создания асинхронных программ.
Оригинал статьи: https://www.baeldung.com/kotlin/coroutines