in pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java [245:327]
private void startWorker() {
worker.execute(() -> {
try {
for(;;) {
PushEvent<T> event;
List<PushEventConsumer< ? super T>> toCall;
boolean resetWait = false;
synchronized (lock) {
if(waitForFinishes) {
semaphore.release();
while(waitForFinishes) {
lock.notifyAll();
lock.wait();
}
semaphore.acquire();
}
event = (PushEvent<T>) queue.poll();
if(event == null) {
break;
}
toCall = new ArrayList<>(connected);
if (event.isTerminal()) {
waitForFinishes = true;
resetWait = true;
connected.clear();
while (!semaphore.tryAcquire(parallelism - 1)) {
lock.wait();
}
}
}
List<Promise<Long>> calls = toCall.stream().map(pec -> {
if (semaphore.tryAcquire()) {
try {
return doSendWithBackPressure(pec, event);
} finally {
semaphore.release();
}
} else {
return Promises.resolved(
System.nanoTime() + safePush(pec, event));
}
}).collect(toList());
long toWait = Promises.<Long,Long>all(calls)
.map(l -> l.stream()
.max((a,b) -> a.compareTo(b))
.orElseGet(() -> System.nanoTime()))
.getValue() - System.nanoTime();
if (toWait > 0) {
scheduler.schedule(this::startWorker, toWait,
NANOSECONDS);
return;
}
if (resetWait == true) {
synchronized (lock) {
waitForFinishes = false;
lock.notifyAll();
}
}
}
semaphore.release();
} catch (Exception e) {
close(PushEvent.error(e));
}
if (queue.peek() != null && semaphore.tryAcquire()) {
try {
startWorker();
} catch (Exception e) {
close(PushEvent.error(e));
}
}
});
}