public V call()

in core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java [296:415]


        public V call() {
            Preconditions.checkNotNull(source, "source");
            Preconditions.checkNotNull(sensor, "sensor on "+source);
            T value = source.getAttribute(sensor);

            // return immediately if either the ready predicate or the abort conditions hold
            if (ready(value)) return postProcess(value);
            
            final List<Exception> abortionExceptions = Lists.newCopyOnWriteArrayList();
            long start = System.currentTimeMillis();
            
            for (AttributeAndSensorCondition abortCondition : abortSensorConditions) {
                Object currentValue = abortCondition.source.getAttribute(abortCondition.sensor);
                if (abortCondition.predicate.apply(currentValue)) {
                    abortionExceptions.add(new Exception("Abort due to "+abortCondition+": "+currentValue));
                }
            }
            if (!abortionExceptions.isEmpty()) {
                throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortionExceptions);
            }

            TaskInternal<?> current = (TaskInternal<?>) Tasks.current();
            if (current == null) throw new IllegalStateException("Should only be invoked in a running task");
            Entity entity = BrooklynTaskTags.getTargetOrContextEntity(current);
            if (entity == null) throw new IllegalStateException("Should only be invoked in a running task with an entity tag; "+
                current+" has no entity tag ("+current.getStatusDetail(false)+")");
            
            final LinkedList<T> publishedValues = new LinkedList<T>();
            final Semaphore semaphore = new Semaphore(0); // could use Exchanger
            SubscriptionHandle subscription = null;
            List<SubscriptionHandle> abortSubscriptions = Lists.newArrayList();
            
            try {
                subscription = entity.subscriptions().subscribe(source, sensor, new SensorEventListener<T>() {
                    @Override public void onEvent(SensorEvent<T> event) {
                        synchronized (publishedValues) { publishedValues.add(event.getValue()); }
                        semaphore.release();
                    }});
                for (final AttributeAndSensorCondition abortCondition : abortSensorConditions) {
                    abortSubscriptions.add(entity.subscriptions().subscribe(abortCondition.source, abortCondition.sensor, new SensorEventListener<Object>() {
                        @Override public void onEvent(SensorEvent<Object> event) {
                            if (abortCondition.predicate.apply(event.getValue())) {
                                abortionExceptions.add(new Exception("Abort due to "+abortCondition+": "+event.getValue()));
                                semaphore.release();
                            }
                        }}));
                    Object currentValue = abortCondition.source.getAttribute(abortCondition.sensor);
                    if (abortCondition.predicate.apply(currentValue)) {
                        abortionExceptions.add(new Exception("Abort due to "+abortCondition+": "+currentValue));
                    }
                }
                if (!abortionExceptions.isEmpty()) {
                    throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortionExceptions);
                }

                CountdownTimer timer = timeout!=null ? timeout.countdownTimer() : null;
                Duration maxPeriod = ValueResolver.PRETTY_QUICK_WAIT;
                Duration nextPeriod = ValueResolver.REAL_QUICK_PERIOD;
                while (true) {
                    // check the source on initial run (could be done outside the loop) 
                    // and also (optionally) on each iteration in case it is more recent 
                    value = source.getAttribute(sensor);
                    if (ready(value)) break;

                    if (timer!=null) {
                        if (timer.getDurationRemaining().isShorterThan(nextPeriod)) {
                            nextPeriod = timer.getDurationRemaining();
                        }
                        if (timer.isExpired()) {
                            if (onTimeout.isPresent()) return onTimeout.get();
                            throw new RuntimeTimeoutException("Unsatisfied after "+Duration.sinceUtc(start));
                        }
                    }

                    String prevBlockingDetails = current.setBlockingDetails(blockingDetails);
                    try {
                        if (semaphore.tryAcquire(nextPeriod.toMilliseconds(), TimeUnit.MILLISECONDS)) {
                            // immediately release so we are available for the next check
                            semaphore.release();
                            // if other permits have been made available (e.g. multiple notifications) drain them all as no point running multiple times
                            semaphore.drainPermits();
                        }
                    } finally {
                        current.setBlockingDetails(prevBlockingDetails);
                    }

                    // check any subscribed values which have come in first
                    while (true) {
                        synchronized (publishedValues) {
                            if (publishedValues.isEmpty()) break;
                            value = publishedValues.pop(); 
                        }
                        if (ready(value)) break;
                    }

                    // if unmanaged then ignore the other abort conditions
                    if (!ignoreUnmanaged && Entities.isNoLongerManaged(entity)) {
                        if (onUnmanaged.isPresent()) return onUnmanaged.get();
                        throw new NotManagedException(entity);                        
                    }
                    
                    if (!abortionExceptions.isEmpty()) {
                        throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortionExceptions);
                    }

                    nextPeriod = nextPeriod.multiply(1.2).upperBound(maxPeriod);
                }
                if (LOG.isDebugEnabled()) LOG.debug("Attribute-ready for {} in entity {}", sensor, source);
                return postProcess(value);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            } finally {
                if (subscription != null) {
                    entity.subscriptions().unsubscribe(subscription);
                }
                for (SubscriptionHandle handle : abortSubscriptions) {
                    entity.subscriptions().unsubscribe(handle);
                }
            }
        }