Translate

Friday, October 19, 2018

Implementing a Sliding Window Stream/Spliterator in Java

Source: https://dzone.com/articles/implementing-a-sliding-window-streamspliterator-in

Let's go through this tutorial on creating a sliding window in Java using streams and spliterators! Trust me — you'll be happy you did.

In this article, we'll take a look at how to implement a custom sliding window Stream/Spliterator in Java. Does the world need another way of implementing a sliding window operation in Java? Probably not, but you do — for your self-development.

Sliding Window

Simply put, the Sliding Window algorithm is a method of traversing data structures by moving a fixed-size window (sublist) over a sequence in fixed steps.
It gets much more intuitive when shown in an example.
If we wanted to traverse a list [1 2 3 4 5] by using the window of the size 3, we'd be merely looking at the following groups:
  1. [1 2 3]
  2. [2 3 4]
  3. [3 4 5]
But, if we wanted to traverse the same list using a window that's bigger than the collection's size, we wouldn't get a single element.

Implementation

To be able to create a custom Stream, we need to implement a custom Spliterator.
In our case, we need to be able to iterate over groups represented by Stream<T> sequences, so we need to implement the Spliterator interface and specify the generic type parameter:
public class SlidingWindowSpliterator<T> implements Spliterator<Stream<T>> {
// ...
}

Then, it turns out we have a bunch of methods to implement:
public class SlidingWindowSpliterator<T> implements Spliterator<Stream<T>> {
    @Override
    public boolean tryAdvance(Consumer<? super Stream<T>> action) {
        return false;
    }
    @Override
    public Spliterator<Stream<T>> trySplit() {
        return null;
    }
    @Override
    public long estimateSize() {
        return 0;
    }
    @Override
    public int characteristics() {
        return 0;
    }
}

We'll also need a few fields for storing buffered elements, the window size parameter, an iterator of the source collection, and a precomputed size estimation (we'll need that later on):
private final Queue<T> buffer;
private final Iterator<T> sourceIterator;
private final int windowSize;
private final int size;

Before we can start implementing interface methods, we need to have an ability to instantiate our tool.
In this case, we'll restrict the visibility of the constructor and expose a public static factory method instead:
private SlidingWindowSpliterator(Collection<T> source, int windowSize) {
    this.buffer = new ArrayDeque<>(windowSize);
    this.sourceIterator = Objects.requireNonNull(source).iterator();
    this.windowSize = windowSize;
    this.size = calculateSize(source, windowSize);
}
static <T> Stream<Stream<T>> windowed(Collection<T> stream, int windowSize) {
    return StreamSupport.stream(
    new SlidingWindowSpliterator<>(stream, windowSize), false);
}

Now, let's implement the easy part of the Spliterator methods.
In our case, there's no easy way to split the sequence, so when implementingtrySplit(), we default to values specified in the documentation. Luckily, size can be calculated quite easily:
private static int calculateSize(Collection<?> source, int windowSize) {
    return source.size() < windowSize
    ? 0
    : source.size() - windowSize + 1;
}
@Override 
public Spliterator<Stream<T>> trySplit() { 
    return null; 
} 
@Override 
public long estimateSize() { 
    return size; 
}

In  characteristics(), we specify:
  1.  ORDERED  — Because of the encounter, order matters.
  2.  NONNULL  — This because elements will never be null (although can contain nulls).
  3.  SIZED — This is due to the fact that size is predictable.
@Override
public int characteristics() {
    return ORDERED | NONNULL | SIZED;
}

Implementing tryAdvance

And, here comes the crucial part — the method responsible for the actual grouping and iteration.
Firstly, if the window is smaller than 1, then there's nothing to iterate so that we can short-circuit immediately:
@Override
public boolean tryAdvance(Consumer<? super Stream<T>> action) {
    if (windowSize < 1) {
        return false;
    }
    // ...
}

And now, to generate the first sublist, we need to start iterating and filling the buffer:
while (sourceIterator.hasNext()) {
    buffer.add(sourceIterator.next());
    // ...
}

Once the buffer is filled, we can dispatch the complete group and discard the oldest element from the buffer.
Here comes a crucial part — one might be tempted to pass the  buffer.stream()  to the accept()  method, which is a huge mistake. Streams are lazily bound to an underlying collection, which means that if the source changes, the Stream changes as well.
In order to avoid the problem and decouple our groups from the internal buffer representation, we need to snapshot the current state of the buffer before creating each Stream instance. We'll back Stream instances with arrays to make them as lightweight as possible.
Since Java doesn't support generic arrays, we need to do some ugly casting:
if (buffer.size() == windowSize) {
    action.accept(Arrays.stream((T[]) buffer.toArray(new Object[0])));
    buffer.poll();
    return sourceIterator.hasNext();
}

...and, voila, we are ready to use it:
windowed(List.of(1,2,3,4,5), 3)
.map(group -> group.collect(toList()))
.forEach(System.out::println);
// result
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]

For additional practices, you can implement a possibility of specifying a custom step size (now it's implicitly set to 1).

Complete Example

package com.pivovarit.stream;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class SlidingWindowSpliterator<T> implements Spliterator<Stream<T>> {
    static <T> Stream<Stream<T>> windowed(Collection<T> stream, int windowSize) {
        return StreamSupport.stream(
        new SlidingWindowSpliterator<>(stream, windowSize), false);
    }
    private final Queue<T> buffer;
    private final Iterator<T> sourceIterator;
    private final int windowSize;
    private final int size;
    private SlidingWindowSpliterator(Collection<T> source, int windowSize) {
        this.buffer = new ArrayDeque<>(windowSize);
        this.sourceIterator = Objects.requireNonNull(source).iterator();
        this.windowSize = windowSize;
        this.size = calculateSize(source, windowSize);
    }
    @Override
    public boolean tryAdvance(Consumer<? super Stream<T>> action) {
        if (windowSize < 1) {
            return false;
        }
        while (sourceIterator.hasNext()) {
            buffer.add(sourceIterator.next());
            if (buffer.size() == windowSize) {
                action.accept(Arrays.stream((T[]) buffer.toArray(new Object[0])));
                buffer.poll();
                return sourceIterator.hasNext();
            }
        }
        return false;
    }
    @Override
    public Spliterator<Stream<T>> trySplit() {
        return null;
    }
    @Override
    public long estimateSize() {
        return size;
    }
    @Override
    public int characteristics() {
        return ORDERED | NONNULL | SIZED;
    }
    private static int calculateSize(Collection<?> source, int windowSize) {
        return source.size() < windowSize
        ? 0
        : source.size() - windowSize + 1;
    }
}

Source

The complete example can be also found on GitHub.
Have a good idea about how to improve it? Feel free to issue a PR!