pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java [456:477]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
				psp, defaultExecutor, scheduler, this);
		AtomicInteger count = new AtomicInteger(2);
		PushEventConsumer<T> consumer = event -> {
			try {
				if (!event.isTerminal()) {
					return eventStream.handleEvent(event);
				}
	
				if (count.decrementAndGet() == 0) {
					eventStream.handleEvent(event.nodata());
					return ABORT;
				}
				return CONTINUE;
			} catch (Exception e) {
				PushEvent<T> error = PushEvent.error(e);
				close(error);
				eventStream.close(event.nodata());
				return ABORT;
			}
		};
		updateNext(consumer);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java [503:524]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
				psp, defaultExecutor, scheduler, this);
		AtomicInteger count = new AtomicInteger(2);
		PushEventConsumer<T> consumer = event -> {
			try {
				if (!event.isTerminal()) {
					return eventStream.handleEvent(event);
				}
				
				if (count.decrementAndGet() == 0) {
					eventStream.handleEvent(event.nodata());
					return ABORT;
				}
				return CONTINUE;
			} catch (Exception e) {
				PushEvent<T> error = PushEvent.error(e);
				close(error);
				eventStream.close(event.nodata());
				return ABORT;
			}
		};
		updateNext(consumer);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



