private void startWorker()

in pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java [245:327]


	private void startWorker() {
		worker.execute(() -> {
			try {
				
				for(;;) {
					PushEvent<T> event;
					List<PushEventConsumer< ? super T>> toCall;
					boolean resetWait = false;
					synchronized (lock) {
						if(waitForFinishes) {
							semaphore.release();
							while(waitForFinishes) {
								lock.notifyAll();
								lock.wait();
							}
							semaphore.acquire();
						}

						event = (PushEvent<T>) queue.poll();
						
						if(event == null) {
							break;
						}

						toCall = new ArrayList<>(connected);
						if (event.isTerminal()) {
							waitForFinishes = true;
							resetWait = true;
							connected.clear();
							while (!semaphore.tryAcquire(parallelism - 1)) {
								lock.wait();
							}
						}
					}
					
					List<Promise<Long>> calls = toCall.stream().map(pec -> {
						if (semaphore.tryAcquire()) {
							try {
								return doSendWithBackPressure(pec, event);
							} finally {
								semaphore.release();
							}
						} else {
							return Promises.resolved(
									System.nanoTime() + safePush(pec, event));
						}
					}).collect(toList());

					long toWait = Promises.<Long,Long>all(calls)
							.map(l -> l.stream()
									.max((a,b) -> a.compareTo(b))
										.orElseGet(() -> System.nanoTime()))
							.getValue() - System.nanoTime();
					
					
					if (toWait > 0) {
						scheduler.schedule(this::startWorker, toWait,
								NANOSECONDS);
						return;
					}

					if (resetWait == true) {
						synchronized (lock) {
							waitForFinishes = false;
							lock.notifyAll();
						}
					}
				}

				semaphore.release();
			} catch (Exception e) {
				close(PushEvent.error(e));
			}
			if (queue.peek() != null && semaphore.tryAcquire()) {
				try {
					startWorker();
				} catch (Exception e) {
					close(PushEvent.error(e));
				}
			}
		});

	}