public PushStream window()

in pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java [771:889]


	public <R> PushStream<R> window(Supplier<Duration> time,
			IntSupplier maxEvents, Executor ex,
			BiFunction<Long,Collection<T>,R> f) {

		AtomicLong timestamp = new AtomicLong();
		AtomicLong counter = new AtomicLong();
		Object lock = new Object();
		AtomicReference<Queue<T>> queueRef = new AtomicReference<Queue<T>>(
				null);

		// This code is declared as a separate block to avoid any confusion
		// about which instance's methods and variables are in scope
		Consumer<AbstractPushStreamImpl<R>> begin = p -> {

			synchronized (lock) {
				timestamp.lazySet(System.nanoTime());
				long count = counter.get();


				scheduler.schedule(
						getWindowTask(p, f, time, maxEvents, lock, count,
								queueRef, timestamp, counter, ex),
						time.get().toNanos(), NANOSECONDS);
			}

			queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
		};

		@SuppressWarnings("resource")
		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
				psp, ex, scheduler, this) {
			@Override
			protected void beginning() {
				begin.accept(this);
			}
		};

		AtomicBoolean endPending = new AtomicBoolean(false);
		updateNext((event) -> {
			try {
				if (eventStream.closed.get() == CLOSED) {
					return ABORT;
				}
				Queue<T> queue;
				if (!event.isTerminal()) {
					long elapsed;
					long newCount;
					synchronized (lock) {
						for (;;) {
							queue = queueRef.get();
							if (queue == null) {
								if (endPending.get()) {
									return ABORT;
								} else {
									continue;
								}
							} else if (queue.offer(event.getData())) {
								return CONTINUE;
							} else {
								queueRef.lazySet(null);
								break;
							}
						}

						long now = System.nanoTime();
						elapsed = now - timestamp.get();
						timestamp.lazySet(now);
						newCount = counter.get() + 1;
						counter.lazySet(newCount);

						// This is a non-blocking call, and must happen in the
						// synchronized block to avoid re=ordering the executor
						// enqueue with a subsequent incoming close operation
						aggregateAndForward(f, eventStream, event, queue,
								ex, elapsed);
					}
					// These must happen outside the synchronized block as we
					// call out to user code
					queueRef.set(
							getQueueForInternalBuffering(maxEvents.getAsInt()));
					scheduler.schedule(
							getWindowTask(eventStream, f, time, maxEvents, lock,
									newCount, queueRef, timestamp, counter, ex),
							time.get().toNanos(), NANOSECONDS);

					return CONTINUE;
				} else {
					long elapsed;
					synchronized (lock) {
						queue = queueRef.get();
						queueRef.lazySet(null);
						endPending.set(true);
						long now = System.nanoTime();
						elapsed = now - timestamp.get();
						counter.lazySet(counter.get() + 1);
					}
					Collection<T> collected = queue == null ? emptyList()
							: queue;
					ex.execute(() -> {
						try {
							eventStream
									.handleEvent(PushEvent.data(f.apply(
											Long.valueOf(NANOSECONDS
													.toMillis(elapsed)),
											collected)));
						} catch (Exception e) {
							close(PushEvent.error(e));
						}
					});
				}
				ex.execute(() -> eventStream.handleEvent(event.nodata()));
				return ABORT;
			} catch (Exception e) {
				close(PushEvent.error(e));
				return ABORT;
			}
		});
		return eventStream;
	}