in pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java [771:889]
public <R> PushStream<R> window(Supplier<Duration> time,
IntSupplier maxEvents, Executor ex,
BiFunction<Long,Collection<T>,R> f) {
AtomicLong timestamp = new AtomicLong();
AtomicLong counter = new AtomicLong();
Object lock = new Object();
AtomicReference<Queue<T>> queueRef = new AtomicReference<Queue<T>>(
null);
// This code is declared as a separate block to avoid any confusion
// about which instance's methods and variables are in scope
Consumer<AbstractPushStreamImpl<R>> begin = p -> {
synchronized (lock) {
timestamp.lazySet(System.nanoTime());
long count = counter.get();
scheduler.schedule(
getWindowTask(p, f, time, maxEvents, lock, count,
queueRef, timestamp, counter, ex),
time.get().toNanos(), NANOSECONDS);
}
queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
};
@SuppressWarnings("resource")
AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
psp, ex, scheduler, this) {
@Override
protected void beginning() {
begin.accept(this);
}
};
AtomicBoolean endPending = new AtomicBoolean(false);
updateNext((event) -> {
try {
if (eventStream.closed.get() == CLOSED) {
return ABORT;
}
Queue<T> queue;
if (!event.isTerminal()) {
long elapsed;
long newCount;
synchronized (lock) {
for (;;) {
queue = queueRef.get();
if (queue == null) {
if (endPending.get()) {
return ABORT;
} else {
continue;
}
} else if (queue.offer(event.getData())) {
return CONTINUE;
} else {
queueRef.lazySet(null);
break;
}
}
long now = System.nanoTime();
elapsed = now - timestamp.get();
timestamp.lazySet(now);
newCount = counter.get() + 1;
counter.lazySet(newCount);
// This is a non-blocking call, and must happen in the
// synchronized block to avoid re=ordering the executor
// enqueue with a subsequent incoming close operation
aggregateAndForward(f, eventStream, event, queue,
ex, elapsed);
}
// These must happen outside the synchronized block as we
// call out to user code
queueRef.set(
getQueueForInternalBuffering(maxEvents.getAsInt()));
scheduler.schedule(
getWindowTask(eventStream, f, time, maxEvents, lock,
newCount, queueRef, timestamp, counter, ex),
time.get().toNanos(), NANOSECONDS);
return CONTINUE;
} else {
long elapsed;
synchronized (lock) {
queue = queueRef.get();
queueRef.lazySet(null);
endPending.set(true);
long now = System.nanoTime();
elapsed = now - timestamp.get();
counter.lazySet(counter.get() + 1);
}
Collection<T> collected = queue == null ? emptyList()
: queue;
ex.execute(() -> {
try {
eventStream
.handleEvent(PushEvent.data(f.apply(
Long.valueOf(NANOSECONDS
.toMillis(elapsed)),
collected)));
} catch (Exception e) {
close(PushEvent.error(e));
}
});
}
ex.execute(() -> eventStream.handleEvent(event.nodata()));
return ABORT;
} catch (Exception e) {
close(PushEvent.error(e));
return ABORT;
}
});
return eventStream;
}