Найти в Дзене

Сборщики стримов

JDK 25
JDK 25

Одним из наиболее важных нововведений, на мой взгляд, в стандартной библиотеке в грядущем релизе JDK 25 станут сборщики стримов (JEP 485 Stream Gatherers). Да, стабильными они стали ещё в JDK 24, но для себя я рассматриваю только LTS-релизы.

Сборщики стримов описываются типизированным интерфейсом Gatherer<T, A, R>, где T - тип входящего элемента стрима, R - тип выходящего элемента, а A - потенциально изменяемое состояние сборщика.

Внутри Gatherer используются четыре компонента:

  • Supplier<A> initializer - инициализатор состояния (если оно используется)
  • Integrator<A, T, R> integrator - интегратор, обрабатывающий полученный элемент, опционально использующий состояние. Так же существует жадный интегратор - Integrator.Greedy, который принимает все элементы стрима и передаёт элементы нисходящему стриму только, если тот их может принять. Жадные интеграторы не должны порождать бесконечные стримы.
  • BinaryOperator<A> - функция, принимающая два промежуточных состояния и объединяющая их
  • BiConsumer<A, Downstream<? super R>> finisher - функция, принимающая последнее промежуточное состояние и нисходящий стрим (даунстрим - стрим, который получается после применения сборщика)

Создать сборщик стрима можно при помощи метода Gatherer.of (для сборщиков без состояния) или Gatherer.ofSequential (для сборщиков с состоянием), передав в качестве аргумента, как минимум, функцию-интегратор или все вышеуказанные компоненты. Состояние позволяет реализовывать логику, опирающуюся на ранее обработанные элементы стрима, например исключение дублирующихся элементов из стрима.

Для применения сборщика к стриму нужно передать его в промежуточном метода Stream.gather(). Ниже приведён пример сборщика который для каждого элемента вычисляет сумму всех предыдущих элементов и передаёт её в нисходящий стрим:

// выведет 1, 3, 7, 12, 18
IO.println(
Stream.of(1, 2, 4, 5, 6)
.gather(Gatherer.ofSequential(AtomicInteger::new,
Gatherer.Integrator.ofGreedy(
(state, in, downstream) -> downstream.push(state.addAndGet(in)))))
.map(String::valueOf)
.collect(Collectors.joining(", "))
);

При помощи сборщиков можно реализовывать и стандартные операции, но с большим контролем, например комбинируя, map() и filter():

Stream.of(1, 2, 3, 4, 5)
// Оставит только чётные числа и преобразует их в строку в бинарном формате
.gather(Gatherer.of(
(_, num, downstream) -> {
if (num % 2 == 0) {
return !downstream.isRejecting() && downstream.push(Integer.toBinaryString(num));
} else {
return !downstream.isRejecting();
}
}))
.forEach(IO::println);
/*
Выведет
10
100
*/

Кроме этого в стандартную библиотеку добавлен класс Gatherers, предоставляющий стандартные сборщики:

  • Gatherers.fold - сборщик, реализующий свёртывание элементов стрима в один
  • Gatherers.scan - сборщик, реализующий сканирование префикса - инкрементное накопление, когда каждый следующий элемент содержит данные предыдущих
  • Gatherers.mapConcurrent - сборщик, реализующий конкурентное преобразование элементов стрима
  • Gatherers.windowFixed - сборщик, агрегирующий элементы стрима в окна фиксированного размера
  • Gatherers.windowSliding - сборщик, агрегирующий элементы стрима в плавающие окна фиксированного размера

Вполне вероятно, что стандартные промежуточные операции в будущем могут быть переписаны на сборщики стримов. Как вам нововведение?