- /** Submits new elements until stopped */
- private static class InfiniteFlux {
- private FluxSink<Integer> sink;
- private int counter = 0;
-
- public synchronized Flux<Integer> start() {
- stop();
- return Flux.create(this::next).doOnRequest(this::onRequest);
- }
-
- public synchronized void stop() {
- if (this.sink != null) {
- this.sink.complete();
- this.sink = null;
- }
- }
-
- void onRequest(long no) {
- logger.debug("InfiniteFlux.onRequest {}", no);
- for (long i = 0; i < no; ++i) {
- sink.next(counter++);
- }
- }
-
- void next(FluxSink<Integer> sink) {
- logger.debug("InfiniteFlux.next");
- this.sink = sink;
- sink.next(counter++);
- }
- }
-