Skip to content

Commit

Permalink
Add example for stream gatherers
Browse files Browse the repository at this point in the history
  • Loading branch information
nipafx committed Nov 2, 2023
1 parent b9facd7 commit a84483f
Show file tree
Hide file tree
Showing 3 changed files with 340 additions and 0 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ If an API that was introduced in Java 9+ was later updated, the update is listed

## Updated APIs

* ⓧ Stream gatherers: [custom gatherers](src/main/java/dev/nipafx/demo/java_next/api/gather/CustomGatherers.java)
(videos [1](https://www.youtube.com/watch?v=epgJm2dZTSg),
[2](https://www.youtube.com/watch?v=pNQ5OXMXDbY);
[JEP 461](https://openjdk.org/jeps/461))
* ⑯ (server) socket channels: [Unix domain socket support](src/main/java/dev/nipafx/demo/java16/api/unix_sockets)
([article](https://nipafx.dev/java-unix-domain-sockets/),
[JEP 380](https://openjdk.java.net/jeps/380))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package dev.nipafx.demo.java_next.api.gather;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;

/* --- UNTIL JEP 461 IS MERGED --- */
import dev.nipafx.demo.java_next.api.gather.stream.Gatherer;
import dev.nipafx.demo.java_next.api.gather.stream.Gatherer.Downstream;
import dev.nipafx.demo.java_next.api.gather.stream.Gatherer.Integrator;

/* --- AFTER JEP 461 IS MERGED --- */
//import java.util.stream.Gatherer;
//import java.util.stream.Gatherer.Downstream;
//import java.util.stream.Gatherer.Integrator;

import static java.lang.StringTemplate.STR;

/**
* This demo is built for JEP 461, which is not yet merged. To allow experimentation, I rebuilt the essential parts:
*
* <ul>
* <li>the interface hierarchy in {@link dev.nipafx.demo.java_next.api.gather.stream.Gatherer}</li>
* <li>the iteration mechanism in {@code apply_manually} (without any guarantee of correctness</li>
* </ul>
*
* The code should work on any recent Java version as is.
*/
public class CustomGatherers {

public static void main(String[] args) {
var letters = List.of("A", "B", "D", "C", "B", "F", "E");
Predicate<String> isEven = letter -> ((int) letter.charAt(0)) % 2 == 0;

/* --- UNTIL JEP 461 IS MERGED --- */
var result = apply_manually(
letters,
doNothing());
/* --- AFTER JEP 461 IS MERGED --- */
// var result = apply_jep461(
// letters,
// doNothing());

System.out.println(STR."""
in: \{letters}
out: \{result}
""");
}

/* --- UNTIL JEP 461 IS MERGED --- */
private static <R> List<R> apply_manually(List<String> letters, Gatherer<String, ?, R> gatherer) {
// the raw type is needed because the compiler won't let us pass a `?` to a `?`
var rawGatherer = (Gatherer) gatherer;
var result = new ArrayList<R>();
Downstream<? super R> downstream = result::add;

Object state = gatherer.initializer().get();
boolean integrateMore = true;
var iterator = letters.iterator();
while (integrateMore && iterator.hasNext())
integrateMore = rawGatherer.integrator().integrate(state, iterator.next(), downstream);
rawGatherer.finisher().accept(state, downstream);

return result;
}

/* --- AFTER JEP 461 IS MERGED --- */
// private static <R> List<R> apply_jep461(List<String> letters, Gatherer<String,?, R> gatherer) {
// return letters.stream()
// .gather(gatherer)
// .toList();
// }

public static <T> Gatherer<T, ?, T> doNothing() {
Integrator<Void, T, T> integrator = (_, element, downstream) -> {
downstream.push(element);
return true;
};
return Gatherer.of(integrator);
}

public static <T, R> Gatherer<T, ?, R> map(
Function<? super T, ? extends R> mapper) {
Integrator<Void, T, R> integrator = (_, element, downstream) -> {
R newElement = mapper.apply(element);
downstream.push(newElement);
return true;
};
return Gatherer.of(integrator);
}

public static <T> Gatherer<T, ?, T> filter(
Predicate<? super T> filter) {
Integrator<Void, T, T> integrator = (_, element, downstream) -> {
var passOn = filter.test(element);
if (passOn)
downstream.push(element);
return true;
};
return Gatherer.of(integrator);
}

public static <T> Gatherer<T, ?, T> flatMapIf(
Predicate<? super T> test,
Function<? super T, Stream<? extends T>> mapper) {
Integrator<Void, T, T> integrator = (_, element, downstream) -> {
var expand = test.test(element);
if (expand)
mapper.apply(element).forEach(downstream::push);
else
downstream.push(element);
return true;
};
return Gatherer.of(integrator);
}

public static <T> Gatherer<T, ?, T> takeWhileIncluding(
Predicate<? super T> predicate) {
Integrator<Void, T, T> integrator = (_, element, downstream) -> {
downstream.push(element);
return predicate.test(element);
};
return Gatherer.of(integrator);
}

public static <T> Gatherer<T, ?, T> limit(int numberOfElements) {
Supplier<AtomicInteger> initializer = AtomicInteger::new;
Integrator<AtomicInteger, T, T> integrator = (state, element, downstream) -> {
var currentIndex = state.getAndIncrement();
if (currentIndex < numberOfElements)
downstream.push(element);
return currentIndex + 1 < numberOfElements;
};
return Gatherer.ofSequential(initializer, integrator);
}

public static <T> Gatherer<T, ?, T> increasing(Comparator<T> comparator) {
Supplier<AtomicReference<T>> initializer = AtomicReference::new;
Integrator<AtomicReference<T>, T, T> integrator = (state, element, downstream) -> {
T largest = state.get();
var isLarger = largest == null || comparator.compare(element, largest) > 0;
if (isLarger) {
downstream.push(element);
state.set(element);
}
return true;
};
return Gatherer.ofSequential(initializer, integrator);
}

public static Gatherer<Integer, ?, Double> runningAverage() {
class State {

private long sum;
private long count;

}
Supplier<State> initializer = State::new;
Integrator<State, Integer, Double> integrator = (state, element, downstream) -> {
state.sum += element;
state.count++;
double average = (double) state.sum / state.count;
downstream.push(average);
return true;
};
return Gatherer.ofSequential(initializer, integrator);
}

public static <T> Gatherer<T, ?, List<T>> slidingWindow(int size) {
Supplier<List<T>> initializer = ArrayList::new;
Integrator<List<T>, T, List<T>> integrator = (state, element, downstream) -> {
state.addFirst(element);
if (state.size() > size) {
state.removeLast();
}
var group = List.copyOf(state);
downstream.push(group);
return true;
};
return Gatherer.ofSequential(initializer, integrator);
}

public static <T> Gatherer<T, ?, List<T>> fixedGroups(int size) {
Supplier<List<T>> initializer = ArrayList::new;
Integrator<List<T>, T, List<T>> integrator = (state, element, downstream) -> {
state.add(element);
if (state.size() == size) {
var group = List.copyOf(state);
downstream.push(group);
state.clear();
}
return true;
};
BiConsumer<List<T>, Downstream<? super List<T>>> finisher = (state, downstream) -> {
var group = List.copyOf(state);
downstream.push(group);
};
return Gatherer.ofSequential(initializer, integrator, finisher);
}

public static <T> Gatherer<T, ?, T> sorted(Comparator<? super T> comparator) {
Supplier<List<T>> initializer = ArrayList::new;
Integrator<List<T>, T, T> integrator = (state, element, _) -> {
state.add(element);
return true;
};
BiConsumer<List<T>, Downstream<? super T>> finisher = (state, downstream) -> {
state.sort(comparator);
state.forEach(downstream::push);
};
return Gatherer.ofSequential(initializer, integrator, finisher);
}

public static <T> Gatherer<T, ?, List<T>> increasingSequences(Comparator<T> comparator) {
Supplier<List<T>> initializer = ArrayList::new;
Integrator<List<T>, T, List<T>> integrator = (state, element, downstream) -> {
boolean isInSequence = state.isEmpty()
|| comparator.compare(element, state.getLast()) >= 0;
if (!isInSequence) {
var group = List.copyOf(state);
downstream.push(group);
state.clear();
}
state.addLast(element);
return true;
};
BiConsumer<List<T>, Downstream<? super List<T>>> finisher = (state, downstream) -> {
var group = List.copyOf(state);
downstream.push(group);
};
return Gatherer.ofSequential(initializer, integrator, finisher);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package dev.nipafx.demo.java_next.api.gather.stream;

import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Supplier;

public interface Gatherer<T, A, R> {

Supplier<A> initializer();
Integrator<A,T,R> integrator();
BinaryOperator<A> combiner();
BiConsumer<A, Downstream<? super R>> finisher();

interface Integrator<A, T, R> {
boolean integrate(A state, T element, Downstream<? super R> downstream);
}

interface Downstream<R> {
boolean push(R element);
}

static <T, A, R> Gatherer<T, A, R> of(Integrator<A, T, R> integrator) {
return new Gatherer<T, A, R>() {
@Override
public Supplier<A> initializer() {
return () -> null;
}

@Override
public Integrator<A, T, R> integrator() {
return integrator;
}

@Override
public BinaryOperator<A> combiner() {
return (state1, state2) -> { throw new IllegalStateException(); };
}

@Override
public BiConsumer<A, Downstream<? super R>> finisher() {
return (state, downstream) -> { };
}
};
}

static <T, A, R> Gatherer<T, A, R> ofSequential(Supplier<A> initializer, Integrator<A, T, R> integrator) {
return new Gatherer<T, A, R>() {
@Override
public Supplier<A> initializer() {
return initializer;
}

@Override
public Integrator<A, T, R> integrator() {
return integrator;
}

@Override
public BinaryOperator<A> combiner() {
return (state1, state2) -> { throw new IllegalStateException(); };
}

@Override
public BiConsumer<A, Downstream<? super R>> finisher() {
return (state, downstream) -> { };
}
};
}

static <T, A, R> Gatherer<T, A, R> ofSequential(Supplier<A> initializer, Integrator<A, T, R> integrator, BiConsumer<A, Downstream<? super R>> finisher) {
return new Gatherer<T, A, R>() {
@Override
public Supplier<A> initializer() {
return () -> null;
}

@Override
public Integrator<A, T, R> integrator() {
return integrator;
}

@Override
public BinaryOperator<A> combiner() {
return (state1, state2) -> { throw new IllegalStateException(); };
}

@Override
public BiConsumer<A, Downstream<? super R>> finisher() {
return finisher;
}
};
}

}

0 comments on commit a84483f

Please sign in to comment.