in core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java [303:481]
public V call() {
Preconditions.checkNotNull(source, "source");
Preconditions.checkNotNull(sensor, "sensor on "+source);
T value = source.getAttribute(sensor);
// return immediately if either the ready predicate or the abort conditions hold
if (ready(value)) return postProcess(value);
if (Duration.ZERO.equals(timeout)) {
if (onTimeout.isPresent()) return onTimeout.get();
throw new RuntimeTimeoutException("Waiting not permitted");
}
final List<Exception> abortionExceptions = Lists.newCopyOnWriteArrayList();
long start = System.currentTimeMillis();
for (AttributeAndSensorCondition abortCondition : abortSensorConditions) {
Object currentValue = abortCondition.source.getAttribute(abortCondition.sensor);
if (abortCondition.predicate.apply(currentValue)) {
abortionExceptions.add(new Exception("Abort due to "+abortCondition+": "+currentValue));
}
}
if (!abortionExceptions.isEmpty()) {
throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortionExceptions);
}
TaskInternal<?> current = (TaskInternal<?>) Tasks.current();
if (current == null) throw new IllegalStateException("Should only be invoked in a running task");
Entity entity = BrooklynTaskTags.getTargetOrContextEntity(current);
if (entity == null) throw new IllegalStateException("Should only be invoked in a running task with an entity tag; "+
current+" has no entity tag ("+current.getStatusDetail(false)+")");
final LinkedList<T> publishedValues = new LinkedList<T>();
final Semaphore semaphore = new Semaphore(0); // could use Exchanger
SubscriptionHandle subscription = null;
List<SubscriptionHandle> thisWaitSubscriptions = Lists.newArrayList();
try {
subscription = entity.subscriptions().subscribe(source, sensor, new SensorEventListener<T>() {
@Override public void onEvent(SensorEvent<T> event) {
synchronized (publishedValues) { publishedValues.add(event.getValue()); }
semaphore.release();
}});
for (final AttributeAndSensorCondition abortCondition : abortSensorConditions) {
thisWaitSubscriptions.add(entity.subscriptions().subscribe(abortCondition.source, abortCondition.sensor, new SensorEventListener<Object>() {
@Override public void onEvent(SensorEvent<Object> event) {
if (abortCondition.predicate.apply(event.getValue())) {
abortionExceptions.add(new Exception("Abort due to "+abortCondition+": "+event.getValue()));
semaphore.release();
}
}}));
Object currentValue = abortCondition.source.getAttribute(abortCondition.sensor);
if (abortCondition.predicate.apply(currentValue)) {
abortionExceptions.add(new Exception("Abort due to "+abortCondition+": "+currentValue));
}
}
if (!abortionExceptions.isEmpty()) {
throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortionExceptions);
}
CountdownTimer timer = timeout!=null ? timeout.countdownTimer() : Duration.PRACTICALLY_FOREVER.countdownTimer();
Map<Integer,Duration> customTimeouts = MutableMap.of();
BiConsumer<Integer,Object> checkValueAtIndex = (index, val) -> {
synchronized (customTimeouts) {
Pair<AttributeAndSensorCondition<Object>, Duration> timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
if (timeoutIfCondition.getLeft().predicate.apply(val)) {
if (!customTimeouts.containsKey(index)) {
// start timer from this point
customTimeouts.put(index, timer.getDurationElapsed().add(timeoutIfCondition.getRight()));
}
} else {
customTimeouts.remove(index);
}
}
};
if (timeoutIfTimeoutSensorConditions!=null) {
for (int i=0; i<timeoutIfTimeoutSensorConditions.size(); i++) {
int index = i;
Pair<AttributeAndSensorCondition<Object>, Duration> timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
thisWaitSubscriptions.add(entity.subscriptions().subscribe(timeoutIfCondition.getLeft().source, timeoutIfCondition.getLeft().sensor, new SensorEventListener<Object>() {
@Override public void onEvent(SensorEvent<Object> event) {
checkValueAtIndex.accept(index, event.getValue());
}}));
Object val = timeoutIfCondition.getLeft().source.getAttribute(timeoutIfCondition.getLeft().sensor);
checkValueAtIndex.accept(index, val);
}
}
Duration maxPeriod = ValueResolver.PRETTY_QUICK_WAIT;
Duration nextPeriod = ValueResolver.REAL_QUICK_PERIOD;
while (true) {
// check the source on initial run (could be done outside the loop)
// and also (optionally) on each iteration in case it is more recent
value = source.getAttribute(sensor);
if (ready(value)) break;
if (timer!=null) {
if (timer.getDurationRemaining().isShorterThan(nextPeriod)) {
nextPeriod = timer.getDurationRemaining();
}
if (timer.isExpired()) {
if (onTimeout.isPresent()) return onTimeout.get();
throw new RuntimeTimeoutException("Unsatisfied after "+Duration.sinceUtc(start));
}
}
String prevBlockingDetails = current.setBlockingDetails(blockingDetails);
try {
if (semaphore.tryAcquire(nextPeriod.toMilliseconds(), TimeUnit.MILLISECONDS)) {
// immediately release so we are available for the next check
semaphore.release();
// if other permits have been made available (e.g. multiple notifications) drain them all as no point running multiple times
semaphore.drainPermits();
}
} finally {
current.setBlockingDetails(prevBlockingDetails);
}
// check any subscribed values which have come in first
while (true) {
synchronized (publishedValues) {
if (publishedValues.isEmpty()) break;
value = publishedValues.pop();
}
if (ready(value)) break;
}
// if unmanaged then ignore the other abort conditions
if (!ignoreUnmanaged && Entities.isNoLongerManaged(entity)) {
if (onUnmanaged.isPresent()) return onUnmanaged.get();
throw new NotManagedException(entity);
}
if (!abortionExceptions.isEmpty()) {
throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortionExceptions);
}
Set<Map.Entry<Integer, Duration>> timeoutsHere = null;
synchronized (customTimeouts) {
if (!customTimeouts.isEmpty()) {
timeoutsHere = MutableSet.copyOf(customTimeouts.entrySet());
}
}
if (timeoutsHere!=null) {
for (Map.Entry<Integer, Duration> entry : timeoutsHere) {
Integer index = entry.getKey();
Duration specialTimeout = entry.getValue();
Pair<AttributeAndSensorCondition<Object>, Duration> timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
if (timer.getDurationElapsed().isLongerThan(specialTimeout)) {
Object val = timeoutIfCondition.getLeft().source.getAttribute(timeoutIfCondition.getLeft().sensor);
if (timeoutIfCondition.getLeft().predicate.apply(val)) {
if (onTimeout.isPresent()) continue;
throw new RuntimeTimeoutException("Unsatisfied after " + Duration.sinceUtc(start) + " (tighter timeout due to " +
timeoutIfCondition.getLeft() + ", with value " + val + ")");
}
}
}
}
nextPeriod = nextPeriod.multiply(1.2).upperBound(maxPeriod);
}
if (LOG.isDebugEnabled()) LOG.debug("Attribute-ready for {} in entity {}", sensor, source);
return postProcess(value);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
} finally {
if (subscription != null) {
entity.subscriptions().unsubscribe(subscription);
}
for (SubscriptionHandle handle : thisWaitSubscriptions) {
entity.subscriptions().unsubscribe(handle);
}
}
}