Добавить в корзинуПозвонить
Найти в Дзене
Умный пульс

Kotlin Coroutines: Ожидание завершения нескольких потоков (корутин)

В этом руководстве мы рассмотрим, как запускать группу задач параллельно и ожидать их завершения. Вместо потоков мы будем использовать корутины, поскольку это рекомендуемый способ реализации конкурентности в Kotlin.
Функция async в Kotlin позволяет запускать конкурентные корутины и возвращает результат типа Deferred<T>.
Deferred — это неблокирующий, отменяемый future-объект, который представляет результат, изначально неизвестный. Например, вызвав метод await у Deferred, мы ждём завершения задачи и затем получаем результат. Если у нас есть коллекция Deferred, мы можем использовать расширение awaitAll, чтобы дождаться завершения всех: @Test
fun whenAwaitAsyncCoroutines_thenAllTerminated() {
val count = AtomicInteger()
runBlocking {
val tasks = listOf(
async(Dispatchers.IO) { count.addAndGet(longRunningTask()) },
async(Dispatchers.IO) { count.addAndGet(longRunningTask()) }
)
tasks.awaitAll()
Assertions.assertEquals(2, count
Оглавление

1. Введение

В этом руководстве мы рассмотрим, как запускать группу задач параллельно и ожидать их завершения. Вместо потоков мы будем использовать корутины, поскольку это рекомендуемый способ реализации конкурентности в Kotlin.

2. async-await


Функция async в Kotlin позволяет запускать конкурентные корутины и возвращает результат типа Deferred<T>.

Deferred — это неблокирующий, отменяемый future-объект, который представляет результат, изначально неизвестный. Например, вызвав метод await у Deferred, мы ждём завершения задачи и затем получаем результат.

Если у нас есть коллекция Deferred, мы можем использовать расширение awaitAll, чтобы дождаться завершения всех:

@Test
fun whenAwaitAsyncCoroutines_thenAllTerminated() {
val count = AtomicInteger()
runBlocking {
val tasks = listOf(
async(Dispatchers.IO) { count.addAndGet(longRunningTask()) },
async(Dispatchers.IO) { count.addAndGet(longRunningTask()) }
)
tasks.awaitAll()
Assertions.assertEquals(2, count.get())
}
}

Здесь мы передали Dispatchers.IO в async, но доступны и другие диспетчеры.

Диспетчер (Dispatcher) отвечает за то, в каком потоке будет выполняться корутина.
Если не указать диспетчер явно, будет использован тот же, что и у родительской корутины.

2.1. Структурная конкурентность с async


Однако есть лучший способ — использовать преимущества
структурной конкурентности в Kotlin.

Структурная конкурентность означает, что жизненный цикл корутины ограничен областью CoroutineScope, и корутина может быть запущена только внутри этой области. Родительская область не завершится, пока не завершатся все её дочерние корутины. Это предотвращает утечки, потерю ошибок и делает контроль за выполнением более предсказуемым.

Пример с использованием родительской корутины:

@Test
fun whenParentCoroutineRunAsyncCoroutines_thenAllTerminated() {
val count = AtomicInteger()
runBlocking {
withContext(coroutineContext) {
async(Dispatchers.IO) { count.addAndGet(longRunningTask()) }
async(Dispatchers.IO) { count.addAndGet(longRunningTask()) }
}
Assertions.assertEquals(2, count.get())
}
}

Ассерты выполняются после withContext. Этот блок завершится только после завершения всех дочерних async. Нам не нужно вручную вызывать await или awaitAll.

Помимо withContext, существуют и другие билдёры, такие как runBlocking и coroutineScope.

coroutineScope создаёт новую область видимости для корутин, и блок не завершится, пока все дочерние корутины не завершатся.

Важно:

  • runBlocking блокирует текущий поток;
  • coroutineScope просто приостанавливает выполнение, не блокируя поток.

3. launch-join


Если результат от корутины нам не нужен, можно использовать launch.

Функция launch возвращает Job, и мы можем вызвать join(), чтобы дождаться его завершения.

Если у нас несколько Job-ов, можно вызвать joinAll():

@Test
fun whenJoinLaunchedCoroutines_thenAllTerminated() {
val count = AtomicInteger()
runBlocking {
val tasks = listOf(
launch(Dispatchers.IO) { count.addAndGet(longRunningTask()) },
launch(Dispatchers.IO) { count.addAndGet(longRunningTask()) }
)
tasks.joinAll()
Assertions.assertEquals(2, count.get())
}
}

Пример со структурной конкурентностью:

@Test
fun whenParentCoroutineLaunchCoroutines_thenAllTerminated() {
val count = AtomicInteger()
runBlocking {
withContext(coroutineContext) {
launch(Dispatchers.IO) { count.addAndGet(longRunningTask()) }
launch(Dispatchers.IO) { count.addAndGet(longRunningTask()) }
}
Assertions.assertEquals(2, count.get())
}
}

Все сказанное ранее о родительских корутинах и диспетчерах также применимо к launch.

4. Заключение


В этом руководстве мы познакомились со
структурной конкурентностью и рассмотрели примеры использования async и launch для параллельного выполнения корутин.

Оригинал статьи: https://www.baeldung.com/kotlin/coroutines-waiting-for-multiple-threads