Найти тему
JavaDocs

14 Параллелизм (Concurrency). Структурированный параллелизм

Оглавление

Ссылка https://docs.oracle.com/en/java/javase/21/core/structured-concurrency.html

Структурированный параллелизм рассматривает группы связанных задач, выполняемых в разных потоках, как единую единицу работы, тем самым оптимизируя обработку ошибок и их отмену, повышая надежность и улучшая наблюдаемость.

Основной класс API структурированного параллелизма находится StructuredTaskScope в java.util.concurrent пакете. Этот класс позволяет координировать группу параллельных подзадач как единое целое. С помощью StructuredTaskScope экземпляра вы разветвляете каждую подзадачу, которая выполняет их в отдельном потоке. После вы объединяете их как единое целое. В результате, StructuredTaskScope гарантирует, что подзадачи будут завершены до продолжения основной задачи. В качестве альтернативы, вы можете указать, что приложение продолжит работу после успешного выполнения одной из подзадач.

Примечание:

Это функция предварительного просмотра. Функция предварительного просмотра - это функция, дизайн, спецификация и реализация которой завершены, но не являются постоянными. Функция предварительного просмотра может существовать в другой форме или отсутствовать вообще в будущих версиях Java SE. Чтобы скомпилировать и запустить код, содержащий функции предварительного просмотра, необходимо указать дополнительные параметры командной строки. См. раздел Язык предварительного просмотра и возможности виртуальной машины.

Для получения справочной информации о структурированном параллелизме см. JEP 453.

Базовое использование класса StructuredTaskScope

Чтобы использовать StructuredTaskScope класс, выполните следующие общие действия:

  1. Создайте StructuredTaskScope; используйте его с try инструкцией "С ресурсами".
  2. Определите свои подзадачи как экземпляры Callable.
  3. Внутри try блока разветвляйте каждую подзадачу в отдельном потоке с помощью StructuredTaskScope::fork.
  4. Вызов StructuredTaskScope::join.
  5. Обрабатывать результаты выполнения подзадач.
  6. Убедитесь, что StructuredTaskScope отключен.

На следующем рисунке показаны эти шаги. Обратите внимание, что область задачи должна ожидать завершения выполнения всех подзадач из-за join() метода. После этого она может обрабатывать результаты выполнения подзадачи.

Рисунок 14-2 С использованием класса StructuredTaskScope

В общем случае код, использующий класс StructuredTaskScope, имеет следующую структуру:

КопироватьCallable<String> task1 = ...
Callable<Integer> task2 = ...

try (var scope = new StructuredTaskScope<Object>()) {

Subtask<String> subtask1 = scope.fork(task1);
Subtask<Integer> subtask2 = scope.fork(task2);

scope.join();

... process results/exceptions ...

} // close

Поскольку StructuredTaskScope был определен в инструкции try с ресурсами, в конце try блока, StructuredTaskScope завершается, и область задач ожидает завершения потоков, выполняющих любые незавершенные подзадачи.

StructuredTaskScope Класс определяет shutdown метод для завершения работы области задач, не закрывая ее. Этот метод отменяет все незавершенные подзадачи, прерывая потоки. Кроме того, shutdown метод позволяет подклассам StructuredTaskScope реализовать политику, которая не требует выполнения всех подзадач. В разделе Общие политики завершения работы: ShutdownOnSuccess и ShutdownOnFailure описаны два подкласса StructuredTaskScope, ShutdownOnSuccess и ShutdownOnFailure. Первый реализует политику, которая отключает область задачи, как только подзадача успешно завершается, в то время как второй отключает область задачи, как только подзадача выдает исключение.

Общие политики завершения работы: ShutdownOnSuccess и ShutdownOnFailure

StructuredTaskScope Класс содержит два подкласса, ShutdownOnFailure и ShutdownOnSuccess. Эти подклассы реализуют две общие политики завершения работы. ShutdownOnFailure отменяет все подзадачи, если одна из них завершается неудачей, и ShutdownOnSuccess отменяет все оставшиеся подзадачи, если одна из них завершается успешно. Эти политики отключения являются примерами схем короткого замыкания. Схема короткого замыкания стимулирует быстрое выполнение подзадач, позволяя основной задаче прерывать и отменять подзадачи, результаты которых больше не нужны.

Следующий пример демонстрирует классы StructuredTaskScope.ShutdownOnFailure и StructuredTaskScope.ShutdownOnSuccess. Каждая область задач включает в себя пять подзадач, которые находятся в режиме ожидания произвольный промежуток времени. Однако, если длительность превышает указанное пороговое значение, подзадача выдает значение TooSlowException. Метод handleShutDownOnFailure() выводит общую продолжительность всех подзадач, если ни одна из них не вызвала исключения. Метод handleShutDownOnSuccess() выводит продолжительность подзадачи, которая выполняется первой:

Рисунок 14-3 SCRandomTasks.java

Копироватьimport java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.StructuredTaskScope.*;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.function.*;
import java.util.stream.*;

public class SCRandomTasks {

class TooSlowException extends Exception {
public TooSlowException(String s) {
super(s);
}
}

public Integer randomTask(int maxDuration, int threshold) throws InterruptedException, TooSlowException {
int t = new Random().nextInt(maxDuration);
System.out.println("Duration: " + t);
if (t > threshold) {
throw new TooSlowException("Duration " + t + " greater than threshold " + threshold);
}
Thread.sleep(t);
return Integer.valueOf(t);
}

void handleShutdownOnFailure() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// var t = new SCRandomTasks();
var subtasks = IntStream.range(0, 5)
.mapToObj(i -> scope.fork(() -> randomTask(1000, 850)))
.toList();
scope.join()
.throwIfFailed();
var totalDuration = subtasks.stream()
.map(t -> t.get())
.reduce(0, Integer::sum);
System.out.println("Total duration: " + totalDuration);
}
}

void handleShutdownOnSuccess() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {
IntStream.range(0, 5)
.mapToObj(i -> scope.fork(() -> randomTask(1000, 850)))
.toList();
scope.join();
System.out.println("First task to finish: " + scope.result());
}
}

public static void main(String[] args) {
var myApp = new SCRandomTasks();
try {
System.out.println("Running handleShutdownOnFailure...");
myApp.handleShutdownOnFailure();
} catch (Exception e) {
System.out.println(e.getMessage());
}
try {
System.out.println("Running handleShutdownOnSuccess...");
myApp.handleShutdownOnSuccess();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}

Выводится вывод, подобный следующему:

КопироватьRunning handleShutdownOnFailure...
Duration: 359
Duration: 676
Duration: 322
Duration: 591
Duration: 315
Total duration: 2263
Running handleShutdownOnSuccess...
Duration: 480
Duration: 40
Duration: 868
Duration: 526
Duration: 532
First task to finish: 40

StructuredTaskScope.ShutdownOnFailure Класс фиксирует первое исключение, вызванное одной из его подзадач, затем вызывает метод shutdown. Это предотвращает запуск любых новых подзадач, прерывает все незавершенные потоки, выполняющие другие подзадачи, и позволяет приложению продолжить работу. Чтобы получить доступ к перехваченному исключению, вызовите метод ShutdownOnFailure::exception. Если вместо этого вы хотите повторно создать исключение, вызовите ShutdownOnFailure::throwIfFailedметод, который демонстрирует этот пример:

Копироватьscope.join()
.throwIfFailed();

StructuredTaskScope.ShutdownOnSuccess Класс фиксирует результат первой подзадачи, которая должна быть успешно выполнена, и, подобно ShutdownOnFailure, вызывает shutdown метод. Чтобы открыть результат подзадач, которые успешно завершена, вызовите ShutdownOnSuccess::result метод, который в этом примере демонстрирует:

КопироватьSystem.out.println("First task to finish: " + scope.result());

Реализуйте свои собственные политики StructuredTaskScope

Вы можете реализовать свои собственные StructuredTaskScope политики, которые обрабатывают подзадачи иначе, чем ShutdownOnFailure и ShutdownOnSuccess. Сделайте это, расширив StructuredTaskScope класс.

Следующий пример, CollectingScope содержит два метода, которые возвращают два потока подзадач одного и того же типа: successfulTasks() возвращает поток успешных подзадач и failedTasks() возвращает поток подзадач, вызвавших исключение:

Рисунок 14-4 CollectingScope.java

Копироватьimport java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class CollectingScope<T> extends StructuredTaskScope<T> {
private final Queue<Subtask<? extends T>> successSubtasks = new LinkedTransferQueue<>();
private final Queue<Subtask<? extends T>> failedSubtasks = new LinkedTransferQueue<>();

@Override
protected void handleComplete(Subtask<? extends T> subtask) {
if (subtask.state() == Subtask.State.SUCCESS) {
successSubtasks.add(subtask);
} else if (subtask.state() == Subtask.State.FAILED) {
failedSubtasks.add(subtask);
}
}

@Override
public CollectingScope<T> join() throws InterruptedException {
super.join();
return this;
}

public Stream<Subtask<? extends T>> successfulTasks() {
super.ensureOwnerAndJoined();
return successSubtasks.stream();
}

public Stream<Subtask<? extends T>> failedTasks() {
super.ensureOwnerAndJoined();
return failedSubtasks.stream();
}
}

Чтобы использовать этот класс в примере, SCRanndomTasks как описано в Общих политиках завершения работы: ShutdownOnSuccess и ShutdownOnFailure, добавьте следующий метод. Он выводит общую продолжительность успешно выполненных подзадач и исключения из подзадач, которые вызвали исключения.

Копироватьvoid handleBoth() throws InterruptedException {
try (var scope = new CollectingScope()) {
// var t = new SCRandomTasks();
var subtasks = IntStream.range(0, 5)
.mapToObj(i -> scope.fork(() -> randomTask(1000, 500)))
.toList();
scope.join();

var totalDuration = scope.successfulTasks()
.mapToInt(st -> (Integer)((Subtask)st).get())
.reduce(0, Integer::sum);
System.out.println("Total duration: " + totalDuration);

scope.failedTasks()
.forEach(ft ->
System.out.println(((Exception)((Subtask)ft).exception()).getMessage()));
}
}

Выводится вывод, подобный следующему:

КопироватьDuration: 501
Duration: 211
Duration: 661
Duration: 903
Duration: 839
Total duration: 211
Duration 501 greater than threshold 500
Duration 661 greater than threshold 500
Duration 903 greater than threshold 500
Duration 839 greater than threshold 500

В примере CollectingScope, перед successfulTasks() и failedTasks() возвращает потоки для successSubtasks и failedSubtasks, соответственно, вызывает StructuredTaskScope::ensureOwnerAndJoined. Это гарантирует, что пример может получить доступ только к successSubtasksи failedSubtasks при условии, что текущий поток является владельцем области задач , а область задач присоединилась к подзадачам после их разветвления.

Отладка StructuredTaskScope и его разветвленных подзадач с помощью команды jcmd

jcmd Инструмент может выдавать дамп потока в формате JSON. Этот дамп потока отображает потоки, выполняющие разветвленные подзадачи a StructuredTaskScope в массиве, вместе с их трассировками стека.

Рассмотрим следующий пример, который разбивает три подзадачи. Эти подзадачи постоянно чередуются между печатью сообщения и переходом в режим ожидания на одну секунду.

Рисунок 14-5 SCObervable.java

Копироватьimport java.util.*;
import java.util.function.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class SCObservable {

static Long sleepOneSecond(String s) throws InterruptedException {
long pid = ProcessHandle.current().pid();
for (int i = 0; i<60; i++) {
System.out.println("[" + pid + ", " + s + "]" + " Sleeping for 1s...");
Thread.sleep(1000);
}
return Long.valueOf(pid);
}

void handle() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Supplier<Long> task1 = scope.fork(() -> sleepOneSecond("task1"));
Supplier<Long> task2 = scope.fork(() -> sleepOneSecond("task2"));
Supplier<Long> task3 = scope.fork(() -> sleepOneSecond("task3"));
scope.join()
.throwIfFailed();
}
}

public static void main(String[] args) {
try {
var myApp = new SCObservable();
myApp.handle();
} catch (Exception e) {
e.printStackTrace();
}
}
}

Выводится аналогично следующему:

Копировать[10852, task1] Sleeping for 1s...
[10852, task2] Sleeping for 1s...
[10852, task3] Sleeping for 1s...
[10852, task1] Sleeping for 1s...
...

Во время выполнения этого примера вы можете создать дамп потока, выполнив следующую команду в другой консоли, где <pid> - это идентификатор процесса запущенного процесса Java:

Копироватьjcmd <pid> Thread.dump_to_file -format=json <output_file>

Следующий отрывок из образца выходного файла дампа потока показывает StructuredTaskScope с потоками его разветвленных подзадач в массиве. Дамп потока также показывает ссылку на родительский элемент программы StructuredTaskScope так что структура программы может быть восстановлена из дампа потока:

Копировать{
"threadDump": {
"processId": "10852",
"time": "2023-06-22T13:59:05.156805300Z",
"runtimeVersion": "21-ea+27-LTS-2343",
"threadContainers": [
{
"container": "<root>",
"parent": null,
"owner": null,
"threads": [
{
"tid": "1",
"name": "main",
"stack": [
"java.base\/jdk.internal.misc.Unsafe.park(Native Method)",
"java.base\/java.util.concurrent.locks.LockSupport.park(LockSupport.java:371)",
"java.base\/jdk.internal.misc.ThreadFlock.awaitAll(ThreadFlock.java:315)",
"java.base\/java.util.concurrent.StructuredTaskScope.implJoin(StructuredTaskScope.java:621)",
"java.base\/java.util.concurrent.StructuredTaskScope.join(StructuredTaskScope.java:647)",
"java.base\/java.util.concurrent.StructuredTaskScope$ShutdownOnFailure.join(StructuredTaskScope.java:1200)",
"SCObservable.handle(SCObservable.java:22)",
"SCObservable.main(SCObservable.java:30)"
]
},

...

],
"threadCount": "7"
},

...

{
"container": "java.util.concurrent.StructuredTaskScope$ShutdownOnFailure@5674cd4d",
"parent": "<root>",
"owner": "1",
"threads": [
{
"tid": "21",
"name": "",
"stack": [
"java.base\/java.lang.VirtualThread.parkNanos(VirtualThread.java:631)",
"java.base\/java.lang.VirtualThread.sleepNanos(VirtualThread.java:803)",
"java.base\/java.lang.Thread.sleep(Thread.java:507)",
"SCObservable.sleepOneSecond(SCObservable.java:12)",
"SCObservable.lambda$handle$0(SCObservable.java:19)",
"java.base\/java.util.concurrent.StructuredTaskScope$SubtaskImpl.run(StructuredTaskScope.java:883)",
"java.base\/java.lang.VirtualThread.run(VirtualThread.java:311)"
]
},
{
"tid": "23",
"name": "",
"stack": [
...
"SCObservable.sleepOneSecond(SCObservable.java:12)",
"SCObservable.lambda$handle$1(SCObservable.java:20)",
...
]
},
{
"tid": "24",
"name": "",
"stack": [
...
"SCObservable.sleepOneSecond(SCObservable.java:12)",
"SCObservable.lambda$handle$2(SCObservable.java:21)",
...
]
}
],
"threadCount": "3"
}
]
}
}