in core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java [296:415]
public V call() {
Preconditions.checkNotNull(source, "source");
Preconditions.checkNotNull(sensor, "sensor on "+source);
T value = source.getAttribute(sensor);
// return immediately if either the ready predicate or the abort conditions hold
if (ready(value)) return postProcess(value);
final List<Exception> abortionExceptions = Lists.newCopyOnWriteArrayList();
long start = System.currentTimeMillis();
for (AttributeAndSensorCondition abortCondition : abortSensorConditions) {
Object currentValue = abortCondition.source.getAttribute(abortCondition.sensor);
if (abortCondition.predicate.apply(currentValue)) {
abortionExceptions.add(new Exception("Abort due to "+abortCondition+": "+currentValue));
}
}
if (!abortionExceptions.isEmpty()) {
throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortionExceptions);
}
TaskInternal<?> current = (TaskInternal<?>) Tasks.current();
if (current == null) throw new IllegalStateException("Should only be invoked in a running task");
Entity entity = BrooklynTaskTags.getTargetOrContextEntity(current);
if (entity == null) throw new IllegalStateException("Should only be invoked in a running task with an entity tag; "+
current+" has no entity tag ("+current.getStatusDetail(false)+")");
final LinkedList<T> publishedValues = new LinkedList<T>();
final Semaphore semaphore = new Semaphore(0); // could use Exchanger
SubscriptionHandle subscription = null;
List<SubscriptionHandle> abortSubscriptions = Lists.newArrayList();
try {
subscription = entity.subscriptions().subscribe(source, sensor, new SensorEventListener<T>() {
@Override public void onEvent(SensorEvent<T> event) {
synchronized (publishedValues) { publishedValues.add(event.getValue()); }
semaphore.release();
}});
for (final AttributeAndSensorCondition abortCondition : abortSensorConditions) {
abortSubscriptions.add(entity.subscriptions().subscribe(abortCondition.source, abortCondition.sensor, new SensorEventListener<Object>() {
@Override public void onEvent(SensorEvent<Object> event) {
if (abortCondition.predicate.apply(event.getValue())) {
abortionExceptions.add(new Exception("Abort due to "+abortCondition+": "+event.getValue()));
semaphore.release();
}
}}));
Object currentValue = abortCondition.source.getAttribute(abortCondition.sensor);
if (abortCondition.predicate.apply(currentValue)) {
abortionExceptions.add(new Exception("Abort due to "+abortCondition+": "+currentValue));
}
}
if (!abortionExceptions.isEmpty()) {
throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortionExceptions);
}
CountdownTimer timer = timeout!=null ? timeout.countdownTimer() : null;
Duration maxPeriod = ValueResolver.PRETTY_QUICK_WAIT;
Duration nextPeriod = ValueResolver.REAL_QUICK_PERIOD;
while (true) {
// check the source on initial run (could be done outside the loop)
// and also (optionally) on each iteration in case it is more recent
value = source.getAttribute(sensor);
if (ready(value)) break;
if (timer!=null) {
if (timer.getDurationRemaining().isShorterThan(nextPeriod)) {
nextPeriod = timer.getDurationRemaining();
}
if (timer.isExpired()) {
if (onTimeout.isPresent()) return onTimeout.get();
throw new RuntimeTimeoutException("Unsatisfied after "+Duration.sinceUtc(start));
}
}
String prevBlockingDetails = current.setBlockingDetails(blockingDetails);
try {
if (semaphore.tryAcquire(nextPeriod.toMilliseconds(), TimeUnit.MILLISECONDS)) {
// immediately release so we are available for the next check
semaphore.release();
// if other permits have been made available (e.g. multiple notifications) drain them all as no point running multiple times
semaphore.drainPermits();
}
} finally {
current.setBlockingDetails(prevBlockingDetails);
}
// check any subscribed values which have come in first
while (true) {
synchronized (publishedValues) {
if (publishedValues.isEmpty()) break;
value = publishedValues.pop();
}
if (ready(value)) break;
}
// if unmanaged then ignore the other abort conditions
if (!ignoreUnmanaged && Entities.isNoLongerManaged(entity)) {
if (onUnmanaged.isPresent()) return onUnmanaged.get();
throw new NotManagedException(entity);
}
if (!abortionExceptions.isEmpty()) {
throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortionExceptions);
}
nextPeriod = nextPeriod.multiply(1.2).upperBound(maxPeriod);
}
if (LOG.isDebugEnabled()) LOG.debug("Attribute-ready for {} in entity {}", sensor, source);
return postProcess(value);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
} finally {
if (subscription != null) {
entity.subscriptions().unsubscribe(subscription);
}
for (SubscriptionHandle handle : abortSubscriptions) {
entity.subscriptions().unsubscribe(handle);
}
}
}