diff --git a/README.md b/README.md index 8fd1efd..678a478 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,10 @@ If an API that was introduced in Java 9+ was later updated, the update is listed ## Updated APIs +* ⓧ Stream gatherers: [custom gatherers](src/main/java/dev/nipafx/demo/java_next/api/gather/CustomGatherers.java) + (videos [1](https://www.youtube.com/watch?v=epgJm2dZTSg), + [2](https://www.youtube.com/watch?v=pNQ5OXMXDbY); + [JEP 461](https://openjdk.org/jeps/461)) * ⑯ (server) socket channels: [Unix domain socket support](src/main/java/dev/nipafx/demo/java16/api/unix_sockets) ([article](https://nipafx.dev/java-unix-domain-sockets/), [JEP 380](https://openjdk.java.net/jeps/380)) diff --git a/src/main/java/dev/nipafx/demo/java_next/api/gather/CustomGatherers.java b/src/main/java/dev/nipafx/demo/java_next/api/gather/CustomGatherers.java new file mode 100644 index 0000000..405270f --- /dev/null +++ b/src/main/java/dev/nipafx/demo/java_next/api/gather/CustomGatherers.java @@ -0,0 +1,242 @@ +package dev.nipafx.demo.java_next.api.gather; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Stream; + +/* --- UNTIL JEP 461 IS MERGED --- */ +import dev.nipafx.demo.java_next.api.gather.stream.Gatherer; +import dev.nipafx.demo.java_next.api.gather.stream.Gatherer.Downstream; +import dev.nipafx.demo.java_next.api.gather.stream.Gatherer.Integrator; + +/* --- AFTER JEP 461 IS MERGED --- */ +//import java.util.stream.Gatherer; +//import java.util.stream.Gatherer.Downstream; +//import java.util.stream.Gatherer.Integrator; + +import static java.lang.StringTemplate.STR; + +/** + * This demo is built for JEP 461, which is not yet merged. To allow experimentation, I rebuilt the essential parts: + * + * + * + * The code should work on any recent Java version as is. + */ +public class CustomGatherers { + + public static void main(String[] args) { + var letters = List.of("A", "B", "D", "C", "B", "F", "E"); + Predicate isEven = letter -> ((int) letter.charAt(0)) % 2 == 0; + + /* --- UNTIL JEP 461 IS MERGED --- */ + var result = apply_manually( + letters, + doNothing()); + /* --- AFTER JEP 461 IS MERGED --- */ +// var result = apply_jep461( +// letters, +// doNothing()); + + System.out.println(STR.""" + + in: \{letters} + out: \{result} + """); + } + + /* --- UNTIL JEP 461 IS MERGED --- */ + private static List apply_manually(List letters, Gatherer gatherer) { + // the raw type is needed because the compiler won't let us pass a `?` to a `?` + var rawGatherer = (Gatherer) gatherer; + var result = new ArrayList(); + Downstream downstream = result::add; + + Object state = gatherer.initializer().get(); + boolean integrateMore = true; + var iterator = letters.iterator(); + while (integrateMore && iterator.hasNext()) + integrateMore = rawGatherer.integrator().integrate(state, iterator.next(), downstream); + rawGatherer.finisher().accept(state, downstream); + + return result; + } + + /* --- AFTER JEP 461 IS MERGED --- */ +// private static List apply_jep461(List letters, Gatherer gatherer) { +// return letters.stream() +// .gather(gatherer) +// .toList(); +// } + + public static Gatherer doNothing() { + Integrator integrator = (_, element, downstream) -> { + downstream.push(element); + return true; + }; + return Gatherer.of(integrator); + } + + public static Gatherer map( + Function mapper) { + Integrator integrator = (_, element, downstream) -> { + R newElement = mapper.apply(element); + downstream.push(newElement); + return true; + }; + return Gatherer.of(integrator); + } + + public static Gatherer filter( + Predicate filter) { + Integrator integrator = (_, element, downstream) -> { + var passOn = filter.test(element); + if (passOn) + downstream.push(element); + return true; + }; + return Gatherer.of(integrator); + } + + public static Gatherer flatMapIf( + Predicate test, + Function> mapper) { + Integrator integrator = (_, element, downstream) -> { + var expand = test.test(element); + if (expand) + mapper.apply(element).forEach(downstream::push); + else + downstream.push(element); + return true; + }; + return Gatherer.of(integrator); + } + + public static Gatherer takeWhileIncluding( + Predicate predicate) { + Integrator integrator = (_, element, downstream) -> { + downstream.push(element); + return predicate.test(element); + }; + return Gatherer.of(integrator); + } + + public static Gatherer limit(int numberOfElements) { + Supplier initializer = AtomicInteger::new; + Integrator integrator = (state, element, downstream) -> { + var currentIndex = state.getAndIncrement(); + if (currentIndex < numberOfElements) + downstream.push(element); + return currentIndex + 1 < numberOfElements; + }; + return Gatherer.ofSequential(initializer, integrator); + } + + public static Gatherer increasing(Comparator comparator) { + Supplier> initializer = AtomicReference::new; + Integrator, T, T> integrator = (state, element, downstream) -> { + T largest = state.get(); + var isLarger = largest == null || comparator.compare(element, largest) > 0; + if (isLarger) { + downstream.push(element); + state.set(element); + } + return true; + }; + return Gatherer.ofSequential(initializer, integrator); + } + + public static Gatherer runningAverage() { + class State { + + private long sum; + private long count; + + } + Supplier initializer = State::new; + Integrator integrator = (state, element, downstream) -> { + state.sum += element; + state.count++; + double average = (double) state.sum / state.count; + downstream.push(average); + return true; + }; + return Gatherer.ofSequential(initializer, integrator); + } + + public static Gatherer> slidingWindow(int size) { + Supplier> initializer = ArrayList::new; + Integrator, T, List> integrator = (state, element, downstream) -> { + state.addFirst(element); + if (state.size() > size) { + state.removeLast(); + } + var group = List.copyOf(state); + downstream.push(group); + return true; + }; + return Gatherer.ofSequential(initializer, integrator); + } + + public static Gatherer> fixedGroups(int size) { + Supplier> initializer = ArrayList::new; + Integrator, T, List> integrator = (state, element, downstream) -> { + state.add(element); + if (state.size() == size) { + var group = List.copyOf(state); + downstream.push(group); + state.clear(); + } + return true; + }; + BiConsumer, Downstream>> finisher = (state, downstream) -> { + var group = List.copyOf(state); + downstream.push(group); + }; + return Gatherer.ofSequential(initializer, integrator, finisher); + } + + public static Gatherer sorted(Comparator comparator) { + Supplier> initializer = ArrayList::new; + Integrator, T, T> integrator = (state, element, _) -> { + state.add(element); + return true; + }; + BiConsumer, Downstream> finisher = (state, downstream) -> { + state.sort(comparator); + state.forEach(downstream::push); + }; + return Gatherer.ofSequential(initializer, integrator, finisher); + } + + public static Gatherer> increasingSequences(Comparator comparator) { + Supplier> initializer = ArrayList::new; + Integrator, T, List> integrator = (state, element, downstream) -> { + boolean isInSequence = state.isEmpty() + || comparator.compare(element, state.getLast()) >= 0; + if (!isInSequence) { + var group = List.copyOf(state); + downstream.push(group); + state.clear(); + } + state.addLast(element); + return true; + }; + BiConsumer, Downstream>> finisher = (state, downstream) -> { + var group = List.copyOf(state); + downstream.push(group); + }; + return Gatherer.ofSequential(initializer, integrator, finisher); + } + +} diff --git a/src/main/java/dev/nipafx/demo/java_next/api/gather/stream/Gatherer.java b/src/main/java/dev/nipafx/demo/java_next/api/gather/stream/Gatherer.java new file mode 100644 index 0000000..0767429 --- /dev/null +++ b/src/main/java/dev/nipafx/demo/java_next/api/gather/stream/Gatherer.java @@ -0,0 +1,94 @@ +package dev.nipafx.demo.java_next.api.gather.stream; + +import java.util.function.BiConsumer; +import java.util.function.BinaryOperator; +import java.util.function.Supplier; + +public interface Gatherer { + + Supplier initializer(); + Integrator integrator(); + BinaryOperator combiner(); + BiConsumer> finisher(); + + interface Integrator { + boolean integrate(A state, T element, Downstream downstream); + } + + interface Downstream { + boolean push(R element); + } + + static Gatherer of(Integrator integrator) { + return new Gatherer() { + @Override + public Supplier initializer() { + return () -> null; + } + + @Override + public Integrator integrator() { + return integrator; + } + + @Override + public BinaryOperator combiner() { + return (state1, state2) -> { throw new IllegalStateException(); }; + } + + @Override + public BiConsumer> finisher() { + return (state, downstream) -> { }; + } + }; + } + + static Gatherer ofSequential(Supplier initializer, Integrator integrator) { + return new Gatherer() { + @Override + public Supplier initializer() { + return initializer; + } + + @Override + public Integrator integrator() { + return integrator; + } + + @Override + public BinaryOperator combiner() { + return (state1, state2) -> { throw new IllegalStateException(); }; + } + + @Override + public BiConsumer> finisher() { + return (state, downstream) -> { }; + } + }; + } + + static Gatherer ofSequential(Supplier initializer, Integrator integrator, BiConsumer> finisher) { + return new Gatherer() { + @Override + public Supplier initializer() { + return () -> null; + } + + @Override + public Integrator integrator() { + return integrator; + } + + @Override + public BinaryOperator combiner() { + return (state1, state2) -> { throw new IllegalStateException(); }; + } + + @Override + public BiConsumer> finisher() { + return finisher; + } + }; + } + +}